How to do average on a derived event from incoming event

Hi,
I have a use case, where I am creating an event (derived event) from an incoming event and I have to do a an average of an integer value placed in this derived event at the end of the day. I have used stream but not able to get two things :
a) scheduling. i.e., at the end of the day trigger to calculate the average.
b) average of value placed in the derived event.

For ex: my events are

incoming event is
event Product{
string productId;
integer retailSell;
integer onlineSell;
}

Derived event
event TotalSell{
string productId;
intered totalSell; // totalSell for a product will be Product.retailSell + Product.onlineSell
}

Scenario :: On every 10 mins data will arrive as Product event saying number of retailSell and number of onlineSell. On that data packet TotalSell event will be created saying the total sell for the last ten minutes for a particular product is TotalSell.totalSell = Product.retailSell + Product.onlineSell.
At the end of the day we need to take the average of the total sell.

Can any one help with this scenario?
Thanks

Hi Sush,

The ‘at’ operator in a listener expression is ideal for your use case. It is used to specify a time at which some code should be run, more information can be found in the doc under the ‘Triggering event listeners at specific times’ topic

The below monitor solves your scenario I believe


using com.apama.aggregates.mean;

event Sentinel {}

event Averages
{
	string productId;
	float average;
}

monitor DailySellAverageMonitor
{
	action onload()
	{
		// every day at midnight, send a trigger to report the average
		on all at(*, 23, *, *, *) { route Sentinel(); }
		
		// create a stream of TotalSell events out of incoming Product events
		stream<TotalSell> tss := from p in all Product() select TotalSell(p.productId, p.retailSell + p.onlineSell);
		// create a stream of Averages for each product out of generated TotalSells in tss
		// we use the mean aggregate to calculate the running average
		// this stream has a window of the past 24 hours
		stream<Averages> avgs := from ts in tss within 24.0*60.0*60.0 group by ts.productId select Averages(ts.productId, mean(ts.totalSell.toFloat()));
		// when we receive a Sentinel, for each productId pull an Averages event out of the stream
		from a in avgs partition by a.productId retain 1 from s in all Sentinel() select a as avg
		{
			// we now have an event 'avg' containing the productId and todays average
		}
	}

Please feel free to ask further if any of the above is unclear

Callum