Chain Managers – Dynamic Connectivity

Connectivity plug-ins can be written in Java or C++, and run inside the correlator process to allow messages to be sent and received to/from external systems

Individual plug-ins are combined together to form chains that define the path of a message, with the correlator host process at one end and an external system or library at the other, and with an optional sequence of message mapping transformations between them.

Refer TECHniques Blog post on Apama connectivity plug-ins.

Static and dynamic connectivity chains

A chain is a combination of plug-ins with configuration. Every chain consists of the following:

  • Codec plug-in . Optionally, one or more codec plug-ins are responsible for applying transformations to the messages (for example, the JSON codec in the following example) to prepare them for the next plug-in in the chain.
  • Transport plug-in . One transport plug-in is responsible for sending/receiving messages to/from an external system (for example, httpClient in the following example).
  • Host plug-in. One built-in host plug-in is responsible for sending/receiving messages to/from the correlator process that is hosting the chain. These are built-in plug-ins (which do not need to be specified in the connectivityPlugins stanza) which the correlator supports. Host plug-ins determine in which format events are passed in and out of the correlator. Thus, a chain should specify a host plug-in that is compatible with the next codec or transport element in the chain. Host plug-ins can also specify on which channel the chain receives events from the correlator, and can specify a default channel to send events in to the correlator (for example, apama.eventMap in the following example).

Each transport plug-in and codec plug-in used in the chain must also be described in the connectivityPlugins stanza. All of the plug-ins in a chain can optionally take further configuration, dependent on the type of plug-in.

Chains can be created statically using a YAML configuration file or dynamically at runtime.

How the chains are created depends on the type of transport. Connectivity chains can be created in three different ways as mentioned below (1, 2(a), 2(b)):

  1. Some transports have a dynamic chain manager which manages chain creation in a transport-specific way. For example, new chains may created in response to external requests (for example, for each connection made to the HTTP server connectivity plug-in), or when an Apama channel with a particular prefix is first used from EPL (for example, the Universal Messaging connectivity plug-in creates a chain by default when a channel beginning with “um:” is used from EPL, in a monitor.subscribe(…) method or a send … to statement).
    These transports always have a “dynamicChainManagers” section in their YAML configuration file. The connectivity chains are created dynamically by a transport chain manager plug-in, using chain definitions specified in “dynamicChains”.
  2. For transports that do not provide a dynamic chain manager,
    1. chains are created either statically using the “startChains” section of the YAML file
    2. or dynamically from EPL, using ConnectivityPlugins.createDynamicChain(chainInstanceId, channels, chainDefnName, …)” action, using chain definitions specified in “dynamicChains”.

Note: A transport that permits user-controlled chain creation never has a dynamicChainManagers section in its YAML configuration file.

A transport plug-in can control the lifetime of chains involving that transport, by providing a dynamic chain manager.

A static chain configuration file always contains “startChains” section. Chains under “startChains” are created at startup. The value of startChains is a map where each key is a string that names a chain. Each chain must contain one host plug-in, which is one of the built-in supported host plug-ins, optionally followed by a number of codecs, and end with a plug-in that is the transport.

A dynamic chain configuration file contains either dynamicChainManager or dynamicChains section or both. These chains are not created on startup.

  • The value of dynamicChainManagers is a map where each key is a manager name, that is, a string naming an instance of a dynamic chain manager plug-in class. Each value is a map providing the configuration for the chain manager instance (such as details for connecting to a specific external system) and the name of the transport plug-in it is associated with. The manager should have the following keys:
    • transport : Specifies the transport plug-in associated with this dynamic chain manager. This must match the key used in connectivityPlugins to load this chain manager, and also the name used in the dynamicChains definition to identify the transport plug-in at the end of each chain. This is the name used for the plug-in in the configuration file, not the name of the class that implements the plug-in.
    • managerConfig : Specifies the configuration map that will be passed to the chain manager constructor when it is created at startup. The available configuration options are defined by the plug-in author, therefore, see the plug-in’s documentation for details. If the managerConfig is invalid and the chain manager throws an exception, the correlator logs an error message and fails to start. The managerConfig usually includes details for connecting to a specific external server or system. Some chain managers may also provide some options that are set in the transport plug-in’s configuration section under dynamicChains, for example, options specific to the protocol or message format described by that chain definition.
  • The dynamicChains map is used to provide chain definitions that are used by chain manager plug-ins or EPL code that dynamically create chain instances after the correlator has started. Multiple instances of these chains can be created on demand from EPL using the ConnectivityPlugins EPL API.The dynamicChains map is used to provide chain definitions that are used by chain manager plug-ins or EPL code that dynamically create chain instances after the correlator has started.

Static Chains

A static chain configuration file may look like the following:

Sample YAML configuration file for Static Chain

connectivityPlugins:
   jsonCodec: 
      directory: ${APAMA_HOME}/lib/
      classpath:
            - json-codec.jar
      class: com.softwareag.connectivity.plugins.JSONCodec
   httpClient:
      libraryName: HTTPClientSample
      class: HTTPClient
   mapperCodec: 
      libraryName: MapperCodec
      class: MapperCodec
startChains: # This means, the chain is static and chains under it get created at startup
   weatherService:
      # built-in host plug-in is responsible for sending/receiving messages to/from the correlator process
      - apama.eventMap:
          defaultEventType: com.apamax.Weather
      # Mapper codec can be used to take messages from a transport which do not match the schema expected by the host
      # and turn them into messages which are suitable for the host. This is optional.
      - mapperCodec: 
          "*":
            towardsTransport:
              defaultValue:
                metadata.http.path: /data/2.5/weather?q=Cambridge,uk
                metadata.http.method: GET
      # codec plug-ins are responsible for applying transformations to the messages, this is optional
      - jsonCodec
      # Transport plug-in responsible for sending/receiving messages to/from an external system
      - httpClient: 
          host: api.openweathermap.org

Dynamic Chains

A dynamic chain configuration file may look like the following:

SampleConnectivity.yaml for Dynamic chain

connectivityPlugins:
  SampleTransport:
    libraryName: SampleTransport
    class: SampleTransportChainManager
    directory: ${Location_of_the_class_after _build}
dynamicChainManagers:
  TestManager:
    transport: SampleTransport
    managerConfig:
      channelPrefix: SampleTransport
dynamicChains:
  testChain:
    - apama.eventMap:
             defaultEventType: EventFromTransport
    - SampleTransport

Dynamic Chain Managers

A Chain Manager decide when to create or destroy chains, and typically functions by listening to channel subscriptions from the correlator host, and or listening to external connections. For example, on an EPL channel-created notification, the chain manager would check if there is an external topic/queue to which it can connect, and create a chain instance to connect to that topic/queue on demand. Alternatively, the chain manager may listen to accept new external connections, and each new connection can create a new chain instance.

In both cases, the chain manager will typically hold the connection to an outside system, which it will then pass to transport instances as they are created. Thus, the chain manager and transport are usually tightly coupled, and a chain manager can only create chains using its own transport class.

Every chain manager is required to implement the following:

  • CreateTransport (Java only; for C++, the transport’s constructor is invoked directly)
  • Start() – A typical chain manager would use its start() method to create any required connection(s) to external servers, and to add a ChannelLifecycleListener providing notifications when channels with a specific prefix are created or destroyed.
  • Shutdown() – This is the place to explicitly destroy all chains as part of the host shutdown. And after the chains have been destroyed, disconnect from the external system.
  • onChannelCreated(), via the ChannelLifecycleListener – which defines what to do when an EPL channel is created
  • onChannelDestroyed(), via the ChannelLifecycleListener – which defines what to do when an EPL channel is destroyed

Channel lifecycle listeners

The ChannelLifecycleListener helper class is used to define the chain creation/destruction around EPL channels. As above, they must implement the onChannelCreated() and onChannelDestroyed() functions. Channel Lifecycle Listeners avoid observing all EPL channels created or destroyed by specifying a prefix – only channels with this prefix will be passed to the lifecycle listener. For example, the inbuilt MQTT Plug-in uses “mqtt:” as the prefix by default, thus only EPL channels such as “mqtt:main_channel” will be observed by the MQTT lifecycle listener.

A Chain Manager will typically contain one ChannelLifecycleListener, which is added to the manager using the addChannelLifecycleListener function.

SampleTransport.cpp

#include <sag_connectivity_plugins.hpp>
#include <sag_connectivity_chain_managers.hpp>
#include <stdio.h>
#include <vector>
#include <string>
#include <algorithm>
#include <mutex>
#include <map>

using namespace com::softwareag::connectivity;
using namespace com::softwareag::connectivity::chainmanagers;

namespace sample_namespace {

    /** Sample transport connectivity plug-in class, demonstrating use of chain manager API.
    */
    class SampleTransport : public AbstractSimpleTransport
    {
    public:
        SampleTransport(const PluginConstructorParameters::TransportConstructorParameters &params,
            Direction direction
        ) : AbstractSimpleTransport(params), direction(direction)
        {
        }

        virtual void start()
        {
            logger.debug("Transport instance started.");
        }

        virtual void shutdown()
        {
            logger.debug("Transport instance shutdown.");
        }

        virtual void hostReady()
        {
            logger.debug("Transport instance hostReady.");

            // Simulate a message sent towards the host for demonstration purposes
            // a real transport would not do this            
            if (direction == Direction::TOWARDS_HOST) {
                hostSide->sendBatchTowardsHost(
                    Message{ map_t{ std::make_pair("s", "Hello World") } }
                );
            }
        }

        virtual void deliverMessageTowardsTransport(Message &m)
        {
            map_t &payload = get<map_t>(m.getPayload());
            const char* &str = get<const char*>(payload["s"]);
            logger.info("Sending message towards transport: %s", str);
        }

    private:
        Direction direction;
    }; //end of class SampleTransport


    /** Sample transport chain manager connectivity plug-in class, demonstrating use of chain manager API.
    */
    class SampleTransportChainManager : public AbstractChainManager < SampleTransport >
    {
    public:
        SampleTransportChainManager(const ChainManagerConstructorParameters &params)
            : AbstractChainManager(params)
        {
            // read chain manager configuration
            // optionally add some status reporter items to indicate whether connection is online and provide some statistics
            MapExtractor configEx(config, "config");
            channelPrefix = configEx.getStringDisallowEmpty("channelPrefix");
            configEx.checkNoItemsRemaining();
            // Select a desired chain definition.
            // Some transports only permit a singleton chain definition, others use the dynamic chain
            // definition identifier or something in the transport's configuration to select the right
            // chain
            chainDefinition = getChainDefinition(); // get singleton chain definition
        }

    private:
        /** ChannelLifecycleListener for channels prefixed with 'prefix', creates a transport for each channel/direction combination. */
        class SampleTransportChannelListener;
        std::string channelPrefix;
        ChainDefinition chainDefinition;
    }; //end of class SampleTransportChainManager

     
    class SampleTransportChainManager::SampleTransportChannelListener : public ChannelLifecycleListener
    {
    public:
        explicit SampleTransportChannelListener(SampleTransportChainManager &manager) : manager(manager) {}

        virtual void onChannelCreated(const std::string &channel, Direction direction)
        {
            std::unique_lock<std::mutex> lock(mutex);

            // Need to check if a chain already exists for this channel and direction
            auto it = chains.find(std::make_pair(channel, direction));
            if (it != chains.end())
            {
                // Chain for this channel and direction already exists, so no need to create a new one.
                //Some transports may wish to re-create the chain at this point if the configuration
                // of the channel on the external system has changed since the chain was first created
                return;
            }

            // Create a new chain, with a dedicated transport instance for each (channel, direction) pair.
            std::string chainInstanceId = ((direction == Direction::TOWARDS_TRANSPORT) ? std::string("ToTransport") : std::string("FromTransport")) + "[" + channel + "]";
            map_t substitutions;        // use this to provide @{...} substitution variables for use in the dyanmic chain's configuration
            list_t subscribeChannels;

            if (direction == Direction::TOWARDS_TRANSPORT) {
                subscribeChannels.push_back(channel);
            }

            chain_t chain = manager.host.createChain(chainInstanceId,
                channel,
                subscribeChannels,
                manager.chainDefinition,
                substitutions,
                // any following arguments are passed to the transport constructor
                // (whose signature must contain items with the same types),
                direction
            );
            chain->start();
            chains.insert(std::make_pair(std::make_pair(channel, direction), chain));
        }

        virtual void onChannelDestroyed(const std::string &channel, Direction direction)
        {
            std::unique_lock<std::mutex> lock(mutex);

            auto it = chains.find(std::make_pair(channel, direction));
            if (it == chains.end()) return;

            chain_t chain = it->second;
            chains.erase(it);
            lock.unlock();
        }

        void start()
        {
            manager.logger.debug("Starting SampleTransportChainManager");
            manager.host.addChannelLifecycleListener(this, manager.channelPrefix);
            // This is typically where a connection to the external transport would be established, optionally on a background thread
            manager.logger.debug("SampleTransportChainManager started");
        }

        void shutdown()
        {
            // best practice is to explicitly destroy all chains as part of the host shutdown
            std::unique_lock<std::mutex> lock(mutex);
            std::map<std::pair<std::string, Direction>, chain_t> localChains{ std::move(chains) };
            lock.unlock(); // don't hold the lock while calling chain->destroy
            for (auto it = localChains.begin(); it != localChains.end(); it++) {
                chain_t chain = it->second;
                chain->destroy();
            }
            chains.clear();

            // After the chains have been destroyed is a good place to disconnect from the external system
        }
         
        /** The manager this listener is owned by */
        SampleTransportChainManager &manager;
    private:
        std::map<std::pair<std::string, Direction>, chain_t> chains;
        std::mutex mutex;
    }; //end of class SampleTransportChannelListener

    SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(SampleTransportChainManager)
} // end of namespace

The AbstractChainManager base class has a number of member fields that provide access to logging, the configuration for all dynamic chain definitions associated with its transport, and a ChainManagerHost interface which supports creating chains and registering channel listeners.
In summary, the chain manager is responsible for:

  • Creating and destroying chains as needed
  • Instantiating and managing the lifetime of any connection to an external server
  • Optionally, reporting status information that applies to the chain manager rather than to individual transports.

Sample Apama application

Plug-in chains support sending events in both directions, to and from the external system:

  • An Apama application can send events to a connectivity chain in the same way as it would send them to any other receiver connected to the correlator, that is, using the ‘send to ’ key words. Events from the EPL application are translated into the form specified by the host plug-in (the first in the chain configuration). They are then passed through each codec in turn, and then delivered to the transport. The host plug-in (apama.eventMap in the above example) by default listens for events from the application using the chain’s name as a channel name. The host plug-in can be configured to listen on a specific set of channels with the subscribeChannels configuration property.
  • Events from a connectivity chain’s transport are passed through the codecs in the reverse order and are translated by the host plug-in to Apama events which are enqueued to the Apama application on the desired channel. The channel can be specified per event, or a default channel can be configured in the host plug-in using the defaultChannel configuration property.

Sample apama application for above dynamic chain manager:

SampleApplication.mon

/**
 * An application to demonstrate use of Connectivity Plug-in chain manager.
 */
using com.softwareag.connectivity.ConnectivityPlugins;

event EventToTransport {
    string s;
}

event EventFromTransport {
    string s;
}

monitor TestMonitor {
    action onload() {
        monitor.subscribe("SampleTransport:TestChannel");
         
        on all EventFromTransport() as evt {
            log "Message received from Transport: " + evt.s at INFO;
        }
         
        ConnectivityPlugins.onApplicationInitialized();
         
        EventToTransport mEvt := new EventToTransport;
        mEvt.s := "Hello World";
         
        send mEvt to "SampleTransport:TestChain"  ;    
         
    }
}

Refer samples\connectivity_plugin\cpp\skeleton_chainmanager sample in your installation.