您的当前位置:首页正文

Scala计数器开发

2024-11-29 来源:个人技术集锦
package com.imooc.flink.course04
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode

object CounterApp {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = env.fromElements("hadoop","spark","flink","strom")

    val info = data.map(new RichMapFunction[String,String]() {
      //1.定义计数器
      val counter = new LongCounter()
      override def open(parameters: Configuration): Unit = {
        //2.注册计数器
        getRuntimeContext.addAccumulator("ele-counts",counter)
      }
      override def map(in: String): String = {
        counter.add(1)
        in
      }
    })

    val filePath="file:///F://data/"
    info.writeAsText(filePath,WriteMode.OVERWRITE).setParallelism(2)
    val jobResult: JobExecutionResult = env.execute("CounterApp")
    
    //3.获取计数器
    val num = jobResult.getAccumulatorResult[Long]("ele-counts")
    println("num:"+num)

  }

}

显示全文