Multi-threading from scheduler

IS 10.5
Hello,

We have an old implementation of a scheduler in our department where the scheduler calls an API to fetch a list of sftp login details from another system. Once it gets the list, it logs on to the destination sftp sites in a loop sequentially to copy the files over from the source sftp site to the destination server.

Over a period of time, the list kept growing and now we experience a bit of a slowness in the service due to the sequential polling in a loop. This solution was done to mimic sort of an MFT solution back then as we were not licensed for MFT components. We are still in our Lab PoC phase with our MFT solution but we are tasked to make this scheduler more efficient meanwhile.

To achieve this, we think of doing multithreading using MQ, where the scheduler will fetch the sftp login credentials using the same API, and will publish a trigger message on MQ for each sftp site in a loop sequentially. This way, each sftp site will have its own MQ listener service, fetching & copying the files for it.

The only challenge is, how do we ensure that if 2 trigger messages are published at the same time for the SAME sftp site, they dont process over each other. The later thread must wait for the first one to finish while the other sftp sites threads can still run in parallel without waiting.

Any suggestion will highly be appreciated. If there is any better way to achieve this, please do share.

Thanks

Hi Dhiraj,

what type of MQ are you planning to use?
When triggers are running in a Cluster, they need to be set to serial with “Duplicate Detection” enabled so they will behave as one single trigger.

How are the SFTP Connections configured?
Via the built in SFTP solution?
In this case provide the Aliases (server and user) in the published/queued messages and use them as inputs when invoking pub.sftp:login et al. where applicable. Please make sure to encapsulate all the sftp stuff in the Try-block of a Try-Catch-Sequences construct for easier handling of any connection issues related with the SFTP servers.

Can you show your current outline for further inverstigation, please?

I think it might be worth to rethink your logic to work without scheduler and looping.

Regards,
Holger

I would like to understand the challenge in having multiple threads log into the same site at the same time. If they are dealing with different files, it shouldn’t be an issue. If they are dealing with the same file, then you should dedup the list prior to looping.

By the way, I’m not sure if pub/sub is really necessary if you’re simply trying to achieve multithreading. Not saying it’s wrong and it does have some additional benefits (e.g. load balancing) but for basic multithreading you can simply do a threaded invoke for each server (or file group).

HTH,
Percio

Thank you for the reply @Percio_Castro1.

Sorry, we have never implemented multi threading from any of our services as it was never needed in our environment until now. If we had to do a basic multithreading and implement the below pattern, can you please point me to a document or example which has the latest approach to achieve it perhaps. This is what we want to implement:

  1. Parent thread is started every 2 mins from a scheduler with a list of sftp sites
  2. Parent invokes multi child threads each with an argument holding sftp site login details
  3. Any new child thread should make sure if another child thread is active and polling the same sftp site, it should exit the flow after raising an informational log message e.g. “Another thread is polling the same site”.

The other approach we are thinking of is, to stop a duplicate child thread submission at the parent level itself, where parent submits all the child threads and waits for their response and the Scheduler is not allowed to run another instance of the parent unless the previous parent thread was done running.

Essentially, we want to avoid parallel threads polling for the same sftp sites together as they do LS . instead of LS and running them parallelly will cause them contesting for same files.

Hope it clarifies the requirement further.

Thanks

1 Like

Normal concepts of semaphoring to make your code thread safe will help. :smiley:
Remains the question how to implement your semaphores, especially when you imagine a (stateless) clustered and need to control your connections across the host.

When we want to introduce parallel processing we use pub/sub. It is an almost trivial way to do so and avoid all the “fun” of managing async service invocations and joins. We never drop to Java to do multi-threading in IS.

Quick terminology glossary to aid communication:

  • Scheduler/scheduling - facility within IS to define scheduled tasks. There is only one of these in IS. It manages execution of tasks.
  • Task - configuration that identifies a service to be run at specified times or intervals. (Many people refer to these as “schedulers” for some reason but the docs and UI call these tasks or “user tasks”.)
  • Service - any IS-hosted service can be defined in a task to be executed.

Assuming you have Universal Messaging (or if still using Broker) here is a way that may fit the scenario of interactions with multiple SFTP servers at once, but just one for each.

  • Parent service executed periodically via scheduled task. It obtains the list of SFTP servers. For each, publishes a document identifying the SFTP “source” in the publishable doc type.
  • Define a trigger for each SFTP source server, using a filter. Set each to use serial processing.
  • Depending upon how you have the interactions with each SFTP server defined (hard-coded, configuration driven) each trigger is defined with either the common service (assuming config driven) using the published doc input to determine what to do. E.g. multiple triggers, common service for all. Or a specific service for the given SFTP server. E.g. a trigger and a service for each SFTP server.

The serial setting will make sure just one thread is active on just one of the IS instances in the cluster at a time (edit: this does not mean “IS cluster”; but multiple identical IS instances connected to the same UM). Each published doc will be processed serially.

That said, there is a caveat. I am not 100% certain that using filters in triggers for the same doc type will distribute multiple events to run in parallel. You’ll need to confirm the behavior.

A potentially big downside to this approach, depending upon the specifics of your implementation, is that every time you add an SFTP target, you would need to create a trigger (with a filter).

Another possibility is to not get the list of SFTP servers to poll from somewhere else and instead define a scheduled task for each SFTP server. One advantage is that the polling interval would not need to be the same for all SFTP servers. Some can be once a day, others every 5 minutes.

My advice: keep it as simple as possible. The “stop a duplicate…” and “Scheduler is not allowed to run another instance…” is getting a bit involved and error prone.

Here is yet another possibility that may be workable and allow any number of simultaneous threads against a single SFTP server.

  • Get list of files from SFTP server. If none, exit.
  • For each file
  1. Rename the file to “reserve” it for this thread. Either add a suffix to the file (something that “list of files” is not looking for) or rename it to another directory (a move is a rename).
  2. If the rename fails, ignore and move on to the next because another thread processed the file.
  3. Retrieve the renamed file. Move/delete it when done.

We use this approach frequently. Works fine.

Another technique that should almost always be used – when writing a file for some other process to pick up, write the file to a name that the other process is not looking for. E.g. use .tmp instead of .csv; or write to another directory (on the same volume). When done, rename the file to the needed name. This prevents the other process from picking up files that still being written to, or files that are partial because the app writing the file failed.

HTH!

2 Likes

Hi all,

After going through all the replies, I now have a bit of an idea what can be done here.
Thanks again for your help and guidance. Much appreciated.

Cheers!

I’m still not sure I understand how this is going to happen. You’re retrieving a list of sFTP sites (and credentials) from an API, right? Is the API returning duplicates? If so, why?

If it is returning duplicates and there is no way to fix that, then I’d recommend dedup’ing in the parent before spawning the threads because it seems wasteful to me to spawn a thread just to have it quit if that sFTP server is already being processed by another thread. Deduping the list could be as simple as adding the entries into a Java Set first (e.g. HashSet) and then reading out of the Set, or perhaps even simpler, you could simply sort the list first and then process it in order ensuring that you haven’t already processed the “current” sFTP site like in the pseudocode below:

sites[] = call API to retrieve sFTP sites
sites = sort(sites)
previousSite = null
for(i = 0; i < sites.length; i++) {
  currentSite = sites[i]
  if(currentSite != previousSite) { // Current sFTP site has not yet been processed
    process(currentSite); // Spawn a thread however you see fit, whether it's via messaging or a threaded invoke
  }
  previousSite = currentSite
}

Having said this, the suggestion that @reamon made on renaming the file(s) prior to doing any processing is something I use regularly in all my file integrations and I recommend it as well. It is helpful in ensuring files aren’t read before they are completely written (whether you’re the receiver or the sender). You can even use this technique to enable multiple threads to process a given directory (if you ever find the need). For example, the suffix that you add to a file name could be a UUID that is unique to that thread (e.g. it could be a combination of host IP, port, and thread ID or you could actually generate a UUID.) Given that the suffix is unique to your thread, you’re guaranteed that if the rename succeeds, no other thread will accidentally pick up the file.

Percio

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.