Universal Messaging with MQTT

1. Introduction

Universal Messaging is a Message Orientated Middleware product that guarantees message delivery across public, private and wireless infrastructures. Universal Messaging has been built from the ground up to overcome the challenges of delivering data across different networks. It provides its guaranteed messaging functionality without the use of a web server or modifications to firewall policy.

Universal Messaging offer a wide range of functionality in the areas of Scheduling, Triggers, Clustering, Peer-to-Peer, Realm Federation, Joins, JMS, Pub / Sub (Channels), Pub / Sub (DataGroups), Queues, Plugins, Security and Low Latency IO.


1.1. MQTT Overview

MQTT (Message Queuing Telemetry Transport), is a publish/subscribe, simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth by IBM / Eurotech in 1999. The simplicity and low overhead of the protocol make it ideal for the emerging "machine-to-machine" (M2M) or "Internet of Things" (IoT) world of connected devices, and for mobile applications where bandwidth and battery power are at a premium. The protocol is openly published with a royalty-free license, and a variety of client libraries have been developed especially on popular embedded hardware platforms such as arduino/netduino, mbed and Nanode.

In a nutshell - “A light weight event and message oriented protocol allowing devices to asynchronously communicate efficiently across constrained networks to remote systems”.

1.2 Audience

This document is intended for partners who are interested to consume messages distributed to UM (webMethods Message Orientated Middleware) using MQTT.

2. Connecting

In order to connect to a Universal Messaging server using MQTT, your application needs to use a tcp:host:port URL (NSP Interfaces) or ssl:host:port URL (NSPS Interfaces). MQTT connections are treated in the same way as any other connections by the Universal Messaging realm. If the username is present, the Universal Messaging subject is username@hostname, otherwise the subject is anonymous@hostname.

 MQTT configuration for UM server can be found in config section, when you connect to realm server from Enterprise Manager.            



In order to enable connectivity from external parties to UM topics/channels using MQTT nsp or nsps interface should be created and it should be running. These ports can be created from Enterprise Manager, comms tab. 


Universal Messaging supports connections from any MQTT 3.1 client including the Eclipse Paho client - http://www.eclipse.org/paho/.      

3. Publishing

MQTT applications can publish events to channels. If the specified channels do not already exist in the Universal Messaging realm, they will be automatically created by the server as MIXED type with a JMS engine.

These channels are regular Universal Messaging channels but all events published from MQTT will be persisted to disk. While it is possible to create other channel types using the Administration API / Enterprise Manager, the mixed type with JMS engine is the recommended combination. Events published via MQTT only contain a byte[] payload and are tagged MQTT. They are fully interoperable with any Universal Messaging subscriber on any client platform supported and can be snooped using the Universal Messaging Enterprise Manager.

       All messages published from MQTT are flagged as JMS BytesMessage objects.

STASTANote:  Publishing to queues via MQTT is not supported.STASTA

     

  1. Example using Eclipse Paho Java Client Libraries

 Documentation for eclipse paho java client libraries can be found at the below locaiton. Paho also offers libraries for different MQTT clients (CSLAC++, JavaScript, Python, Co, C# & WinART).

http://www.eclipse.org/paho/files/javadoc/index.html

Following example, demonstrate you send a test message to a topic using MQTT 3.1 eclipse paho client libraries.

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

 public class MqttPublishSample {     public static void main(String[] args) {         String topic        = "Channel1";        String content      = "Message from MqttPublishSample";        int qos             = 2;        String broker       = "tcp:SLASLAlocalhost:1883";        String clientId     = "JavaSample";        MemoryPersistence persistence = new MemoryPersistence();         try {                MqttClient sampleClient = new MqttClient(broker, clientId, persistence);            MqttConnectOptions connOpts = new MqttConnectOptions();            connOpts.setCleanSession(true);            System.out.println("Connecting to broker: "+broker);            connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);            sampleClient.connect(connOpts);            System.out.println("Connected");            System.out.println("Publishing message: "+content);            MqttMessage message = new MqttMessage(content.getBytes());            sampleClient.publish(topic, message);            System.out.println("Message published");            sampleClient.disconnect();            System.out.println("Disconnected");            System.exit(0);        } catch(MqttException me) {            System.out.println("reason "+me.getReasonCode());            System.out.println("msg "+me.getMessage());            System.out.println("loc "+me.getLocalizedMessage());            System.out.println("cause "+me.getCause());            System.out.println("excep "+me);            me.printStackTrace();        }    }

}

4. Subscribing

MQTT applications can subscribe to channels. If the specified channels do not already exist in the Universal Messaging realm, they will be automatically created by the server as MIXED type with a JMS engine.

These channels are regular Universal Messaging channels with all messages being persistent, regardless of whether they are published by MQTT or Universal Messaging applications.

STASTANote: Subscribing to queues via MQTT is not supported.STASTA

  1. Example using Eclipse Paho Java Client Libraries

 Following example, demonstrate you send a test message to a topic and subscribe using MQTT 3.1 eclipse paho client libraries. 

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

  class SimpleCallback implements MqttCallback {        ATOverride        public void connectionLost(Throwable cause) { SLASLACalled when the client lost the connection to the broker        }         ATOverride        public void messageArrived(String topic, MqttMessage message) throws Exception {            System.out.println("-------------------------------------------------");            System.out.println("| Topic:" + topic);            System.out.println("| Message: " + new String(message.getPayload()));            System.out.println("-------------------------------------------------");        }         ATOverride        public void deliveryComplete(IMqttDeliveryToken token) {SLASLACalled when a outgoing publish is complete        }

}

  public class MqttPublishSubscribeSample {    public static void main(String] args){        String topic = "test";        String content = "Message from MqttPublishSample 1";        int qos = 2;        String broker = "tcp:localhost:1883";        String clientId = "JavaSample";        MemoryPersistence persistence = new MemoryPersistence();         try {            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);            MqttConnectOptions connOpts = new MqttConnectOptions();            connOpts.setCleanSession(true);            System.out.println("Connecting to broker: " + broker);            sampleClient.connect(connOpts);            sampleClient.subscribe("blah", 1);            System.out.println("Connected");            System.out.println("Publish message: " + content);            MqttMessage message = new MqttMessage(content.getBytes());            message.setQos(qos);            sampleClient.setCallback(new SimpleCallback());            sampleClient.publish(topic, message);            System.out.println("Message published");            try {                Thread.sleep(5000);                sampleClient.disconnect();            } catch(Exception e) {                e.printStackTrace();            }            System.out.println("Disconnected");            System.exit(0);        } catch(MqttException me){            System.out.println("reason " + me.getReasonCode());            System.out.println("msg " + me.getMessage());            System.out.println("loc " + me.getLocalizedMessage());            System.out.println("cause " + me.getCause());            System.out.println("except " + me);            me.printStackTrace();        }    }

}

5. Quality of Service

Universal Messaging supports all three QOS levels defined by the MQTT standard. This is driven by the MQTT client connection and describes the effort the server and client will make to ensure that a message is received, as follows:

  1. QOS 0: The Universal Messaging realm will deliver the message once with no confirmation
  2. QOS 1: The Universal Messaging realm will deliver the message at least once, with confirmation required.
  3. QOS 2: The Universal Messaging realm will deliver the message exactly once, using a 4 phase approach as defined by the standard.

 6.   Will

Universal Messaging fully supports connections with Will settings, which indicate messages that need to be published automatically if the MQTT application disconnects unexpectedly.