Level: INTERMEDIATE
When starting with EPL you will often create event listeners with the “on” or “on all” keywords. These listeners are checked and executed on every individual incoming event and provide a simple, yet powerful way of writing your rule logic. But what if you need to keep track of multiple events over time?
Typical questions you might want to answer in Cumulocity include windows or batches of events:
- What is average of this Measurement over the last hour (day)?
- What is the rate of Events coming in from my devices?
- What does the standard deviation over the last 50 Measurements look like and is my last Measurement within the expected range?
- What was the maximum (minimum) value for the Measurements today?
One way to solve these or similar questions in EPL would be to store or query the amount of events you need and do the calculation with a big loop in your code. While this is often a viable approach, it often leads to a lot of buggy code because you need to keep track of the windows (and their lifetime) on your own. Things will quickly become complex once you notice you have to partition your calculations based on the device the events actually came from, or do more advanced things like groupings on the events.
A much better approach would be to have something like SQL to do all the heavy lifting of keeping track and updating windows, partitioning, grouping, and aggregation. This allows something more similar to a declarative approach for describing what you want to calculate instead of the details on how .
Well, here is the good news: the EPL syntax actually allows you to do that. Stream listeners (10.11 documentation) offer an alternative to the “discrete” event listeners. Some of the syntax may look a little more like LINQ than SQL as it reads more intuitively in imperative code. Just look at this example and you can probably figure out easily what it is doing:
from m in all MeasurementFragment(type="c8y_Temperature",
valueFragment="c8y_Temperature",
valueSeries="T")
partition by m.source within 60.0
group by m.source
select MovingAverage(m.source, avg(m.value)) as movingAverage
{
route movingAverage;
}
This is just a single stream query and a few lines of code that will calculate the average temperature over the last 60 seconds for each device individually.
Apama already includes the most commonly used aggregation functions (ApamaDoc) - but you are not just limited to those. You can create your own custom aggregates to use all the benefits of stream listeners while doing your own calculations on the windows and partitions.
If this is still not enough for your use case: well, you can also combine Stream queries via join or use Stream Queries as part of other Stream Queries to create “Stream Networks”. And then you are also able to mix all this with your common “on” or “on all” listeners.
Stream listeners make EPL much more powerful and all these options can confuse beginners, but don’t be afraid to play with them. Use the samples in your on-prem Apama installation or in Cumulocity IoT to get a feeling for them, and refer to the extensive documentation for more detail.
Stream listeners can really simplify a lot of typical analytic use cases.
Put the “Streams” back into “Streaming Analytics”!
Today’s article was kindly provided by Mario Heidenreich from the Global Competency Centre for IoT team.
This is Day #21 of a short series of brief tips, tricks, hints, and reminders of information relating to the Apama Streaming Analytics platform, both from Software AG as well as from the community.
This series of articles will be published Monday-Friday only, with the occasional weekend bonus.