Building an IoT solution with Azure Event Hubs and Stream Analytics – Part 1

In this blog series, I will attempt to cover Azure Service Bus EventHub as a technology and how you can seamlessly integrate Event Hub with Microsoft Stream Analytics. We will then create an end to end Internet of Things (IoT) scenario leveraging these technologies.

EventHub Overview

Event Hub is a hyper scale stream ingestion entity in Azure Service Bus. It allows for multiple client to publish events that can be persisted within event hub as streams of data, these event can then be used by consumer technologies like Microsoft Stream Analytics to transform it into useful information.

The service bus team has already done an excellent job in creating a comprehensive EventHub developer guide, so we will skip the introduction of EventHub but rather focus on the design patterns and implementation of EventHub with Stream Analytics.

If you are looking into a deeper feature overview of EventHub I would highly recommend you to go through the developer guide first.

I will be covering the following in this multi-part blog series:

Design principles behind EventHub

Cloud computing has changed the paradigm of building scalable applications. It has helped us to enable scenarios which were unrealistic in a privately owned data center.

The Internet of things is the next challenge for the cloud. Think of it this way, up till now cloud hosting providers were focused on scaling applications (of course there are many other benefits of cloud, but I focus on the “infinite” scale aspect for now). The uniqueness of scaling applications is that the demand of these applications on resources is intermittent. For example, a shopping portal can manage massive traffic by allocating more resources during peak hours and then minimizing during lean hours. This is the power and flexibility that the cloud presents us so we can operate in a cost effective way still maintaining customer expectations.

But, what is a lean scale scenario in case of Internet of things? Well, in most cases (especially Telemetry) there is none.

Think of a scenario – a vehicle designed to send telemetry data every 5 second will keep sending it unless it is interrupted because of a network or some other failure. This means that it does require a human to login into the Telematics Unit of the vehicle and the start the transfer of data, the vehicle may be transmitting all the time. It’s like a robot configured to send data without stop!

If you now extrapolate this scenario to a fleet of vehicles and now you have 1M devices sending continuous streams of data without interruption. So technically, the servers always need to respond to the requests to ensure adequate scale.

The above may not apply for all IoT scenarios, but this is a critical scenario for capturing telemetry data from the device. In most cases, the devices are flashed with firmware which has a connection module that reports consistent streams of data at frequent intervals.

So the question is, how do we effectively manage this humongous size?

You may say, add a pub sub messaging layer like a Topic or a Queue and that should take care of it, and it certainly will, however unless you have multiple Topics or Queue created, the pipeline will soon get saturated or require hyper scale at the consumer side to ensure you don’t reach the thresholds for these entities. This will also add more complexity on the development and cost of managing such systems.

Event hubs is targeted to solve such hyper scale IoT scenarios and is specifically targeted towards ingestion of data from several connected clients. Event hubs implements some interesting concepts to achieve this ridiculous scale, let’s look at some of these design principles:

  • Event Stream: From a design perspective event hub achieves high throughput by following a simple event stream log pattern. As events are sent, they are augmented to an event sink in an ordered fashion. Think of it as a giant funnel that allows authorized traffic to enter the system and keeps appending data fragments to a commit log. Events are segregated by partitions (more on partitions later) and may be accessed using time stamp or by an offset. Simplifying the architecture also applies some constraints, to enable high throughput event hub sheds some of the complex features available in other messaging system like sequencing, dead lettering, transactions etc. This, to me seems a reasonable trade off. Most IoT Telemetry scenarios that I have seen are more focused on achieving higher throughputs and some even do not care about loss of data. The main rational behind this is that the device is usually transmitting data at frequent intervals so if one packet is lost, the next update will provide the state of the device. (This of course does not apply for all telemetry scenarios.)

    Note that although the messaging features in EventHub is simplified, it is still an enterprise grade messaging system and leverages the robust Service Bus and Azure infrastructure to meet the operational SLA’s.

  • Scale units: the concept of a scale unit is not new to Azure Service Bus, the existing service bus entities such as topics and queues also follow a scale unit design pattern. A scale unit in its simplified form is pre-allocation of resources to achieve deterministic scale targets. What this means is that the overall system is divided into groups of scale units where each scale units had some defined thresholds, these thresholds have been tested based on the resource allocation to the Scale Unit so it is somewhat guaranteed that the system will perform on optimum scale provided the ingress and egress targets stay within the scale unit thresholds. This is different from an auto scale approach where you keep adding resources dynamically as the load increases. While an auto scale approach works well for scenarios like web front ends or backend worker roles, dealing with a group of resources (such as database, backend nodes etc. together) can make auto scaling complicated. Furthermore, a scale unit provide a degree of isolation thus improving security, if a malicious user is able to hack into the system one the specific scale unit is impacted. Another benefit of scale units is parallel deployment and testability of components, you may upgrade one scale unit in a smaller region with a beta release while the critical geographies continue using the stable version. Event hubs enable the scale unit pattern by Throughput units, a single throughput unit provides:
    • Ingress: Up to 1MB per second or 1000 events per second.
    • Egress: Up to 2MB per second.

    Throughput units are billed at an hourly interval, in the current release you can purchase up to 20 throughput units for a service bus namespace.

  • Partitioned Consumers: Partitions allow for efficient organization of data within EventHub and basically are used to build a log of ordered event streams within EventHub. This of partitions in EventHub as data shards. Partitions play a pivotal role in the classification of data within event hub and also determine how load within event hub will be distributed. Load distribution of a partition however does not correlate to the throughput of EventHub, partitions are more focused in allowing consumers to retrieve data streams more efficiently, and throughput units should be used for improving throughput of the system. An EventHub can have multiple data-isolated partitions, the GA release supports up to 32 partitions but this can be increased by opening a support ticket with Microsoft Azure team.

    EventHub employ a Partitioned Consumer pattern where consumers receive messages on a partition rather than on the entire message stream. This is different from to a service bus Queue or Topic which leverages a Competing consumer pattern allowing multiple clients to read from a single message stream. The benefit of a Partitioned Consumer is that since there is data isolation amongst partitions you can now direct consumers to specific partitions (data shard) reducing the overall load on the messaging layer. Also this approach can allow for segregating Consumers by functionality (using Consumer Groups) and even scale out based on partition load.

  • Granular identity Management: EventHub leverage the SAS (Shared Access Token) model already available in Azure Service Bus to provide a much granular control on publishers and consumers. This is very relevant to an IoT scenario where you would want each sending device to have its own unique identity, EventHub achieves this through Publisher policies for Client sending event streams.

Now that we have an understanding of the EventHub design principles, let’s start working on building our scenario. In the next section, we create a .NET based publisher and consumer for event hub to send and receive data.

Building an IoT solution with Azure Event Hubs and Stream Analytics – Part 3

In Part 2 of this blog series, we created an EventHub and a publisher to send Vehicle Stream data. We also created a consumer for testing our published messages. In this final post, we will look at Microsoft Stream Analytics and how it provide Out Of Box capabilities of processing EventHub data streams in real time.

Introducing Stream Analytics

Stream Analytics is Microsoft answer to real time event processing. It can be employed to enable Complex Event processing (CEP) scenarios (in combination with EventHubs) allowing multiple inputs to be processed in real time to generate meaningful analytics. Technologies like Esper and Apache Storm provide similar capabilities but with Stream Analytics you get an out of box integration with EventHub, SQL Databases, Storage, which make it very compelling for development in Microsoft Azure.

Moreover, it exposes a query processing language which is very similar to SQL 92 syntax, so the learning curve is minimal. In fact, once you have a job created, you can simply use the Azure Management portal to develop queries and run jobs eliminating the need to coding for most use cases. For more information on Stream Analytics refer here.

Let’s leverage Stream Analytics for the Event Hub scenario we development in the previous blog:

Creating a Stream Analytics job

Stream Analytics is still in Preview so the first task is to enable it as a feature for your Azure Subscription.

For limitations of the preview release in creating job refer here.

To do this, login to your Azure subscription account administration. Select your subscription, then choose preview features, scroll down and Click “try it now” against the Stream Analytics option.


Once activated you should see a Stream Analytics extension in your management portal:


You can now start creating Stream Analytics jobs. A job in stream analytics allows you to define the inputs, query logic and outputs for a scenario.

  • Click Create a new analytics Job to start the job creation template.
  • Choose a unique job name
  • Choose a Region, for the preview release Stream Analytics is only available in Central US and West Europe.
  • Choose a monitoring storage account, this defines a storage account where Stream Analytics will capture monitoring and logging data. This does not refer to the storage account of the output from stream processing, you will specify that later.
  • One your job is created, you should have a job that is in a Not Started state.


    Defining Inputs

    The next step is define Inputs for our job, Click on the job we just created and select Input -> Add an Input


     

    There are currently two types of Input that can be added:

  • Data Stream: This is the from where Stream Analytics will read data streams. The data stream can be an Event Hub or blob storage. Since an EventHub can ingest data from multiple clients (publishers), it is a preferred choice for real-time processing on IoT Devices. You may still use Blob storage if you have data populated in a persisted store, the data must have some of TimeStamp for features like Stream Analytics Windowing to be to be employed.
  • Reference data: This can be used to provide look up data like State, Countries, etc. Currently, this can only be a Blob storage account.

    When building a query both inputs appear as dataset that you can include in your query.

     

    For our scenario, we will use EventHub as the data stream:

  • Click Add an Input, Choose Event Hub as the data stream option.


  • In the EventHub settings, Provide an input alias as a unique value, this name will appear as a data source in the query window.
  • From the EventHub dropdown, select the EventHub namespace that we created in Part 2 of this blog. You can also provide EventHub from a different subscription.
  • Select the EventHub you want to use for ingestion
  • Select the EventHub policy, currently Stream Analytics requires the policy to have a Manage rule. If you don’t provide Manage permission, the query processing will fail later. Use the Manage permission for Event Hub that we created in the previous section. I really hope this will change so a granular permission can be specified.


  • Next, we specify the data serialization format. You can choose between JSON, Avro and CSV. Note that this is not the output format but the ingestion data stream format. I used JSON.NET to serialize my VehicleStream type before publishing so I will choose JSON here.


  • Click OK, The job will attempt to test connection by connecting to the EventHub. You should now see the status for the Input as Connected


    Defining Output

    Before we define a query, we will provide an output for our results

  • Click the Output tab, Click Add an Output
  • Stream Analytics preview supports outputting the results in three formats:
    • Blob Storage: push results into a blob storage container for permanent storage.
    • EventHub: transfer results to another EventHub, this is useful where you want to create a pipeline architecture and the results of one processing needs to be the input for other.
    • SQL Database: push the results into a database. This will create a new table in the database.
  • Select Blob Storage, next, provide details of the storage account that should contain the output data. Ensure that you follow appropriate storage guidelines depending on the duration the job is going to run and that data that is going to be stored. You can also specify storage account from a different subscription.


  • Finally, select the data output format. Supported formats are JSON, CSV, Avro. In this case we select CSV. The preview feature only support UTF 8 encoding.


  • Click OK. You have an Output available in Connected state.

    Defining the Query

    Now we come to the interesting part of Stream Analytics, as mentioned before Stream Analytics allows users to create SQL-like syntax for processing of ingestion data streams. The query language that enables this feature is the Stream Analytics Query language. Most of the syntax and constructs of SQL 92 are supported however there are some very interesting additions:

    Windowing

    As the name suggests, Windowing allows for processing data stream within a window of interval. It is mainly handling events that occurred on a slice of the timeline. Stream Analytics does the aggregation over the duration of the window specified. Windowing is always used in the GROUP BY clause.

    There are three variations of Windowing that is supported:

  • Tumbling: process events every n <unit of time>, do not overlap time intervals
  • Hopping: process events for a windows size X <unit of time>, next time hop Y <unit of time> and then again process X <unit of time>. Example: start with processing 10 minutes of stream with a hop size of 5 minutes, next time processing will start from 5 minutes with a windows size of 10 so essentially next processing will be 5-15 instead of 0-10.
  • Sliding: process event that occurred during the time window X <unit of time>

     

    For our scenario, we will create a query that will process based on the following business logic:

    For the last five minutes, output the count all vehicles where average odometer reading is > 10000.

    This can be used by dealers to determine which vehicles are due lease renewals.

     

    Don’t worry if this condition sounds unrealistic, the idea here is to show the simplicity of query development in Stream Analytics, you can create more powerful use cases using the query language:

    To define the query, we simply access the Query tab in the Management Portal to enter a query. The Query window itself is very similar to an SQL query window and provides basic syntax validation. It does not have features like running query for results at least today, you will have to execute the job to view the results.


    Running our Job and results

    Let’s recap what we have done until now, we created a new Stream Analytics Job and defined the Input, Output and Query to be processed. The next step is to run the job. This can simply be done by pressing Start on the Dashboard tab. This will verify the Input and Output connection and then also validate the Query before execution.


     

    Once all validation are successfully completed, Stream Analytics will provide a status on the job. Your job is now reading incoming streams from the EventHub. Think of this as a consumer to your EventHub which will process all incoming data streams.


     

    Now that we have a job running, the final step is to push some event data into the event hub to validate our results. I will use the Publisher that we created in Part 2 of this blog to publish messages on the EventHub. The publisher is a simulator which send event streams for different devices and also repeats status from already sent device to create a mix of incoming data streams.


     

    If you now go and look at the Storage account and the Blob container we mentioned when configuring the Stream Analytics Output, you should see a CSV created. Opening the CSV provides us with the expected results.


    Great, we now have real-time results getting processed from our device Telmetry!

    If you want to monitor the requests being processed by the Stream Analytics job, you can view the Dashboard in Management Portal and you should see Input and Output events getting processed.


    EventHub and Stream Analytics are really powerful techniques that can be used to create end-end IoT solutions. With the support of protocols like AMQP, Https you can cater a lot of new generation powered devices and use these technologies in conjunction to ingest telemetry data from a variety of clients. In case, your devices work on other custom protocol or protocol like MQTT you may still be able to create a front end (protocol head) that accepts request and then transforms the packet in AMQP. From that point onwards you can continue to use EventHub for ingestion and Stream analytics for real-time processing.

Building an IoT solution with Azure Event Hubs and Stream Analytics – Part 2

In Part 1 of this multi-part series blog, we talked about EventHubs and the design patterns that enable it to perform at high throughputs. In this blog post we will create use EventHub for collecting data from a publisher, we will also create a Consumer that can receive the published events.

Problem Scenario

Our problem situation is a fictitious automotive company Contoso Motor Works (CMW). CMW has built their next generation Telemetry system to collect frequent data streams from the vehicles. The data will be used for performing preventive maintenance and near real time analytics for example to provide notifications to the driver in case the engine oil goes below the level. Contoso has chosen EventHub and Stream Analytics to achieve the anticipated scale for their North America vehicle rollout.

The high-level design looks something like this:


Note that since this is a simplified scenario we only output the results in a CSV. In a real world scenario, you can do more powerful stuff such push the results into another EventHub and have a Consumer that sends push notifications using Notification Hubs, etc. I will cover some of those in the next section of this series.

Creating our Data Model

We need to represent Telemetry data from the Vehicle that will be sent as the Event datastream, the following represents a simplistic model for our fictitious scenario:


public class VehicleStream : Entity

{

public int TirePressure { get; set; } // Ignore using psi, using a standard int

public int FuelGaugeLevel { get; set; }

public int EngineOilLevel { get; set; }

public int OdoMeterReading { get; set; }

public bool VTUStatus { get; set; }

}

Continue reading “Building an IoT solution with Azure Event Hubs and Stream Analytics – Part 2”