Java service to monitor durables queues in UM

Product/components used and version/fix level:

Integration server and UM - V 10.7
IS/UM_10.7_Core_Fix8

Detailed explanation of the problem:

I have a Java service that fetches the List Durable By Channel and returns the following values. Most of the time, it does not return results, and we noticed that the time stamp of 12/31/1969 is inaccurate and the depth is 0, indicating that we have unprocessed messages. When I change the code and repeat it, it produces the desired results.

Expected results:

<durableList>
<depth>0</depth>
<lastReadTime>09/07/2023 10:27:027</lastReadTime>
<lastWriteTime>09/07/2023 10:27:027</lastWriteTime>
<durableName>fullyQualifiedDurableName_YYY</durableName>
</durableList>
<durableList>
<depth>247</depth>
<lastReadTime>08/30/2023 03:07:050</lastReadTime>
<lastWriteTime>09/06/2023 02:27:027</lastWriteTime>
<durableName>fullyQualifiedDurableName_XXX</durableName>
<durableList>
<durableList>

Returning Results:

<durableList>
<depth>0</depth>
<lastReadTime>12/31/1969 07:00:000</lastReadTime>
<lastWriteTime>12/31/1969 07:00:000</lastWriteTime>
<durableName>fullyQualifiedDurableName_YYY</durableName>
</durableList>
<durableList>
<depth>0</depth>
<lastReadTime>12/31/1969 07:00:000</lastReadTime>
<lastWriteTime>12/31/1969 07:00:000</lastWriteTime>
<durableName>fullyQualifiedDurableName_XXX</durableName>
</durableList>
<durableList>

JAVA CODE:

public static final void getListDurableByChannel(IData pipeline) throws ServiceException {
IDataCursor pipelineCursor = pipeline.getCursor();
// recovery input parameter for realmURL
String realmName = IDataUtil.getString( pipelineCursor, “realmName” );
// recovery input parameter for nodeName
String nodeName = IDataUtil.getString( pipelineCursor, “nodeName” );
Boolean umRunning = true;
//Definition for dataformat
SimpleDateFormat sdf = new SimpleDateFormat(“MM/dd/yyyy hh:mm:sss”);
// recovery input parameter for user
String user = IDataUtil.getString( pipelineCursor, “user” );
// recovery input parameter for password
String password = IDataUtil.getString( pipelineCursor, “password” );

	//This class represents a Nirvana session, the logical connection between the client API and the Nirvana realm
	nSession mySession = getSession(realmName, user, password);
	if (mySession== null){
		umRunning = false;
		IDataUtil.put( pipelineCursor, "umRunning", "NOK" );
	}
	
	// If UM is running execute the capture the realmNode
	if (umRunning){
		//This object allows the user to access the current status of the RealmServer, the current configuration parameters
		nRealmNode node = getNode(mySession);
	    //This class represents the Nirvana Channel or Queue on the Nirvana Realm.
		nLeafNode leafNode = null;
		try {
			node.waitForDurableInformation();
			leafNode = (nLeafNode) node.findNode(nodeName);
		} catch (nAdminIllegalArgumentException e) {
			throw new ServiceException("Channel/queue not found!");
		}
		//Verify if node is found
		if (leafNode!=null){
		    nTopicNode topicNode = (nTopicNode) leafNode;
		    //Iterator in Dubrable Topic List
			Iterator<nDurableNode> iteratorListDurable = topicNode.getDurableList().iterator();
			// durableList
			IData[]	durableList = new IData[topicNode.getDurableList().size()];
			int i = 0;
			while (iteratorListDurable.hasNext()) {
				nDurableNode durableNode = (nDurableNode) iteratorListDurable.next();
				
				
				durableList[i] = IDataFactory.create();
				IDataCursor durableListCursor = durableList[i].getCursor();
				String lastReadTime = "";
				String lastWriteTime = "";
				lastReadTime = sdf.format(new Date(durableNode.getLastReadTime()));
				lastWriteTime = sdf.format(new Date(durableNode.getLastWriteTime()));
				
				IDataUtil.put( durableListCursor, "depth", durableNode.getDepth() );
				IDataUtil.put( durableListCursor, "lastReadTime", lastReadTime );
				IDataUtil.put( durableListCursor, "lastWriteTime", lastWriteTime );
				IDataUtil.put( durableListCursor, "durableName", durableNode.getName() );
				durableListCursor.destroy();
				i++;
			}
			IDataUtil.put( pipelineCursor, "umRunning", "OK" );
			IDataUtil.put( pipelineCursor, "durableList", durableList );
			
	
		}else {
			// In case not found the nodeName using the variable umRunning to notice.
			IDataUtil.put( pipelineCursor, "umRunning", "NOK - informed node was not found!" );
		}
		//Closes this session, disconnecting from the server and killing all threads.
		mySession.close();
		//Closes the session with the remote Realm and all realms from this point in the namespace.
		node.close();
		
	}
	pipelineCursor.destroy();			
		
}

// --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---

//time in mileseconds for wait UM conn
public static final int WAIT_FOR_CONN = 200;

public static final nSession getSession(String realmName, String user, String password) throws ServiceException{
	nSessionAttributes nsa = null;
	Boolean umRunning = true;
	//1.Create a nSessionAttributes object with the RNAME value of your choice		
	try {
		nsa=new nSessionAttributes(realmName);
		nsa.setTruststore("/location", "certificate");
	} catch (nIllegalArgumentException e) {
	
		throw new ServiceException("Unable to create session object");
	}
	//Call the create method on nSessionFactory to create your session
	nSession mySession = null;
	try {
		// if user or password informed create a session based on user and password
		if ( (user!=null && !user.equals("")) || ( password!=null && !password.equals(""))){
			mySession = nSessionFactory.create(nsa, user, password);	
		}else{
			mySession = nSessionFactory.create(nsa);
		}
		
	} catch (nIllegalArgumentException e) {
		throw new ServiceException("Error connecting to UM");
	}
	//3.Initialise the session object to open the connection to the Universal Messaging Realm
	try {
		mySession.init();
	} catch (nRealmUnreachableException e) {
		return null;
		//throw new ServiceException("Impossible inicialize the session object");
		
	} catch (nSecurityException e) {
		throw new ServiceException("Cannot initialize the session object (nSecurityException)");
	} catch (nSessionNotConnectedException e) {
		throw new ServiceException("Cannot initialize the session object (nSessionNotConnectedException)");
	} catch (nSessionAlreadyInitialisedException e) {
		throw new ServiceException("Cannot initialize the session object (nSessionAlreadyInitialisedException)");
	}
	return mySession;
}

public static final nRealmNode getNode(nSession mySession) throws ServiceException{
	nRealmNode node = null;
	try {
		node = new nRealmNode(mySession.getAttributes());
	} catch (nBaseAdminException e1) {
		throw new ServiceException("Cannot create Realm node object (nBaseAdminException)");
	}
	
	//wait to receive connection information
	try {
		Thread.sleep(WAIT_FOR_CONN);
	} catch (InterruptedException e) {
		throw new ServiceException("Thread interrupted trying to get connection information (InterruptedException)");
	}
	return node;
}

public static final Map<String, IData> getDataCursorForNodes(List <nRealmNode> listOfNodes){
	Map<String, IData> mapNodeInfoList = new HashMap<String, IData>();
	for (Iterator<nRealmNode> iterator = listOfNodes.iterator(); iterator.hasNext();) {
		nRealmNode node = (nRealmNode) iterator.next();
		
		IData	nodeInfo = IDataFactory.create();
		IDataCursor nodeInfoCursor = nodeInfo.getCursor();
		
		//Size memory setup only to presentation the information (MB-MegaBytes)
		long sizeCount = 1024 * 1024;
		//Capture all variables to put in pipeline
		//Nnumber of channels within the realm
		String getNoOfChannels		=String.valueOf(node.getNoOfChannels());
		//Number of queues within the realm
		String getNoOfQueues		=String.valueOf(node.getNoOfQueues());
		//Number of thread the JVM has allocated for the Realm Server
		String getNoOfThreads		=String.valueOf(node.getNoOfThreads());
		//Physical memory of the machine tha hosts UM Server (Used size Count to return MB size)
		String getPhysicalMemory	=String.valueOf(node.getPhysicalMemory() /sizeCount);
		//Total number of connections served by this realm from the time it started
		String getTotalConnections	=String.valueOf(node.getTotalConnections());
		//Total allocatable number of bytes that the JVM can use before receiving a Out Of Memmory Exception
		String getTotalDirectMemory	=String.valueOf(node.getTotalDirectMemory()/sizeCount);
		//Total number of bytes that the JVM has allocated from the unerlying OS.
		String getTotalMemory		=String.valueOf(node.getTotalMemory() / sizeCount);
		//Amount of free memory the Realm Server has within the JVM
		String getFreeMemory		=String.valueOf(node.getFreeMemory()/sizeCount);
		//Maximum on-heap memory available
		String getMaxHeapMemory		=String.valueOf(node.getMaxHeapMemory()/sizeCount);
		//Total number of events published on this realm from the time it has started
		String getTotalPublished	=String.valueOf(node.getTotalPublished());
		//Total number of events that this realm from the time it has started
		String getTotalSubscribed	=String.valueOf(node.getTotalSubscribed());
		//Number of connection on this realm.
		String getCurrentConnections=String.valueOf(node.getCurrentConnections());			
	
		IDataUtil.put( nodeInfoCursor, "name",  node.getName());
		IDataUtil.put( nodeInfoCursor, "noOfChannels",  getNoOfChannels);
		IDataUtil.put( nodeInfoCursor, "noOfQueues", getNoOfQueues );
		IDataUtil.put( nodeInfoCursor, "noOfThreads", getNoOfThreads );
		IDataUtil.put( nodeInfoCursor, "physicalMemory", getPhysicalMemory );
		IDataUtil.put( nodeInfoCursor, "totalConnections", getTotalConnections );
		IDataUtil.put( nodeInfoCursor, "totalDirectMemory", getTotalDirectMemory );
		IDataUtil.put( nodeInfoCursor, "totalMemory", getTotalMemory );
		IDataUtil.put( nodeInfoCursor, "freeMemory", getFreeMemory );
		IDataUtil.put( nodeInfoCursor, "maxHeapMemory", getMaxHeapMemory );
		IDataUtil.put( nodeInfoCursor, "totalPublished", getTotalPublished );
		IDataUtil.put( nodeInfoCursor, "totalSubscribed", getTotalSubscribed );
		IDataUtil.put( nodeInfoCursor, "currentConnections", getCurrentConnections );
		mapNodeInfoList.put(node.getName(), nodeInfo);
		nodeInfoCursor.destroy();

	}
	
	return mapNodeInfoList;

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.