Creating your own Apama Connectivity Plugins

Apama offers a multitude of ways to connect to external systems, and one of the easiest and powerful ways is via Connectivity plug-ins. They can be written in either Java or C++ and run inside the Correlator process.

You can combine individual plugins to form chains that define the path of a message, with the correlator host process at one end and an external system or library at the other, with an optional sequence of message mapping transformations between them.

To describe the chain of plug-ins, you can use a configuration file written in the YAML markup language. Every chain consists of:

  • A host plug-in – responsible for sending and receiving messages to and from the correlator process hosting the chain
  • A Transport plug-in – responsible for sending and receiving messages to and from an external system
  • Codec plug-ins – one or more of which can be used for applying transformations to the messages

You can read comprehensive details about Connectivity Plugins in our documentation (specifically ‘Connecting Apama Applications to External Components‘), which is provided with the full Community Edition installer package and is also viewable and downloadable from the Doc section on this site.

To help you get started we’re providing a sample CSV codec plug-in, which you can use to read and write Comma Separated Value format files – for instance, spreadsheets. We are also bundling a simple File transport to read and write from files. Both the codec and transport are written in Java.

This CSV codec can translate messages both towards the host (correlator) and towards the transport. It extends the ‘com.softwareag.connectivity.AbstractSimpleCodec’ class, overriding both ‘transformMessageTowardsHost’ and ‘transformMessageTowardsTransport’ methods, performing alteration on the payload of the supplied Message parameter, and returning a new Message object that is passed onto the next plug-in in the chain.

Below is an example ‘transformMessageTowardsHost’ method which converts the String payload from Message (which in our example is a line read in from a file via the File Transport) into an event Map where the key is an event field name and the value is from the split String. The event field names/keys have been set from the Codec config during construction, and in this example they are ‘name’, ‘number’ and ’email’.

 * Converts the message in CSV format to event map.
 * @param message The message, guaranteed to be non-null and have a non-null payload.
 * @return a map as a payload of a message, or null if it is to be discarded.
public Message transformMessageTowardsHost(Message message) throws Exception {
    // Extract the String payload from the message
    String msg = (String) message.getPayload();
    // For the purposes of demonstration, we will allow messages delimited
    // by any of ',', tabs or ';'.
    String delimiters = "\\,|\\t|\\;";
    Object[] arr = msg.split(delimiters);
    // Construct a Map of field names (keys) to values
    // keys has been populated by the Codec config during construction
    Map<String, Object> map = new LinkedHashMap<String, Object>();
    for (int i = 0; i < arr.length; i++) {
        // strip any leading/trailing whitespace off and put it in the map
        map.put(keys[i], ((String)arr[i]).trim());
    // Set the payload of the message to an event Map and return it
    return message;

The returned Message from this method could be passed on to another Codec for further processing, but in our example it is converted into an Event for the correlator to process. Below is the definition of this Test Event.

event Test {
    string name;
    integer number;
    string email;

The ‘name’, ‘number’ and ’email’ fields are populated from the key/values in the Map from the Message. Note the ‘number’ field is an integer and the correlator will attempt to convert the String value in the map to an integer. This Event can then be processed by the correlator. Below is example EPL monitor code which will listen for all Test events, log what was received and reflect the Test event back to the plug-in chain for outbound processing towards the transport. In our example the plug-in chain has been configured to listen on the ‘data’ channel.

package com.softwareag.samples;
using com.softwareag.connectivity.ConnectivityPlugins;
monitor Simple {
    ConnectivityPlugins connectivity;
    action onload() {
        Test test;
        on all Test():test {
            log "Received: "+test.toString();
            // Send Test events to the data channel the CSV Plugin is registered for
            send test to "data";
        // Signify our application is ready to receive data

The ‘transformMessageTowardsTransport’ method in the Codec does the above but in reverse, it extracts a Map from the Message payload and constructs a single comma separated String from the Map values.

The Readme file included cover the basics of running a ‘test’ of the plug-in – and we’d encourage you to explore the code for the codec, the transport and the YAML file together (alongside the simple test EPL file). By looking at those together, you’ll get a good idea how the plug-in system works as a whole. You’re free to explore the code and modify it as you wish for your projects. Indeed, we’d encourage you to do so! As we all know, a great way to learn is by experimentation – so go ahead and make some alterations and see how it affects the results.

As always, please post on our forums with any questions you may have. Thanks and happy downloading!

File Transport on GitHub
CSV Codec on GitHub

Utilities and samples shown here are not official parts of the Software AG products. These utilities and samples are not eligible for technical support through Software AG Customer Care. Software AG makes no guarantees pertaining to the functionality, scalability, robustness, or degree of testing of these utilities and samples. Customers are strongly advised to consider these utilities and samples as “working examples” from which they should build and test their own solutions