More advanced EPL: Contexts, channels and event filtering

In the Apama EPL first steps blog, we explored the basic concepts and constructs used to create EPL programs. In this blog, we will build upon this and look at some more advanced features of the EPL language, namely those around sending and receiving events, as well as extending our knowledge of filtering specific patterns of events.

Sending and receiving

We know that EPL can process incoming events with a listener, but where do these events come from? How does the Correlator send and receive events for internal communication? The answer is in two related systems: contexts and channels.

Contexts

Contexts are the concurrency construct within EPL, allowing you to easily organise work into threads for parallel execution. Contexts contain monitor instances, listeners that belong to those instances, and a queue for delivering events to those listeners. You create a context by first creating a context object, passing in a name (used only for diagnostics, not an identifier), and whether the context is public or private (more on this later). Then, you specify what you want to run in that context by naming an action and ‘spawning’ to the desired context, as below:

monitor myMon
{
    // Monitor scope variable
    context myCtx;
 
    action onload()
    {
        myCtx := context("myCtx", false);
        spawn mySpawningAction("ETR:SOW") to myCtx;
    }
 
    action mySpawningAction(string param)
    {
        // do some work
    }
}

When you spawn to a context, you create a new instance of the monitor containing that action, which is started in the target context, running concurrently. So in the above example, we create a new context and spawn a new instance of myMon to that context, which calls mySpawningAction() when it starts. A new instance of a monitor is a deep copy of the monitor scope variables (in this case myCtx ) but it does not copy any listeners. Contexts are very lightweight objects and scale readily; creating many contexts for extremely finely grained parallelism is no problem at all. There is always one ‘main’ context running in the Correlator.

Now we know how to do some basic concurrent in EPL. Note that each monitor instance, and thus each context, has a completely independent state – no objects are shared between contexts or monitor instances. This ‘actor model’ removes the need for any locking or mutexes when manipulating data from EPL – each monitor instance’s state is only accessed from one thread or context. The primary way of transferring state between contexts is by sending events – these can be used to both notify something has happened, and to update state. EPL can send an event (which again is copied) from one context to another, like so:

monitor myMon
{
    context myCtx;
 
    action onload()
    {
        myCtx := context("myCtx", false);
        spawn setupListener() to myCtx;
 
        // send a MyEvent event to myCtx
        send MyEvent() to myCtx;
    }
 
    action setupListener()
    {
        on all MyEvent()
        {
            // do some work
        }
    }
}

And that is how events travel within the Correlator. As an aside, we can also send to a sequence of contexts using the same ‘ send...to... ‘ syntax; this will send the event to each context in the sequence.

Channels

We don’t always have a handle to the contexts we want to send to. For example, when sending from an external system or even just a part of our EPL code in which we don’t have access to the desired context reference. To solve this problem we have channels. Like contexts, events can be sent to channels. Contexts can subscribe to channels, which means that any events sent to that channel will be sent to all subscribing contexts. Finally, channels are identified by their name (which is just a string ), meaning that you always have access to them. Public contexts, including the main context, are subscribed to the default channel, whose name is the empty string “”.

The below example combines what we’ve seen before with contexts with our new knowledge of channels:

monitor myMon
{
    // these channels could be connected to an external service, such as a connectivity plugin
    constant string myChan := "channel1";
    constant string myChan2 := "channel2";
 
    action onload()
    {
        context myCtx := context("myCtx", false);
        spawn setupListener() to myCtx;
    }
 
    action setupListener()
    {
        // subscribe this monitor instance to the channel, myCtx is now subscribed to myChan (but the main context is not)
        monitor.subscribe(myChan);
 
        // listen for all the MyEvents coming in (from myChan)
        on all MyEvent() as myEv;
        {
            // send it through to myChan2
            send myEv to myChan2;
        }
    }
}

Event Expressions and Complex Pattern Matching

In the previous blog, we saw that listeners can filter on the fields of incoming events. Fortunately, we can do a lot more exciting things than that. Event expressions (the definition of listener patterns) can be thought of as their own little language, described with the following terms:

  • Event template – a clause that specifies an event type and filtering options on the fields of that type. Incoming events are compared and matched against existing templates
  • Operators – logical operations that can be used to modify/connect event templates together
  • Event expression – a pattern made up of event templates and operators. This pattern defines an expression that must be completed before the associated listener is activated
  • Complete – whether the conditions defined by the expression have been met. If it has, we say this expression is complete

Operators allow us to define more complex pattern matching logic by combining multiple event templates together with various extra conditions. There are operators for logical connectives, temporal operations, and even operators based on previously received events! Let’s have a look at some of the more common ones:

all

Often, we need to listen for each occurrence of an event expression rather than just the once; for this, we use the all operator. The below expression will complete every time we receive a MyEvent :

on all MyEvent()

and

We can create a logical intersection of two templates with the and operator. The below expression will be complete when both templates are matched:

on MyEvent(id="important") and MyOtherEvent(id="important")

or & xor

Like the and operator, we can use the or operator to connect two templates, this time in a logical union. The below expression will be complete when either of the templates are matched:

on MyEvent(id="important") or MyOtherEvent(id="very_important")

Similarly, we can also use the xor operator to create a logical exclusion. The below expression will be complete when only one of the templates are matched (i.e. sending in MyEvent(1, "important") will not trigger this listener):

on MyEvent(val=1) xor MyEvent(id="important")

not

Sometimes it is useful to check if something hasn’t happened. For this purpose we have the not operator, which creates a logical negation. The below expression is complete every time we receive a MyEvent and haven’t yet received a MyOtherEvent . As a side note, this listener will be removed internally if we do ever receive a MyOtherEvent , as it can no longer ever be complete:

on all MyEvent() and not MyOtherEvent()

Followed By (->)

We may require some explicit ordering and want to take action when particular events follow each other. For this purpose we have the followed by operator, which is a right hand arrow ‘ -> ‘. The below expression is complete when a MyEvent is followed by a MyOtherEvent which is then followed by another MyEvent . Note, these events don’t have to be contiguous, so an event stream like ‘ MyEvent , MyOtherEvent , MyCompletelyOtherEvent , MyEvent ‘ would complete this expression:

on MyEvent() -> MyOtherEvent() -> MyEvent()

within

Temporal operations are a common requirement for CEP. For example, we may want to put a time limit on our expressions. The within operator specifies a time in seconds within which the expression must complete, starting from when a constituent template is first activated. The below expression is complete when we receive both a MyEvent and a MyOtherEvent with the appropriate fields inside of 15.0 seconds. To clarify, in this expression the timer starts when either the MyEvent or MyOtherEvent is matched:

on all MyEvent(id="important") and MyOtherEvent(id="important") within 15.0

wait

We may want to only consider a certain number of completions within a time window, or only examine sequential events if they have a certain amount of time between them. The wait operator specifies a time in seconds to wait before it can be considered complete. The below expressions are complete when we receive a MyEvent but with only one completion per 2 seconds, and when we receive a MyEvent followed by another MyEvent at least 2 seconds later:

on all wait(2.0) and MyEvent()

on all MyEvent() -> wait(2.0) -> MyEvent()

at

Finally, we may want to take some action at a specific time, such as filing off a report every weekday, saving to a file every hour, or even to perform some action at 11:56 on the 12th day of March, July and November. The time patterns used in this operator are a bit too expansive for this blog, but you can find more information in the documentation. For reference, the below expression completes at midnight:

on all at(0, 0,*,*,*)

Combining the above

As alluded to earlier, the real power in event expressions comes from combining all the previous operators and filtering we’ve seen together to create complex patterns. Take a look at this listener:

on all ((MySensor(val > 90) and MyOtherSensor(id="temp", val > 200)) or Warning()) -> not AllClear() within 7.0

This listener fires every time when

  • a MySensor event AND a MyOtherSensor event are received with the appropriate values, OR just a Warning event
  • and then if we do NOT receive an AllClear event FOLLOWING this
  • WITHIN a 7 second window

Putting it all together

So how would all of this come together in a real application? Let’s consider a simple program that monitors IoT devices, such as in a smart factory. We receive all our messages from a connectivity plug-in, i.e. on a channel. For performance reasons, we have decided we want each device we are looking at to be processed in its own context. Finally, to handle the management and monitoring of these devices, we will have a more advanced event expression:

monitor LineManager
{
    constant string externalSystem := "myDeviceChain";
    constant float warningBufferTime := 7.0;
    dictionary<string, context> deviceNamesToContext;
 
    action onload()
    {
        // Subscribe to the external systems control channel
        monitor.subscribe(externalSystem);
 
        on all Begin() as b
        {
            if (!deviceNamesToContext.hasKey(b.name))
            {
                context ctx := context(b.name, false);
                deviceNamesToContext.add(b.name, ctx);
                spawn beginMonitoring(b.name) to ctx;
            }
        }
    }
 
    action beginMonitoring(string name)
    {
        // Subscribe to the specific sensors channel
        monitor.subscribe(name);
 
        on all (SensorReading(deviceId="thermometer", fVal > 195.0) or Warning(device=name)) -> not AllClear(device=name) within warningBufferTime
        {
            // looks like there's an immediate problem! better take care of it
        }
    }
}

As always, please post on our forums with any questions you may have. Thanks and happy downloading!

Disclaimer:
Utilities and samples shown here are not official parts of the Software AG products. These utilities and samples are not eligible for technical support through Software AG Customer Care. Software AG makes no guarantees pertaining to the functionality, scalability, robustness, or degree of testing of these utilities and samples. Customers are strongly advised to consider these utilities and samples as “working examples” from which they should build and test their own solutions