wiki

Codit Wiki

Loading information... Please wait.

Codit Blog

Posted on Wednesday, January 21, 2015 2:19 PM

Sam Vanhoutte by Sam Vanhoutte

Azure Stream Analytics is a very interesting service on the Azure platform that allows developers and data analysts to get interesting information or events out of a stream of incoming events. The service is part of the typical IoT value chain and is positioned in the transformation part. This post describes how to get started.

Azure Stream Analytics is a very interesting service on the Azure platform that allows developers and data analysts to get interesting information or events out of a stream of incoming events.  The service is part of the typical IoT value chain and is positioned in the transformation part.  The following picture is taken from a Microsoft presentation on IoT and shows Stream Analytics, connected with Event Hubs.

 

This post explains how you can start using Azure Stream Analytics and covers some troubleshooting tips that should save you some time.  My scenario will explain how you can send data through the Azure Event Hubs and how you can apply the standing queries of Azure Stream Analytics to get some added value out of the incoming stream.

In my scenarios and this post, I will solely focus on Azure Event Hubs as the input for my jobs.  You can use blobs as well, but I believe Event Hubs is the closest match with the integration world.

Typical scenarios for Complex Event Processing

I tried to describe some of the typical scenarios where Stream analytics can be used.  But on the high level, you can say that any scenario that needs to get intelligence out of incoming streams by correlating events should be a good fit for ASA (the abbreviation of Azure Stream Analytics).

Noise reduction

When talking with customers on frequency of incoming events (for example sensor data), we see often that there's a tendency to send data in a higher frequency that mostly needed.  While this has some advantages (better accuracy), this is not always needed.  The higher the frequency of ingested events, the higher the ingestion costs.  (potential extra data transfer costs on the device side and higher number of incoming events).  Typically the ingestion costs are not that high (when using Azure Event Hubs, for example you pay around 2 cents for 1mio events), but the long term storage cost is always higher (as the data mostly increases each month and you pay for the same data every month again). 

That's why it would be good to reduce the incoming stream with the high frequency into an aggregated (averages/sums)  output stream to the long term storage.  Then still you benefit from the higher accuracy, but you save on the expensive cold storage.

Enrich events with reference data

It is possible to join the incoming data with reference tables/data.  This allows you to build in decision logic. For example, you could have a table with all the devices and their region or customer.  And you could use that extra information to join with the telemetry information. (using the DeviceId) and aggregate per region, customer etc.

Leverage multiple outputs (pub-sub style)

When using Azure Event Hubs, it is perfectly possible to create multiple ASA jobs that share the same EventHub as their input.  This allows to get multiple results, calculations or events out of the incoming stream.  For example, one job could extract data for archiving, while another job just looks for anomalies, or event detection.

Detect special events, anomalies or trends

The canonical example, used in complex event processing, is the TollBooth or traffic speed scenario.  Every car passing a toll booth or the traffic camera, results in a new event.  But the added value is in detection special cases in the stream of events. 

  • Maybe it's needed to automatically trigger an event if the duration of one car from one booth to another is shorter than the allowed time, because he is speeding.  This event can then be processed by the backend.
  • The same stream can also be used to detect traffic jams, if the average time between booths decreased in a window of time.  Here it is important to take the average values, in order to avoid the extremes of cars that are very slow, or cars that speed.
  • And at the same time, license plates could be matched against the reference data of suspected cars.  Cars that are being sought after because they are stolen, or because they are driven by suspects of crime.

Getting started with Azure Stream Analytics

We will start with a very basic scenario, where we will send events to an Event Hub and we will execute standing queries against that stream, using ASA. This is a quick step-by-step guide to create an Analysis job.

Sending data to Azure Event Hubs

In order to create an event hub, there are a lot of blog posts and samples that explain this.

The sending of data is probably the most important part, as it defines the data structure that will be processed by the ASA job.  Therefore, it is important that the event that is sent to the Event Hub is using supported structures.  At the end of this post, I have some troubleshooting tips and some limitations that I encountered.

The following code snippet sends my telemetry-event to the Event Hub, serializing it as JSON.

As you can see, we are defining a TelemetryInfo object that we send multiple times (in parallel threads) to the event hub.  We just have 10 devices here to simulate data entry.  These json-serialized objects get UTF8-encoded and are sent in the EventData object of EventHub SDK.

Creating the Analysis job

In order to create the job, you have to make sure the subscription is enabled for Azure Stream Analytics.  To do this, log on to http://account.windowsazure.com with the subscription owner and click on preview features to enable ASA.

Log on to the Azure portal and click the ASA icon.  There, you can create a new job.  The job has to have a name, a region and a monitoring account.  (there is one monitoring account for every region).

Once this is done, it is required to specify one or more inputs, the actual query and the output to which the job has to write.

Create the inputs

Select the inputs tab in your job.

Create a new input and define the input as Data Stream (constant inflow of data).  Select Event Hub in the second window of the wizard, as that's the input we want to work with.  In the 3rd part of the wizard, we have to select the event hub we are sending our events to.  Ideally this is in the same subscription.

In the last part, we have to specify the encoding and serialization.  We will use json as UTF8

Create the output

One job can have only one output at this moment, which is unfortunate.  If you also would love to see this changed, don't hesitate to vote on my suggestion on uservoice.

In this step, we will create a blob output that takes our stream events and outputs it in a CSV file. 

For that, we click on the Output tab and create a new one.  To start, we select Blob storage.  As you can see, we can also output our data to SQL Azure, or to another Event Hub.  In the wizard, we have to provide our blob settings and the serialization/encoding, as you can see in the following screenshots.

Creating the query

The scenario we are now implementing is really like the "hello world" example of Stream Analytics.  We just take all the incoming data from the event hub and will output it to the blob storage.

Therefore, we just create the following query: SELECT * FROM TelemetryStream, where TelemetryStream is the name of our input.  

Starting and testing the job

Now we have all things configured, we can start the job.  And if everything goes well, we can run the producer (see above) and we should see that the data is getting written to our blob container, based on the properties we specified.  This allows us to easily get data and get a basic test on which we can start defining our query to the level we need.  If you have issues, please refer to the troubleshooting section of this post.

Testing queries

The query windows also allows users to test their queries against sample files, which is a big improvement since a few months.  The only challenge one has is to get a good sample file.  For that, I mostly change the output of my job to JSON/UTF and apply the SELECT * query again, resulting in a perfect good test file.  I then take that json file to upload it in the test wizard.  If the data is valid, you can easily test the query and see the output of the query in the bottom of the query screen.

Writing Analytics queries

As you can see in the source code (and in the output on our blob), the data we are sending contains the following fields:

DeviceId, ErrorReading, MainValue, Type, EventProcessedUtcTime, PartitionId, EventEnqueuedUtcTime.  The last 3 fields are added by ASA and can be used in the query as well.

Now we will dive a little deeper in the query language of ASA, that really feels like plain old SQL. 

Time-based aggregation

One of the most common aggregation methods that is needed (explained in the noise reduction scenario) is the grouping of events by a window of time.  ASA provides 3 types of windows, of which the tumbling window (fixed time intervals) is the most common.  The details of these windows are explained well in the documentation.  The other windowing methods are Hopping (overlapping time windows) and Sliding (flexible windowing) windows.

If we now want to aggregate the data from our sample, we can easily update the query to aggregate the sensor data per minute interval, by writing the following query:

The documentation outlines good examples and other functions.

Using reference data

It is also possible to add reference data to your query, so that this data can be used to enrich the query or take extra filters or decisions.  For that, it is needed to add a new input to the query of type 'Reference data' and browse to an existing blob (I always use CSV files for this).  In this sample, I am uploading the SensorList.csv that you can view on our github account. 

Now, you can use this data to make SQL-like joins and enrich your query.  I am using the SensorList.csv that is part of the Gist I created for this blog:

 And this allows me to write the following join-query that adds the name of the sensor to the output on my blob.

Troubleshooting and diagnostics

I had some issues in the past days, trying to get things work.  I hope this post helps others in avoiding the same issues.  With this, I also want to thank the ASA team for the help they gave in fixing this.

For logging information, you should fall back on the Operation Logs of Azure that you can get to, through the dashboard of your ASA job.  But (it's a preview, don't forget that!) there were some errors that were not visible in the logs and that required support from the team.  I'll try to list some of the limitations that I encountered here.

Data Conversion Errors

When you are getting Data Conversion errors in the log, the chance is big that you are sending an unsupported object type in your event.  I had sent booleans and Arrays in my event and they were causing issues. 

I created the ToJson() method in my sender (see above), where I am json-serializing a dynamic object with only the allowed types.  This still allows my local application to work with all properties, I'm just making sure to remove or change the incorrect data types, before sending them (by using the dynamic object).

However, the intention is that the value of the unsupported fields will be set to NULL in an update that is expected shortly.  That way, you can still process the events, even if they contain invalid data types.

There is also the possibility to declare the data structure and data types in the query.  That way, you describe the data types of the different fields (without creating something specific)

The following query is an example of that

The details of the issue and the resolution I had, can be found on the msdn forum.

Next blog post

In a next blog post, I'm planning to explore the following things:

  • Output data to SQL Azure
  • ASA - Event Hub specifics
  • Dynamic reference data
  • Combining multiple input streams

Sam.

Categories: Azure IoT Troubleshooting
written by: Sam Vanhoutte

Posted on Monday, January 12, 2015 11:28 AM

Tom Kerkhove by Tom Kerkhove

Automatically expiring messages in Service Bus is a nice feature but you have to be aware that it does not "automatically" move expired messages to the dead-letter queue. In some scenarios this can cause reliability loss.

Note: This content was originally released on my personal blog http://tomkerkhove.ghost.io/.

Lately my team and I have been using Service Bus for Windows Server in our projects. We are hosting it on our on-premise infrastructure.
Besides the name, everything is actually pretty much the same as Azure Service Bus, except for the fact that you’re in charge of the installation & administration. Next to that, you’re stuck with an “old” version of Azure Service Bus - Currently v2.1 - which has a more limited set of features than the latest and greatest Azure Service Bus. f.e. No auto forwarding from deadletter queue to another entity.

Our scenario was built around the concept of queues and messages that should only live for a certain amount of time, also called time-to-live (TTL).
Once a message has exceeded its time-to-live it needs to expire and be moved to the deadletter queue. This is a common messaging scenario.

Getting started with Message expiration & dead-lettering

When you create a queue in Service Bus, you are actually creating two queues – Your requested queue called ‘YourQueueName’ and a dead-letter queue. Once a message is expired, it will automatically be moved to this dead-letter queue if you enable this.

Dead-lettering scenario

Enabling expiration of messages on your queue

Enabling this functionality on your queue is very easy - You create a QueueDescription based on your queue name and you enable EnableDeadLetteringOnMessageExpiration, that's it!

But hold on, it currently uses the default Time-to-Live for each message in the queue which is 10675199 days and thus almost infinite. You can specify this time window yourself by changing the DefaultMessageTimeToLive-property.

Specifying Time-to-Live to a message

You can also assign a Time-to-Live window to your individual messages. This gives you more control on expiring individual messages.

You can achieve this by assigning a window to TimeToLive property of your message.

Queue Time-to-Live vs Message Time-to-Live

As we've just seen we can specify the Time-to-Live on a queue and on a message, but you can also combine them. This allows you to define a default on the queue-level and exceptionally assign one to a message.

Bare in mind that the shortest Time-to-Live windows will be applied, whether it is your queue's default or the message's Time-to-live!

Here is a small example - Imagine you're in a scenario where you send different types of messages to one queue but certain messages need to expire sooner than the others.

Processing expired messages from the dead-letter queue

As mentioned before all your expired messages will be moved to the dead-letter queue of your queue and sit there until you process them.

Processing them is the same as processing messages from your normal queues except that the name, also called entity path, is different and applies the following pattern

QueueName\$DeadLetterQueue

If you are using the Azure SDK, you can even generate the name for your dead-letter queue with one line of code -

Note - It is important to know that there could also be other messages in the DLQ f.e. poison messages but this is out of scope of this article.

How does Service Bus expiration work?

Now that we've seen how we can enable expiration and assign the time your messages should be alive, we can take a look at how it works based on a fictional scenario.

We have a back-end system that is sending invoices to our client application over a Service Bus queue. This allows the customer to determine when he wants to receive the next invoice but to make sure that each invoice is payed in the required amount of time we want to expire the invoice and receive it back at the back-end.

Periodic Receive-scenario

As I was building this scenario I noticed that Service Bus will not monitor your queue expired messages, instead it waits until you perform a Receive. Then it will move all expired messages at that time to your dead-letter queue. Important to know that a Peek is not sufficient as messages will not expire!

In our scenario this means that when the customer doesn't request a new invoice for a very long time the messages will not expire and thus not be processed by the backend.

Although this is a big problem in our scenario since we are relying on Service Bus, it also pretty much makes sense as well - This avoids that Service Bus is permanently monitoring our messages which results in a higher load and lower scalability.

Implementing the "Monitoring" pattern

We had to redesign our scenario because of this behavior. Since we couldn't rely on the automatically expiration of messages, we stepped away from TTL and thanks to Clemens Vasters we implemented a pattern I call the "Monitoring" pattern.

Instead of only using one queue we are now using two queues - Our InvoiceQueue and a MonitorQueue.

Every time we send a message to our InvoiceQueue we will send a dummy message to the Monitoring-queue. That dummy message will contain metadata about the original invoice and will also have a ScheduledEnqueueTimeUtc based on the current UTC time with addition of the required TTL timespan. This will enqueue our dummy message but it will not be visible for the receivers until the specified time.

Once our backend system picks up one of the dummy messages from the MonitorQueue it will use the metadata to look if the message still exists on the InvoiceQueue.
If it is still present, it will remove it from the InvoiceQueue and perform the required logic because the customer failed to process it in time. If the message is already gone it will just remove it from the MonitorQueue and move on to the next one.

Alternative Monitoring pattern

One small side note is that we used Service Bus sessioning on the messages in the InvoiceQueue, this allows the backend system to retrieve the session Id from the dummy message its metadata and request that specific session on the InvoiceQueue.

Don't believe me? Try it yourself!

I've prepared a sample application that shows you the behavior with Azure Service Bus and Service Bus for Windows Server.

You can download it here.

Conclusion

Automatically expiring messages in Service Bus is a nice feature but you have to be aware that it does not "automatically" move expired messages to the dead-letter queue. Once you know a Receive() is required you can build your solution around this and increase your expired messages reliability.

Thanks for reading,

Tom.

Categories: Azure Service Bus
written by: Tom Kerkhove

Posted on Thursday, December 18, 2014 8:06 AM

Sam Vanhoutte by Sam Vanhoutte

This post describes how you can secure specific azure service bus relay endpoints with Shared Access Signatures and move away from the ACS sharedsecret credentials.

For a few months, it is clear that the Azure Service Bus team is moving away from ACS (but still supports it!) as the main authentication mechanism.  This has been detailed in various blog posts and news items and the intention of this blog post is not to repeat what has been said before.

Instead, we will focus on how it is possible to secure relay endpoints with Shared Access Signatures.  This is currently not well-documented.  This blog post describes how you can easily secure a relay service, using SAS.

Concept

Too often, we see the usage of the RootManageSharedAccessKey in demo and sample code (mine sometimes included).  And earlier, a lot of those samples were using the 'owner' SecretKey.  What's worse, I've seen these things in production.  The main reason is probably that it is too much work for people to create specific user rights on messaging entities or on relay endpoints.  But ignoring security is never good, right?

Especially when deploying services in the field, in remote data centers, computers and even devices.  There it is crucial to ensure that the credentials only allows that specific service, device or client to perform the actions they are entitled to.  By doing so, it is also easy to revoke one of those clients if that would be needed, without impacting the other existing connections.

Secure on path-level (Uri)

Just like we can enable authorization rules on messaging entities, such as queues or topics, we can also enable specific access rules for the hierarchical paths on relay endpoints.  When we deploy several of our cloud connectors for Integration Cloud (these are on prem agents, exposing a relay endpoint), we always foresee a unique path per connector endpoint and we also make sure that the credentials that are deployed with the connector have the minimum rights to listen on that exact URI (path) and not on other locations.

Security validation with Relay Services

When using the WCF bindings for relay services, you specify service credentials and the Path where the service will be registered.  When a service wants to register on such an endpoint, the Azure Service Bus Relay service will verify if the following conditions are met:

  • Is the service registering with valid credentials? (authentication)
  • Is the service allowed to Listen on that specific Uri/Path with those credentials? (authorization)

The same method is used when a client is calling a relay endpoint with specific credentials.  Instead now it is verified if he has the Send access rights.  The next table shows the various access rights.

Access rights

The following access rights are available to secure the hierarchy of a Service Bus namespace.  There is a difference in usage and working between the Relay Services and Messaging entities.

  Relay Services Messaging entities
Listen

Right to open a service endpoint on the Uri

Right to receive messages from entities on the entities under that Uri

Send

Right to call a service endpoint on the Uri 

Right to send messages to entities on the entities under that Uri

Manage

Right to change settings of relay endpoints

Right to create, change or delete entities under that Uri

 

Create SAS authorization rules 

In order to deploy a relay service in the wild, we want to use security credentials that have the minimum required Access Rights.  The following code sample shows how to do this.

What's important to notice here is that we explicitly define the RelayDescription for a specific Path and on that Path, we create one or more SharedAccessAuthorizationRules.  These can be used to give Listen, Send or Manage rights. 

In practice, there will be a SharedAccessAuthorizationRule for the service (ListenAccessKey and one for every client that needs to call the service (in this case we created one: SendAccessKey).  This way we have full flexibility in revoking clients and when the SAS Key of the service gets compromised, the potential damage is restricted to the subpath of our namespace.

Using the keys in the web service or web clients

To use the keys in the web client or the web service, the following app/web.config settings can be specified.  There is one thing that is crucial here: the binding has to specify that it is not a dynamic binding!  If this is not done, you can get the AddressAlreadyInUseException: System.ServiceModel.AddressAlreadyInUseException: System error.  Therefore, use the isDynamic="false" setting. 

Note: This took me a real long time to find out and it was Dan Rosanova who pointed that one out to me.  Thanks for that!

The web service web.config

The web client app.config

These things are also be available through the latest update of the ServiceBusExplorer tool, by Paolo Salvatori.

Posted on Saturday, December 6, 2014 12:19 AM

Peter Borremans by Peter Borremans

Today Microsoft went into more detail on the Host Integration roadmap. Read all about it in this article.

As integration specialists, Codit developed numerous integration projects involving Host Systems.

After the announcements during Integrate 2014, I was very interested to see how BizTalk, BizTalk Services and Host Integration will cope with these changes.

The Host Integration Team represented by Paul Larsen, published a clear roadmap of how Host Integration Server will evolve.
The following Host Integration Server features will become available as Microservices:

  • CICS, IMS and i programs application integration
  • DB2 and Informix databases
  • WebSphere MQ messages (using MS client)

The Host Integration Team will also provide connectors to use in Power BI for Office to DB2 and Informix databases (Power Query, Power Pivot)

Host Integration Server vNext will support Informix databases for both ADO.NET and BizTalk Server.

I was very pleased to see the clear and concrete roadmap and hope to see this from the other product teams as well.

Posted on Friday, December 5, 2014 1:43 PM

Peter Borremans by Peter Borremans

A first glance and closer look at the Azure Microservices technology, the new integration platform within the Microsoft stack.

Azure Microservices – first glance

As my colleague Sam promised in his ‘initial thoughts’ blog post about Azure Microservices, we would come back to you with as much details about Azure Microservices as we can right now.

The Azure Microservice technology will be leveraged as the core to build the new integration platform within the Microsoft stack. As this will be the technology underneath the integration platform, it deserves a closer look.

Let’s have a look at the platform of Azure Microservices. The platform of Azure Microservices contains the following parts:

  • Hosting
  • Development
  • Gateway
  • Workflow engine
  • Gallery

Hosting

Azure Microservices will be hosted in Azure App Containers, which are run as Azure Websites. The choice for Azure Websites as a hosting environment is not a coincidence, it is an enterprise grade cloud that supports global scale and already runs millions of websites and Web API’s.

Each Microservice exists as an independently deployable unit of logic/functionality that is exposed via a RESTful API.

Development

The Azure Microservices technology is open to a wide range of developers. Microservices can be written in one of the following languages: .Net, Java, PHP, Python, Node. This makes it possible for developers to use the language they are most productive in.

Gateway

The gateway will handle calls between Microservices. Microservices will never call each other directly.
Having the gateway in between Microservices calls allows to implement security, monitoring and governance in a centralized manner.

Workflow engine

The workflow engine will orchestrate API execution or the Microservices in that workflow. This workflow definition will be JSON based!

Each of these Microservices in a workflow will be monitored closely. The workflow engine will allow to monitor parameters like installed applications, number of calls to components, network traffic, detailed performance data, up-time and crashes.

Gallery

The gallery will contain Microservices that you can use that are developed by you, Microsoft or third party organizations. Microservices you create can be kept either private to your organization or made public for Azure users. The gallery will allow reusing existing functionality and be more productive when delivering complete workflows.

A Microservice author who publishes his Microservice to the gallery will also get feedback about the performance of his Microservices. Crash logs will be communicated to the Microservice author.

Integration platform on Microservices

To make integration possible on this technology, the integration concepts we use today will be implemented on the Microservices platform. Transform, trading partner management, connectors, rule engine, validation, batching (…) will all be made available as a Microservice that can be plugged into your workflows.

 

 

Categories: Integration Cloud
written by: Peter Borremans