您的当前位置:首页正文

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据

2024-12-01 来源:个人技术集锦

在实际的业务中,我们经常会遇到数据迟到的情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.

我们先来看一下官网给出的一张图,非常形象地展示了Process Time、Event Time、Ingestion Time这三个时间分别所处的位置,如下图所示:

Event Time

事件时间:事件时间是每条事件在它产生的时候记录的时间,该时间记录在事件中,在处理的时候可以被提取出来。小时的时间窗处理将会包含事件时间在该小时内的所有事件,而忽略事件到达的时间和到达的顺序事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结果。事件时间依赖于事件本身,而跟物理时钟没有关系。利用事件时间编程必须指定如何生成事件时间的watermark,这是使用事件时间处理事件的机制。机制是这样描述的:事件时间处理通常存在一定的延时,因此自然的需要为延时和无序的事件等待一段时间。因此,使用事件时间编程通常需要与处理时间相结合。

Ingestion Time

摄入时间:摄入时间是事件进入flink的时间,在source operator中,每个事件拿到当前时间作为时间戳,后续的时间窗口基于该时间。摄入时间在概念上处于事件时间和处理时间之间,与处理时间相比稍微昂贵一点,但是能过够给出更多可预测的结果。因为摄入时间使用的是source operator产生的不变的时间,后续不同的operator都将基于这个不变的时间进行处理,但是处理时间使用的是处理消息当时的机器系统时钟的时间。与事件时间相比,摄入时间无法处理延时和无序的情况,但是不需要明确执行如何生成watermark。在系统内部,摄入时间采用更类似于事件时间的处理方式进行处理,但是有自动生成的时间戳和自动的watermark。

Process Time

处理时间:当前机器处理该条事件的时间流处理程序使用该时间进行处理的时候,所有的操作(类似于时间窗口)都会使用当前机器的时间,例如按照小时时间窗进行处理,程序将处理该机器一个小时内接收到的数据。处理时间是最简单的概念,不需要协调机器时间和流中事件相关的时间。他提供了最小的延时和最佳的性能。但是在分布式和异步环境中,处理时间不能提供确定性,因为它对事件到达系统的速度和数据流在系统的各个operator之间处理的速度很敏感

基于处理时间的系统

对于这个例子,我们期望消息具有格式值,timestamp,其中value是消息,timestamp是在源生成此消息的时间。由于我们正在构建基于处理时间的系统,因此以下代码忽略了时间戳部分。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

情况1:消息到达不间断

假设源分别在时间13秒,第13秒和第16秒产生类型a的三个消息。(小时和分钟不重要,因为窗口大小只有10秒)。

这些消息将落入Windows中,如下所示。在第13秒产生的前两个消息将落入窗口1 [5s-15s]和window2 [10s-20s],第16个时间生成的第三个消息将落入window2 [ 10s-20s]和window3 [15s-25s] ]。每个窗口发出的最终计数分别为(a,2),(a,3)和(a,1)。

该输出可以被认为是预期的行为。现在我们将看看当一个消息到达系统的时候会发生什么。

情况2:消息到达延迟

现在假设其中一条消息(在第13秒生成)到达延迟6秒(第19秒),可能是由于某些网络拥塞。你能猜测这个消息会落入哪个窗口?

延迟的消息落入窗口2和3,因为19在10-20和15-25之间。在window2中计算没有任何问题(因为消息应该落入该窗口),但是它影响了window1和window3的结果。那怎么办呢?我们现在将尝试使用EventTime处理来解决这个问题。

基于EventTime的系统

我们现在需要设置这个时间戳提取器,并将TimeCharactersistic设置为EventTime。其余的代码与ProcessingTime的情况保持一致。

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

要启用EventTime处理,我们需要一个时间戳提取器,从消息中提取事件时间信息。请记住,消息是格式值,时间戳。该extractTimestamp方法获取时间戳部分并将其作为一个长期。现在忽略getCurrentWatermark方法,我们稍后再回来。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}

运行上述代码的结果如下图所示

结果看起来更好,窗口2和3现在发出正确的结果,但是window1仍然是错误的。Flink没有将延迟的消息分配给窗口3,因为它现在检查了消息的事件时间,并且理解它不在该窗口中。但是为什么没有将消息分配给窗口1?原因是在延迟的信息到达系统时(第19秒),窗口1的评估已经完成了(第15秒)。现在让我们尝试通过使用水印来解决这个问题。请注意,在窗口2中,延迟的消息仍然位于第19秒,而不是第13秒(事件时间)。该图中的描述是故意表示窗口中的消息不会根据事件时间进行排序。

Watermark

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

水印本质上是一个时间戳。当Flink中的运算符接收到水印时,它明白(假设)它不会看到比该时间戳更早的消息。因此,在“EventTime”中,水印也可以被认为是一种告诉Flink它有多远的一种方式。

为了这个例子的目的,把它看作是一种告诉Flink一个消息延迟多少的方式。在最后一次尝试中,我们将水印设置为当前系统时间。因此,不要指望任何延迟的消息。我们现在将水印设置为当前时间-5秒,这告诉Flink希望消息最多有5s的延迟,这是因为每个窗口仅在水印通过时被评估。由于我们的水印是当前时间-5秒,所以第一个窗口[5s-15s]将仅在第20秒被评估。类似地,窗口[10s-20s]将在第25秒进行评估,依此类推。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

通常最好保持接收到的最大时间戳,并创建具有最大预期延迟的水印,而不是从当前系统时间减去。
进行上述更改后运行代码的结果是:

最后我们得到了正确的结果,所有这三个窗口现在都按照预期的方式发射计数,这是(a,2),(a,3)和(a,1)。

allowedLateness

allowedLateness也是Flink处理乱序事件的一个特别重要的特性,默认情况下,当wartermark通过window后,再进来的数据,也就是迟到或者晚到的数据就会别丢弃掉了,但是有的时候我们希望在一个可以接受的范围内,迟到的数据,也可以被处理或者计算,这就是allowedLateness产生的原因了,简而言之呢,allowedLateness就是对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。

默认情况下,如果不指定allowedLateness,其值是0,即对于watermark超过end-of-window之后,还有此window的数据到达时,这些数据被删除掉了。

注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。再次计算就是DataFlow模型中的Accumulating的情况。

同时,对于sessionWindow的情况,当late element在allowedLateness范围之内到达时,可能会引起窗口的merge,这样,之前窗口的数据会在新窗口中累加计算,这就是DataFlow模型中的AccumulatingAndRetracting的情况。

下面看一下完整的代码:

package flink
 
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.slf4j.LoggerFactory
 
/**
  * flinkstreaming消费kafka的数据实现exactly-once的语义;
  */
object flinkStreamingJason {
  private val zk = "192.168.17.142:2181,192.168.17.145:2181,192.168.17.147:2181"
  private val broker = "192.168.17.142:9092,192.168.17.145:9092,192.168.17.147:9092"
  private val group_id = "jason_"
  private val topic = "jason_1027"
  def main(args: Array[String]): Unit = {
    lazy val logger  = LoggerFactory.getLogger(classOf[Nothing])
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置时间Time Notion;默认使用的是ProcessTime;
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableObjectReuse()
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    val properties = new Properties()
    properties.setProperty("zookeeper.connect", zk)
    properties.setProperty("bootstrap.servers", broker)
    properties.setProperty("group.id", group_id)
    val consumer = new FlinkKafkaConsumer010[String](topic,new SimpleStringSchema, properties)
    val stream = env.addSource(consumer)
    val wordcount = stream
      .assignTimestampsAndWatermarks(new TimestampExtractor)
      .filter(_.nonEmpty)
      .filter(_.contains(","))
      .map(x=>{
        val arr = x.split(",")
        val code = arr(0)
        val time = arr(1).toLong
        (code,time)
      }) // 指派时间戳,并生成WaterMark
      .keyBy(0)
      .timeWindow(Time.seconds(10),Time.seconds(5))
      .trigger(CountTrigger.of(2)) //决定了一个窗口何时能被窗口函数计算或者清除,每个窗口都有自己的trigger;
      .sum(1)
      .map(x=> (x._1,x._2.toString))
    val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.17.142").setPort(6379).build()
    val sink = new RedisSink[(String,String)](conf, new RedisExampleMapperJason)
    wordcount.addSink(sink)
    env.execute("flink streaming Event Time And WaterMark Jason")
  }
}
 
class RedisExampleMapperJason extends RedisMapper[(String, String)] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET, null)
  }
  override def getKeyFromData(data: (String, String)): String = data._1
  override def getValueFromData(data: (String, String)): String = data._2
}
 
/**
  * 时间戳提取器需要实现AssignerWithPeriodicWatermarks;
  *
  */
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  private val maxOutOfOrderness = 5000L //允许数据晚到的最大时间5s;
  private var wm: Watermark = null
 
  override def getCurrentWatermark: Watermark = {
    wm = new Watermark(System.currentTimeMillis() - maxOutOfOrderness)
    wm
  }
  override def extractTimestamp(t: String, l: Long): Long = {
    l
  }
}

提交后到web,ui上面可以看到watermark的如下图所示:

转载自: https:///xianpanjia4616/article/details/84971274

https://juejin.im/post/5bf95810e51d452d705fef33 这篇文章的多线程测试 watermark 也讲的很好
keyBy的时候会按照key把数据分在不同线程的窗口里面,比如 8个key,3个线程,1个key可能放到不同的线程里面,只有所有线程都满足窗口的watermark时触发窗口统计,可以看到每个key的窗口都是固定的。窗口统计的时候是按照key进行统计的。

这里可以看到key:0003放到了线程49和线程52里面了

public class StreamingWindowWatermark2 {

    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port = 9010;
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置使用eventtime,默认是使用processtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //设置并行度为1,默认并行度是当前机器的cpu数量
        env.setParallelism(8);

        //连接socket获取输入的数据
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        //解析输入的数据
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
            }
        });

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            /**
             * 定义生成watermark的逻辑
             * 默认100ms被调用一次
             */
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定义如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                long id = Thread.currentThread().getId();
                System.out.println("键值 :"+element.f0+"线程验证 :"+  id   +" , 事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
                    sdf.format(currentMaxTimestamp)+" ],水印时间:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");                return timestamp;
            }
        });

        //保存被丢弃的数据

        TypeInformation<Tuple2<String, Long>> typeInformation = new TypeInformation<Tuple2<String, Long>>() {
            @Override public boolean isBasicType() {
                return false;
            }

            @Override public boolean isTupleType() {
                return false;
            }

            @Override public int getArity() {
                return 0;
            }

            @Override public int getTotalFields() {
                return 0;
            }

            @Override public Class<Tuple2<String, Long>> getTypeClass() {
                return null;
            }

            @Override public boolean isKeyType() {
                return false;
            }

            @Override
            public TypeSerializer<Tuple2<String, Long>> createSerializer(ExecutionConfig config) {
                return null;
            }

            @Override public String toString() {
                return null;
            }

            @Override public boolean equals(Object obj) {
                return false;
            }

            @Override public int hashCode() {
                return 0;
            }

            @Override public boolean canEqual(Object obj) {
                return false;
            }
        };
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late", typeInformation);
        //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
        SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
            //.allowedLateness(Time.seconds(2))//允许数据迟到2秒
            .sideOutputLateData(outputTag)
            .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                /**
                 * 对window内的数据进行排序,保证数据的顺序
                 * @param tuple
                 * @param window
                 * @param input
                 * @param out
                 * @throws Exception
                 */
                @Override
                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                    String key = tuple.toString();
                    List<Long> arrarList = new ArrayList<Long>();
                    Iterator<Tuple2<String, Long>> it = input.iterator();
                    while (it.hasNext()) {
                        Tuple2<String, Long> next = it.next();
                        arrarList.add(next.f1);
                    }
                    Collections.sort(arrarList);
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                    String result = "\n  键值 : "+ key + "\n              触发窗内数据个数 : " + arrarList.size() + "\n              触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n              触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))
                        + "\n              实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";
                    out.collect(result);
                }
            });
        //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
        DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
        sideOutput.print();
        //测试-把结果打印到控制台即可
        window.print();

        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("eventtime-watermark");

    }


}
显示全文