Microservice Client that connect to multiple devices

I’m busy building a Cumulocity Microservice that will communicate with multiple devices.
Upon startup of the microservice (through a subscription added event) I am retrieving preprovisioned device configuration from the inventory for all devices that match the type “Legacy Siemens S7” to retrieve the connectivity details. Based on the connectivity details I want to make a connection to each of the Siemens PLC’s and poll them for data. I was thinking of creating a thread for each device and have them polling each PLC individually.
This however did not prove to be a successful approach as the newly created threads don’t have access to the credentials of Cumulocity.

Please advise the best approach to take in this situation.

Below an excerpt of one of the devices:

"id": "1136082",
"type": "Legacy Siemens S7",
"name": "SU_X5001",
"c8y_IsDevice": {},
"connectivity": {
    "ipaddress": "192.168.10.40",
    "rack": "0",
    "slot": "2"
},

Here is the simplified coded for the Cumulocity Agent that retrieves the devices and starts creating clients.

@Service
@Slf4j
public class C8YS7Agent {
	
	@EventListener
	public void init(MicroserviceSubscriptionAddedEvent event) {
		log.debug("init(microserviceSubscriptionAdded)");
		
		String myTenant = event.getCredentials().getTenant();
		//Register the agent for the newly subscribed tenant
		subscriptionService.runForTenant(myTenant, () -> {
			registerThisAgent();
                });

               ManagedObjectCollection myDeviceCollection = findS7Devices();
		
		myDeviceCollection.get(MAX_DEVICES).forEach((device) -> {
			log.debug("starting connection to device: " + device.getName());
			
			ManagedObjectRepresentation myDevice = new ManagedObjectRepresentation();
			myDevice.setId(device.getId());
			myDevice.set(device.get("connectivity"), "connectivity");
			myDevice.set(device.get("tags"), "tags");
			
			C8YS7Client myClient = new C8YS7Client(myDevice);
			//myClient.run();
			applicationContext.getAutowireCapableBeanFactory().autowireBean(myClient);
			new Thread(myClient).start();
		});
       }

And here is the code for the S7Client:

public class C8YS7Client implements Runnable {

	//=========================================================================
	//========================= Constructors ==================================
	//=========================================================================
	public C8YS7Client(ManagedObjectRepresentation aDevice) {
		log.debug("C8YS7Client(aDevice: {}", aDevice.getName());
		
		s7Device = aDevice;
		
		@SuppressWarnings("unchecked")
		HashMap<String, Object> myConnectivity = (HashMap<String,Object>)aDevice.get("connectivity");
		String myIPAddress = (String)myConnectivity.get("ipaddress");
		int myRack = Integer.parseInt((String)myConnectivity.get("rack"));
		int mySlot = Integer.parseInt((String)myConnectivity.get("slot"));
		
		ipAddress = myIPAddress;
		rack = myRack;
		slot = mySlot;
	}
	
	//=========================================================================
	//======================= Public functions ================================
	//=========================================================================
	@Override
	public void run() {
		
		
		startGateway();
	}
	
	public void startGateway() {
		log.debug("startGateway()");
		
		int myPollingInterval = 500;
		
		S7Client myS7Client = connectPLC(ipAddress, rack, slot);
		
		if (myS7Client == null) {
			log.error("Unable to connect to PLC {} at {}", s7Device.getName(), ipAddress);
			return;
		}
		
		log.info("Connected to PLC");
		s7Device = registerOrUpdateDevice(myS7Client);  // This is where the fault occurs
		
		while (true) {
			try {
				log.trace("execute Polling cycle");
				
				Thread.sleep(myPollingInterval);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private ManagedObjectRepresentation registerOrUpdateDevice(S7Client aClient) {
		
		S7CpuInfo myCPUInfo = new S7CpuInfo();
		int myStatus = aClient.GetCpuInfo(myCPUInfo);
		if (myStatus == 0 ) {
			
			String mySerialNumber = myCPUInfo.SerialNumber();
			
			s7Device.setName(myCPUInfo.ASName());
			
			Hardware myHardware = new Hardware();
			myHardware.setModel(myCPUInfo.ModuleName());
			myHardware.setSerialNumber(mySerialNumber);
			
			s7Device.set(new IsDevice());
			s7Device.set(myHardware);
			
			s7Device = c8yAgent.updateDevice(s7Device); // c8yAgent is auto wired and has been loaded as we ran the autowire functions for this bean. However the session scope is not available for this thread.
			//c8yAgent.updateSerialNumber(mySerialNumber, s7Device.getId());
			
			return s7Device;

		} else {
			log.error("Unable to retrieve Serial Number from CPU Info");
			throw new SDKException("Unable to retrieve CPUInfo");
		}
	}
	//=========================================================================
	//======================= Private attributes ==============================
	//=========================================================================
	@Autowired
	private C8YS7Agent c8yAgent;

The code above will throw the following exception:

[WARNING] 
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.inventoryApi': Scope 'tenant' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: Not within any context!
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:372)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
	at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
	at jdk.proxy3/jdk.proxy3.$Proxy126.update(Unknown Source)
	at com.softwareag.c8y.s7.agent.C8YS7Agent.updateDevice(C8YS7Agent.java:77)
	at com.softwareag.c8y.s7.client.C8YS7Client.registerOrUpdateDevice(C8YS7Client.java:123)
	at com.softwareag.c8y.s7.client.C8YS7Client.startGateway(C8YS7Client.java:66)
	at com.softwareag.c8y.s7.client.C8YS7Client.run(C8YS7Client.java:50)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: Not within any context!
	at com.cumulocity.microservice.context.ContextServiceImpl.getContext(ContextServiceImpl.java:41)
	at com.cumulocity.microservice.context.annotation.EnableContextSupportConfiguration$1.getContextId(EnableContextSupportConfiguration.java:64)
	at com.cumulocity.microservice.context.scope.BaseScope.doGetSynchronized(BaseScope.java:75)
	at com.cumulocity.microservice.context.scope.BaseScope.get(BaseScope.java:67)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:360)
	... 9 more

Hi Misja,

in case of creating new threads the tenant-context gets lost, so that the microservice does not know to which tenant to connect anymore and which credentials to use.
However there is an easy way to run a new thread within the tenant-context again.
Therefor you need to use the MicroserviceSubscriptionsService and pass the tenant into the new thread.
Hence I would suggest the following.

  1. When creating the runnable add the tenant as parameter

C8YS7Agent:

C8YS7Client myClient = new C8YS7Client(myDevice, myTenant);
  1. When invoking the C8YS7Agent use the MicroserviceSubscriptionsService

C8YS7Client:

@Autowired
private MicroserviceSubscriptionsService subscriptionsService;

[...]
subscriptionService.runForTenant(myTenant, () -> {
	s7Device = c8yAgent.updateDevice(s7Device);
});

Regards Kai