Hi, and thank you all for your suggestions,
after many different attempts, the issue still remains, the websocket connection gets closed before token is expired. I wrapped the listener in a new token generation and reconnection so it works for now.
@Stefan_Witschel the ping is already done by the library i use: Timeouts - websockets 12.0.dev12+gfe629de.dirty documentation . I can see it in the debug log.
@yiz
thank you for the explanation and the info about 10.16.
I reduced to use one connection and token per tenant but the issue remains. So it has nothing to do with that there are to many connections, what my first guess was.
CODE
This is the code i use now, with a generation of a new token after 10 failed reconnections With that new token and new connection it is possible to connect again. This should also cover token expiration.
from threading import Thread, Event
import asyncio
import logging
from c8y_api.app import CumulocityApi
from c8y_api.model import Subscription
import websockets
import time
def subscribe(c8y: CumulocityApi, sub_name: str, callback, event: Event) -> Thread:
"""
subscribe to a beforehand crated subscription. Starts a listener thread and returns it.
"""
async def handle_notification(
c8y: CumulocityApi, callback: callable, event: Event
): # python-c8y-api library listener throws SSL Error, this listener not
while not event.is_set():
try_reconnect = 10
token = c8y.notification2_tokens.generate(
subscription=sub_name, expires=3
)
uri = f"{c8y.base_url.replace('http','ws')}/notification2/consumer/?token={token}"
log.debug(f"websocket uri {uri}")
try:
async with websockets.connect(uri) as websocket:
while not event.is_set():
try:
log.info(f"Websocket connected: {str(websocket)}")
async for message in websocket:
log.debug(f"Received message: {message}")
if "CREATE" in message.split("\n")[2]:
callback(message.split("\n")[-1])
# ACK message
msg_id = message.split("\n")[0]
await websocket.send(msg_id)
except websockets.ConnectionClosed:
log.info("Websocket closed. Reconnecting...")
try_reconnect = try_reconnect - 1
time.sleep(1)
if try_reconnect > 0:
continue
else:
raise ConnectionError(
"Reconnecting websocket failed"
)
except ConnectionError as e:
log.error(e)
log.info("refresh token and try again ...")
continue
def listen_websocket(c8y, callback, event):
log.info("start listen_websocket")
asyncio.run(handle_notification(c8y, callback, event)) # blocking
log.info("end listen_websocket")
_listener_thread = Thread(
target=listen_websocket,
args=(
c8y,
callback,
event,
),
)
_listener_thread.start()
return _listener_thread
LOG
This is the log of the start of the microservice, followed by some notification of type c8y_ThresholdAlarm_AUTO recived!
from the listener, then another tenant of which the devices get subscribed with a second websocket connection ( Subscriptions for t4567 created
). Then the first websocket connection (identifieable by 0x7f8d5d359cd0
) closes randomly. (after about 1 minute, while token expiration is set to 3)
In the last lines another token is created and 0x7f8d5d359cd0
gets replaced by 0x7f8d5cedbd10
[2023-05-03 14:48:55,175 INFO ] CumulocityApp initialized.
[2023-05-03 14:48:58,445 INFO ] Subscription already exists: subt1234
...
[2023-05-03 14:49:03,141 INFO ] Subscription already exists: subt1234
[2023-05-03 14:49:03,208 INFO ] Subscription already exists: subt1234
[2023-05-03 14:49:03,209 INFO ] start listen_websocket
[2023-05-03 14:49:03,209 INFO ] Subscriptions for t1234 created
* Serving Flask app 'main'
* Debug mode: off
[2023-05-03 14:49:03,213 INFO ] ...
[2023-05-03 14:49:03,213 INFO ] e[33mPress CTRL+C to quite[0m
[2023-05-03 14:49:03,266 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:49:03,267 INFO ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:17,100 INFO ] 10.64.5.25 - - [03/May/2023 14:49:17] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:27,100 INFO ] 10.64.5.25 - - [03/May/2023 14:49:27] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:27,259 INFO ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:37,100 INFO ] 10.64.5.25 - - [03/May/2023 14:49:37] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:47,100 INFO ] 10.64.5.25 - - [03/May/2023 14:49:47] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:49:52,230 INFO ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:49:57,100 INFO ] 10.64.5.25 - - [03/May/2023 14:49:57] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:03,312 INFO ] cyclic_check check, all_tenant_ids: ['t1234','t4567'] old tenant_ids: ['t1234']
[2023-05-03 14:50:05,889 INFO ] Subscription already exists: subt4567
...
[2023-05-03 14:50:05,994 INFO ] Subscription already exists: subt4567
[2023-05-03 14:50:06,009 INFO ] Subscription already exists: subt4567
[2023-05-03 14:50:06,009 INFO ] start listen_websocket
[2023-05-03 14:50:06,009 INFO ] Subscriptions for t4567 created
[2023-05-03 14:50:06,050 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d190f10>
[2023-05-03 14:50:07,100 INFO ] 10.64.5.25 - - [03/May/2023 14:50:07] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:17,100 INFO ] 10.64.5.25 - - [03/May/2023 14:50:17] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:18,457 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:19,458 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:19,458 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:20,458 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:20,458 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:21,458 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:21,459 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:22,287 INFO ] 10.64.5.25 - - [03/May/2023 14:50:22] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:22,459 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:22,459 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:23,459 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:23,459 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:24,460 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:24,460 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:25,460 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:25,460 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:26,460 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:26,461 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:27,100 INFO ] 10.64.5.25 - - [03/May/2023 14:50:27] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:27,461 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5d359cd0>
[2023-05-03 14:50:27,461 INFO ] Websocket closed. Reconnecting...
[2023-05-03 14:50:28,461 ERROR ] Reconnecting websocket failed
[2023-05-03 14:50:28,461 INFO ] refresh token and try again ...
[2023-05-03 14:50:28,482 INFO ] Websocket connected: <websockets.legacy.client.WebSocketClientProtocol object at 0x7f8d5cedbd10>
[2023-05-03 14:50:28,491 INFO ] notification of type c8y_ThresholdAlarm_AUTO recived!
[2023-05-03 14:50:37,100 INFO ] 10.64.5.25 - - [03/May/2023 14:50:37] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:47,100 INFO ] 10.64.5.25 - - [03/May/2023 14:50:47] "GET /health HTTP/1.1" 200 -
[2023-05-03 14:50:52,287 INFO ] 10.64.5.25 - - [03/May/2023 14:50:52] "GET /health HTTP/1.1" 200 -