Apama Streaming Analytics version 10.15.3 and above. Cumulocity IoT version 10.17 and above.
The task
The task we’re going to tackle in this post is, given a list of devices in the form of an EPL sequence<ManagedObject>
, to check for any active alarms for each of the devices and then clear all of those alarms. Finally, we want to know how many alarms need clearing and when all of those alarms have been cleared. Using just regular EPL constructs, you would need to:
- Create some sequences to hold in-flight pending events:
sequence<integer> outstandingIds := new sequence<integer>;
sequence<Alarm> allAlarms := new sequence<Alarm>;
- Iterate over the sequence of managed objects:
ManagedObject o;
for o in objects {
- Send a bunch of
FindAlarm
requests:
integer reqId := Util.generateReqId();
send FindAlarm(reqId, { "source" : o.id }) to FindAlarm.SEND_CHANNEL;
outstandingIds.append(reqId);
- Listen for all the
FindAlarmResponse
events and count the events to be cleared:
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
on all FindAlarmResponse(reqId=reqId) as resp and not FindAlarmResponseAck(reqId=reqId) {
allAlarms.append(resp.alarm);
}
- Listen for all of the
FindAlarmResponseAck
events and record that we’ve received them:
on FindAlarmResponseAck(reqId=reqId) {
outstandingIds.remove(outstandingIds.indexOf(reqId));
- Once all of them have been received, log that they’re all cleared:
if outstandingIds.size() = 0 then {
monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL));
Alarm a;
for a in allAlarms {
a.status := "CLEARED";
send a to Alarm.SEND_CHANNEL;
}
log "Cleared "+allAlarms.size().toString()+" alarms from "+objects.size().toString()+" devices" at INFO;
}
Functional programming
The EPL Functional Library provides various functional operations, such as filter
, map
and reduce
, which operate on EPL container types. There are also several functor actions and predicates which can help you use these operators. There are two APIs for accessing the functional operators: com.apama.functional.Fn
and com.apama.functional.Functional
.
Firstly, you can use the Fn
type, which provides static functions which operate on containers, for example:
sequence<integer> evens := <sequence<integer>> Fn.filter(numbers, Fn.even);
Secondly, you can use the Functional
type, which wraps your container and provides the functional operators as instance methods, each one returning a new Functional
wrapping the result container. This allows the chaining of multiple operators in a fluent style. For example, this code wraps the original sequence of numbers, filters out just the even numbers, and then adds them up, yielding a single integer as the result:
integer evenSum := <integer> Functional(numbers).filter(Fn.even).reduce(Fn.sum);
Besides operations on static containers, EPL is a language designed to handle streams of events. There are also functional-style ways of doing that. For example, if you have a variable number of outstanding operations and you need to wait for a completed event from each one before continuing you could use:
Functional(sequenceIDs).waitForAllCompleted( "Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout);
Partially binding functions
Functional programming, and APIs that rely on callbacks, often need to provide additional arguments to functions that will be invoked by the APIs. The EPL Functional Library allows you to partially bind arguments to actions and provide those as an argument to the API. This is similar to how a lambda would be used in other languages. For example:
Fn.map(["Bob", "Alice"], Fn.partial(Fn.concat, "Hello ")) // returns ["Hello Bob", "Hello Alice"]
Managing alarms functionally
We’re going to use all of the features above to implement the alarm clearing in a functional fashion. To do this we will write a single Functional
chain, using a few simple helper actions. The main functional component looks like this:
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
sequence<Alarm> allAlarms := new sequence<Alarm>;
any _ := Functional(objects)
.map(Fn.getEntry("id")) // get the id from each object.
.map(sendFindAlarm) // send a find alarm for each, returning the ids.
.map(Fn.partial(receiveAlarms, [allAlarms])) // listen for all those alarms.
.waitForAllCompleted(FindAlarmResponseAck.getName(), "reqId", Fn.partial(clearAllAlarms, [allAlarms])); // clear all the alarms we found.
This can be read as follows:
- Take my list of managed objects and get the id from each one.
- For each id, call the
sendFindAlarm
functor, which will send a request and return the request ids. - For each request id, call
receiveAlarms
, which will listen for all of the alarms from that object and build theallAlarms
sequence. - Finally wait for all of the ids to be complete and call
clearAllAlarms
with the built up sequence of alarms.
There’s logic in the helper actions, but the functional style allows you to see the overall logic in a compact fashion together and then drill down into the details when you need it.
The sendFindAlarm
action is very simple and self-explanatory:
action sendFindAlarm(string source) returns integer
{
return <integer> Fn.sendToChannel(FindAlarm.SEND_CHANNEL,
FindAlarm(Util.generateReqId(), {"source": source.toString() }))
.getEntry("reqId");
}
The receiveAlarms
function uses another part of the Functional
API:
action receiveAlarms(sequence<Alarm> allAlarms, integer reqId) returns integer
{
any _ := Fn.getAllEvents(FindAlarmResponse.getName(), {"reqId":<any>reqId},
FindAlarmResponseAck.getName(), {"reqId":<any>reqId},
Fn.partial(appendSequence, [allAlarms]));
return reqId;
}
Here we’re asking Functional
to collect together all of the responses received before the acknowledgement.
Finally, clearAllAlarms
is also implemented as a functional call:
action clearAllAlarms(sequence<Alarm> allAlarms)
{
monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
any _ := Functional(allAlarms)
.map(Fn.setEntry("status", "CLEARED"))
.map(Fn.partial(Fn.sendToChannel, Alarm.SEND_CHANNEL));
}
Further reading
You can see the full sample in the latest release of Apama in the samples along with the full documentation of using functional operations in EPL and the API doc.
This article is part of the TECHniques newsletter blog - technical tips and tricks for the Software AG community. Subscribe to receive our quarterly updates or read the latest issue.