wiki

Codit Wiki

Loading information... Please wait.

Codit Blog

Posted on Wednesday, January 21, 2015 2:19 PM

Sam Vanhoutte by Sam Vanhoutte

Azure Stream Analytics is a very interesting service on the Azure platform that allows developers and data analysts to get interesting information or events out of a stream of incoming events. The service is part of the typical IoT value chain and is positioned in the transformation part. This post describes how to get started.

Azure Stream Analytics is a very interesting service on the Azure platform that allows developers and data analysts to get interesting information or events out of a stream of incoming events.  The service is part of the typical IoT value chain and is positioned in the transformation part.  The following picture is taken from a Microsoft presentation on IoT and shows Stream Analytics, connected with Event Hubs.

 

This post explains how you can start using Azure Stream Analytics and covers some troubleshooting tips that should save you some time.  My scenario will explain how you can send data through the Azure Event Hubs and how you can apply the standing queries of Azure Stream Analytics to get some added value out of the incoming stream.

In my scenarios and this post, I will solely focus on Azure Event Hubs as the input for my jobs.  You can use blobs as well, but I believe Event Hubs is the closest match with the integration world.

Typical scenarios for Complex Event Processing

I tried to describe some of the typical scenarios where Stream analytics can be used.  But on the high level, you can say that any scenario that needs to get intelligence out of incoming streams by correlating events should be a good fit for ASA (the abbreviation of Azure Stream Analytics).

Noise reduction

When talking with customers on frequency of incoming events (for example sensor data), we see often that there's a tendency to send data in a higher frequency that mostly needed.  While this has some advantages (better accuracy), this is not always needed.  The higher the frequency of ingested events, the higher the ingestion costs.  (potential extra data transfer costs on the device side and higher number of incoming events).  Typically the ingestion costs are not that high (when using Azure Event Hubs, for example you pay around 2 cents for 1mio events), but the long term storage cost is always higher (as the data mostly increases each month and you pay for the same data every month again). 

That's why it would be good to reduce the incoming stream with the high frequency into an aggregated (averages/sums)  output stream to the long term storage.  Then still you benefit from the higher accuracy, but you save on the expensive cold storage.

Enrich events with reference data

It is possible to join the incoming data with reference tables/data.  This allows you to build in decision logic. For example, you could have a table with all the devices and their region or customer.  And you could use that extra information to join with the telemetry information. (using the DeviceId) and aggregate per region, customer etc.

Leverage multiple outputs (pub-sub style)

When using Azure Event Hubs, it is perfectly possible to create multiple ASA jobs that share the same EventHub as their input.  This allows to get multiple results, calculations or events out of the incoming stream.  For example, one job could extract data for archiving, while another job just looks for anomalies, or event detection.

Detect special events, anomalies or trends

The canonical example, used in complex event processing, is the TollBooth or traffic speed scenario.  Every car passing a toll booth or the traffic camera, results in a new event.  But the added value is in detection special cases in the stream of events. 

  • Maybe it's needed to automatically trigger an event if the duration of one car from one booth to another is shorter than the allowed time, because he is speeding.  This event can then be processed by the backend.
  • The same stream can also be used to detect traffic jams, if the average time between booths decreased in a window of time.  Here it is important to take the average values, in order to avoid the extremes of cars that are very slow, or cars that speed.
  • And at the same time, license plates could be matched against the reference data of suspected cars.  Cars that are being sought after because they are stolen, or because they are driven by suspects of crime.

Getting started with Azure Stream Analytics

We will start with a very basic scenario, where we will send events to an Event Hub and we will execute standing queries against that stream, using ASA. This is a quick step-by-step guide to create an Analysis job.

Sending data to Azure Event Hubs

In order to create an event hub, there are a lot of blog posts and samples that explain this.

The sending of data is probably the most important part, as it defines the data structure that will be processed by the ASA job.  Therefore, it is important that the event that is sent to the Event Hub is using supported structures.  At the end of this post, I have some troubleshooting tips and some limitations that I encountered.

The following code snippet sends my telemetry-event to the Event Hub, serializing it as JSON.

As you can see, we are defining a TelemetryInfo object that we send multiple times (in parallel threads) to the event hub.  We just have 10 devices here to simulate data entry.  These json-serialized objects get UTF8-encoded and are sent in the EventData object of EventHub SDK.

Creating the Analysis job

In order to create the job, you have to make sure the subscription is enabled for Azure Stream Analytics.  To do this, log on to http://account.windowsazure.com with the subscription owner and click on preview features to enable ASA.

Log on to the Azure portal and click the ASA icon.  There, you can create a new job.  The job has to have a name, a region and a monitoring account.  (there is one monitoring account for every region).

Once this is done, it is required to specify one or more inputs, the actual query and the output to which the job has to write.

Create the inputs

Select the inputs tab in your job.

Create a new input and define the input as Data Stream (constant inflow of data).  Select Event Hub in the second window of the wizard, as that's the input we want to work with.  In the 3rd part of the wizard, we have to select the event hub we are sending our events to.  Ideally this is in the same subscription.

In the last part, we have to specify the encoding and serialization.  We will use json as UTF8

Create the output

One job can have only one output at this moment, which is unfortunate.  If you also would love to see this changed, don't hesitate to vote on my suggestion on uservoice.

In this step, we will create a blob output that takes our stream events and outputs it in a CSV file. 

For that, we click on the Output tab and create a new one.  To start, we select Blob storage.  As you can see, we can also output our data to SQL Azure, or to another Event Hub.  In the wizard, we have to provide our blob settings and the serialization/encoding, as you can see in the following screenshots.

Creating the query

The scenario we are now implementing is really like the "hello world" example of Stream Analytics.  We just take all the incoming data from the event hub and will output it to the blob storage.

Therefore, we just create the following query: SELECT * FROM TelemetryStream, where TelemetryStream is the name of our input.  

Starting and testing the job

Now we have all things configured, we can start the job.  And if everything goes well, we can run the producer (see above) and we should see that the data is getting written to our blob container, based on the properties we specified.  This allows us to easily get data and get a basic test on which we can start defining our query to the level we need.  If you have issues, please refer to the troubleshooting section of this post.

Testing queries

The query windows also allows users to test their queries against sample files, which is a big improvement since a few months.  The only challenge one has is to get a good sample file.  For that, I mostly change the output of my job to JSON/UTF and apply the SELECT * query again, resulting in a perfect good test file.  I then take that json file to upload it in the test wizard.  If the data is valid, you can easily test the query and see the output of the query in the bottom of the query screen.

Writing Analytics queries

As you can see in the source code (and in the output on our blob), the data we are sending contains the following fields:

DeviceId, ErrorReading, MainValue, Type, EventProcessedUtcTime, PartitionId, EventEnqueuedUtcTime.  The last 3 fields are added by ASA and can be used in the query as well.

Now we will dive a little deeper in the query language of ASA, that really feels like plain old SQL. 

Time-based aggregation

One of the most common aggregation methods that is needed (explained in the noise reduction scenario) is the grouping of events by a window of time.  ASA provides 3 types of windows, of which the tumbling window (fixed time intervals) is the most common.  The details of these windows are explained well in the documentation.  The other windowing methods are Hopping (overlapping time windows) and Sliding (flexible windowing) windows.

If we now want to aggregate the data from our sample, we can easily update the query to aggregate the sensor data per minute interval, by writing the following query:

The documentation outlines good examples and other functions.

Using reference data

It is also possible to add reference data to your query, so that this data can be used to enrich the query or take extra filters or decisions.  For that, it is needed to add a new input to the query of type 'Reference data' and browse to an existing blob (I always use CSV files for this).  In this sample, I am uploading the SensorList.csv that you can view on our github account. 

Now, you can use this data to make SQL-like joins and enrich your query.  I am using the SensorList.csv that is part of the Gist I created for this blog:

 And this allows me to write the following join-query that adds the name of the sensor to the output on my blob.

Troubleshooting and diagnostics

I had some issues in the past days, trying to get things work.  I hope this post helps others in avoiding the same issues.  With this, I also want to thank the ASA team for the help they gave in fixing this.

For logging information, you should fall back on the Operation Logs of Azure that you can get to, through the dashboard of your ASA job.  But (it's a preview, don't forget that!) there were some errors that were not visible in the logs and that required support from the team.  I'll try to list some of the limitations that I encountered here.

Data Conversion Errors

When you are getting Data Conversion errors in the log, the chance is big that you are sending an unsupported object type in your event.  I had sent booleans and Arrays in my event and they were causing issues. 

I created the ToJson() method in my sender (see above), where I am json-serializing a dynamic object with only the allowed types.  This still allows my local application to work with all properties, I'm just making sure to remove or change the incorrect data types, before sending them (by using the dynamic object).

However, the intention is that the value of the unsupported fields will be set to NULL in an update that is expected shortly.  That way, you can still process the events, even if they contain invalid data types.

There is also the possibility to declare the data structure and data types in the query.  That way, you describe the data types of the different fields (without creating something specific)

The following query is an example of that

The details of the issue and the resolution I had, can be found on the msdn forum.

Next blog post

In a next blog post, I'm planning to explore the following things:

  • Output data to SQL Azure
  • ASA - Event Hub specifics
  • Dynamic reference data
  • Combining multiple input streams

Sam.

Categories: Azure
written by: Sam Vanhoutte

Posted on Friday, July 28, 2017 4:59 PM

Toon Vanhoutte by Toon Vanhoutte

Recently I was asked for an intervention on an IoT project. Sensor data was sent via Azure IoT Hubs towards Azure Stream Analytics. Events detected in Azure Stream Analytics resulted in a Service Bus message, that had to be handled by a Logic App. Hence, the Logic Apps failed to parse that JSON message taken from a Service Bus queue. Let's have a more detailed look at the issue!

Explanation

This diagram reflects the IoT scenario:

We encountered an issue in Logic Apps, the moment we tried to parse the message into a JSON object. After some investigation, we realized that the ServiceBus message wasn't actually a JSON message. It was preceded by string serialization "overhead".

Thanks to our favourite search engine, we came across this blog that nicely explains the cause of the issue. The problem is situated at the sender of the message. The issue is caused, because the BrokeredMessage is created with a string object:

If you control the sender side code, you can resolve the problem by passing a stream object instead.

Solution

Unfortunately we cannot change the way Azure Stream Analytics behaves, so we need to deal with it at receiver side. I've found several blogs and forum answers, suggesting to clean up the "serialization garbage" with an Azure Function. Although this is a valuable solution, I always tend to avoid additional components if not really needed. Introducing Azure Functions comes with additional cost, storage, deployment complexity, maintenance, etc…

As this is actually pure string manipulation, I had a look at the available string functions in the Logic Apps Workflow Definition Language. The following expression removes the unwanted "serialization overhead":

If you use this, in combination with the Parse JSON action, you have a user-friendly way to extract data from the JSON in the next steps of the Logic App. In the sample below, I just used the Terminate action for testing purpose. You can now easily use SensorId, Temperature, etc…

Conclusion

It's a pity that Azure Stream Analytics doesn't behave as expected when sending messages to Azure Service Bus. Luckely, we were able to fix it easily in the Logic App. Before reaching out to Azure Functions, as an extensibility option, it's advised to have an in-depth look at the available Logic Apps Workflow Definition Language functions.

Hope this was a timesaver!
Cheers,
Toon

Categories: Azure
written by: Toon Vanhoutte