In the Apama EPL first steps blog, we explored the basic concepts and constructs used to create EPL programs. In this blog, we will build upon this and look at some more advanced features of the EPL language, namely those around sending and receiving events, as well as extending our knowledge of filtering specific patterns of events.
Sending and receiving
We know that EPL can process incoming events with a listener, but where do these events come from? How does the Correlator send and receive events for internal communication? The answer is in two related systems: contexts and channels.
Contexts
Contexts are the concurrency construct within EPL, allowing you to easily organise work into threads for parallel execution. Contexts contain monitor instances, listeners that belong to those instances, and a queue for delivering events to those listeners. You create a context by first creating a context object, passing in a name (used only for diagnostics, not an identifier), and whether the context is public or private (more on this later). Then, you specify what you want to run in that context by naming an action and ‘spawning’ to the desired context, as below:
monitor myMon
{
// Monitor scope variable
context myCtx;
action onload()
{
myCtx := context("myCtx", false);
spawn mySpawningAction("ETR:SOW") to myCtx;
}
action mySpawningAction(string param)
{
// do some work
}
}
When you spawn
to a context, you create a new instance of the monitor containing that action, which is started in the target context, running concurrently. So in the above example, we create a new context and spawn a new instance of myMon
to that context, which calls mySpawningAction()
when it starts. A new instance of a monitor is a deep copy of the monitor scope variables (in this case myCtx
) but it does not copy any listeners. Contexts are very lightweight objects and scale readily; creating many contexts for extremely finely grained parallelism is no problem at all. There is always one ‘main’ context running in the Correlator.
Now we know how to do some basic concurrent in EPL. Note that each monitor instance, and thus each context, has a completely independent state – no objects are shared between contexts or monitor instances. This ‘actor model’ removes the need for any locking or mutexes when manipulating data from EPL – each monitor instance’s state is only accessed from one thread or context. The primary way of transferring state between contexts is by sending events – these can be used to both notify something has happened, and to update state. EPL can send an event (which again is copied) from one context to another, like so:
monitor myMon
{
context myCtx;
action onload()
{
myCtx := context("myCtx", false);
spawn setupListener() to myCtx;
// send a MyEvent event to myCtx
send MyEvent() to myCtx;
}
action setupListener()
{
on all MyEvent()
{
// do some work
}
}
}
And that is how events travel within the Correlator. As an aside, we can also send to a sequence of contexts using the same ‘ send...to...
‘ syntax; this will send the event to each context in the sequence.
Channels
We don’t always have a handle to the contexts we want to send to. For example, when sending from an external system or even just a part of our EPL code in which we don’t have access to the desired context reference. To solve this problem we have channels. Like contexts, events can be sent to channels. Contexts can subscribe to channels, which means that any events sent to that channel will be sent to all subscribing contexts. Finally, channels are identified by their name (which is just a string
), meaning that you always have access to them. Public contexts, including the main context, are subscribed to the default channel, whose name is the empty string “”.
The below example combines what we’ve seen before with contexts with our new knowledge of channels:
monitor myMon
{
// these channels could be connected to an external service, such as a connectivity plugin
constant string myChan := "channel1";
constant string myChan2 := "channel2";
action onload()
{
context myCtx := context("myCtx", false);
spawn setupListener() to myCtx;
}
action setupListener()
{
// subscribe this monitor instance to the channel, myCtx is now subscribed to myChan (but the main context is not)
monitor.subscribe(myChan);
// listen for all the MyEvents coming in (from myChan)
on all MyEvent() as myEv;
{
// send it through to myChan2
send myEv to myChan2;
}
}
}
Event Expressions and Complex Pattern Matching
In the previous blog, we saw that listeners can filter on the fields of incoming events. Fortunately, we can do a lot more exciting things than that. Event expressions (the definition of listener patterns) can be thought of as their own little language, described with the following terms:
- Event template – a clause that specifies an event type and filtering options on the fields of that type. Incoming events are compared and matched against existing templates
- Operators – logical operations that can be used to modify/connect event templates together
- Event expression – a pattern made up of event templates and operators. This pattern defines an expression that must be completed before the associated listener is activated
- Complete – whether the conditions defined by the expression have been met. If it has, we say this expression is complete
Operators allow us to define more complex pattern matching logic by combining multiple event templates together with various extra conditions. There are operators for logical connectives, temporal operations, and even operators based on previously received events! Let’s have a look at some of the more common ones:
all
Often, we need to listen for each occurrence of an event expression rather than just the once; for this, we use the all
operator. The below expression will complete every time we receive a MyEvent
:
on all MyEvent()
and
We can create a logical intersection of two templates with the and
operator. The below expression will be complete when both templates are matched:
on MyEvent(id="important") and MyOtherEvent(id="important")
or & xor
Like the and
operator, we can use the or
operator to connect two templates, this time in a logical union. The below expression will be complete when either of the templates are matched:
on MyEvent(id="important") or MyOtherEvent(id="very_important")
Similarly, we can also use the xor
operator to create a logical exclusion. The below expression will be complete when only one of the templates are matched (i.e. sending in MyEvent(1, "important")
will not trigger this listener):
on MyEvent(val=1) xor MyEvent(id="important")
not
Sometimes it is useful to check if something hasn’t happened. For this purpose we have the not
operator, which creates a logical negation. The below expression is complete every time we receive a MyEvent
and haven’t yet received a MyOtherEvent
. As a side note, this listener will be removed internally if we do ever receive a MyOtherEvent
, as it can no longer ever be complete:
on all MyEvent() and not MyOtherEvent()
Followed By (->)
We may require some explicit ordering and want to take action when particular events follow each other. For this purpose we have the followed by operator, which is a right hand arrow ‘ ->
‘. The below expression is complete when a MyEvent
is followed by a MyOtherEvent
which is then followed by another MyEvent
. Note, these events don’t have to be contiguous, so an event stream like ‘ MyEvent
, MyOtherEvent
, MyCompletelyOtherEvent
, MyEvent
‘ would complete this expression:
on MyEvent() -> MyOtherEvent() -> MyEvent()
within
Temporal operations are a common requirement for CEP. For example, we may want to put a time limit on our expressions. The within
operator specifies a time in seconds within which the expression must complete, starting from when a constituent template is first activated. The below expression is complete when we receive both a MyEvent
and a MyOtherEvent
with the appropriate fields inside of 15.0 seconds. To clarify, in this expression the timer starts when either the MyEvent
or MyOtherEvent
is matched:
on all MyEvent(id="important") and MyOtherEvent(id="important") within 15.0
wait
We may want to only consider a certain number of completions within a time window, or only examine sequential events if they have a certain amount of time between them. The wait
operator specifies a time in seconds to wait before it can be considered complete. The below expressions are complete when we receive a MyEvent
but with only one completion per 2 seconds, and when we receive a MyEvent
followed by another MyEvent
at least 2 seconds later:
on all wait(2.0) and MyEvent()
on all MyEvent() -> wait(2.0) -> MyEvent()
at
Finally, we may want to take some action at a specific time, such as filing off a report every weekday, saving to a file every hour, or even to perform some action at 11:56 on the 12th day of March, July and November. The time patterns used in this operator are a bit too expansive for this blog, but you can find more information in the documentation. For reference, the below expression completes at midnight:
on all at(0, 0,*,*,*)
Combining the above
As alluded to earlier, the real power in event expressions comes from combining all the previous operators and filtering we’ve seen together to create complex patterns. Take a look at this listener:
on all ((MySensor(val > 90) and MyOtherSensor(id="temp", val > 200)) or Warning()) -> not AllClear() within 7.0
This listener fires every time when
- a
MySensor
event AND aMyOtherSensor
event are received with the appropriate values, OR just aWarning
event - and then if we do NOT receive an
AllClear
event FOLLOWING this - WITHIN a 7 second window
Putting it all together
So how would all of this come together in a real application? Let’s consider a simple program that monitors IoT devices, such as in a smart factory. We receive all our messages from a connectivity plug-in, i.e. on a channel. For performance reasons, we have decided we want each device we are looking at to be processed in its own context. Finally, to handle the management and monitoring of these devices, we will have a more advanced event expression:
monitor LineManager
{
constant string externalSystem := "myDeviceChain";
constant float warningBufferTime := 7.0;
dictionary<string, context> deviceNamesToContext;
action onload()
{
// Subscribe to the external systems control channel
monitor.subscribe(externalSystem);
on all Begin() as b
{
if (!deviceNamesToContext.hasKey(b.name))
{
context ctx := context(b.name, false);
deviceNamesToContext.add(b.name, ctx);
spawn beginMonitoring(b.name) to ctx;
}
}
}
action beginMonitoring(string name)
{
// Subscribe to the specific sensors channel
monitor.subscribe(name);
on all (SensorReading(deviceId="thermometer", fVal > 195.0) or Warning(device=name)) -> not AllClear(device=name) within warningBufferTime
{
// looks like there's an immediate problem! better take care of it
}
}
}
As always, please post on our forums with any questions you may have. Thanks and happy downloading!
Disclaimer:
Utilities and samples shown here are not official parts of the Software AG products. These utilities and samples are not eligible for technical support through Software AG Customer Care. Software AG makes no guarantees pertaining to the functionality, scalability, robustness, or degree of testing of these utilities and samples. Customers are strongly advised to consider these utilities and samples as “working examples” from which they should build and test their own solutions