In this post we will combine the reference data from Azure Storage and process the query and write the output events to the Service Bus. These events will be picked by the Azure Logic Apps and the responsible person to maintain the pipe will be notified.
Let’s begin by adding the second input to the Job Topology. The second input is a reference data which is a CSV file stored in the Azure Blob Storage.
Unfortunately testing the CSV inputs in the query window produced some error – this link
But it is not a blocker to continue the experiment. After adding the input the CSV file, we can add the Service Bus Queue as the output.
After the complete wiring up of the inputs and output, we can write the CEP query.
The reference data input is named as ‘maintenance’. You can download the sample file from this location.
Let’s write the query for the CEP. In the query blade we can specify the query with the simple JOIN with the reference data. Note that the streaming data is in JSON format and reference data is in the CSV format, but Stream Analytics can join them and produce the output.
Query in text
SELECT T.PipeCode, M.Owner, AVG(T.Temperature) AVGTemp INTO Spikes FROM TemperatureInput T JOIN Maintenance M ON T.PipeCode = M.PipeCode GROUP BY T.PipeCode, M.Owner, TumblingWindow(second, 120) HAVING AVGTemp >= 50
Also we have mentioned that output should go to the specific Service Bus connection. This will post a message in Service Bus Queue.
In order to make the sample more interactive let’s create a Even Hub simulator which posts a message (sensor reading) to the Event Hubs. You can access the code for the simulator from this link.
Run the simulator from the local machine and run the job in the portal. You can notice the Service Bus Queue gets the messages from the Stream Analytics Job. We can visualize the live job status in the Job Diagram blade in the portal.
Plug a Azure Logic Apps to the Service Bus to listen to the queue and send the alert to the owner. Here the owner’s email address is coming from the reference data.
In the next post we will see how to add custom functions to the Stream Analytics CEP query.