Analytic Block Parallel Input Out of Synch Issue

Apama Analytics Builder used and Version: 10.16.0.4.20230518 level:

Detailed explanation of the problem: Getting synchronization issue for blocks having Multiple input ports. For example, For a output block of type Measurement Output, we want to sent a measurement with as Fragment along with some custom Fragment as coming in properties port, and the properties port receives the input bit delayed, then the result is not as expected. Input in the properties port are inserted into wrong measurement output.

What could be possible issue and its resolution ?
Please guide.

Error messages / full error message screenshot / log file:

Measurement Data:

Question related to a Partner Tenant, or to a Partner instance?

Hi,

if the delay is fixed, the most obvious solution would be to use the Delay block similar to how you use it to simulate the problem. Most likely that is not the case I guess.

I assume what you want to achieve is to always match the measurement value with the corresponding property and not miss any of either of them?

Then that is not easily doable with the built-in blocks as it would require queuing functionality. If you know that you would only ever have to queue one value of each, you could probably do that queuing using a clever usage of the AND block.

Otherwise you would require a custom block that queues in its local state and outputs synchronized values whenever each queue contains at least one value. If custom block development is an option for you, I could share a high level sketch of how this would look like.

Best regards,
Harald

1 Like

Thank you for your prompt reply.
Yes, You said it right, Delay is used to simulate the issue.

Yes, we want to achieve is to always match the measurement value with the corresponding property and not miss any of either of them.

Actual Issue is to something similar below:

MO- Managed Object.
Query MO, API Call, Update MO are custom block.

Please share a sketch to achieve it, would be very helpful.

Thanks

Hi Matiur,

I have not tested this, but it should work like this below. Note, that the two inputs are pulse inputs and you should put a “Set Property” block before the input and put the value into a property called “value”. The reason for this is that only with a pulse will you be able to detect which input triggered the invocation of the “$process” action. We need this to associate the correct input values. If an input triggered “$process” we store the value in a buffer. If we have received values on both inputs, we take the oldest values for each and produce an output.

Note: instead of doing this in a separate block you could incorporate this logic into your “Make API Call” block if it makes sense.

event SyncInputs_$State {
    sequence<float> buffer1;
    sequence<float> buffer2;
}

event SyncInputs {
	action $process(Activation $activation, Value $input_value1, Value $input_value2, SyncInputs_$State $blockState) {
        
        boolean v1active := <boolean> $input_value1.value;
        boolean v2active := <boolean> $input_value2.value;

        if(v1active) {
            $blockState.buffer1.append(AnyExtractor($input_value1).getFloat("value"));
        }
        if(v2active) {
            $blockState.buffer2.append(AnyExtractor($input_value1).getFloat("value"));
        }

        if($blockState.buffer1.size()>0 and $blockState.buffer1.size()>0) {
            $setOutput_output1($activation, $blockState.buffer1[0]);
            $blockState.buffer1.remove(0);
            $setOutput_output2($activation, $blockState.buffer2[0]);
            $blockState.buffer2.remove(0);
        }
    }

    constant string $INPUT_TYPE_value1 := "pulse";
    constant string $INPUT_TYPE_value2 := "pulse";

	action<Activation,float> $setOutput_output1;	
    action<Activation,float> $setOutput_output2;

}
1 Like

Thank You @Harald_Meyer ,
Will try this.
The blocks are separated to make it generic and reusable.

Tried as said, however not able to succeed yet. For single source it works, for input from group of device, it is yet to work.
Any way we can handle inputs from group of device?

One major issue is, $process 's $input_… is holding previous values if pulse is not received. So, I tried to take care this using Value.timestamp value somehow for single input source, not able to get expected result where input is from a Device Group.

SyncInputs.txt (4.7 KB)

Maintaining source Id wise buffer may work, not sure, but is becoming more & more tricky,
What could be best viable solution?
Please suggest.

1 Like

Hi Matiur,

One major issue is, $process 's $input_… is holding previous values if pulse is not received.

My assumption was that you could use the $input_.value field to check if it is true or not and it would only be true if the pulse was received. Is that not the case?

not able to get expected result where input is from a Device Group.

In case of device groups, the state should be split by individual device. So it is strange that this is not working for you. This is the explanation:

Let me also try to reproduce the issue.

Best regards,
Harald

It is measurement input and holding float value, and for next iterations when value1 has a new value valu2 and value3 holds old value. That is what is being printed in log.
For next iterations when value1 receiving pulse, other should be empty or reset, but it holds previous values. So, I am comparing the input’s timestamp and ignoring the same if found equal.

Let me deep dive for group of device issue. Will update.

Interestingly, your block seems to work for me. I created a model similar to yours (also with delays) for a group of devices. I sent in a value of “200” for device 1 and then a value of “104” for device 2 before the delay occurred.

Input 1:

SyncInputs.process.Delay>>>$input_value1>>apama.analyticsbuilder.Value(any(float,200),1690542329.166,{"fragment":any(string,"input"),"id":any(string,"67950065"),"measurement_creationTime":any(float,1690542330.429),"measurement_source.name":any(),"measurement_source.type":any(string,"com_cumulocity_model_idtype_GId"),"measurement_source.value":any(string,"9266897646"),"measurement_tenantId":any(string,"t21106993"),"series":any(string,"i"),"source":any(string,"9266897646"),"time":any(float,1690542329.166),"type":any(string,"input"),"unit":any(string,""),"value":any(float,200)})>>>$input_value2>>apama.analyticsbuilder.Value(any(),0,{})>>>$input_value3>>apama.analyticsbuilder.Value(any(),0,{})

Input 2:

SyncInputs.process.Delay>>>$input_value1>>apama.analyticsbuilder.Value(any(float,104),1690542331.695,{"fragment":any(string,"input"),"id":any(string,"67933927"),"measurement_creationTime":any(float,1690542332.559),"measurement_source.name":any(),"measurement_source.type":any(string,"com_cumulocity_model_idtype_GId"),"measurement_source.value":any(string,"2318011"),"measurement_tenantId":any(string,"t21106993"),"series":any(string,"i"),"source":any(string,"2318011"),"time":any(float,1690542331.695),"type":any(string,"input"),"unit":any(string,""),"value":any(float,104)})>>>$input_value2>>apama.analyticsbuilder.Value(any(),0,{})>>>$input_value3>>apama.analyticsbuilder.Value(any(),0,{})

Output 1:

SyncInputs.process.Delay>>>$input_value1>>apama.analyticsbuilder.Value(any(float,200),1690542329.166,{"fragment":any(string,"input"),"id":any(string,"67950065"),"measurement_creationTime":any(float,1690542330.429),"measurement_source.name":any(),"measurement_source.type":any(string,"com_cumulocity_model_idtype_GId"),"measurement_source.value":any(string,"9266897646"),"measurement_tenantId":any(string,"t21106993"),"series":any(string,"i"),"source":any(string,"9266897646"),"time":any(float,1690542329.166),"type":any(string,"input"),"unit":any(string,""),"value":any(float,200)})>>>$input_value2>>apama.analyticsbuilder.Value(any(float,200),1690542339.166,{})>>>$input_value3>>apama.analyticsbuilder.Value(any(float,200),1690542344.166,{})
SyncInputs.process.Delay>>>$input_value1.timestamp>>1690542329.166/1690542329.166>>>$input_value2.timestamp>>1690542339.166/1690542339.166>>>$input_value3.timestamp>>1690542344.166/0
SyncInputs.process.Delay>>>$blockState.buffer1>>[200]>>>$blockState.buffer2>>[200]>>>$blockState.buffer3>>[]
SyncInputs.process.Delay>>>v1active>>false>>>v2active>>false>>>v3active>>true
SyncInputs.processSync.Delay>>>$input_value1>>200>>>$input_value2>>200>>>$input_value3>>200
SyncInputs.processSync.Delay>>>sum>>600

Output 2:

SyncInputs.process.Delay>>>$input_value1>>apama.analyticsbuilder.Value(any(float,104),1690542331.695,{"fragment":any(string,"input"),"id":any(string,"67933927"),"measurement_creationTime":any(float,1690542332.559),"measurement_source.name":any(),"measurement_source.type":any(string,"com_cumulocity_model_idtype_GId"),"measurement_source.value":any(string,"2318011"),"measurement_tenantId":any(string,"t21106993"),"series":any(string,"i"),"source":any(string,"2318011"),"time":any(float,1690542331.695),"type":any(string,"input"),"unit":any(string,""),"value":any(float,104)})>>>$input_value2>>apama.analyticsbuilder.Value(any(float,104),1690542341.695,{})>>>$input_value3>>apama.analyticsbuilder.Value(any(float,104),1690542346.695,{})
SyncInputs.process.Delay>>>$input_value1.timestamp>>1690542331.695/1690542331.695>>>$input_value2.timestamp>>1690542341.695/1690542341.695>>>$input_value3.timestamp>>1690542346.695/0
SyncInputs.process.Delay>>>$blockState.buffer1>>[104]>>>$blockState.buffer2>>[104]>>>$blockState.buffer3>>[]
SyncInputs.process.Delay>>>v1active>>false>>>v2active>>false>>>v3active>>true
SyncInputs.processSync.Delay>>>$input_value1>>104>>>$input_value2>>104>>>$input_value3>>104
SyncInputs.processSync.Delay>>>sum>>312
1 Like

I am checking more, may be I did some mistake analyzing the log.

Could you please check once for next iterations(not first one),
As my simulator sending values (0,10,20,30,40,50) in every 5 seconds. In the model, First delay is of 5 seconds and 2nd delay is of 7 seconds.

So, for me,
for first iteration its OK, but in next iterations some reading found to be missing
I am expecting a result 0,30,60,90,120,150

Anyway, It seems will work, testing for it to be foolproof.

Thank you for you time.

@Harald_Meyer ,
Thanks.
Seems Its working.

2 Likes

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.