Getting started with Streaming Analytics EPL apps

Introduction

Many IoT Solutions (if not all) need to perform some real-time analytics on their connected devices data. Cumulocity IoT offers different tools to achieve this; starting from the easy-peasy Smart Rules, moving to a more business-users approach with the Analytics Builder app and finishing with its more in depth EPL App tool which is primarily focused for developers. This article is aimed for the first-time EPL developers who want to try it out and create their first real-time analytics rule. The example used in this article is very easy to understand so you can comprehend how EPL is working and how to structure your analytics rule. So, let’s get started!

Great! Wait… but what is EPL?

sheldon-sheldon-cooper

Let’s keep it sweet and short. EPL stands for Event Processing Language, and it is the programming language used by Apama.
Apama is our in-memory system using a highly performant core engine called the “correlator” which has been specifically designed to process very quickly a large volume of events in real-time. Apama uses EPL because events are the fundamental base of everything; meaning Apama understands ONLY events. So all the communication in Apama is done via events.
More info on Apama can be found here.

Create your first analytics rule

For this article, we will create an easy-to-follow rule using EPL. A step-by-step approach will be provided with explanation on what you are trying to achieve and how to do it . Some notions and core concepts of EPL will be provided as well.

Context

The example used in this article is kind of mind-blowing and definitely a useful use-case…
it-crowd

Well no, it’s not… But it is a good base for developers who can then adapt it to meet their business needs. The aim of the analytics rule is to check the difference in value of 2 subsequent c8y_Temperature measurements originating from the same device. If the difference is above a certain threshold, then an Alarm is raised and an email is sent. Obviously, your Cumulocity tenant needs to have at least one device connected to the platform and sending c8y_Temperature measurements (tip: you can create a Simulator if there is no device :wink: ).
Ready? Let’s go!

1.Create the Monitor

A monitor is one of the basic unit of program execution in EPL. You can think of it like kind of a bit of a class. A monitor groups together variables declaration and actions (aka methods) to define some business logic. Each monitor has an action called onload() which is the monitor’s entry point; meaning this is where the code will start to be executed when the monitor is injected in the Apama correlator. You can consider the onload() action like the main() method in C/C++; except that every monitor must have an onload() action.

To create your first monitor, perform the below steps:

  1. Connect to your Cumulocity Tenant and click on the App Switcher > Streaming Analytics
  2. Navigate to EPL Apps
  3. Click on the button +New EPL App
  4. In the popup, give a meaningful name to your rule; like “Example1”… :stuck_out_tongue: no, I said meaningful… so let’s call it “AlarmOnDeltaTemperature”
  5. Provide a description if you want to
  6. Click on Done
  7. Scroll down a bit and rename monitor MySampleMonitor (around line 36) to monitor AlarmOnDeltaTemperature
  8. Save it

There you have it, your first monitor! And since, we, at Software AG, are very nice people, you already have very useful imports in your EPL App as well as the onload() action declared in the monitor.
tumblr_mmp3se1kn91sqxujoo1_500 (1)

2.Create the listeners

2.a Your first listener

As mentioned previously, Apama understands only events. This means that all data being exchanged is in form of events. Luckily, the Cumulocity domain model has corresponding Apama events (e.g. Measurement, ManagedObject, Alarms… tip: you can easily find those via the imports in your monitor). In our rule, we want to capture temperature measurements; so basically, we want our EPL code to trigger whenever a Cumulocity measurement of type temperature is sent from a device. But how do we trigger this code? Well, by listening to the related event via a listener.
A listener is the EPL way to trigger some logic in your code. It specifies either one event or a sequence of events and, whenever the Apama correlator receives this event, the listener is triggered.

A listener is constructed with:

  • the on keyword
  • followed by the event name
  • some filtering on the fields of the event if needed

For Cumulocity events, you can find the documentation here which specifies the different fields available for each event type.

Let’s use our example: we want to listen for measurements of type temperature:

  • the on keyword → on
  • followed by the event name → MeasurementFragment
  • some filtering on the fields of the event if needed → type="c8y_Temperature"

So end-to-end, our listener would look like: on MeasurementFragment(type=c8y_Temperature)

Well, we are almost where we need to be. At the moment, the above listener will trigger only once; meaning your device might send multiple temperature measurements but the listener will trigger only on the first one and terminate after the first match.
We actually want to listen to all the temperature measurements from the device, and to do that we just need to add the all keyword.

Finally, let’s discuss about the notion of channels. Events are transported to/from the correlator via channels. Each Cumulocity event category is sent to its specific channel (measurement channel, managed object channel, alarm channel etc…). If you want your listener to trigger for a Cumulocity event, you need to subscribe to the corresponding channel to ensure you do not miss those events.

To add your first listener to your EPL rule, perform the below steps:

  1. Open your rule named AlarmOnDeltaTemperature
  2. Scroll to the onload() action
  3. Edit the action by appending the below code:
action onload() {
       // Subscribe to the measurement fragment channel so the monitor will receive this type of events
       monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);

		// Listen for measurements of type c8y_Temperature
		on all MeasurementFragment(type="c8y_Temperature") {
           log "Measurement Temperature received";
		}
	}
  1. Save your EPL rule
2.b Second listener

Okay so now we know that all temperature measurements will be listened for in the EPL rule. In our example, we want to calculate the delta between subsequent measurements coming from the same device.

Two keywords / sentences here.

  • Subsequent: the beauty of EPL is that it is quite a powerful language. You can create as many nested listener(s) as needed. A nested listener will be created only after its parent listener is triggered. This is very useful when you want to build a business logic based on a sequence of events.

    For our EPL rule, we want to listen for 2 measurements following one another. Therefore let’s adapt the code to look like below:

action onload() {
       // Subscribe to the measurement fragment channel so the monitor will receive this type of events
       monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);

		// Listen for measurements of type c8y_Temperature
		on all MeasurementFragment(type="c8y_Temperature") as m1 {
			on MeasurementFragment(type="c8y_Temperature") as m2 {
				log "Measurement Temperature received";
			}
			
		}
	}

You will notice that it is possible to assign your listener to a variable using the keyword as at the end of the listener declaration.

Warning Alert!
tumblr_md509uvagv1rwjzpqo1_500

Be careful! You need to use the all keyword with parsimony. Indeed, imagine if we had an all for our second listener, then it means that for every single temperature measurement received, we are going to listen again to all the following temperature measurements. Not 1, not 2 nor 3… but ALL the measurements. The picture below might explain the snowball effect clearly.

This would consume a lot of memory as effectively every measurement sent will trigger potentially hundreds or thousands of listeners and the business logic within them.

In our example, we do not put the all keyword on the second listener. Indeed, we just want to calculate the difference in Temperature with the immediate subsequent measurement; so we are happy for our nested listener to terminate once the second measurement has been received.

  • coming from the same device: in the current state of the rule, you are listening to all the measurements and their immediate subsequent measurement without filtering on anything else but the type. This means that these listeners can trigger for different devices. For e.g., if the first listener triggers for Device D1 and the nested listener triggers for Device D2 and they generate temperatures in totally different ranges, the delta might always exceed the threshold and raise an alarm. So let’s change the rule to add a filter and make sure the second listener is triggered on the same device as the first listener:
action onload() {
       // Subscribe to the measurement fragment channel so the monitor will receive this type of events
       monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);

		// Listen for measurements of type c8y_Temperature
		on all MeasurementFragment(type="c8y_Temperature") as m1 {
			//Listen for the immediate subsequent measurement of same type and same source
			on MeasurementFragment(type=m1.type, source=m1.source) as m2 {
				log "Measurement Temperature received for Device Id " + m1.source at INFO;
			}
		}
	}

As you can see, the m1 variable is being used to extract the values of event’s fields, type and *source* and apply a filter.

Do not forget to Save your EPL rule.

3.Calculate the delta

We want to check the difference between 2 consecutive measurements and raise an Alarm only if this delta is above a certain threshold. So let’s add the logic in our code:

  1. Edit your EPL rule and navigate within the second listener.
  2. Add the below code:
         on MeasurementFragment(type=m1.type, source=m1.source) as m2 {
				log "Measurement Temperature received for Device Id " + m1.source at INFO;
				float delta := (m2.value - m1.value).abs();
				if (delta > 30.0) 
				{
					string text := "The  delta in temperature from device " + m1.source + " exceeded the threshold of 30.0. The delta was of " + delta.toString() + " (measurements value of " + m1.value.toString() + " followed by " + m2.value.toString() +  ".";
					log text at WARN;
				}
			}
  1. Save your code

4.Create the global variables

You can create variables locally within your actions or globally in your monitor. Global variables will be accessible across all the actions in your monitor.

We usually like to declare variables globally when we know they will be used multiple times in the monitor; or when they are being used to store configurable values. It is a good practice to declare the global variables at the top of the monitor file, so it is quick and easy to change their value if needed without having to search through the entire code. For example, in the code above, we have the threshold value right in the middle of the file. It is not easy for a user to remember where to find it if he wants to change the value in like 3 months’ time.

Let’s have a think about our EPL rule… what data will be most likely used several times or containing configurable value:

  • the threshold: we want to raise an alarm and email when a threshold is breached.
  • the alarm severity: what is the severity of the alarm being raised
  • the alarm type: what is the type of the alarm being raised
  • the email recipient: to which address the email will be sent to

Let’s add the global variables just below the monitor declaration:

  1. Edit your EPL rule monitor and navigate around line 37.
  2. Add the below code:
     monitor AlarmOnDeltaTemperature  {
	constant float THRESHOLD := 30.0;
	constant string ALARM_TYPE := "Threshold_Exceeded";
	constant string ALARM_SEVERITY := "MAJOR";
	constant string EMAIL_TO := "test@test.com";
  1. Update the THRESHOLD to a suitable value, which will be exceeded by device’s temperature measurement deltas.
  2. Update the email address to your email address where you would like to receive notifications.
  3. Now let’s use the THRESHOLD value in the code we added in the previous chapter:
              on MeasurementFragment(type=m1.type, source=m1.source) as m2 {
				log "Measurement Temperature received for Device Id " + m1.source at INFO;
				float delta := (m2.value - m1.value).abs();
				if (delta > THRESHOLD) 
				{
					string text := "The  delta in temperature from device " + m1.source + " exceeded the threshold of " + THRESHOLD.toString()
					+ ". The delta was of " + delta.toString() + " (measurements value of " + m1.value.toString() + " followed by " + m2.value.toString() +  ".";
					log text at WARN;
				}
			}
  1. Save your rule

Note: I like to declare my global variables in capital letter so that I can differentiate between the local and global variables. It is a preference and not at all mandatory.

5.Generate the Alarm

Let’s start building our business logic. At the moment we are just logging that the threshold was exceeded. Instead we want to raise an alarm and send an email. Let’s focus on the Alarm first.
We all like to have our code readable…Right?
theoffice

So let’s create a new action for the alarm generation, this will avoid having a big block of code in the onload action.

  1. Edit your EPL rule and navigate at the end of the onload action
  2. Add the below code:
action generateAlarm(string deviceId, string text) {
		Alarm a := new Alarm;
		a.source := deviceId;
		a.type := ALARM_TYPE;
		a.severity := ALARM_SEVERITY;
		a.time := currentTime;
		a.text := text;

        log "Alarm generated " + a.toString() at INFO;
		send a to Alarm.SEND_CHANNEL;
	}
  1. Now replace the log text at WARN; statement in the onload action by
generateAlarm(m1.source, text);

  1. Save your rule

Notice that when we generate an Alarm, we need to send it to its related channel so Cumulocity receives it and create the alarm in the MongoDB. This is true for any other types of Cumulocity events, when you create a new one then you need to ensure you send it to its proper channel.

6.Send the email

Again,we are going to create a separate method for sending the email.

  1. Edit your EPL rule and at the top of it, add a new import

using com.apama.cumulocity.SendEmail;

  1. Navigate to the end of the generateAlarm action and add the below code:
action sendEmail(string text) {
		SendEmail se := new SendEmail;
		se.receiver := [EMAIL_TO];		
		se.subject := "Delta in Temperature exceeded";
		se.text := text;
		se.replyTo := "noreply@test.com";

		log "Email sent " + se.toString() at INFO;
		send se to SendEmail.SEND_CHANNEL;
	}
  1. Now go back to the onload() action and below generateAlarm(m1.source, text); add the call to the above action
generateAlarm(m1.source, text);
sendEmail(text);
  1. Save your rule

Test your analytics rule

Your first rule is done! It is now time to enable it and test it :slight_smile:
First, activate it by clicking on the “Inactive” button the top right. Its status should move to Active

You can follow the progress of the rule in the correlator log file. To do that, you would need Admin rights on the tenant. From there just go to:

  1. Administation App from the Application switcher
  2. Navigate to Applications > Subscribed applications
  3. Click on Apama-ctrl-1c-4g
  4. Click on the Logs tab
  5. Navigate to the bottom, you should see in real time the logs of your EPL rule

Then, ensure that at least one of your devices sends Temperature measurement and that the threshold is breached at some point in time (reduce the value of the THRESHOLD if needed).
Whenever the log states that an Alarm was generated and email sent, you can then navigate to your device to visualize the alarm and you should shortly receive an email!! Happy days :slight_smile:

Not working ?
hello-it-have-you-tried

Below is the complete rule; ensure that you did not make any mistake in your code.
Again, ensure that the threshold is indeed breached and that your device sends c8y_Temperature measurements.
If you still do not see any Alarm or Email, please do not hesitate to reach out to the community.

/** Basic event definitions */
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;

using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;


using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;

using com.apama.cumulocity.Operation;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;

using com.apama.cumulocity.ObjectCommitted;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.SendEmail;

/** Miscellaneous utilities */
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;

monitor AlarmOnDeltaTemperature  {
	constant float THRESHOLD := 30.0;
	constant string ALARM_TYPE := "Threshold_Exceeded";
	constant string ALARM_SEVERITY := "MAJOR";
	constant string EMAIL_TO := "test@softwareag.com";

	/** Initialize the application */
	action onload() {
		//Subscribe to the measurement fragment channel so the monitor will receive this type of events
		monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
		// Listen for measurements of type c8y_Temperature
		on all MeasurementFragment(type="c8y_Temperature") as m1 {
			//Listen for the immediate subsequent measurement of same type and same source
			on MeasurementFragment(type=m1.type, source=m1.source) as m2 {
				log "Measurement Temperature received for Device Id " + m1.source at INFO;
				float delta := (m2.value - m1.value).abs();
				if (delta > THRESHOLD) 
				{
					string text := "The  delta in temperature from device " + m1.source + " exceeded the threshold of " + THRESHOLD.toString()
					+ ". The delta was of " + delta.toString() + " (measurements value of " + m1.value.toString() + " followed by " + m2.value.toString() +  ".";
					generateAlarm(m1.source, text);
					sendEmail(text);
				}
			}

		}
	}

	action generateAlarm(string deviceId, string text) {
		Alarm a := new Alarm;
		a.source := deviceId;
		a.type := ALARM_TYPE;
		a.severity := ALARM_SEVERITY;
		a.time := currentTime;
		a.text := text;

		log "Alarm generated " + a.toString() at INFO;
		send a to Alarm.SEND_CHANNEL;
	}
	
	action sendEmail(string text) {
		SendEmail se := new SendEmail;
		se.receiver := [EMAIL_TO];		
		se.subject := "Delta in Temperature exceeded";
		se.text := text;
		se.replyTo := "noreply@test.com";

		log "Email sent " + se.toString() at INFO;
		send se to SendEmail.SEND_CHANNEL;
	}
}

Next steps

Thank you for reading this article till the end :slight_smile:
If you enjoyed doing this exercise and want to learn more about EPL or write more complex analytics, then you might be interested in some of our Training courses or Fast Track Services which will allow you to learn much more about Apama, EPL and how powerful it is…
17nY

Useful links | Relevant resources

What is Apama?
Apama Cumulocity Events Documentation
Apama EPL Fundamentals
Apama EPL Notions

4 Likes