Batch Codecs

In Apama 10.3.1 we introduced the “Batch Accumulator” and the “Message List” codecs. Both of these codecs deal with handling batches of multiple events at the same time to support use cases with high event rates

The “Batch Accumulator” codec

You can easily add this codec to your connectivity plug-in chain, without changing anything else in your application or anything external, and it will allow events being sent from the connectivity transport to a correlator (host) to be batched. By batching events rather than sending them individually to the correlator you can improve throughput performance which is especially useful in high event-rate situations.

If the application takes longer to process each message through the chain than the event rate, then the transport will block processing before it goes onto a queue. Each message will still be processed one at a time. We have the ability to handle batches of events at the same time, but in order to achieve this something needs to take the event, put it onto a queue and take a batch of them off at once, this is what the batch accumulator does.

This codec maintains a queue such that when the transport produces events they get put one at a time onto this queue. We do not wait for it to fill up to process it, it is processed as soon as possible. So if processing is slow, batches will get taken off which will include everything since we last processed a batch. If event rates are always going to be low then using this codec would lead to a latency degradation, so it is recommended to only use this in high event rate applications.

The main benefits of this codec come into play around pipe-lining, putting the message onto a queue and taking if off to do the rest of the chain, because in the former it doesn’t have to wait for the chain to complete and also as mentioned above where there’s overheads on batches, that is if the event rate means you can’t process one at a time.

All of our supplied message-bus transports, like MQTT and Kafka, already provide batching. The HTTPClient is not natively batched, but won’t benefit from using the batched accumulator codec because of the request/response nature of the transport.

If you were using the HTTPServer in a submission-only mode then this codec could be used to supply batching capabilities allowing a high rate of events with multiple clients.

EXAMPLE

To be able to use the codec in your connectivity plug-in you must load it in the yaml file as follows in the connectivityPlugins section, specifying a section name which indicates how it will be referred to, in this example it’s batchAccumulatorCodec.


batchAccumulatorCodec:
  libraryName: connectivity-batch-accumulator-codec
  class: BatchAccumulator

To actually use the codec refer to it using the name you gave it in the above section – batchAccumulatorCodec, and it’s best practice to place this codec just before the transport, in this case HTTPServer


dynamicChains:
    http:
      - apama.eventMap
      - mapperCodec:
           ...
      - classifierCodec:
           ...
      - jsonCodec
      - stringCodec
      - batchAccumulatorCodec:
           maxBatchSize: 500
      - httpServer:
           automaticResponses: true

The Batch Accumulator codec exposes the actual size of the queue via a user-defined status value. This is available through the various monitoring tools for the correlator with the name user-chain.batchAccumulator.queueSize , and represents the most recent batch size removed from the queue. Please see the Apama documentation to find out more about user-defined status values.

CONFIGURABLE OPTIONS

The default for maxBatchSize is 1000, usually, you wouldn’t need to change this but you can configure the maxBatchSize on this codec as shown in the example above. A maxBatchSize of 1000 means that if more than 1000 messages are waiting to be processed by the host-bound thread, requests from the transport will block. It also means you can be using up to 1000 times your message size in memory in outstanding events. So perhaps in applications that have large events, it might be worth reducing the maxBatchSize .

The “Message List” codec

You can add the codec to your connectivity plug-in chain and it will allow combining the contents of a batch of outbound events being sent, from the correlator (host) to the connectivity transport, into a single event containing a list.
The Message List codec changes the type of events being sent from a batch of single events into a single event containing a list, so it must be used alongside an external service that provides support for processing a list of events rather than individual events.

What the Batch Accumulator codec does for inbound messages we already automatically do for outbound messages. If processing one at a time is good enough then you get messages one at a time and if the system is slow at processing messages one at a time then we build up and give you a batch of messages at once. When combining things into the Message List codec it’s only the contents of that batch that get combined into a single event, we don’t wait for more to build up.

Using the Message List codec processing one at a time would give you a list with one element at a time, and if a batch has built up then the list will combine messages within that batch into a single event the body of which is a list of the original messages in the batch. Therefore this codec is only useful where high event rates are experienced. The codec allows you to do one entire batch of messages per request/response latency, rather than just one message per request/response latency.

As mentioned previously the HTTPClient can only process events one at a time which means the maximum throughput is restricted based on the time taken to process a single event. Therefore the HTTPClient could use the Message List codec to help with this scenario.

You need a transport or downstream codec which expects the lists produced by the Message List codec as well as a service that supports them. Often this will be by encoding the lists in something like JSON and then using the String codec to produce a binary message for the transport. The webservice must be capable of handling JSON which is a list of requests in one go and reply correctly.

The codec can also handle lists of replies (if the service produces them) by splitting them up and delivering them back to the correlator as separate messages.

EXAMPLE

To be able to use the codec in your connectivity plug-in you must load it in the yaml file as follows in the connectivityPlugins section, specifying a section name that indicates how it will be referred to, in this example it’s messageListCodec.


messageListCodec:
   libraryName: connectivity-message-list-codec
   class: MessageListCodec

To actually use the codec, refer to it using the loaded name (messageListCodec), setting configurable elements of the codec as required. This example shows an HTTPClient chain for accessing a web service that has been configured to process lists of messages in JSON format. In this example, the lists produced by the Message List codec are being encoded into JSON to be consumed by the web service.


startChains:
   webServiceChain:
      - apama.eventMap
      - mapperCodec:
           ...
      - classifierCodec:
           ...
      - messageListCodec:
            metadataMode: first
      - jsonCodec
      - stringCodec
      - httpClient:
           host: ${WEBSERVICE_HOST}
           port: ${WEBSERVICE_PORT}

CONFIGURABLE OPTIONS

As with the first codec maxBatchSize is optionally configurable and again defaulted to 1000. In this codec, this value represents the maximum number of events that are to be combined into a single list.

The main choice to make is how to handle the metadata when combining multiple messages into a single message, what you choose depends on your application.
Therefore you are required to set the strategy for handling the metadata of multiple requests in – metadataMode
Note – when converting a list back to separate messages these mappings are performed in reverse.

Let’s assume that we have a batch of two messages as follows:


metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = { name: "Matt", age: 30 }
 
metadata = { requestId: 6, http: { method: PUT, path: /add } }
payload = { name: "James", age: 31 }

metadataMode Possible values for metadataMode are:

  • first – Just use the metadata from the first message in the batch as the metadata for the list message.

metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = [ { name: "Matt", age: 30 }, { name: "James", age: 31 } ]

  • splitBatch – Only add items whose metadata is identical from the batch to a list. Create a new list message when the metadata changes.

metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = [ { name: "Matt", age: 30 } ]
metadata = { requestId: 6, http: { method: PUT, path: /add } }
payload = [ { name: "James", age: 31 } ]

  • requestIdList – Use the metadata from the first message, but set metadata.requestId to be a list containing the requestId of each message.

metadata = { }
payload = [
   {
     metadata: { requestId: 5, http: { method: PUT, path: /add } },
     payload: { name: "Matt", age: 30 }
   },
   {
      metadata: { requestId: 6, http: { method: PUT, path: /add } },
      payload: { name: "James", age: 31 }
   }
]

FULL DOCUMENTATION

Full documentation on the Batch Accumulator and Message List codecs can be found in the online documentation for Apama 10.3.1 under Batch Accumulator codec, or similar within the “Developing Connectivity Plug-ins” section of the installed Apama documentation.

Feel free to post on our our stack exchange site with any questions you may have about this, or any other, topic.

DISCLAIMER

Utilities and samples shown here are not official parts of the Software AG products. These utilities and samples are not eligible for technical support through Software AG Customer Care. Software AG makes no guarantees pertaining to the functionality, scalability, robustness, or degree of testing of these utilities and samples. Customers are strongly advised to consider these utilities and samples as “working examples” from which they should build and test their own solutions.