Notification 2.0 API too many subscriptions?

Product/components used and version/fix level are you on:

cumulocity.com: 1015.0.278

Detailed explanation of the problem:

I subscribe to all events and alarms of all devices+assets in my tenant with a python microservice. When i do a subscription to the tenant i can only subscribe to alarms. Because i want to subscribe also to events, i create a subscription for each device/asset, hence i have one listener thread and one websocket connection for each.

In my tenant for the tests i have about 70 devices and assets.

Sometimes when i start the microservice (subscribe to it) it opens all the connections and works for a while, but sometimes the websocket connections are not stable, see messages below.

Error messages / full error message screenshot / log fileL

INFO:API.mosubscription:Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f9cfb547810>
INFO:API.mosubscription:Websocket closed. Reconnecting...
INFO:API.mosubscription:Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f9cfb547810>
INFO:API.mosubscription:Websocket closed. Reconnecting...
INFO:API.mosubscription:Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f9cfb547810>
INFO:API.mosubscription:Websocket closed. Reconnecting...
INFO:API.mosubscription:Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f9cfb547810>
INFO:API.mosubscription:Websocket closed. Reconnecting...
INFO:API.mosubscription:Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f9cfb547810>
INFO:API.mosubscription:Websocket closed. Reconnecting...

…etc

Is your question related to the free trial, or to a production (customer) instance?

xy.cumulocity.com

Is it better to poll the events instead?
Is it planned to enable subscription to the events by tenant like the alarms, instead of by device/asset?

Hi @simeon.mendgen,

the Rest API 10.16 can create subscriptions to events with tenant as value of context. With 10.15 REST API, you can try to create all the subscriptions with the same subscription name. Then you will only need to create one token and one websocket connection. In this way the connection may can be more stable. Hope this can help you.

1 Like

Hi @simeon.mendgen,

The suggestion from @yiz is a good one - grouping your subscriptions this way where possible is the recommended best practice, as long as this makes sense for your use case. Regarding the disconnections that you’re seeing, there are a few potential causes for this and it’s not really possible to say exactly what is happening from the information we have so far. Would you be able to share the code for your microservice and a more complete log, ideally with some timestamps? Obviously remove any confidential or proprietary information before doing this!

It could also be that you don’t send any ping to keep the connection alive.
Here is an example to keep the connection alive:

Hi, and thank you all for your suggestions,

after many different attempts, the issue still remains, the websocket connection gets closed before token is expired. I wrapped the listener in a new token generation and reconnection so it works for now.

@Stefan_Witschel the ping is already done by the library i use: Timeouts - websockets 12.0.dev12+gfe629de.dirty documentation . I can see it in the debug log.

@yiz
thank you for the explanation and the info about 10.16.
I reduced to use one connection and token per tenant but the issue remains. So it has nothing to do with that there are to many connections, what my first guess was.

CODE

This is the code i use now, with a generation of a new token after 10 failed reconnections With that new token and new connection it is possible to connect again. This should also cover token expiration.

from threading import Thread, Event
import asyncio
import logging
from c8y_api.app import CumulocityApi
from c8y_api.model import Subscription
import websockets
import time
 
def subscribe(c8y: CumulocityApi, sub_name: str, callback, event: Event) -> Thread:
        """
        subscribe to a beforehand crated subscription. Starts a listener thread and returns it.
        """

        async def handle_notification(
            c8y: CumulocityApi, callback: callable, event: Event
        ):  # python-c8y-api library listener throws SSL Error, this listener not
            while not event.is_set():
                try_reconnect = 10
                token = c8y.notification2_tokens.generate(
                    subscription=sub_name, expires=3
                )
                uri = f"{c8y.base_url.replace('http','ws')}/notification2/consumer/?token={token}"
                log.debug(f"websocket uri {uri}")

                try:
                    async with websockets.connect(uri) as websocket:
                        while not event.is_set():
                            try:
                                log.info(f"Websocket connected: {str(websocket)}")
                                async for message in websocket:
                                    log.debug(f"Received message: {message}")

                                    if "CREATE" in message.split("\n")[2]:
                                        callback(message.split("\n")[-1])

                                    # ACK message
                                    msg_id = message.split("\n")[0]
                                    await websocket.send(msg_id)

                            except websockets.ConnectionClosed:
                                log.info("Websocket closed. Reconnecting...")
                                try_reconnect = try_reconnect - 1
                                time.sleep(1)
                                if try_reconnect > 0:
                                    continue
                                else:
                                    raise ConnectionError(
                                        "Reconnecting websocket failed"
                                    )
                except ConnectionError as e:
                    log.error(e)
                    log.info("refresh token and try again ...")
                    continue

        def listen_websocket(c8y, callback, event):
            log.info("start listen_websocket")
            asyncio.run(handle_notification(c8y, callback, event))  # blocking
            log.info("end listen_websocket")

        _listener_thread = Thread(
            target=listen_websocket,
            args=(
                c8y,
                callback,
                event,
            ),
        )

        _listener_thread.start()
        return _listener_thread

LOG

This is the log of the start of the microservice, followed by some notification of type c8y_ThresholdAlarm_AUTO recived! from the listener, then another tenant of which the devices get subscribed with a second websocket connection ( Subscriptions for t4567 created). Then the first websocket connection (identifieable by 0x7f8d5d359cd0) closes randomly. (after about 1 minute, while token expiration is set to 3)

In the last lines another token is created and 0x7f8d5d359cd0 gets replaced by 0x7f8d5cedbd10

[2023-05-03 14:48:55,175 INFO    ] CumulocityApp initialized.
[2023-05-03 14:48:58,445 INFO    ] Subscription already exists: subt1234
...
[2023-05-03 14:49:03,141 INFO    ] Subscription already exists: subt1234
[2023-05-03 14:49:03,208 INFO    ] Subscription already exists: subt1234
[2023-05-03 14:49:03,209 INFO    ] start listen_websocket
[2023-05-03 14:49:03,209 INFO    ] Subscriptions for t1234 created
 * Serving Flask app 'main'
 * Debug mode: off
[2023-05-03 14:49:03,213 INFO    ] ...
[2023-05-03 14:49:03,213 INFO    ] e[33mPress CTRL+C to quite[0m
[2023-05-03 14:49:03,266 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:49:03,267 INFO    ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:17,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:49:17] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:27,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:49:27] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:27,259 INFO    ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:37,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:49:37] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:47,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:49:47] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:52,230 INFO    ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:57,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:49:57] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:03,312 INFO    ] cyclic_check check, all_tenant_ids: ['t1234','t4567'] old tenant_ids: ['t1234']
[2023-05-03 14:50:05,889 INFO    ] Subscription already exists: subt4567
...
[2023-05-03 14:50:05,994 INFO    ] Subscription already exists: subt4567
[2023-05-03 14:50:06,009 INFO    ] Subscription already exists: subt4567
[2023-05-03 14:50:06,009 INFO    ] start listen_websocket
[2023-05-03 14:50:06,009 INFO    ] Subscriptions for t4567 created
[2023-05-03 14:50:06,050 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d190f10>
[2023-05-03 14:50:07,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:07] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:17,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:17] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:18,457 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:19,458 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:19,458 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:20,458 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:20,458 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:21,458 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:21,459 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:22,287 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:22] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:22,459 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:22,459 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:23,459 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:23,459 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:24,460 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:24,460 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:25,460 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:25,460 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:26,460 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:26,461 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:27,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:27] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:27,461 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:27,461 INFO    ] Websocket closed. Reconnecting...
[2023-05-03 14:50:28,461 ERROR   ] Reconnecting websocket failed
[2023-05-03 14:50:28,461 INFO    ] refresh token and try again ...
[2023-05-03 14:50:28,482 INFO    ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5cedbd10>
[2023-05-03 14:50:28,491 INFO    ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:50:37,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:37] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:47,100 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:47] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:52,287 INFO    ] 10.64.5.25 - - [03/May/2023 14:50:52] "GET /health HTTP/1.1" 200 -

Hi @simeon.mendgen

the token is not important and there might be a misunderstanding how to use them. You can only use them once to connect to the WS endpoint for a specific time. If already used or expired you have to get a new one, if the websocket connection is fully terminated. That’s the reason your reconnects fail.

The main question is why your websocket connection get’s terminated after around 1 minute. My guess is still that the ping pong does not work properly with notification API / web socket server.

In my client I also activated the ping-pong by setting the connectionLostTimeout(30) but this still leads to the fact that after around 1 minute my connection was terminated by the server. So I’m sending the ping manually very minute and now my connection is stable. It might be an issue with the websocket server on Cumulocity side, but would be good if we could double check if this at least as a workaround resolves your disconnects as well.

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.