Refreshing access tokens in EPL apps without redeploying

EPL apps provide a way to add your own logic according to the use cases into Cumulocity IoT in the form of a monitor file. One such use case could be talking to HTTP endpoints secured with authorization protocols which involve access tokens. This post talks about the problems and the possible solution for this particular use case.

Usually these access tokens are only valid for a certain period of time and then need to be refreshed with the authorization server once expired. In those scenarios, hard coding the token in an EPL app demands the author to redeploy the app with the updated value of the access token every time the token expires. This post provides a possible solution about how we can refresh the access token in the EPL app without having to manually modify or redeploy the app. Furthermore, we can configure the app to provide an early notification of a token that is about to expire by logging a warning, triggering the refresh-token subroutine, or sending an SMS/email to the personnel responsible for managing the tokens.

To achieve this, we need to introduce a TokenFactory which will be responsible for providing token updates to all the consumers which are interested in various tokens. This TokenFactory itself will be listening for token updates that are POSTed by the token manager (personnel responsible for refreshing the token values). The following flow diagram shows the basic interaction between a token consumer and TokenFactory.

To refresh the access token in a running EPL app, the token manager needs to POST a Cumulocity IoT event with the updated value of the access token to the TokenFactory. To make sure that this event is not stored anywhere and is only forwarded to the Apama-ctrl microservice, we should add the following header to our POST request:

X-Cumulocity-Processing-Mode: CEP

For more information on processing modes, see the Cumulocity IoT – HTTP Usage/Processing modes.

The EPL implementation for the TokenFactory shown above is available as the following GitHub repository: working_with_access_tokens_in_eplapps. Here are some snippets from the repository code with explanation:

/**
 * @private
 * Internal event TokenRegistrationRequest
 */
event TokenRegistrationRequest {
    constant string SEND_CHANNEL := "TOKEN_FACTORY_IN_CHANNEL";
    string eventType;
    integer reqId;
}

/**
 * @private
 * Internal event TokenRegistrationResponse
 */
event TokenRegistrationResponse {
    constant string SUBSCRIBE_CHANNEL := "TOKEN_FACTORY_OUT_CHANNEL";
    integer reqId;
    string token;
}

/**
 * @private
 * Internal event TokenUpdate
 */
event TokenUpdate {
    constant string SUBSCRIBE_CHANNEL := "TOKEN_FACTORY_OUT_CHANNEL";
    string eventType;
    string token;
}

/**
 * @private
 * Internal event RegistrationCancellationReq
 */
event RegistrationCancellationReq {
    constant string SEND_CHANNEL := "TOKEN_FACTORY_IN_CHANNEL";
    string eventType;
}
/**
 * This monitor is responsible for providing token updates to all of the
 * Token consumers which have registered to updates using Token.registerInterest
 */
monitor TokenFactory {

    action onload() {
        log "Loaded monitor TokenFactory" at INFO;
        spawn serveTokenRelatedRequests() to context("new_context");
    }
    
    event CacheEntry {
        string token;
        float expiration;
        integer consumers;
    }

    event StopEventListener {
        string type;
    }

    dictionary<string, CacheEntry> tokenCache;
    
    /**
     * Listens for all Token related requests and keep sending token refresh
     * notifications to respective consumers using TokenUpdates
     */
    action serveTokenRelatedRequests() {
        monitor.subscribe(TokenRegistrationRequest.SEND_CHANNEL);
        monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

        on all TokenRegistrationRequest() as req {
            if not tokenCache.hasKey(req.eventType) {
                // Add a new cachedEntry in the caches with an invalid token
                tokenCache.add(req.eventType, CacheEntry("", 0.0, 1));

                on all Event(type=req.eventType) as e and not StopEventListener(type=req.eventType) {
                    // if "token" and/or "expirationTime" fields are missing, consider it as an expired token
                    string token := "";
                    float expirationTime := e.params.getOr("expirationTime", 0.0).valueToString().toFloat();
                    // Read the token only if it is still valid
                    if (expirationTime > currentTime) {
                        token := e.params.getOr("token", "").valueToString();
                    }
                    updateAndNotify(req.eventType, token, expirationTime);
                    startExpiryTimer(req.eventType);
                }
            } else {
                // Increment the number of consumers by 1
                tokenCache[req.eventType].consumers := tokenCache[req.eventType].consumers + 1;
            }
            // Always respond with the cached value even if it is invalid.
            // A new TokenUpdate notification will be sent on every future refresh
            send TokenRegistrationResponse(req.reqId, tokenCache[req.eventType].token)
                 to TokenRegistrationResponse.SUBSCRIBE_CHANNEL;
        }

        on all RegistrationCancellationReq() as req {
            string cacheKey := req.eventType;
            if not tokenCache.hasKey(cacheKey) {
                log "Trying to unregisterInterest with an invalid TokenRegistrar."
                    + " Use TokenRegistrar.registerInterest() to get a valid registrar" at WARN;
                return;
            }

            CacheEntry cachedEntry := tokenCache[cacheKey];
            cachedEntry.consumers := cachedEntry.consumers - 1;
            if (cachedEntry.consumers = 0) {
                send StopEventListener(req.eventType) to context.current();
                tokenCache.remove(req.eventType);
            }
        }
    }

    /** 
     * Starts an expiration timer on a cache key.
     */
    action startExpiryTimer(string key) {
        // we can configure an additional wait to log an early warning, trigger
        // a refresh-token subroutine or notify the person responsible via
        // SMS/EMAIL

        on wait(tokenCache[key].expiration - currentTime)
                and not StopEventListener(type=key) {
            // check if the token was refreshed during this wait
            CacheEntry cachedEntry := tokenCache[key];
            if (currentTime >= cachedEntry.expiration) {
                log "Token with key=" + key + " has expired." at WARN;
                // set token value as empty which implies invalid token
                cachedEntry.token := "";

                // Notify the consumers of this token
                send TokenUpdate(key, cachedEntry.token)
                     to TokenRegistrationResponse.SUBSCRIBE_CHANNEL;
            }
        }
    }

    /**
     * Updates the caches on any token refreshes received as events and
     * notifies the consumers by sending a TokenUpdate with eventType=key.
     */
    action updateAndNotify(string key, string tokenValue, 
                           float expirationTime) {
        CacheEntry cachedEntry := tokenCache[key];
        cachedEntry.token := tokenValue;
        cachedEntry.expiration := expirationTime;
        send TokenUpdate(key, cachedEntry.token)
             to TokenUpdate.SUBSCRIBE_CHANNEL;
    }
}

The above code expects a token consumer to TokenRegistrar.registerInterest in a particular token with eventType as the unique identifier for the Cumulocity IoT event which will carry the refreshed value of the token of interest. Multiple consumers can register for updates to the same token. Any updates to a particular token via a Cumulocity IoT event will trigger callback notifications to only the interested consumers. If a token does not receive an update and expires, a warning is logged and all consumers get a notification callback with an empty string as the token value, which implies an invalid token. Consumers can take appropriate actions for invalid tokens.

Here is an example of a token consumer:

/**
 * This monitor is a sample consumer of an access token. It registers its
 * interest in token and keeps getting updates callback for that token.
 */
monitor SampleConsumer {

    string token := "";
    TokenRegistrar registrar;

    event TokenRefreshed {}
    event LoopEvent {}

    integer loopCounter := 0;

    action onload() {
        log "Loaded monitor SampleConsumer" at INFO;
        spawn startConsumingToken() to context("some_context");
    }

    action refreshToken(string tokenValue) {
        token := tokenValue;
        if  (token != "") {
            send TokenRefreshed() to context.current();
        }
    }

    action startConsumingToken() {
        // Register for token updates
        registrar := TokenRegistrar.registerInterest("token_tag_for_uri_abc",
                                                     refreshToken);

        // Start using token
        on all LoopEvent() {
            log currentTime.toString() at INFO;
            if ("" = token) {
                log "Consumer: Cannot use token as it has expired and invalid."
                    + " I will wait for a Token refresh before using it"
                    at WARN;
                on TokenRefreshed() {
                    useToken();
                }
            } else {
                useToken();
            }
        }
        send LoopEvent() to context.current();
    }

    action useToken() {
        // we can use this valid token in our requests here
        log "Consumer: Found a valid token: " + token at INFO;

        loopCounter := loopCounter + 1;
        // Some condition after which we no longer need token and 
        // can stop getting updates
        if (loopCounter = 20) {
            stopGettingUpdates();
        } else {
            log currentTime.toString() at INFO;
            // Add a small delay before the next token use
            on wait(5.0) {
                send LoopEvent() to context.current();
            }
        }
    }

    action stopGettingUpdates() {
        // Avoid potential listener leaks.
        registrar.unregisterInterest();
    }
}

The above code expects the token publisher to send token updates via a Cumulocity IoT event which should have the following information in the event body:

  • event.type = "uniqueType_for_redirect_uri_ABC"
  • token = "some_token_value"
  • expirationTime = expiry_timestamp_for_token *

* Access tokens are usually provided with an expiration period, for example, 3600 seconds. The expirationTime in the event should be a value equal to the token generation time plus the expiration period. It should be provided as a timestamp value equal to the number of seconds and fractional seconds elapsed since midnight, January 1, 1970 UTC.

Sending a token update to the TokenFactory

Based on the authorization protocol used to secure the endpoint, we can generate an access token using protocol-specific request-response cycles. Then we need to create a Cumulocity IoT event to POST this token as an update to the TokenFactory . For example, we can use curl to create an event:

curl --location --request POST "https://<TENANT_DOMAIN>/event/events" \
--header "Authorization: Basic <AUTHORIZATION>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "X-Cumulocity-Processing-Mode: CEP" \
--data-raw '<Payload>

Note the header X-Cumulocity-Processing-Mode: CEP in the curl request. You should use this header to make sure that this event is not stored anywhere and is only forwarded to (or available to) the cep endpoint in the Apama-ctrl microservice.

The <Payload> in the above request is a JSON body. It should contain all required fields for a Cumulocity IoT event as shown below. The type of the event should be set to the eventType that was provided while registering interest in any token. The payload should also contain two extra fields: token and expirationTime which are read in TokenFactory .

{
    "source":
    {
        "id": <any_managed_object_id>
    },
    "text": "An event",
    "time": "2021-10-19T12:03:27.845Z",
    "type": "TOKEN_A_TAG",
    "token": "_SOME_VALID_TOKEN_",
    "expirationTime": <expiration_timestamp>
}

For more details on how to create a Cumulocity IoT event, see the Cumulocity IoT – OpenAPI Specification.

If the process of generating a new access token does not require human interaction like in a multi-factor authentication setup, then the process of generating and posting a token update to Apama can be entirely automated as some sort of small script.

Making the TokenFactory available to all EPL apps which use tokens

If we deploy EPL code as an EPL app, that code is not available to other EPL apps. So in order to share the TokenFactory with all other EPL apps, we must upload it as an extension. For details on how to upload an extension, see the section “Uploading a block to Analytics Builder” in the Knowlegde base article Analytics Builder Block SDK which is available from the Tech Community.

Links