MWM and IS Version: 9.12
Broker Version: 9.6
I am maintaining an app which records JMS Broker messages when added to Topics and Queues (among other things). This works fine for Topics since using the API I can register a callback using BrokerClient.registerCallbackForSubId() to be notified every time a message of interest is published. This same method does not seem to get called when I use a Queue. I know I could poll for messages sent to the Queue but that is not very efficient and not sufficient for what we are trying to do. So…
- Should messages sent to a Queue notify my client if I use BrokerClient.registerCallbackForSubId() and I am doing something wrong?
- If not, then is there an alternative aside from polling to achieve the same functionality?
Below is a paired down version of the code that works for topics but not queue. Thank you in advance.
package com.abc.wm.jms.queue;
import java.util.concurrent.atomic.AtomicInteger;
import COM.activesw.api.client.BrokerCallback;
import COM.activesw.api.client.BrokerClient;
import COM.activesw.api.client.BrokerConnectionCallback;
import COM.activesw.api.client.BrokerConnectionDescriptor;
import COM.activesw.api.client.BrokerEvent;
import COM.activesw.api.client.BrokerException;
public class MyBrokerClient {
private static final String CLIENT_GROUP = "SampleQueueConnectionGroup";
private static final String EVENT_TYPE = "JMS::Queues::SampleQueue";
// private static final String CLIENT_GROUP = "SampleTopicConnectionGroup";
// private static final String EVENT_TYPE = "Sample::Topic";
private BrokerClient client = null;
private BrokerConnectionDescriptor descriptor;
private AtomicInteger subid = new AtomicInteger(1);
public MyBrokerClient() throws BrokerException {
descriptor = new BrokerConnectionDescriptor();
descriptor.setAutomaticReconnect(true);
descriptor.setConnectionShare(false);
descriptor.setStateShare(false);
descriptor.setSharedEventOrdering(BrokerConnectionDescriptor.SHARED_ORDER_BY_PUBLISHER);
client = new BrokerClient("myserver:6849", "Broker #1", "", CLIENT_GROUP, "My App", descriptor);
// Required that I add one of these or I get nothing for Topic or Queue
client.registerCallback(new BrokerCallback() {
@Override
public boolean handleBrokerEvent(BrokerClient client, BrokerEvent event, Object client_data) {
System.out.println("Received event in generic broker event callback.");
return true;
}
}, null);
int subscriptionId = subid.getAndIncrement();
client.registerCallbackForSubId(subscriptionId, new BrokerCallback() {
@Override
public boolean handleBrokerEvent(BrokerClient client, BrokerEvent event, Object client_data) {
try {
System.out.format("***\nMessage added to queue, baseType:%s scopeTypeName=%s\n***\n",
event.getBaseTypeName(), event.getScopeTypeName());
return true;
} catch (Exception ex) {
ex.printStackTrace();
return true;
}
}
}, null);
// subscribe to event
String eventType = EVENT_TYPE;
if (client.canSubscribe(eventType)) {
client.newSubscription(subscriptionId, eventType, null);
System.out.format("Subscribed to event type [%s] on [%s://%s], client group [%s], client id [%s]\n",
eventType, client.getBrokerName(), client.getBrokerHost(), client.getClientGroup(),
client.getClientId());
} else {
System.out.format("ERROR: Unable to subscribe to event type: [%s]\n");
System.exit(2);
}
client.registerConnectionCallback(new BrokerConnectionCallback() {
@Override
public void handleConnectionChange(BrokerClient arg0, int arg1, Object arg2) {
System.out.println("The connection changed. What now?");
}
}, null);
BrokerClient.threadedCallbacks(true);
}
}