How to INVOKE Integration Service services from "outside of a service context", but still from the same JVM?

Product/components used and version/fix level:

Integration Server 10.11 and 10.15

Detailed explanation of the problem:

I’m playing with RabbitMQ (surely many of you have already payed with it). however, when it comes to the subcribing a queue, comes a tricky part: when a message arrives, it triggers a callback (a DeliverCallback), which can be customized to INVOKE a service published in Integration Server.

However, just calling Service.doInvoke(serviceCName, session, inputIData); is not enough, seems having to create all the “context”.

The Kellton’s blog (How to Integrate RabbitMQ with webMethods? | kellton) and Java Service → ext Java library (rabbit MQ client library → Service.doInvoke fails both shed a light on this subject - but I had no luck making it work.

Here is the portion of the code built so far:

        ...
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String properties = delivery.getProperties().getHeaders().toString();
            String message = new String(delivery.getBody(), "UTF-8");
            
            com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser(username);
            
            Session session = StateManager.createContext(0x7fffffffL, "system", user);
            session.setUser(user);
            session.clearModified();
            
            IData inputIData = IDataFactory.create();
            ...
            cursor.destroy();
            
            try {
                Service.doInvoke(triggerService, session, inputIData);
            } catch (Exception e) {
                e.printStackTrace();
            }
            
            StateManager.deleteContext(session.getSessionID());
            
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        ...

…and also merged

    InvokeState is = new InvokeState();
              
    session = Service.getSession();
    User olduser = session.getUser();
    InvokeState.setCurrentSession(session);
    InvokeState.setSessionUser(olduser);
		  
    output = Service.doInvoke(nsServiceName, is.getSession(), input);

I’ve made several tests, and here are what I’ve found so far:

  • the messages are being drained - the autoAck is set to true (refer to Channel.basicConsume()) and on the “Queue and streams” page (see management plugin) number of “messages ready” and “messages total” inements as new messages are published;
  • but the “trigger service” is not actually invoked; even if the trigger service is the pub.flow:debugLog nothing gets logger in the server.log;
  • I’ve used a sample JavaApp as subscriber using almost the same code, and it shows messages flowing through.

Therefore, what else I’m missing [in order to be able to invoke a service from the callback]?

Error messages / full error message screenshot / log file:

Actually, there is no error message at all.

Hi @Feng_Sian5 -

Interesting topic and I’m almost certain we can get to the bottom of it. But before we go down this rabbit hole (pun intended), let me ask: why not use JMS to integrate with RabbitMQ?

Percio

Did you check the topic below? It is using amqp protocol on azure, but the configuration should be the same.

Thanks for your comments, @Percio_Castro1.
The main reason of not going down the JMS + Apache Qpid method is that - in the current case - the IS is not at the position of making making rules. (Whether this approach is correct or appropriate is out of the scope…)
Actually, it is going to be only “a new kid on the block” wanting to play along with the others. Hence, it needs to adhere with the rules already in place. I bet there must exist advantages on using the JMS + Apache Qpid approach (limiting the # of threads executing the service, for example).

@engin_arlak, thanks for pointing this.
I’ve also came across this post, but I was looking for a integration that could leverage what RabbitMQ could offer.
Looks like the essential idea behind the messaging routing in RabbitMQ is that:

  • publishers always publish to exchanges (for simpler cases they can publish directly to queues)
  • subscribers always subscribes to queues.
  • and the routing logic (how to route from the exchanges to the [client]queues) is something that “can be defined on-the-fly” - perhaps usually defined by the subscribing parties.

Taking the 2nd point (subscribers and queues) - and considering another point of view -, are you meaning that “using this approach (JMS + Apache Qpid) means that we can use the JMS triggers subscribing to RabbitMQ Queues as they were simple JMS Queues”?

If you use them as JMS queues implementation should be the same as any other JMS trigger on IS since it is the protocol you implement, not the implementation. I didn’t consume/publish to RabbitMQ using IS before but, they both seems to support this protocol so unless there is a technical limitation, this should be the easiest way.

This is weird: after scratching my head, and conducted many tests, I decided to go back to the basic - meaning copying exactly what was posted … … … and, for my surprise, it worked.

Definetely, I must have done something wrong.


UPDATE: I cried out for having fixed, but I WAS WRONG.
The signs I saw was the callback code working, not the actual “IS service outcome”…

Update:
(I’m not sure of why it happens, but it works this way.)

On a callback context (in my current case, when RabbitMQ calls the handleDelivery of a DefaultConsumer), a Service.doInvoke() doesn’t work, but setting up the User and Session the exactly same way and using the Service.doThreadInvoke() works (higlight the word “Thread”). I really don’t have the thinnest idea for explaining this.

For explaining this process:

  1. Register a subscriber by handing a callback (look for the channel.basicConsume())
    1.1. The callback is a DefaultConsumer-derived anonymous class;
  2. issue a publish (channel.basicPublish())

Here is the code for 1.

	public static final void subscribeQueue_Consumer_doThreadInvoke(IData pipeline) throws ServiceException {
		...
		try {
			connection = __borrowConnection(alias);
			Channel channel = connection.createChannel();
			...
			consumerTag = channel.basicConsume(queueName, automaticAcknowledgement, new DefaultConsumer(channel) /* 1.1. */ { ... };
			outcome = "Consumer registered: " + consumerTag+ ".";
		} catch (Exception e) {
			outcome = "Consumer not registered: " + e.toString() + ".";
			try {
				__invalidateConnection(alias, connection);
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			throw new ServiceException(e);
		
		} finally {
			if (connection != null) {
				__returnConnection(alias, connection);
			}
			...		
		}


… and for the anonymous class (1.1.)

/* new DefaultConsumer(channel) */ {
		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			...
			try {
				com.wm.lang.ns.NSName triggerService = NSName.create(RABBITMQ_CALLBACK_WRAPPER_SERVICE);
				com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser(username);
				Session session = StateManager.createContext(0x7fffffffL, "system", user);
				session.setUser(user);
				session.clearModified();
				...
				Service.doThreadInvoke(triggerService, session, inputIData);
				StateManager.deleteContext(session.getSessionID());
			} catch (Exception e) {
				ServerAPI.logError(e);
				...
				}
			}
		}
	}

A word of caution: this code works, but exercise caution when changing it. On the many experiments I did, moving things around can break it without any clue. I only realized it wasn’t working only much later - for example, after posting the prior message.