Sift is building an observability platform to help world-class hardware companies ingest and understand modern telemetry. Recently, we added key-level watermarking on top of Flink to more reliably process complex multi-channel rules with imperfect real-world streams of data.
One advanced telemetry feature we’re building at Sift is real-time multi-channel time-aligned stateful rules. That’s a bit of a mouthful, so let’s break it down:
- Real-time: we want to apply your rules to your data with as little latency as possible.
- Multi-channel: we want to let you join data from multiple channels in your rule conditions.
- Time-aligned: each datapoint should only be joined with another channel's latest values as of the datapoint's timestamp.
- Stateful: rules can continuously compute a value based on previous values.
Meeting all of these requirements is a challenging engineering problem, and there’s no off-the-shelf software that will do it. So we’re building our own.
Stream Processing
One of the primary challenges with stateful rules lies in data storage and retrieval. Traditional monitoring and alerting systems typically query their data sources periodically, evaluating data against rule conditions. This approach becomes costly and time-consuming, especially with thousands of assets and dozens of rules, which lead to numerous queries that strain the data storage system, particularly when looking back over extended periods.
To address this, stream processing flips the problem on its head. Instead of repeatedly fetching historical data, streaming jobs maintain a memory per rule (called state) and act only when new data arrives. This approach allows for quick responses and efficient state management. This enables immediate action if something unexpected occurs, without having to query for the previous value every time data arrives.
Recently, to help us more reliably ingest and process our customers’ real-world hardware telemetry, we’ve updated our stream processing infrastructure to add key-level watermarking on top of Apache Flink.
Time Alignment and the Inconvenience of Reality
Aligning multiple data channels by timestamp is a surprisingly complicated problem. In an ideal world, the data points for each channel would come in at exactly the same time and we could simply look at small batches of data for each channel, join them together, and compute the resulting values right away.
But we don’t live in that world. Instead we have to deal with real-world inconveniences like latency and poor connectivity.
Consider assets operating in areas with limited connectivity, such as satellites that only periodically pass over a ground station, or vehicles in a remote desert. They will be collecting telemetry continuously, but will not always be able to send it. The payloads will be buffered while no connection is available, and then sent when the connection is reestablished. When you get a burst of data from reconnected hardware, you want to process it appropriately based on its timestamps, not treat it all as if it's from right now. How do we do the right thing to process this data in a way that makes sense and avoids false alarms based on communication issues?
The Existing Solution: Global Watermarking
In stream processing, these types of problems are solved with buffering and event time watermarks. The idea is, we hold on to a buffer of data and its corresponding timestamps, and we don’t process any given datapoint until the watermark has advanced past that point’s timestamp. What is a watermark though? How do we determine it?
The global watermark is set by taking the minimum, across all the channels, of the maximum timestamp we’ve seen per channel. In other words it is “the most recent timestamp from the farthest-behind channel”.
Imagine you have a vehicle that has two components sending data at different rates. If you have rules that refer to data from both components, you don’t want to evaluate fresh data against stale values from the other component. Watermarks are how you make sure to wait for all the data to be available first. If we didn’t have this watermarking process, we might raise false alarms by processing incomplete or out of sync data.
This issue is one reason telemetry monitoring for hardware can quickly become very challenging! We’re building Sift because there’s no great off the shelf tool for this. Internally we are using Apache Flink as our stream processing engine, as it has some extremely powerful capabilities for dealing with large numbers of parallel channels of streaming data and tracking extremely large amounts of state.
But out of the box it doesn’t do everything we need.
The Problem with Global Watermarking
One thing Flink doesn’t support is the ability to track watermarks at the per-channel level! It’s designed for data streams with global watermarks – all the events coming off an e-commerce company’s servers, for instance – whereas here at Sift we are dealing with different channels coming from different machines and vehicles through different pipelines.
A global watermark on this sort of data can cause big problems with lost data, late data, or with latency and high processing costs due to the need for large data buffers. A single slow data channel could hold back the processing of all of your rules.
So, to solve this problem we built our own key-level watermarking on top of Flink!
Our Solution: Key-level Watermarking
A "key" in Flink represents a distinct partition of events that can be evaluated independently. In our system, that's an individual rule. Each rule is applied to one or more channel’s worth of timeseries data, so when events on those timeseries arrive they'll be sent to the correct server instance for the rules that they match.
In a common Flink scenario you'd use small event time windows to buffer those events per-key, so that when the global watermark advances past the end of any particular window, all of its elements would be processed and evaluated. It would internally be managing this across however many parallel data sources you were reading from and combining that to give you a global watermark.
Rewriting Flink internals to make this watermarking happen at a key level instead of at its internal parallelism level would be a very involved project, as it is tied into the Flink StreamRecord
objects at a low level.
However, the building blocks used by the watermark extraction process – keying events to track each split separately, and firing a periodic timer to consider a new watermark – are essentially the same primitives of a Flink KeyedProcessFunction
.
So we’ve built, essentially, a “Flink in Flink” approach to watermarking, leveraging custom functions to put events into our own event-time-based window buffers on a per-rule-condition basis.
So we’ve built, essentially, a “Flink in Flink” approach to watermarking, leveraging custom functions to put events into our own event-time-based window buffers on a per-rule-condition basis. Any time a new event arrives we keep track of the relevant data like “max seen event timestamp” in order for us to create a per-timeseries-channel watermark. Then we use a periodic processing-time-based timer to check if those per-timeseries-channel watermarks have advanced past any of the available buffered windows, at which point we can flush and process those windows.
As mentioned before, we support rules that are tied to multiple timeseries channels, not just a single one. We have to monitor these watermarks by channel and reconcile them together, so that if – for example – a control channel is ahead of the actual data, we don’t continually process the data before most of it arrives!
The logic for determining watermarks for multiple channels is the same as for a global watermark: it’s the most recent timestamp from the farthest-behind channel. Except now we only have to consider the channels a rule is referring to, not every single channel in the system.
Future Work
As we develop this system in more depth, we are building out increasingly sophisticated capabilities for this rule processing. For example, with live monitoring and stateful rules there will always be challenges around late data regarding how long you're willing to wait for things that may have been delayed: if your rule says something like “we shouldn’t get more than 5 events in a given second”, then if the 6th data point in a given second arrived later than we were willing to wait for it, that would cause a false negative.
We want to provide our users with all the data insights necessary to make appropriate tradeoffs between confidence and latency for their real-time evaluation, while also supporting batch re-evaluation to account for the times when late data was unavoidable.
If you’re interested in helping us solve hard problems to build the world’s best observability stack for hardware sensor data, check out our careers page.