Anyway to subscribe to s/us topic from mqtt client to receive events directly

Product/components used and version/fix level:

Cumulocity/MQTT

Detailed explanation of the problem:

I have code on the edge device that publishes cumulocity events to Cumulocity on s/us topic using the 401 event template. Is there a way to subscribe to s/us topic from my Python Cumulocity microservice so I can can react to the event as quickly as Cumulocity can.
The reason I’m doing this is so I don’t have to do a search for Cumulocity event in a polling fashion.
Note, I can’t change the edge code to publish to a different topic since I don’t own the code.

Error messages / full error message screenshot / log file:

Question related to a free trial, or to a production (customer) instance?

Partner tenant

Hi Eddie,

you cannot subscribe to s/us. However, there is a different solution for your use case. Once the platform received/created your event, a realtime notification is produced. You can subscribe to this Event-Stream from your Microservice via the Notifications API.

Few links:

Thanks Korbinian. This sounds encouraging. But I’d like you to consider the specific use case of being able to detect if I receive any telemetries from any of my devices and then be able to process them according to that device. It will be impossible to use the “mo” context as then I can only have one subscription per device which if allowed will be unmanageable if I have hundreds of devices. Any possibility of using the “tenant” context and then have the payload of the message in that subscription to contain the identification of the device so I can then process the payload for that particular device?
I hope that makes sense.
Thx

Currently there is no tenant context for measurments. However, to avoid missunderstandings here: just because you’ve created e.g. 1000 subscriptions in MO-context does not mean you’ll have to maintain 1000 consumers in your Microservice.
You can bundle these MO-subscriptions in a single subscription and then have a single consumer that receives the messages of all Devices centrally. This is controlled by using the same subscription-name:

Note that the subscriptions need to be created only once, not regularly. The received notification will all have the source-id so that you know from which device they come from.

Optionally (e.g. in case the throughput is too high for your single consumer) you can make use of shared consumers. Then the messages are distributed to as many shared consumers as you’ve created.

1 Like

Good to know.
This will work beautifully if the set of devices is fix. We do get new devices added when new customers are onboarded. Is there an elegant way of handling this special case?
Thx

My idea would be that you mark each device that has a subscription with a fragment like

{
   "c8y_IsDevice": {},
   "name": "...",
   "deviceSubscription": "eddies-subscription", 
   ...
}

And then you could have some job running in your services that queries from time to time “all devices without fragment ‘deviceSubscription’” - and add the subscription for any returned items plus add this fragment.
The query for the search would be: GET /inventory/managedObjects?query=$filter=has(c8y_IsDevice) and not has (deviceSubscription).

// Another option could be to setup a dedicated subscription for managedobjects in tenant-context. This subscription will receive a notification for all newly created Inventory objects. You would have to check if it is a Device and if yes, add it to your measurement subscription. Personally, I would choose the above option with the marker-fragment.

Sounds like a plan. I’ll give it a try.

Though just taking in step back…the notifications2 docs say that events are covered by the tenant scope…is if you really want the events created by ever device then that should work.

Though subscribing to all events may be a lot of data so you’d have to be sure there aren’t too many events being produced that the micro service is not capable of handling.

Thanks Rueben. We have hundreds of devices all producing many events so the load on the microservice will definitely be tested. We will test out Korbinian’s first suggestion and will update.

Thanks All.

I tried the example in github suggested by Korbinian. I’m getting POST error working with an MO that specifically created and also one that we know exists. I’m sure it is something simple I’m overlooking. Following is what I did:

  1. Using cumulocity python API, if I try to create a managed object:
`c8y = SimpleCumulocityApp()

mo_name = "abcdabcd99"
mo = ManagedObject(c8y, name=mo_name, type='c8y_CustomType').create()

I get this error:

ValueError: Unable to perform POST request. Status: 401 Response:
{"error":"security/Unauthorized","message":"Invalid credentials!","info":"https://cumulocity.com/guides/users-guide/getting-started/"}
  1. If I try to create subscription for an already existing device managed object:
mo_id = "10416"
mo_name = "EF985D802901"
sub_name = f'{mo_name.replace("_", "")}Subscription'
sub = Subscription(c8y, name=sub_name, context=Subscription.Context.MANAGED_OBJECT, source_id=mo_id).create()

KeyError: 'No such object: /notification2/subscriptions'

Any ideas?
Eddie

The SimpleCumulocityApp seems to be intended to run as a microservice only fetching the credentials from the environment variables. I think this is not working for your environment, that’s the reason you get 401 Unauthorized.
You can try this

c8y = SimpleCumulocityApp()
print("CumulocityApp initialized.")
print(f"{c8y.base_url}, Tenant: {c8y.tenant_id}, User:{c8y.username}")

and check if the environment vars are correctly fetched. If not you could try to create a new instance by defining the required information manually like this

c8y = CumulocityApi(base_url=C8Y_BASEURL,
                    tenant_id=C8Y_TENANT,
                    username=C8Y_USER,
                    password=C8Y_PASSWORD)

Are you running Cumulocity in the cloud or Cumulocity Edge? Notification 2.0 is not available on all instances and also not on Cumulocity Edge.
That might be the reason for your second error.

I’m running this in the Cloud.
I think the notification 2.0 is not enabled. Let me check that first before trying your suggestion.
Thx

There is also a dedicated Notifications permission that you’ll need to have:

If you’re running your logic with a service-user you would also have to state ROLE_NOTIFICATION_2_ADMIN in your cumulocity.json.

Other than that, Stefan is right…Notification 2.0 is available on all our public cloud instances. For customer-specific instances you would have to check if its activated (there it is activated by request, not by default. See also C8Y Notification 2: getting 404 error).

Might not be what you want, but have you also considered an alternate architecture where your Python logic is invoked as an in-process plug-in from an EPL App, and let Streaming Analytics consume the incoming data for you?

(I don’t think we currently have a sample or article showing exactly that, but we have such info for Python plug-ins, and separately for using the block SDK. Maybe someone else here has already done something similar)

Thanks Kevin.
Yes we have but we want a purely microservice based approach.

We have finally gotten it to work. Thanks to everyone’s help. I’ll close this topic once we have more tests done.