Basics of programming Apache Spark
Concepts
RDD or Resilient Distributed Dataset is an immutable collection of objects that is usually partitioned and distributed across multiple physical nodes of the YARN cluster. So once a RDD is instantiated, it cannot be changed.
Typically, RDDs are instantiated by loading data from HDFS, HBASE, etc on a YARN cluster.
Once a RDD is instantiated you can apply a series of operations. All operations fall into one of two types, transformations or actions. Transformation operations build out the processing graph which can then be applied on the partitioned dataset across the YARN cluster once the Action operation is invoked. Each transformation creates a new RDD.
Let’s try it out.
Hands-On
Let’s open a shell to our Sandbox through SSH:
The default password is hadoop
or in case of Sandbox on Azure, what ever you’ve set it to.
Then let’s get some data with the command below in your shell prompt:
wget http://en.wikipedia.org/wiki/Hortonworks
Copy the data over to HDFS on Sandbox:
hadoop fs -put ~/Hortonworks /user/guest/Hortonworks
Let’s start the PySpark shell and work through a simple example of counting the lines in a file. PySpark shell let’s us interact with out data using Spark and Python:
pyspark
In case you do not want such verbose logging, you can change the verbosity in the $SPARK_HOME/conf/log4j.properties
file.
First exit pyspark
console by pressing CTRL+D
.
In case you do not already have the log4j.properties
file, make a copy of the file using the command below
cp /usr/hdp/current/spark-client/conf/log4j.properties.template /usr/hdp/current/spark-client/conf/log4j.properties
Then edit the file /usr/hdp/current/spark-client/conf/log4j.properties
to change the line
log4j.rootCategory=INFO, console
to
log4j.rootCategory=WARN, console
Now relaunch pyspark
Now it’s much cleaner. Let’s start programming Spark.
As discussed above, the first step is to instantiate the RDD using the Spark Context sc
with the file Hortonworks
on HDFS.
myLines = sc.textFile('hdfs://sandbox.hortonworks.com/user/guest/Hortonworks')
Now that we have instantiated the RDD, it’s time to apply some transformation operations on the RDD. In this case, I am going to apply a simple transformation operation filter(f)
using a Python lambda expression to filter out all the empty lines. The filter(f)
method is a data-parallel operation that creates a new RDD from the input RDD by applying filter function f
to each item in the parent RDD and only passing those elements where the filter function returns True
. Elements that do not return True
will be dropped. Like map(), filter can be applied individually to each entry in the dataset, so is easily parallelized using Spark.
myLines_filtered = myLines.filter( lambda x: len(x) > 0 )
Note that the previous Python statement returned without any output. This lack of output signifies, that the transformation operation did not touch the data in any way so far, but has only modified the processing graph.
Let’s make this transformation real, with an Action operation like ‘count()’, which will execute all the transformation actions before and apply this aggregate function.
myLines_filtered.count()
The final result of this little Spark Job is the number you see at the end. In this case it is 341
.
Hope this little example whets up your appetite for more ambitious data science projects on the Hortonworks Data Platform.