Introduction

In this tutorial, we will explore Apache Storm and use it with Apache Kafka to develop a multi-stage event processing pipeline.

image01

In an event processing pipeline, each stage is a purpose-built step that performs some real-time processing against upstream event streams for downstream analysis. This produces increasingly richer event streams, as data flows through the pipeline:

Prerequisite

In the previous tutorial, we explored collecting and transporting data using Apache Kafka.

So the tutorial Transporting Realtime Event Stream with Apache Kafka needs to be completed before the next steps of this tutorial.

Scenario

The Trucking business is a high-risk business in which truck drivers venture into remote areas, often in harsh weather conditions and chaotic traffic on a daily basis. Using this solution illustrating Modern Data Architecture with Hortonworks Data Platform, we have developed a centralized management system that can help reduce risk and lower the total cost of operations.

This system takes into consideration adverse weather conditions, the driver’s driving patterns, current traffic conditions and other criterions to alert and inform the management staff and the drivers themselves to mitigate risks.

In this tutorial, you will learn the following topics:

Concepts

Apache Storm is a distributed real-time computation system for processing large volumes of high-velocity data in parallel and at scale. With its simple programming interface, Storm allows application developers to write applications that analyze streams comprised of tuples of data; a tuple may can contain object of any type.

At the core of Storm’s data stream processing is a computational topology. This topology of nodes dictates how tuples are processed, transformed, aggregated, stored, or re-emitted to other nodes in the topology for further processing.

Storm Cluster Components

A storm cluster has three sets of nodes:

Five key abstractions help to understand how Storm processes data:

Storm users define topologies for how to process the data when it comes streaming in from the spout. When the data comes in, it is processed and the results are passed onto to other bolts or stored in Hadoop.

Streams

The core abstraction in Storm is the “stream.” It is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are “spouts” and “bolts.” Spouts and bolts have interfaces that you, as an application developer, implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kafka Topic and emit them as a stream, or a spout may even connect to the Twitter API and emit a stream of tweets.

A bolt consumes any number of input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, do streaming joins, talk to databases, and more.

Storm Topologies

A Storm cluster is similar to a Hadoop cluster. Whereas on Hadoop you run “MapReduce jobs,” on Storm you run “topologies.” “Jobs” and “topologies” are different in that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you terminate it).

Networks of spouts and bolts are packaged into a “topology,” which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Links between nodes in your topology indicate how tuples should be passed around. For example, if there is a link between Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C, then every time Spout A emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt B’s output tuples will go to Bolt C as well.

Each node in a Storm topology executes in parallel. In your topology, you can specify how much parallelism you want for each node, and then Storm will spawn that number of threads across the cluster to do the execution.

A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if machines go down and messages are dropped.

###Steps

####Step 1: Start and Configure Storm

  1.  View the Storm Services page

Started by logging into Ambari as admin/admin. From the Dashboard page of Ambari, click on Storm from the list of installed services. (If you do not see Storm listed under Services, please follow click on Action->Add Service and select Storm and deploy it.)

image11

  1.  Start Storm

From the Storm page, click on Service Actions -> Start

image20

Check the box and click on Confirm Start:

image19

Wait for Storm to start.

image17

  1.  Configure Storm

You can check the below configurations by pasting them into the Filter text box under the Service Actions dropdown

image21

image00

image07

image13

image04

image03

Now you can see the UI:

image14

4. Storm User View: You can alternatively use Storm User View as well to view the topologies created by you.

image01

image02

Step 2. Creating a Storm Spout to consume the Kafka truck events generated in Tutorial #1

From tutorial #1 you already have the required New York City truck routes KML. If required, you can download the latest copy of the file with the following command.

wget http://www.nyc.gov/html/dot/downloads/misc/all_truck_routes_nyc.kml --directory-prefix=/opt/TruckEvents/Tutorials-master/src/main/resources/

Recall that the source code is under /opt/TruckEvents/Tutorials-master/src directory

directory and pre-compiled jars are under the /opt/TruckEvents/Tutorials-master/target directory

Verify that Kafka is running using Ambari dashboard. If not, following the steps in tutorial #1

image10

We now have ‘supervisor’ daemon and Kafka processes running.

Running a topology is straightforward. First, you package all your code and dependencies into a single jar. Then, you run a command like the following: The command below will start a new Storm Topology for TruckEvents.

cd /opt/TruckEvents/Tutorials-master/
storm jar target/Tutorial-1.0-SNAPSHOT.jar.com.hortonworks.tutorials.tutorial2.TruckEventProcessingTopology

image16

It should complete with “Finished submitting topology” as shown below.

image06

This runs the class TruckEventProcessingTopology . The main function of the class defines the topology and submits it to Nimbus. The storm jar part takes care of connecting to Nimbus and uploading the jar.

Refresh the Storm UI browser window to see new Topology ‘truck-event-processor’ in the browser.

image12

image22

The TruckEvents producer can now be executed as we did in Tutorial #1 from the same dir:

java -cp target/Tutorial-1.0-SNAPSHOT.jar com.hortonworks.tutorials.tutorial1.TruckEventsProducer sandbox.hortonworks.com:6667 sandbox.hortonworks.com:2181

image24

Go back to the Storm UI and click on “truck-event-processor” topology to drill into it.  Under Spouts you should see that numbers of emitted and transferred tuples is increasing which shows that the messages are processed in real time by Spout

image08

You can press Ctrl-C to stop the Kafka producer

** Under Storm User view: ** You should be able to see the topology created by you under storm user views.

image18

image09

image23

####Step 3: Code description

Let us review the code used in this tutorial. The source files are under the /opt/TruckEvents/Tutorials-master/src/main/java/com/hortonworks/tutorials/tutorial2/

[root@sandbox Tutorials-master]# ls -l src/main/java/com/hortonworks/tutorials/tutorial2/
      total 16
      -rw-r--r-- 1 root root  861 Jul 24 23:34 BaseTruckEventTopology.java
      -rw-r--r-- 1 root root 1205 Jul 24 23:34 LogTruckEventsBolt.java
      -rw-r--r-- 1 root root 2777 Jul 24 23:34 TruckEventProcessingTopology.java
      -rw-r--r-- 1 root root 2233 Jul 24 23:34 TruckScheme.java
topologyConfig.load(ClassLoader.getSystemResourceAsStream(configFileLocation));

Is the base class, where the topology configurations is initialized from the /resource/truck_event_topology.properties files.

This is the storm topology configuration class, where the Kafka spout and LogTruckevent Bolts are initialized. In the following method the Kafka spout is configured.

private SpoutConfig constructKafkaSpoutConf()
{
  
  SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
  
  spoutConfig.scheme = new SchemeAsMultiScheme(new TruckScheme());

  return spoutConfig;
}

A logging bolt that prints the message from the Kafka spout was created for debugging purpose just for this tutorial.

public void configureLogTruckEventBolt(TopologyBuilder builder)

{

  LogTruckEventsBolt logBolt = new LogTruckEventsBolt();

  builder.setBolt(LOG_TRUCK_BOLT_ID, logBolt).globalGrouping(KAFKA_SPOUT_ID);

}

The topology is built and submitted in the following method;

private void buildAndSubmit() throws Exception

{

  ...

  StormSubmitter.submitTopology("truck-event-processor",

  conf, builder.createTopology());

}

Is the deserializer provided to the kafka spout to deserialize kafka byte message stream to Values objects.

public List<Object> deserialize(byte[] bytes)
{
  try
  {
    String truckEvent = new String(bytes, "UTF-8");
    String[] pieces = truckEvent.split("\\|");

    Timestamp eventTime = Timestamp.valueOf(pieces[0]);
    String truckId = pieces[1];
    String driverId = pieces[2];
    String eventType = pieces[3];
    String longitude= pieces[4];
    String latitude  = pieces[5];
    return new Values(cleanup(driverId), cleanup(truckId),
    eventTime, cleanup(eventType), cleanup(longitude), cleanup(latitude));

  }
  catch (UnsupportedEncodingException e)
  {
    LOG.error(e);
    throw new RuntimeException(e);
  }

}

LogTruckEvent spout logs the kafka message received from the kafka spout to the log files under /var/log/storm/worker-*.log

public void execute(Tuple tuple)
{

  LOG.info(tuple.getStringByField(TruckScheme.FIELD_DRIVER_ID) + "," +

  tuple.getStringByField(TruckScheme.FIELD_TRUCK_ID) + "," +

  tuple.getValueByField(TruckScheme.FIELD_EVENT_TIME) + "," +

  tuple.getStringByField(TruckScheme.FIELD_EVENT_TYPE) + "," +

  tuple.getStringByField(TruckScheme.FIELD_LATITUDE) + "," +

  tuple.getStringByField(TruckScheme.FIELD_LONGITUDE));

}

###Summary

In this tutorial we have learned to capture data from Kafka Producer into Storm Spout. This data can now be processed in real time. In our next Tutorial, using Storm Bolt, you will see how to store data into multiple sources for persistence.

Saptak Sen

If you enjoyed this post, you should check out my book: Starting with Spark.

Share this post