Microservice: scheme http is invalid

Microservice Issue

  • I am creating a micrservice in python flask.
  • I have successfully deployed one in the past using the same code with different topic names.
{'subscription_url': 'http://cumulocity:8111/notification2/subscriptions',
 'tenant': 't7944',
 'token_url': 'http://cumulocity:8111/notification2/token',
 'user': 'service_elmmlmanagement',
 'username': 't7944/service_elmmlmanagement'} 
  • Most importantly, it is not catching event with topic el_DemandManagement
    • tedge mqtt pub 'te/device/main///e/el_DemandManagement' '{"text": "LocUpdate"}'
  • The standard routs work fine service/eldemand/environemnt

Any Assistance would be wonderful as the microservice work locally.

cumulocity.json

{
    "apiVersion": "1",
    "version": "0.0.6",
    "provider": {
        "name": "Cumulocity"
    },
    "isolation": "PER_TENANT",
    "requiredRoles": [
        "ROLE_NOTIFICATION_2_ADMIN",
        "ROLE_INVENTORY_ADMIN",
        "ROLE_INVENTORY_CREATE",
        "ROLE_INVENTORY_READ",
        "ROLE_MANAGED_OBJECT_ADMIN",
        "ROLE_MANAGED_OBJECT_CREATE",
        "ROLE_MANAGED_OBJECT_READ",
        "ROLE_MEASUREMENT_ADMIN",
        "ROLE_MEASUREMENT_READ",
        "ROLE_ALARM_ADMIN",
        "ROLE_ALARM_READ",
        "ROLE_DEVICE_CONTROL_READ",
        "ROLE_EVENT_ADMIN",
        "ROLE_EVENT_READ",
        "ROLE_GENERIC_MQTT_ADMIN",
        "ROLE_IDENTITY_READ"
    ],
    "roles": [
    ]
}

Microservice Error

START: ws.run_forever(ping_interval=30)
ERROR:root:Error receiving message: scheme http is invalid
### closed ###
Close status code: None
Close reason: None
END: ws.run_forever(ping_interval=30)

Microservice Implementation

bearer_token = os.getenv("C8Y_BEARER_TOKEN")
tenant = os.getenv('C8Y_TENANT')
user = os.getenv('C8Y_USER', "")
password = os.getenv('C8Y_PASSWORD')
username = f"{tenant}/{user}"
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
auth = HTTPBasicAuth(user, password)
BASE_URL = os.getenv("BASE_URL", "http://cumulocity:8111")

# MICROSERVICE SETTINGS AND NAMES

subscription_url = f"{BASE_URL}/notification2/subscriptions"
token_url = f"{BASE_URL}/notification2/token"
tenant = os.getenv('C8Y_TENANT')
user = os.getenv('C8Y_USER')
password = os.getenv('C8Y_PASSWORD')
username = f"{tenant}/{user}"

subscription = "eldemand"
subscriber = f"{tenant}{subscription}microservice"
subscription_name = f"{tenant}{subscription}"

topics = "el_DemandManagement"
fragmentsToCopy = ["cropId", "deviceId", "payload", "status", "action", "text", "type", "el_DemandManagement"]
object_type = ["events"]


subscription_data = {
    "context": "tenant",
    "subscription": subscription_name,
    "subscriptionFilter": {
        "apis": object_type,
        "typeFilter": topics
    },
    "fragmentsToCopy": fragmentsToCopy
}

token_data = {
    "subscriber": subscriber,
    "subscription": subscription_name
}

print("SUBSCRIPTION")
pprint.pprint(subscription_data)

env_vars = {
    "subscription_url": subscription_url,
    "token_url": token_url,
    "tenant": tenant,
    "user": user,
    "username": username
}
print("ENVIRONMENT VARIABLES")
pprint.pprint(env_vars)

# MESSAGE MANAGEMENT
def on_message(ws, event):
  
    # PRINT MESSAGE DETAILS

    if DEBUG: print("----------------------------------------------------")
    if DEBUG: print("Message Received: ", type(event))
    if DEBUG: pprint.pprint(event)
    if DEBUG: print("----------------------------------------------------")
    # CHECK MESSAGE CONTENTS
    event = message_check(event)
    if event is False: return False

    # IDENTIFY EVENT TYPE
    data = event
    demand = Demands(data["payload"], data["source"]["id"])
    validate = demand.validate()
    if "error" in validate and validate["error"] is True:
        logging.error(f"Failed to validate demand: {validate}")
        return validate
    investigate = demand.investigate()
    if "error" in investigate and investigate["error"] is True:
        logging.error(f"Failed to investigate demand: {investigate}")
        return investigate
    analyse = demand.analyse()
    if "error" in analyse and analyse["error"] is True:
        logging.error(f"Failed to analyse demand: {analyse}")
        return analyse
    process = demand.process()
    if "error" in process and process["error"] is True:
        logging.error(f"Failed to process demand: {process}")
        return process

    if DEBUG: print("----------------------------------------------------")
    if DEBUG: print("####################################################")


# WEB SOCKET MANAGEMENT 
def on_error(ws, error):
    logging.error(f"Error receiving message: {error}")


def on_close(ws, close_status_code, close_reason):
    logging.info("### closed ###")
    if DEBUG: print("### closed ###")
    logging.info("Close status code:", close_status_code)
    if DEBUG: print("Close status code:", close_status_code)
    logging.info("Close reason:", close_reason)
    if DEBUG: print("Close reason:", close_reason)



def on_open(ws):
    logging.info("### websocket on_open ###")
    if DEBUG: print("### websocket on_open ###")


def run_websocket_client():
    websocket.enableTrace(True)
    cumulocity_url = f"{BASE_URL}/notification2/consumer/"

    if not check_subscription():
        logging.info("Creating Subscription")
        if DEBUG: print("Creating Subscription")
        create_subscription()
    else:
        logging.info("Not Creating Subscription, Subscription may already exists")
        if DEBUG: print("Not Creating Subscription, Subscription may already exists")
    logging.info(f"Subscriptions: {get_subscriptions()}")
    if DEBUG: print(f"Subscriptions: {get_subscriptions()}")
    token_info = create_token()
    if token_info:
        logging.info(f"Token created successfully: {token_info}")
        if DEBUG: print(f"Token created successfully: {token_info}")
        ws_url = cumulocity_url + "?token=" + urllib.parse.quote(token_info['token'])
        logging.info(f"Starting WebSocket Client:, {ws_url}")
        if DEBUG: print(f"Starting WebSocket Client:, {ws_url}")
        ws = websocket.WebSocketApp(ws_url,
            on_open=on_open,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close
        )
        try:
            logging.info("START: ws.run_forever(ping_interval=30)")
            if DEBUG: print("START: ws.run_forever(ping_interval=30)")
            ws.run_forever(ping_interval=30)
            logging.info("END: ws.run_forever(ping_interval=30)")
            if DEBUG: print("END: ws.run_forever(ping_interval=30)")
        except Exception as e:
            logging.error(f"Error running WebSocket Client: {e}")
    else:
        logging.error("Failed to obtain token.")


# SUBSCRIPTION MANAGEMENT 
def check_subscription():
    # response = requests.get(subscription_url, headers=headers, auth=(username, password))
    response = requests.get(subscription_url, headers=headers)
    if response.ok:
        logging.info("retrieved subscriptions")
        if DEBUG: print("retrieved subscriptions")
        try:
            response_dict = response.json()
            subscriptions = response_dict.get('subscriptions', [])
            logging.info(f"Number of subscriptions: {len(subscriptions)}")
            if DEBUG: print(f"Number of subscriptions: {len(subscriptions)}")
            for subscription in subscriptions:
                if subscription.get('subscription') == subscription_data['subscription']:
                    logging.info(f"Deleting Subscription: {subscription['id']}")
                    if DEBUG: print(f"Deleting Subscription: {subscription['id']}")
                    del_subscription = delete_subscription(subscription['id'])
                    if del_subscription:
                        logging.info("Deleted Subscription")
                        if DEBUG: print("Deleted Subscription")
                        return False
                    else:
                        logging.info("Failed to delete Subscription")
                        if DEBUG: print("Failed to delete Subscription")
                        return True
        except ValueError as e:
            logging.error(f"Error parsing JSON response: {str(e)}")
            return False
    else:
        logging.error(f"Failed to retrieve subscriptions. Status code: {response.status_code}")
        return False


def get_subscriptions():
    # response = requests.get(subscription_url, auth=(username, password))
    response = requests.get(subscription_url, headers=headers)
    if response.ok:
        try:
            response_dict = response.json()
            subscriptions = response_dict.get('subscriptions', [])
            logging.info(f"Number of subscriptions: {len(subscriptions)}")
            if DEBUG: print(f"Number of subscriptions: {len(subscriptions)}")
            return subscriptions
        except ValueError as e:
            logging.error(f"Error parsing JSON response: {str(e)}")
            return []
    else:
        logging.error(f"Failed to retrieve subscriptions. Status code: {response.status_code}")
        return []


def find_subscription_by_name(subscriptions, name):
    for sub in subscriptions:
        if sub.get('subscription') == name:
            logging.info(f"Found subscription: {name}")
            if DEBUG: print(f"Found subscription: {name}")
            return sub
        logging.info("Failed to find subscription:")
        if DEBUG: print("Failed to find subscription:")
    return None


def create_subscription():
    # response = requests.post(subscription_url, json=subscription_data, auth=(username, password))
    response = requests.post(subscription_url, json=subscription_data, headers=headers)
    logging.info(f"Status code: {response.status_code}")
    logging.info(f"Response content: {response.content}")
    if DEBUG: print(f"Status code: {response.status_code}")
    if DEBUG: print(f"Response content: {response.content}")
    response_json = {"id": "Not Found"}
    try:
        print(response)
        print(type(response))
        # response_json = response.json()
        logging.info(f"Subscription: {response_json}")
    except ValueError:
        logging.error(f"Response is not in JSON format | Response content: {response.content}")

    logging.info(f"Response headers: {response.headers}")
    if DEBUG: print(f"Response headers: {response.headers}")
    logging.info(f"Response text: {response.text}")
    if DEBUG: print(f"Response text: {response.text}")
    if response.ok:
        logging.info(f"Subscription ID: {response_json}")
    else:
        logging.error(f"Failed to create subscription. Status code: {response.status_code}")


def create_token():
    response = requests.post(token_url, json=token_data, auth=(username, password))
    if response.ok:
        logging.info(f"Token created successfully. Status code: {response.text}")
        if DEBUG: print(f"Token created successfully. Status code: {response.text}")
        return response.json()
    else:
        logging.error(f"Failed to create token. Status code: {response.status_code}")
        return None


def delete_subscription(subscription_id):
    delete_url = f"{subscription_url}/{subscription_id}"
    response = requests.delete(delete_url, auth=(username, password))
    if response.ok:
        logging.info(f"Status code: {response.status_code}")
        if DEBUG: print(f"Status code: {response.status_code}")
        return True
    else:
        logging.error(f"Failed to delete subscription. Status code: {response.status_code}")
        return False


# LOAD CUMULOCITY MICROSERVICE VERSION NUMBER
with open('cumulocity.json') as f:
    cumulocity_config = json.load(f)
    version = cumulocity_config.get('version')
    logging.info(f"Cumulocity Microservice Version: {version}")
    if DEBUG: print(f"Cumulocity Microservice Version: {version}")

Full Log

DEBUG MODE: False <class 'bool'>
Password: sz5Oq5pBM43TOjMRf85lY4Y95KzsoSKc
SUBSCRIPTION
{'context': 'tenant',
 'fragmentsToCopy': ['cropId',
                     'deviceId',
                     'payload',
                     'status',
                     'action',
                     'text',
                     'type',
                     'el_DemandManagement'],
 'subscription': 't31727851eldemand',
 'subscriptionFilter': {'apis': ['events'],
                        'typeFilter': 'el_DemandManagement'}}
ENVIRONMENT VARIABLES
{'subscription_url': 'http://cumulocity:8111/notification2/subscriptions',
 'tenant': 't31727851',
 'token_url': 'http://cumulocity:8111/notification2/token',
 'user': 'service_eldemands',
 'username': 't31727851/service_eldemands'}
Cumulocity Microservice Version: 0.0.6
 * Serving Flask app 'application' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
WARNING:werkzeug: * Running on all addresses.
   WARNING: This is a development server. Do not use it in a production deployment.
INFO:werkzeug: * Running on http://10.244.164.121:80/ (Press CTRL+C to quit)
retrieved subscriptions
Number of subscriptions: 5
Deleting Subscription: 779673
Status code: 204
Deleted Subscription
Creating Subscription
Status code: 201
Response content: b'{"nonPersistent":false,"subscriptionFilter":{"apis":["events"],"typeFilter":"el_DemandManagement"},"context":"tenant","self":"http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065","fragmentsToCopy":["cropId","deviceId","payload","status","action","text","type","el_DemandManagement"],"subscription":"t31727851eldemand","id":"792065","source":{"self":"http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851","id":"t31727851"}}'
<Response [201]>
<class 'requests.models.Response'>
Response headers: {'Date': 'Wed, 09 Oct 2024 07:34:00 GMT', 'Content-Type': 'application/vnd.com.nsn.cumulocity.subscription+json;charset=UTF-8;ver=0.9', 'Content-Length': '459', 'Connection': 'keep-alive', 'Location': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065', 'X-Content-Type-Options': 'nosniff', 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate', 'Pragma': 'no-cache', 'Expires': '0', 'X-Frame-Options': 'DENY'}
Response text: {"nonPersistent":false,"subscriptionFilter":{"apis":["events"],"typeFilter":"el_DemandManagement"},"context":"tenant","self":"http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065","fragmentsToCopy":["cropId","deviceId","payload","status","action","text","type","el_DemandManagement"],"subscription":"t31727851eldemand","id":"792065","source":{"self":"http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851","id":"t31727851"}}
Number of subscriptions: 5
Number of subscriptions: 5
Subscriptions: [{'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': "'el_Stocktake'"}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/687208', 'subscription': 'allevents', 'id': '687208', 'fragmentsToCopy': ['el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_Stocktake'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/688230', 'subscription': 'elStocktakeEvents', 'id': '688230', 'fragmentsToCopy': ['el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_Stocktake'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/693341', 'subscription': 'allStocktakeEvents', 'id': '693341', 'fragmentsToCopy': ['text', 'type', 'c8y_Position', 'el_Stocktake'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_test_config'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/791358', 'subscription': 'allConfigurationTest', 'id': '791358', 'fragmentsToCopy': ['text', 'type', 'c8y_Position', 'el_test_config'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}, {'nonPersistent': False, 'subscriptionFilter': {'apis': ['events'], 'typeFilter': 'el_DemandManagement'}, 'context': 'tenant', 'self': 'http://t31727851.apj.cumulocity.com/notification2/subscriptions/792065', 'subscription': 't31727851eldemand', 'id': '792065', 'fragmentsToCopy': ['cropId', 'deviceId', 'payload', 'status', 'action', 'text', 'type', 'el_DemandManagement'], 'source': {'self': 'http://t31727851.apj.cumulocity.com/inventory/managedObjects/t31727851', 'id': 't31727851'}}]
Token created successfully. Status code: {"token":""}
Token created successfully: {'token': ''}
Starting WebSocket Client:, http://cumulocity:8111/notification2/consumer/?token=...
START: ws.run_forever(ping_interval=30)
ERROR:root:Error receiving message: scheme http is invalid
### closed ###
Close status code: None
Close reason: None
END: ws.run_forever(ping_interval=30)

Try to replace http://with ws://
This will only work when deployed to C8Y. Locally you need to use the full tenant domain.

Thanks for your reply.

I made the changes BASE_URL = os.getenv("BASE_URL", "ws://cumulocity:8111"); however, I get a new error where no subscriptions were found or created:

File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 732, in get_adapter
raise InvalidSchema("No connection adapters were found for {!r}".format(url))
requests.exceptions.InvalidSchema: No connection adapters were found for 'ws://cumulocity:8111/notification2/subscriptions'

Ok, for connecting with a websocket client you need to use ws://. For REST clients you need to use http://

1 Like