Spark streaming: simple example streaming data from HDFS

This is a little example how to count words from incoming files that are stored in HDFS. Note that Spark streaming can read data from HDFS but also from Flume, Kafka, Twitter and ZeroMQ.

For our example, the virtual machine (VM) from Cloudera was used (CDH5.3).

From the command line, let’s open the spark shell with spark-shell.

Many spark-with-scala examples are available on github (see here). The following example is based on HdfsTest.scala with just 2 modifications for making it work: commenting out the ‘package‘ line and the StreamingExamples.setStreamingLogLevels(), which gives:

// usage within spark-shell: HdfsWordCount.main(Array("hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/"))

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._


/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <directory>
 *   <directory> is the directory that Spark Streaming will use to find and read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    $ bin/run-example \
 *       org.apache.spark.examples.streaming.HdfsWordCount localdir
 *
 * Then create a text file in `localdir` and the words in the file will get counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

From spark-shell, first stop the current spark context

sc.stop()

Create an HDFS directory “/user/cloudera/sparkStreaming” where you will add your incoming files (this can be done from the unix command line (hadoop fs -mkdir /user/cloudera/sparkStreaming) or from the Hue web interface (available from the browser at http://quickstart.cloudera:8888).

Then, from spark-shell run the program.

HdfsWordCount.main(Array('hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/'))

Note that the host and port may not be needed if the namenode is set, then it becomes

HdfsWordCount.main(Array('hdfs:///user/cloudera/sparkStreaming/'))

At this point, you should see INFO lines showing up in the spark-shell and refreshing every 2s.
Now it is time to add a file into /user/cloudera/sparkStreaming. Again you can do it from the command line (hadoop fs -put <localsrc> ... <HDFS_dest_Path>) or from Hue.
Once you have added a file into the HDFS directory, you should see in the spark shell the words of the file you just added being counted. As illustration, here is what I got after adding some LICENSE file.

-------------------------------------------
Time: 1433418756000 ms
-------------------------------------------
(replaced,1)
(executed,1)
(For,3)
(Unless,3)
(offer,1)
(agree,1)
(reproduce,,1)
(NON-INFRINGEMENT,,1)
(event,1)
("Contributor",1)
...

To stop streaming, use from the spark-shell.


Of course this script can be executed from a bash file

  #!/bin/sh
  exec scala "$0" "$@"
  !#
  # ---
  # Here the scala code above 
  # --- 
  HdfsWordCount.main(Array('hdfs://quickstart.cloudera:8020/user/cloudera/sparkStreaming/'))

Or compiled and executed: scalac HdfsWordCount.scala and scala HdfsWordCount, or embedded in a project using the sbt tool.

Written by Jean-Baptiste Poullet

Data analyst – consultant – freelancer
Expert in Bigdata
Founder of RBelgium – R community in Belgium
Owner of the company Stat’Rgy
Contact me at jeanbaptistepoullet@statrgy.com

Posted in Uncategorized.

2 Comments

  1. Hi there,

    I ran this exactly as you stated, and I still get no output. Cloudera VM. Pretty standard.

    I am wondering what could be wrong.

    Any idea?

Leave a Reply

Your email address will not be published. Required fields are marked *