一. 代码示例

object WordCount {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
        val ssc = new StreamingContext(conf, Seconds(5)) //5秒间隔
        val lines = ssc.socketTextStream(
            StorageLevel.MEMORY_AND_DISK_SER) // 服务器地址,端口,序列化方案
        val words = lines.flatMap(_.split(","))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

二. 源码分析

   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
  extends DStream[T](ssc_) {

  private[streaming] var lastValidTime: Time = null

final private[streaming] class DStreamGraph extends Serializable with Logging {

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
       flatMap()、map()等是Transformations Operations,每个Operations会返回一个对应的DStream,拿map来看:
  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
       这里返回了一个MappedDStream,第一个参数是parent DStream,这就是表明DStream会保存依赖关系。print()是Output Operations操作,会生成一个ForEachDStream并注册到DStreamGraph中:
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
  private[streaming] def register(): DStream[T] = {

 * Start the execution of the streams.
 * @throws IllegalStateException if the StreamingContext is already stopped.
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      StreamingContext.ACTIVATION_LOCK.synchronized {
        try {

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            state = StreamingContextState.STOPPED
            throw e
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
private[streaming] val scheduler = new JobScheduler(this)

       在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler, treamingContext实例拥有JobScheduler实例,在ssc.start() 开始运行时,会调用JobScheduler实例的start()方法。
  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    logInfo("Started JobScheduler")
       JobScheduler是Spark Streaming的Job总调度者。在JobScheduler的start()方法中,会首先创建EventLoop[JobSchedulerEvent]类用来处理各类JobSchedulerEvent。

       JobScheduler有两个非常重要的成员:JobGenerator和ReceiverTracker。JobScheduler将每个batch的RDD DAG具体生成工作委托给JobGenerator,而将源头输入数据的记录工作委托给ReceiverTracker。
