Real Time data Ingestion in Hbase and Hive using Storm 


In this tutorial, we will build a solution to ingest real time streaming data into HBase and HDFS. In previous tutorial we have explored generating and processing streaming data with Apache Kafka and Apache Storm. In this tutorial we will create HDFS Bolt & HBase Bolt to read the streaming data from the Kafka Spout and persist in Hive & HBase tables. Prerequisites Lab1 and Lab2 should be completed successfully with a functional Storm and Kafka Bolt reading data from the Kafka Queue.



HBase provides near real-time, random read and write access to tables (or to be more accurate ‘maps’) storing billions of rows and millions of columns. In this case once we store this rapidly and continuously growing dataset from Internet of Things (IoT), we will be able to do super fast lookup for analytics irrespective of the data size.

Apache Storm

Apache Storm is an Open Source distributed, reliable, fault–tolerant system for real time processing of large volume of data. Spout and Bolt are the two main components in Storm, which work together to process streams of data.

  1. View the HBase Services page Started by logging into Ambari as admin/admin. From the previous tutorials: HDFS, Hive, YARN, Kafka and Storm should already be running but HBase may be down. From the Dashboard page of Ambari, click on HBase from the list of installed services. image27  

  2. ** Start HBase** From the HBase page, click on Service Actions -> Startimage21Check the box and click on Confirm Start: image09   Wait for HBase to start (It may take a few minutes to turn green)image26 You can use the Ambari dashboard to check status of other components too. If HDFS, Hive, YARN, Kafka, Storm or HBase are down, you can start them in the same way: by selecting the service and then using the Service Actions to start it. The remaining components do not have to be up. (Oozie can be stopped to save memory, as it is not needed for this tutorial)   Step 2: Create tables in HDFS & HBase.

  3. Creating HBase tables We will be working with 2 Hbase tables in this tutorial. The first table stores all events generated and the second stores the ‘driverId’ and non-normal events count.

[hbase@sandbox root]$ hbase shell
hbase(main):001:0> create 'truck_events', 'events'
hbase(main):002:0> create 'driver_dangerous_events', 'count'
hbase(main):003:0> list

image23 Next, we will create Hive tables. Â    2.  Creating Hive tables Open the Hive view in Ambari in a browser and copy the below script into the query editor and click Execute: http://localhost:8080/#/main/views/HIVE/0.2.0/MyHive  

create table truck_events_text_partition
(driverId string,
truckId string,
eventTime timestamp,
eventType string,
longitude double,
latitude double)
partitioned by (eventDate string)

This creates the Hive table to persist all events generated. The table is partitioned by date.image22 Verify that the table has been properly created by refreshing the Database Explorer. Under Databases, click default to expand this table and the new table should appear. Clicking on the List icon next to truck_events_text_partition shows that the table was created but empty. image14 Â    Â    3.  Creating ORC ‘truckevent’ Hive tables The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data. Syntax for ORC tables: CREATE TABLE … STORED AS ORC ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT ORC Note: This statement only works on partitioned tables. If you apply it to flat tables, it may cause query errors. Next let’s create the ‘truckevent’ table as per the above syntax. Paste the below into the worksheet of the Hive view and click Execute

create table truck_events_text_partition_orc

(driverId string,

truckId string,

eventTime timestamp,

eventType string,

longitude double,

latitude double)

partitioned by (eventDate string)



stored as orc tblproperties ("orc.compress"="NONE");

Refresh the Database Explorer and you should see the new table appear under default:image24 The data in ‘truck_events_text_partition_orc’ table can be stored with ZLIB, Snappy, LZO compression options. This can be set by changing tblproperties (“orc.compress”=”NONE”)option in the query above. Â    4.  Set permissions on /tmp/hive

**chmod -R 777 /tmp/hive/**



Step 3: Launch new Storm topology

Recall that the source code is under /opt/TruckEvents/Tutorials-master/src directory and pre-compiled jars are under the /opt/TruckEvents/Tutorials-master/target directory (Optional) If you would like to modify/run the code:

You should see that it successfully submitted the topology:image12   The topology should also show up on Storm User View: You shall see here that Kafka Spout has started writing to HDFS and HBase.image04



Step 4: Generate Events and Verify Data in HDFS and HBase

  1. Start the â€˜TruckEventsProducer’ Kafka Producer and verify that the data has been persisted by using the Storm Topology view.
[root@sandbox Tutorials-master]# java -cp target/Tutorial-1.0-SNAPSHOT.jar com.hortonworks.tutorials.tutorial1.TruckEventsProducer

Verify in the Storm UI or Storm User View to verify the Bolt section that HDFS/HBase tuples are being executed and ackedimage082.  Verify that the data is in HDFS by opening the Ambari Files view: http://localhost:8080/#/main/views/FILES/0.1.0/MyFiles With the default settings for HDFS, users will see the data written to HDFS once in every few minutes.image17 Drill down into /truck-events-v4/staging dir in HDFSimage00 Click on one of the txt files and confirm that it contains the events: image05

The ‘driver_dangerous_events’ table is updated upon every violation.image07

image18 Notice that this launches a Tez job in the background. You can get more details on this using the Yarn resource manager UI. You can find for this under the link under Ambari -> Yarn -> Quick links but will be similar to http://localhost:8088/clusterimage19 Now query the ORC table by clicking the List icon next to it under Databases and notice it is also now populatedimage06

Code Description 1.


This is the base class, where the topology configuration is initialized from the /resource/ files. 2. This implements the file rotation policy after a certain duration. public FileTimeRotationPolicy(float count, Units units) { this.maxMilliSeconds = (long) (count * units.getMilliSeconds()); }

public boolean mark(Tuple tuple, long offset) {
// The offsett is not used here as we are rotating based on time
long diff = (new Date()).getTime() - this.lastCheckpoint;
return diff >= this.maxMilliSeconds;
  1. LogTruckEvent Spout logs the Kafka messages received from the Kafka Spout to the log files under /var/log/storm/worker-*.log

    public void execute(Tuple tuple) { + “,” + tuple.getStringByField(TruckScheme.FIELD_TRUCK_ID) + “,” + tuple.getValueByField(TruckScheme.FIELD_EVENT_TIME) + “,” + tuple.getStringByField(TruckScheme.FIELD_EVENT_TYPE) + “,” + tuple.getStringByField(TruckScheme.FIELD_LATITUDE) + “,” + tuple.getStringByField(TruckScheme.FIELD_LONGITUDE)); }

  2. This is the deserializer provided to the Kafka Spout to deserialize Kafka’s byte message streams to Values objects.

    public List deserialize(byte[] bytes) { try { String truckEvent = new String(bytes, “UTF-8”); String[] pieces = truckEvent.split(“\|”);

    Timestamp eventTime = Timestamp.valueOf(pieces[0]); String truckId = pieces[1]; String driverId = pieces[2]; String eventType = pieces[3]; String longitude= pieces[4]; String latitude  = pieces[5]; return new Values(cleanup(driverId), cleanup(truckId), eventTime, cleanup(eventType), cleanup(longitude), cleanup(latitude));

    } catch (UnsupportedEncodingException e) { LOG.error(e); throw new RuntimeException(e); }


  3. This creates Hive partitions based on timestamp and loads the data by executing the Hive DDL statements.

    public void loadData(String path, String datePartitionName, String hourPartitionName ) {

    String partitionValue = datePartitionName + “-“ + hourPartitionName;“About to add file[”+ path + “] to a partitions[“+partitionValue + “]”);

    StringBuilder ddl = new StringBuilder(); ddl.append(“ load data inpath “) .append(“ ‘”).append(path).append(“’ “) .append(“ into table “) .append(tableName) .append(“ partition “).append(“ (date=’”).append(partitionValue).append(“’)”);


    The data is stored in the partitioned ORC tables using the following method.

    String ddlORC = “INSERT OVERWRITE TABLE “ + tableName + “_orc SELECT * FROM “ +tableName;

    try { execHiveDDL(“use “ + databaseName); execHiveDDL(ddl.toString()); execHiveDDL(ddlORC.toString()); } catch (Exception e) { String errorMessage = “Error exexcuting query[“+ddl.toString() + “]”; LOG.error(errorMessage, e); throw new RuntimeException(errorMessage, e); } }

  4. This creates a connection to HBase tables and access data within the prepare() function.

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { … this.connection = HConnectionManager.createConnection(constructConfiguration()); this.eventsCountTable = connection.getTable(EVENTS_COUNT_TABLE_NAME); this.eventsTable = connection.getTable(EVENTS_TABLE_NAME); }

    … }

Data to be stored is prepared in the constructRow() function using put.add().

private Put constructRow(String columnFamily, String driverId, String truckId,
Timestamp eventTime, String eventType, String latitude, String longitude)

String rowKey = consructKey(driverId, truckId, eventTime);
put.add(CF_EVENTS_TABLE, COL_DRIVER_ID, Bytes.toBytes(driverId));
put.add(CF_EVENTS_TABLE, COL_TRUCK_ID, Bytes.toBytes(truckId));


This executes the getInfractionCountForDriver() to get the count of events for a driver using driverID and stores the data in HBase with constructRow() function.

public void execute(Tuple tuple)

long incidentTotalCount = getInfractionCountForDriver(driverId);


Put put = constructRow(EVENTS_TABLE_NAME, driverId, truckId, eventTime, eventType,
latitude, longitude);

incidentTotalCount = this.eventsCountTable.incrementColumnValue(Bytes.toBytes(driverId), CF_EVENTS_COUNT_TABLE,
  1. HDFS and HBase Bolt configurations created within configureHDFSBolt() and configureHBaseBolt() respectively.

    public void configureHDFSBolt(TopologyBuilder builder) {

    HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl(fsUrl) .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy) .addRotationAction(hivePartitionAction);

    } public void configureHBaseBolt(TopologyBuilder builder) { TruckHBaseBolt hbaseBolt = new TruckHBaseBolt(topologyConfig); builder.setBolt(HBASE_BOLT_ID, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT_ID); }

Appendix A: Updating Tutorials-master Project

The maven build should succeed Appendix B: Enabling remote desktop on sandbox and setting up Storm topology as Eclipse project

  1. Setup Ambari VNC service on the sandbox to enable remote desktop via VNC and install eclipse using steps here
  2. Import code as Eclipse project using steps here: Back Next

Saptak Sen

If you enjoyed this post, you should check out my book: Starting with Spark.

Share this post