Spark streaming: simple example streaming data from HDFS

This is a little example how to count words from incoming files that are stored in HDFS. Note that Spark streaming can read data from HDFS but also from Flume, Kafka, Twitter and ZeroMQ.

For our example, the virtual machine (VM) from Cloudera was used (CDH5.3).

From the command line, let’s open the spark shell with spark-shell.

Many spark-with-scala examples are available on github (see here). The following example is based on HdfsTest.scala with just 2 modifications for making it work: commenting out the ‘package‘ line and the StreamingExamples.setStreamingLogLevels(), which gives:

// usage within spark-shell: HdfsWordCount.main(Array("hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/"))

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._


/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

From spark-shell, first stop the current spark context

sc.stop()

Create an HDFS directory “/user/cloudera/sparkStreaming” where you will add your incoming files (this can be done from the unix command line (hadoop fs -mkdir /user/cloudera/sparkStreaming) or from the Hue web interface (available from the browser at http://quickstart.cloudera:8888).

Then, from spark-shell run the program.

HdfsWordCount.main(Array('hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/'))

Note that the host and port may not be needed if the namenode is set, then it becomes

HdfsWordCount.main(Array('hdfs:///user/cloudera/sparkStreaming/'))

At this point, you should see INFO lines showing up in the spark-shell and refreshing every 2s.
Now it is time to add a file into /user/cloudera/sparkStreaming. Again you can do it from the command line (hadoop fs -put <localsrc> ... <HDFS_dest_Path>) or from Hue.
Once you have added a file into the HDFS directory, you should see in the spark shell the words of the file you just added being counted. As illustration, here is what I got after adding some LICENSE file.

-------------------------------------------
Time: 1433418756000 ms
-------------------------------------------
(replaced,1)
(executed,1)
(For,3)
(Unless,3)
(offer,1)
(agree,1)
(reproduce,,1)
(NON-INFRINGEMENT,,1)
(event,1)
("Contributor",1)
...

To stop streaming, use from the spark-shell.


Of course this script can be executed from a bash file

  #!/bin/sh
  exec scala "$0" "$@"
  !#
  # ---
  # Here the scala code above 
  # --- 
  HdfsWordCount.main(Array('hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/'))

Or compiled and executed: scalac HdfsWordCount.scala and scala HdfsWordCount, or embedded in a project using the sbt tool.

Best data processing engine: Flink vs Spark

Flink has been recently graduated top-level Apache Project.
Flink started as a research project at the Technical University of Berlin in 2009.
Spark was originally developed in the AMPLab at UC Berkeley in 2009 and became an Apache top-level project in February 2014.

If Flink is less known than Spark, especially outside Europe, it is promising.
One of the most striking difference between Flink and Spark is probably the fact that Flink can deal with true real-time data,
whereas Spark (with Spark Streaming) deals with micro-batch streams that are only near real time.
Flink is claimed to process batch information just as efficiently as Spark.

Another great feature of Flink is that it allows iterative processing to take place on the same nodes rather than having the cluster run each iteration independently. For example, delta iterations can be run only on parts of the data that are changing.

Flink can also run existing mapreduce jobs and works on Tez runtime.
It also has its own memory management system separate from Java’s garbage collector. Flink provides some HTML viewer for debugging purposes.

Flink comes with extra libraries for machine learning (Flink ML) and graph processing (Gelly) as Spark does with MLlib and GraphX. Future compatibility with Python (beta version for the python API) or R will probably facilitates the adoption of data scientists.

Flink gains interest from large companies such as Spotify or Huawei and will certainly be soon one of the big actor in the bigdata world.