您的当前位置:首页正文

Flink 窗口函数(window function) 详解

2024-11-30 来源:个人技术集锦

一、概念

在定义好了窗口之后,需要指定对每个窗口的计算逻辑。

Window Function 有四种:

ReduceFunction

AggregateFunction

FoldFunction

ProcessWindowFunction

前面两个会执行的更加有效率,因为在元素到来时,Flink 可以增量的把元素聚合到每个窗口上。ProcessWindowFunction 提供了一个 Iterable 迭代器,可以获得一个窗口的所有元素以及元素的元数据信息。ProcessWindowFunction 执行效率不是很好,因为 Flink 内部需要缓存窗口所有元素。

可以结合 ReduceFunction 、 AggregateFunction、FoldFunction ,来增量的获取部分结果,结合 ProcessWindowFunction 提供的元数据信息做综合处理。

二、ReduceFunction

使用 reduce 函数,让两个元素结合起来,产生一个相同类型的元素,它是增量的

env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // reduce 返回的类型,应该和输入的类型一样
    // 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
    .reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
    .print()

三、AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。

输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。

我们可以自己定义一个聚合器:

class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
    override def createAccumulator(): User = User("", 0, 0)

    override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)

    override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)

    override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
  }

然后应用到计算里:

env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 使用 aggregate 来计算
      .aggregate(new MyAggregateFunction)
      .print()

四、FoldFunction

五、ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。

有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。

但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

自己定义一个 ProcessWindowFunction

class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
      var count = 0
      // 遍历,获得窗口所有数据
      for (user <- elements) {
        println(user)
        count += 1
      }
      out.collect(s"Window ${context.window} , count : ${count}")
    }
  }

在算子中计算:

env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 ProcessFunction 来处理整个窗口数据
      .process(new MyProcessFunction())
      .print()

六、ProcessWindowFunction 结合 其他 函数一起计算

使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。

如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:

class MyReduceFunction extends ReduceFunction[User] {
    override def reduce(value1: User, value2: User): User = {
      if (value1.count > value2.count) value2
      else value1
    }
  }

  class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
      val min = elements.iterator.next
      out.collect((context.window.getStart, min))
    }
  }
env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 reduce 和 processWindowFunction
    .reduce(new MyReduceFunction, new MyProcessFunction)
    .print()

关注公众号,获得更多实时计算架构的文章

显示全文