In this second follow-on blog post on Cumulocity messaging architecture, we look behind the new Notification 2.0 API to introduce the new Cumulocity Messaging Service and then see how it was used to enhance the DataBroker. Unlike for Notifications 2.0, the DataBroker modifications do not introduce new API but enhance the underlying quality of service (QoS) offered by the platform.
Cumulocity, in the cloud and on-premise version, gained an important new platform service in the 10.10 release: the new Cumulocity Messaging Service which is based on the open source Apache Pulsar project. First used for the enhanced DataBroker in 10.10, and now in 10.11, the messaging service underlies the new Notifications 2.0 API. As the following diagram shows, messaging is built-in part of the platform in the logical architecture but deployment is currently optional.
The Cumulocity messaging service is deployed in Kubernetes and as a platform service provides for “at-least-once” messaging semantics. In 10.11, we’ve extended it with an WebSocket endpoint so that microservices can access it indirectly but securely to receive notifications. Pulsar is not currently available on Edge deployments but will be soon (targeting 10.13, with lesser message persistence guarantees). We expect the new service to become a required platform component of Cumulocity going forward so that the enhanced DataBroker and the new Notifications 2.0 API are available everywhere and further allowing leveraging the capabilities of messaging throughout the platform.
Our enhancements mean that the version of Apache Pulsar deployed must be from Cumulocity / Software AG. We provide a Helm chart to install and update deployments (please see the Cumulocity Operations Manual for details).
Introducing Apache Pulsar
Apache Pulsar provides for both messaging and streaming at IoT / Internet scale and has many advanced features that make it a great basis to build on. Pulsar was selected after an extensive evaluation and comparison with other message broker & streaming solutions (Apache Kafka for messaging and ActiveMQ for queueing were other finalists among the alternatives evaluated). A major primary factor in selecting Pulsar was the support for both messaging and streaming with a clean message subscription model that allows both individual (multiple if needed) and grouped subscriptions. Another important consideration is ordering of messages, as published by or resulting from requests from a single device, even in the presence of failures.
Apache Pulsar is scalability to potentially millions of topics and has support for multiple tenants (requirements familiar to all users of IoT and Cumulocity). It offers very strong message persistence guarantees, while still providing low average latency, supported by a separately scalable storage layer in a tier of “Bookies” behind stateless “Brokers”, which was another main reason for selecting Apache Pulsar. The ability to scale out message consumers dynamically using “partitioned topics” is another important feature that we hope to leverage more heavily in future.
The exceptional reliability provided by Pulsar is due to storing 3 separate copies of any message and being able to scale up and recover failed replicates when inevitable failures happen in the cloud or in on-premise dedicated hardware. The default settings used in Cumulocity trade off this fault tolerance for even better latency by requiring only two writes to be acknowledged but this could be dialled up to 3 write acknowledgements if very strong message persistence guarantees are required. The storage layer (see diagram above) is based on Apache Bookkeeper which maintains a separate Journal or write ahead log for recovery with topic data stored in segments of “Ledger” files. Apache Zookeeper is used to both coordinate a Pulsar cluster and store meta data reliably (not shown in diagram but there are at least 3 Zookeeper instances deployed with their own persistent state). On Edge deployments, only one copy is saved to disk using the POSIX fsync system call without replication, so only temporary failures are recoverable.
The message delivery semantics provided is “at-least-once”. Once accepted, each message is delivered repeatedly until the consuming application acknowledges it. This can result in duplicates but ensures that there is no message loss, allowing fault tolerant applications to be constructed relatively simply and to be scaled. Many IoT applications can make do with weak “at-most-once” semantics, where a message is delivered once or not at all. But that is difficult to do in practice and certainly does not cover all use cases. Before the new features that use the new message service, Cumulocity only provided weaker message delivery guarantees, especially under heavy load or in the face off node failures. A contention of the new messaging service is that stronger “at-least-once” guarantees can be provided in a performant, scalable and cost-effective way, making it simpler to build IoT applications and integrations. But applications with state must still deal with potential duplicates or use idempotent messages (such as measurements with a unique client-generated timestamp).
In the rest of this blog post, we examine how we have used the new messaging service to enhance the DataBroker feature in Cumulocity, building on the physical architecture diagrammed below:
Microservice Enhanced DataBroker
The Cumulocity data broker has been enhanced with a per-tenant Databroker microservice agent. This must currently be subscribed by a tenant explicitly and when subscribed (as is the case for the tenant in the screenshot of the UI below), redirects all DataBroker connections through Pulsar and the per tenant microservice.
The Cumulocity DataBroker feature allows all API requests, whether over REST or MQTT, to be forwarded to a remote tenant, most likely to a remote Cumulocity system, by defining a forwarding “connection” to a remote URL. Without Cumulocity messaging and the new DataBroker microservice agent, this forwarding is best effort only. If the remote system destination is unavailable or responding only slowly then requests will be dropped, and the two Cumulocity Systems (local and remote) would diverge.
Adding the Databroker microservice agent will cause messages to be persisted in a per connector stream in Pulsar for forwarding by the microservice. Using a single microservice agent consumer also allows message ordering to be preserved, which as noted above is important in maintaining a synchronized state in some applications. Note that, the microservice can still process different connectors in parallel if provisioned with multiple CPUs, so some vertical scalability can be achieved even in a single tenant. Performance measurements have shown that Pulsar and the microservice can comfortably forward at a similar rate as the unenhanced Databroker over a typical bandwidth limited wide area connection.
The microservice listens for connection activations after reading the current set of connectors on start up and then consumes messages on a per-connector Pulsar topic. Messages read are queued for forwarding, using the normal Cumulocity REST API to the configured remote destination tenant. When a REST call to the destination completes the microservice acknowledges the Pulsar message. This prevents re-delivery of the acknowledged message and processing moves on to the next Pulsar message and corresponding API forwarding request. Crashes of the microservice or underlying node are handled by Kubernetes restarting the microservice.
Note that, over time, the ingress rate must not be higher than the average long-term forwarding rate, or eventually (Bookie disk) storage will be exhausted. It is possible to set policies that limit message retention and time to live. These can be configured on a per-tenant basis and are documented in the Cumulocity operations guide. In the unlikely case of Pulsar being unavailable, client/device requests that are routed to the enhanced data broker will fail with a service error, allowing the clients to attempt retries.
The Databroker microservice agent is optionally available in Cloud and on-premise deployments now and will be available in Edge systems soon. With this, Edge deployments will effectively gain a large buffer that can survive temporary crashes or wide area network outages but not disk crashes or other disasters. In the future, the enhanced Databroker will be made the default DataBroker implementation. It’s also available for experimentation on eu-latest.cumulocity.com for trial use. Please see the operations documentation on how to enable Cumulocity Messaging and particular of configuring and using the enhanced DataBroker.
We hope this follow-up blog post has given an overview of the new Cumulocity Messaging service and introduced the enhanced DataBroker as the first feature to have made use of the new platform service. The new 10.11 Notification 2.0 API is also built on the new messaging service, and the service will become non-optional and central to the Cumulocity platform going forward.
Andre Kramer, Software Architect, Cumulocity Messaging Service.