Microservice Issue
- I am creating a micrservice in python flask.
- I have successfully deployed one in the past using the same code with different topic names.
{'subscription_url': 'http://cumulocity:8111/notification2/subscriptions',
'tenant': 't7944',
'token_url': 'http://cumulocity:8111/notification2/token',
'user': 'service_elmmlmanagement',
'username': 't7944/service_elmmlmanagement'}
- Most importantly, it is not catching event with topic el_DemandManagement
tedge mqtt pub 'te/device/main///e/el_DemandManagement' '{"text": "LocUpdate"}'
- The standard routs work fine
service/eldemand/environemnt
Any Assistance would be wonderful as the microservice work locally.
cumulocity.json
{
"apiVersion": "1",
"version": "0.0.6",
"provider": {
"name": "Cumulocity"
},
"isolation": "PER_TENANT",
"requiredRoles": [
"ROLE_NOTIFICATION_2_ADMIN",
"ROLE_INVENTORY_ADMIN",
"ROLE_INVENTORY_CREATE",
"ROLE_INVENTORY_READ",
"ROLE_MANAGED_OBJECT_ADMIN",
"ROLE_MANAGED_OBJECT_CREATE",
"ROLE_MANAGED_OBJECT_READ",
"ROLE_MEASUREMENT_ADMIN",
"ROLE_MEASUREMENT_READ",
"ROLE_ALARM_ADMIN",
"ROLE_ALARM_READ",
"ROLE_DEVICE_CONTROL_READ",
"ROLE_EVENT_ADMIN",
"ROLE_EVENT_READ",
"ROLE_GENERIC_MQTT_ADMIN",
"ROLE_IDENTITY_READ"
],
"roles": [
]
}
Microservice Error
START: ws.run_forever(ping_interval=30)
ERROR:root:Error receiving message: scheme http is invalid
### closed ###
Close status code: None
Close reason: None
END: ws.run_forever(ping_interval=30)
Microservice Implementation
bearer_token = os.getenv("C8Y_BEARER_TOKEN")
tenant = os.getenv('C8Y_TENANT')
user = os.getenv('C8Y_USER', "")
password = os.getenv('C8Y_PASSWORD')
username = f"{tenant}/{user}"
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
auth = HTTPBasicAuth(user, password)
BASE_URL = os.getenv("BASE_URL", "http://cumulocity:8111")
# MICROSERVICE SETTINGS AND NAMES
subscription_url = f"{BASE_URL}/notification2/subscriptions"
token_url = f"{BASE_URL}/notification2/token"
tenant = os.getenv('C8Y_TENANT')
user = os.getenv('C8Y_USER')
password = os.getenv('C8Y_PASSWORD')
username = f"{tenant}/{user}"
subscription = "eldemand"
subscriber = f"{tenant}{subscription}microservice"
subscription_name = f"{tenant}{subscription}"
topics = "el_DemandManagement"
fragmentsToCopy = ["cropId", "deviceId", "payload", "status", "action", "text", "type", "el_DemandManagement"]
object_type = ["events"]
subscription_data = {
"context": "tenant",
"subscription": subscription_name,
"subscriptionFilter": {
"apis": object_type,
"typeFilter": topics
},
"fragmentsToCopy": fragmentsToCopy
}
token_data = {
"subscriber": subscriber,
"subscription": subscription_name
}
print("SUBSCRIPTION")
pprint.pprint(subscription_data)
env_vars = {
"subscription_url": subscription_url,
"token_url": token_url,
"tenant": tenant,
"user": user,
"username": username
}
print("ENVIRONMENT VARIABLES")
pprint.pprint(env_vars)
# MESSAGE MANAGEMENT
def on_message(ws, event):
# PRINT MESSAGE DETAILS
if DEBUG: print("----------------------------------------------------")
if DEBUG: print("Message Received: ", type(event))
if DEBUG: pprint.pprint(event)
if DEBUG: print("----------------------------------------------------")
# CHECK MESSAGE CONTENTS
event = message_check(event)
if event is False: return False
# IDENTIFY EVENT TYPE
data = event
demand = Demands(data["payload"], data["source"]["id"])
validate = demand.validate()
if "error" in validate and validate["error"] is True:
logging.error(f"Failed to validate demand: {validate}")
return validate
investigate = demand.investigate()
if "error" in investigate and investigate["error"] is True:
logging.error(f"Failed to investigate demand: {investigate}")
return investigate
analyse = demand.analyse()
if "error" in analyse and analyse["error"] is True:
logging.error(f"Failed to analyse demand: {analyse}")
return analyse
process = demand.process()
if "error" in process and process["error"] is True:
logging.error(f"Failed to process demand: {process}")
return process
if DEBUG: print("----------------------------------------------------")
if DEBUG: print("####################################################")
# WEB SOCKET MANAGEMENT
def on_error(ws, error):
logging.error(f"Error receiving message: {error}")
def on_close(ws, close_status_code, close_reason):
logging.info("### closed ###")
if DEBUG: print("### closed ###")
logging.info("Close status code:", close_status_code)
if DEBUG: print("Close status code:", close_status_code)
logging.info("Close reason:", close_reason)
if DEBUG: print("Close reason:", close_reason)
def on_open(ws):
logging.info("### websocket on_open ###")
if DEBUG: print("### websocket on_open ###")
def run_websocket_client():
websocket.enableTrace(True)
cumulocity_url = f"{BASE_URL}/notification2/consumer/"
if not check_subscription():
logging.info("Creating Subscription")
if DEBUG: print("Creating Subscription")
create_subscription()
else:
logging.info("Not Creating Subscription, Subscription may already exists")
if DEBUG: print("Not Creating Subscription, Subscription may already exists")
logging.info(f"Subscriptions: {get_subscriptions()}")
if DEBUG: print(f"Subscriptions: {get_subscriptions()}")
token_info = create_token()
if token_info:
logging.info(f"Token created successfully: {token_info}")
if DEBUG: print(f"Token created successfully: {token_info}")
ws_url = cumulocity_url + "?token=" + urllib.parse.quote(token_info['token'])
logging.info(f"Starting WebSocket Client:, {ws_url}")
if DEBUG: print(f"Starting WebSocket Client:, {ws_url}")
ws = websocket.WebSocketApp(ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
try:
logging.info("START: ws.run_forever(ping_interval=30)")
if DEBUG: print("START: ws.run_forever(ping_interval=30)")
ws.run_forever(ping_interval=30)
logging.info("END: ws.run_forever(ping_interval=30)")
if DEBUG: print("END: ws.run_forever(ping_interval=30)")
except Exception as e:
logging.error(f"Error running WebSocket Client: {e}")
else:
logging.error("Failed to obtain token.")
# SUBSCRIPTION MANAGEMENT
def check_subscription():
# response = requests.get(subscription_url, headers=headers, auth=(username, password))
response = requests.get(subscription_url, headers=headers)
if response.ok:
logging.info("retrieved subscriptions")
if DEBUG: print("retrieved subscriptions")
try:
response_dict = response.json()
subscriptions = response_dict.get('subscriptions', [])
logging.info(f"Number of subscriptions: {len(subscriptions)}")
if DEBUG: print(f"Number of subscriptions: {len(subscriptions)}")
for subscription in subscriptions:
if subscription.get('subscription') == subscription_data['subscription']:
logging.info(f"Deleting Subscription: {subscription['id']}")
if DEBUG: print(f"Deleting Subscription: {subscription['id']}")
del_subscription = delete_subscription(subscription['id'])
if del_subscription:
logging.info("Deleted Subscription")
if DEBUG: print("Deleted Subscription")
return False
else:
logging.info("Failed to delete Subscription")
if DEBUG: print("Failed to delete Subscription")
return True
except ValueError as e:
logging.error(f"Error parsing JSON response: {str(e)}")
return False
else:
logging.error(f"Failed to retrieve subscriptions. Status code: {response.status_code}")
return False
def get_subscriptions():
# response = requests.get(subscription_url, auth=(username, password))
response = requests.get(subscription_url, headers=headers)
if response.ok:
try:
response_dict = response.json()
subscriptions = response_dict.get('subscriptions', [])
logging.info(f"Number of subscriptions: {len(subscriptions)}")
if DEBUG: print(f"Number of subscriptions: {len(subscriptions)}")
return subscriptions
except ValueError as e:
logging.error(f"Error parsing JSON response: {str(e)}")
return []
else:
logging.error(f"Failed to retrieve subscriptions. Status code: {response.status_code}")
return []
def find_subscription_by_name(subscriptions, name):
for sub in subscriptions:
if sub.get('subscription') == name:
logging.info(f"Found subscription: {name}")
if DEBUG: print(f"Found subscription: {name}")
return sub
logging.info("Failed to find subscription:")
if DEBUG: print("Failed to find subscription:")
return None
def create_subscription():
# response = requests.post(subscription_url, json=subscription_data, auth=(username, password))
response = requests.post(subscription_url, json=subscription_data, headers=headers)
logging.info(f"Status code: {response.status_code}")
logging.info(f"Response content: {response.content}")
if DEBUG: print(f"Status code: {response.status_code}")
if DEBUG: print(f"Response content: {response.content}")
response_json = {"id": "Not Found"}
try:
print(response)
print(type(response))
# response_json = response.json()
logging.info(f"Subscription: {response_json}")
except ValueError:
logging.error(f"Response is not in JSON format | Response content: {response.content}")
logging.info(f"Response headers: {response.headers}")
if DEBUG: print(f"Response headers: {response.headers}")
logging.info(f"Response text: {response.text}")
if DEBUG: print(f"Response text: {response.text}")
if response.ok:
logging.info(f"Subscription ID: {response_json}")
else:
logging.error(f"Failed to create subscription. Status code: {response.status_code}")
def create_token():
response = requests.post(token_url, json=token_data, auth=(username, password))
if response.ok:
logging.info(f"Token created successfully. Status code: {response.text}")
if DEBUG: print(f"Token created successfully. Status code: {response.text}")
return response.json()
else:
logging.error(f"Failed to create token. Status code: {response.status_code}")
return None
def delete_subscription(subscription_id):
delete_url = f"{subscription_url}/{subscription_id}"
response = requests.delete(delete_url, auth=(username, password))
if response.ok:
logging.info(f"Status code: {response.status_code}")
if DEBUG: print(f"Status code: {response.status_code}")
return True
else:
logging.error(f"Failed to delete subscription. Status code: {response.status_code}")
return False
# LOAD CUMULOCITY MICROSERVICE VERSION NUMBER
with open('cumulocity.json') as f:
cumulocity_config = json.load(f)
version = cumulocity_config.get('version')
logging.info(f"Cumulocity Microservice Version: {version}")
if DEBUG: print(f"Cumulocity Microservice Version: {version}")
Full Log
DEBUG MODE: False <class 'bool'>
Password: sz5Oq5pBM43TOjMRf85lY4Y95KzsoSKc
SUBSCRIPTION
{'context': 'tenant',
'fragmentsToCopy': ['cropId',
'deviceId',
'payload',
'status',
'action',
'text',
'type',
'el_DemandManagement'],
'subscription': 't31727851eldemand',
'subscriptionFilter': {'apis': ['events'],
'typeFilter': 'el_DemandManagement'}}
ENVIRONMENT VARIABLES
{'subscription_url': 'http://cumulocity:8111/notification2/subscriptions',
'tenant': 't31727851',
'token_url': 'http://cumulocity:8111/notification2/token',
'user': 'service_eldemands',
'username': 't31727851/service_eldemands'}
Cumulocity Microservice Version: 0.0.6
* Serving Flask app 'application' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
WARNING:werkzeug: * Running on all addresses.
WARNING: This is a development server. Do not use it in a production deployment.
INFO:werkzeug: * Running on http://10.244.164.121:80/ (Press CTRL+C to quit)
retrieved subscriptions
Number of subscriptions: 5
Deleting Subscription: 779673
Status code: 204
Deleted Subscription
Creating Subscription
Status code: 201
Response content: b'{"nonPersistent":false,"subscriptionFilter":{"apis":["events"],"typeFilter":"el_DemandManagement"},"context":"tenant","self":"http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065","fragmentsToCopy":["cropId","deviceId","payload","status","action","text","type","el_DemandManagement"],"subscription":"t31727851eldemand","id":"792065","source":{"self":"http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851","id":"t31727851"}}'
<Response [201]>
<class 'requests.models.Response'>
Response headers: {'Date': 'Wed, 09 Oct 2024 07:34:00 GMT', 'Content-Type': 'application/vnd.com.nsn.cumulocity.subscription+json;charset=UTF-8;ver=0.9', 'Content-Length': '459', 'Connection': 'keep-alive', 'Location': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065', 'X-Content-Type-Options': 'nosniff', 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate', 'Pragma': 'no-cache', 'Expires': '0', 'X-Frame-Options': 'DENY'}
Response text: {"nonPersistent":false,"subscriptionFilter":{"apis":["events"],"typeFilter":"el_DemandManagement"},"context":"tenant","self":"http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065","fragmentsToCopy":["cropId","deviceId","payload","status","action","text","type","el_DemandManagement"],"subscription":"t31727851eldemand","id":"792065","source":{"self":"http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851","id":"t31727851"}}
Number of subscriptions: 5
Number of subscriptions: 5
Subscriptions: [{'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': "'el_Stocktake'"}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/687208', 'subscription': 'allevents', 'id': '687208', 'fragmentsToCopy': ['el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_Stocktake'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/688230', 'subscription': 'elStocktakeEvents', 'id': '688230', 'fragmentsToCopy': ['el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_Stocktake'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/693341', 'subscription': 'allStocktakeEvents', 'id': '693341', 'fragmentsToCopy': ['text', 'type', 'c8y_Position', 'el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_test_config'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/791358', 'subscription': 'allConfigurationTest', 'id': '791358', 'fragmentsToCopy': ['text', 'type', 'c8y_Position', 'el_test_config'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_DemandManagement'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065', 'subscription': 't31727851eldemand', 'id': '792065', 'fragmentsToCopy': ['cropId', 'deviceId', 'payload', 'status', 'action', 'text', 'type', 'el_DemandManagement'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}]
Token created successfully. Status code: {"token":""}
Token created successfully: {'token': ''}
Starting WebSocket Client:, http://cumulocity:8111/notification2/consumer/?token=...
START: ws.run_forever(ping_interval=30)
ERROR:root:Error receiving message: scheme http is invalid
### closed ###
Close status code: None
Close reason: None
END: ws.run_forever(ping_interval=30)