Kafka and Apama

Aims of this blog

My goal with this blog post is to show how to set up Apama with Kafka and how they can be used together in a stream processing application. Kafka handles the transport and allows you to set up the delivery system, while Apama is a high-performance event-processing engine with enough flexibility and throughput to provide for most applications. This blog post isn’t going to be a tutorial on Kafka, and I have used a containerized version of a Kafka cluster for simplicity. If you have an existing cluster, or you wish to install Kafka yourself, then it is a simple matter of changing the configuration described below in the body of the blog.

The demo will connect to Twitter and use a stream of tweets to publish messages in Kafka on the tweets channel. Apama will subscribe to this channel, and then process each message producing a dictionary of word frequencies. This will then be published to the frequencies channel in Kafka on a periodic basis. There is a GitHub repository containing the basic script to stream tweets to Kafka, the Apama project directory, and a monitor file that will connect to the Kafka cluster then process the messages and output messages back to Kafka.

What is Kafka?

Apache Kafka describes itself as a streaming platform, allowing the user to publish or subscribe to messages on defined topics. Kafka provides potentially fault-tolerant and durable message queues and is used primarily for real-time data streams as an intermediary between the stream and the application. Each message on the queue has an identifier, timestamp and message payload. Kafka is used by many large organizations including Cisco, Oracle, Twitter, Spotify, Paypal and Netflix.

Elements of the demo

This demo uses Python to connect to Twitter and to stream messages into the tweets channel in the Kafka cluster it connects to. You can get the sources for this demo from this GitHub repository.


git clone https://github.com/CaribouJohn/Apama-Twitter-Kafka kafka
cd kafka

The Python script at the top level of the repository stream.py creates the connections to Kafka and Twitter. The following command line will install the Python libraries that the script depends on.


pip install kafka-python python-twitter tweepy

You will also need to create an application in Twitter to obtain the keys required to connect to their API. When creating the application, Twitter may require you to add values for “Website URL” and “Callback URLs”, but the addresses you add are not important. Edit the script stream.py which is in the kafka directory, filling in the keys you got when you created your Twitter application (go to the “Keys and tokens” tab, and use the Generate buttons as required). Once that has been done, the script then connects and streams the tweets into Kafka.


# Twitter access tokens - see https://developer.twitter.com/en/apps/create 
# These are secret keys and so need to be obtained by the user by adding an application
access_token = "Your Token"
access_token_secret =  "Your Secret Token"
consumer_key =  "Your Key"
consumer_secret =  "Your secret key"

# Create the connection to Kafka and the producer for publishing messages
# Edit this with your own cluster details if not using the container version of Kafka
kafka = KafkaClient("localhost:9092")

Apama has a connectivity layer that includes a Kafka plug-in. Using the Kafka plug-in coupled with the flexible system of configurable codecs means that as a user you can concentrate on actually coding your domain-specific application rather than writing interfaces and handling connections. You can use the Apama project included in the GitHub repository if you wish, and skip the configuration section below.

Creating the Apama project

You can just use the Apama project, “KafkaProcessor” you got in the Kafka directory when you cloned the Github repository. Alternatively, delete the KafkaProcessor directory and create it as follows. If you are going to do this, it will save you a moment or two if you first copy KafkaProcessor/monitors/kafka.mon somewhere else so that you can copy it back in later rather than downloading it again.

If you want to create the project from scratch you can use Software AG Designer, but if you are primarily a Linux user, or like command-line tools more than GUI applications, you can use the apama_project tool (new in 10.3.1) instead. There is a blog page about it here: Introducing a new command-line tool for creating Apama projects – ‘apama_project’

Whichever you start off with, you will be able to chop and change between them to modify the project: they are completely compatible.

I chose a root directory of kafka to contain all the elements of the demo and created the Apama project below using the following commands.


> cd kafka
> apama_project create KafkaProcessor
Project created successfully at location: .../kafka/KafkaProcessor

This will drop out if the KafkaProcessor folder already exists, in which case you may not have got around to deleting the one that came with the clone first.
Delete it and try again, optionally copying monitors/kafka.mon from it first.


> cd KafkaProcessor/
> apama_project list bundles

Bundles that can be added:
    Standard bundles:
...
        2       Automatic onApplicationInitialized
...
    Adapter bundles:
...
    Connectivity bundles:
        32      Cumulocity IoT > Cumulocity Client 9.0+
        33      Cumulocity IoT > Cumulocity Client 9.8+
        34      Digital Event Services
        35      HTTP Client > JSON with application-specific event definitions
        36      HTTP Client > JSON with generic request/response event definitions
        37      HTTP Server
        38      Kafka
        39      MQTT
        40      Universal Messaging > Map to UM message payload with JSON
        41      Universal Messaging > Map to UM message properties
        42      Universal Messaging > UM advanced configuration and mapping rules
        43      User Connectivity

> apama_project add bundle "Kafka"
Adding bundle "Kafka".
"Kafka" bundle added successfully.
You may need to configure instance files by editing the following files.
".../kafka/KafkaProcessor/config/connectivity/Kafka/Kafka.yaml"
".../kafka/KafkaProcessor/config/connectivity/Kafka/Kafka.properties"


> apama_project add bundle "Automatic onApplicationInitialized"
Adding bundle "Automatic onApplicationInitialized".
"Automatic onApplicationInitialized" bundle added successfully.

I added the kafka bundle and the Automatic onApplicationInitialized bundle using the apama_project tool as shown above, although I deliberately removed some output from above for the sake of brevity. When you run the command it will show the many ways you can use Apama for connecting to external sources of information, storage options, and message format codecs. Finally, copy the kafka.mon file from the kafka/KafkaProcessor/monitors directory in GitHub (or the copy you may have made of it) into the kafka/KafkaProcessor/monitors subdirectory in the filesystem (i.e. the created project).

The snippet below shows how subscription and publishing of messages between Apama and Kafka can be achieved by using channel names.


monitor Receiver {
    //Channels for I/O - the channel prefix by default is kafka: (all lowercase)
    constant string KAFKA_INPUT_CHANNEL := "kafka:tweets";
    constant string KAFKA_OUTPUT_CHANNEL := "kafka:summary";

    //Just process messages when we load up. 
    action onload() {

        monitor.subscribe(KAFKA_INPUT_CHANNEL);     
        on all TwitterEvent() as e {            
            total_tweets := total_tweets + 1;
			...			

            //periodically send summary - every 10 tweets received
            if ( total_tweets % 10 = 0) {
				...
                TwitterSummary summary := TwitterSummary( total_tweets , summary_frequencies );
                send summary to KAFKA_OUTPUT_CHANNEL;
            }
        }

    }
}

The output of the tool and copying the kafka.mon file into it, should produce a directory structure similar to the one below and the one contained in the GitHub repository.

Configuration

The Kafka.properties file under config/connectivity/Kafka can be edited to contain the correct parameters for your Kafka cluster if required.


# Template from which a dynamic chain will be created for each channel handled by this plug-in
dynamicChains:
  KafkaChain:
    - apama.eventMap:
        # Towards-host messages will turn into EPL events of this type.
        defaultEventType: TwitterEvent
        suppressLoopback: true
    # Translates between Apama events in the correlator and JSON strings in Kafka. You could use other codecs here.
    - jsonCodec:
    - KafkaTransport

Lastly, once the configuration is correct and the EPL monitor has been copied or created we need to deploy the application into a form that can then run in Apama.


cd kafka
engine_deploy --outputDeployDir kafka_deployed KafkaProcessor

Next run the Kafka server ready to receive the messages from Twitter produced by the stream.py script from the GitHub repository. If you are using your own cluster you can skip this step of course. The advertised host should be the host machine you are running the Kafka cluster/container on.


docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=<Your IP or Host> --env ADVERTISED_PORT=9092 --name kafka -d spotify/kafka

Now we can run Apama. If you append & to the command and press return after the header is output, you can issue the next command in the same shell.


correlator --config kafka_deployed

Lastly we can run the stream.py script in a terminal to connect to Twitter and it should start putting tweets into the “tweets” channel. Again, append & to the command to keep using the same shell.


python ./stream.py

At this point we are processing the stream of tweets and Apama is posting summary messages back to Kafka. You can observe the summary messages using a command-line tool present in the Kafka installation on the running image. Because we are running in the image we can use localhost, if you are using your own installation or existing cluster you will need to run the script using the correct host and port.


docker exec -it kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic summary --from-beginning

This will show the messages as they arrive – for example if we use the term “Lady Gaga” as our keyword then (at the current point in time) we see messages arriving like these


{"wordcounts":{"flex":"0.13","play":"0.19","shallow":"0.21","pulled":"0.09","purse":"0.10","bradley":"0.36","gaga:":"0.05","cooper?":"0.18",
"really":"0.10","gaga":"0.83","givenchy":"0.13","please":"0.19","you!":"0.13","cooper":"0.15","pulling":"0.14","thank":"0.13","lady":"0.89",
"oscar":"0.25","award":"0.05","level:":"0.13"},"tweet_count":630}
...
{"wordcounts":{"flex":"0.12","play":"0.15","shallow":"0.18","pulled":"0.09","purse":"0.10","bradley":"0.34","gaga:":"0.05","never":"0.05",
"really":"0.10","cooper?":"0.14","givenchy":"0.12","love":"0.07","you!":"0.11","gaga":"0.81","please":"0.14","thank":"0.11","cooper":"0.16",
"pulling":"0.14","lady":"0.87","oscar":"0.25","award":"0.06","level:":"0.13"},"tweet_count":1530}
...
{"wordcounts":{"flex":"0.14","play":"0.13","shallow":"0.15","pulled":"0.09","purse":"0.10","bradley":"0.32","cooper?":"0.10",
"love":"0.07","really":"0.10","gaga":"0.79","givenchy":"0.14","please":"0.12","you!":"0.08","cooper":"0.16","pulling":"0.16",
"thank":"0.09","lady":"0.86","oscar":"0.27","award":"0.05","level:":"0.14"},"tweet_count":2600}

The output is a JSON message containing the words that occurred with a calculated frequency of > 5%. Rounding is used so some words are shown with 5% exactly. I have pasted several messages to show how the frequency changes as the number of received messages arrive. We could for example use this output to create a dynamic word cloud that changes over time as the words change. Clearly much could be done to normalize the incoming data to make this analysis much more useful.

Finally…

I am sure that most of you can think of much better things to do with the data than analyze the most frequent words appearing that are in tweets containing the phrase “Lady Gaga”. The beauty of the EPL is that it is very flexible and powerful allowing for some very complex processing. Also when you are happy with what the application is doing you can activate LLVM mode which will improve the performance.