In this tutorial, we will explore how you can access and analyze data on Hive from Spark.

Spark SQL uses the Spark engine to execute SQL queries either on data sets persisted in HDFS or on existing RDDs. It allows you to manipulate data with SQL statements within a Spark program.

Prerequisite

The only prerequiste for this tutorial is the latest Hortonworks Sandbox installed on your computer or on the cloud.

In case you are running an Hortonworks Sandbox with an earlier version of Apache Spark, for the instruction in this tutorial to install the Apache Spark 1.3.1.

Getting the dataset

To begin, login in to Hortonworks Sandbox through SSH:

The default password is hadoop.

Now let’s download the dataset with the command below:

wget http://hortonassets.s3.amazonaws.com/tutorial/data/yahoo_stocks.csv

and copy the downloaded file to HDFS:

hadoop fs -put ./yahoo_stocks.csv /tmp/

Starting the Spark shell

Use the command below to launch the Scala REPL for Apache Spark:

./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

Notice it is already starting with Hive integration as we have preconfigured it on the Hortonworks Sandbox.

Before we get started with the actual analytics lets import some of the libraries we are going to use below.

import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._

Creating HiveContext

HiveContext is an instance of the Spark SQL execution engine that integrates with data stored in Hive. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive. It reads the configuration for Hive from hive-site.xml on the classpath.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Creating ORC tables

ORC is a self-describing type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads and with integrated support for finding required rows fast. Storing data in a columnar format lets the reader read, decompress, and process only the values required for the current query. Because ORC files are type aware, the writer chooses the most appropriate encoding for the type and builds an internal index as the file is persisted.

Predicate pushdown uses those indexes to determine which stripes in a file need to be read for a particular query and the row indexes can narrow the search to a particular set of 10,000 rows. ORC supports the complete set of types in Hive, including the complex types: structs, lists, maps, and unions.

Specifying as orc at the end of the SQL statement below ensures that the Hive table is stored in the ORC format.

hiveContext.sql("create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")

Loading the file and creating a RDD

Resilient Distributed Dataset(RDD), is an immutable collection of objects that is partitioned and distributed across multiple physical nodes of a YARN cluster and that can be operated in parallel.

Once an RDD is instantiated, you can apply a series of operations. All operations fall into one of two types:transformations or actionsTransformation operations, as the name suggests, create new datasets from an existing RDD and build out the processing DAG that can then be applied on the partitioned dataset across the YARN cluster. An Action operation, on the other hand, executes DAG and returns a value.

Normally, we would have directly loaded the data in the ORC table we created above and then created an RDD from the same, but in this to cover a little more surface of Spark we will create an RDD directly from the CSV file on HDFS and then apply Schema on the RDD and write it back to the ORC table.

With the command below we instantiate an RDD:

val yahoo_stocks = sc.textFile("hdfs://sandbox.hortonworks.com:8020/tmp/yahoo_stocks.csv")

###Separating the header from the data

Let’s assign the first row of the RDD above to a new variable

val header = yahoo_stocks.first

Let’s dump this new RDD in the console to see what we have here:

header

Now we need to separate the data into a new RDD where we do not have the header above and :

val data = yahoo_stocks.filter(_(0) != header(0))

check the first row to seen it’s indeed only the data in the RDD

data.first

Creating a schema

There’s two ways of doing this.

case class YahooStockPrice(date: String, open: Float, high: Float, low: Float, close: Float, volume: Integer, adjClose: Float)

Attaching the schema to the parsed data

Create an RDD of Yahoo Stock Price objects and register it as a table.

val stockprice = data.map(_.split(",")).map(row => YahooStockPrice(row(0), row(1).trim.toFloat, row(2).trim.toFloat, row(3).trim.toFloat, row(4).trim.toFloat, row(5).trim.toInt, row(6).trim.toFloat)).toDF()

Let’s verify that the data has been correctly parsed by the statement above by dumping the first row of the RDD containing the parsed data:

stockprice.first

if we want to dump more all the rows, we can use

stockprice.show

To verify the schema, let’s dump the schema:

stockprice.printSchema

Registering a temporary table

Now let’s give this RDD a name, so that we can use it in Spark SQL statements:

stockprice.registerTempTable("yahoo_stocks_temp")

Querying against the table

Now that our schema’d RDD with data has a name we can use Spark SQL commands to query it. Remember the table below is not a Hive table, it is just a RDD we are querying with SQL.

val results = sqlContext.sql("SELECT * FROM yahoo_stocks_temp")

The resultset returned from the Spark SQL query is now loaded in the results RDD. Let’s pretty print it out on the command line.

results.map(t => "Stock Entry: " + t.toString).collect().foreach(println)

Saving as an ORC file

Now let’s persist back the RDD into the Hive ORC table we created before.

results.saveAsOrcFile("yahoo_stocks_orc")

Reading the ORC file

Let’s now try to read back the ORC file, we just created back into an RDD. But before we do so, we need a hiveContext:


val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

now we can try to read the ORC file with:

val yahoo_stocks_orc = hiveContext.orcFile("yahoo_stocks_orc")

Let’s register it as a temporary in-memory table mapped to the ORC table:

yahoo_stocks_orc.registerTempTable("orcTest")

Now we can verify weather we can query it back:

hiveContext.sql("SELECT * from orcTest").collect.foreach(println)

Voila! We just did a round trip of persisting and reading data to and from Hive ORC using Spark SQL.

Hope this tutorial illustrated some of the ways you can integrate Hive and Spark.

Saptak Sen

If you enjoyed this post, you should check out my book: Starting with Spark.

Share this post