您的当前位置:首页正文

Flink入门及实战(15)- Flink Window

2024-11-10 来源:个人技术集锦

1 window

1.1 window 类型

  • 针对以上每一种 window,有可以分为

1.1.1 滚动窗口

1.1.2 滑动窗口

1.2 TimeWindow 应用

1.3 CountWindow 的应用

2 window 聚合分类

2.1 增量聚合


package com.tzb.streaming;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;


/**
 * @Description window 增量聚合
 * @Author tzb
 * @Date 2020/9/27 10:50
 * @Version 1.0
 **/
public class SocketDemoIncrAgg {

    public static void main(String[] args) throws Exception {
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("No port sert, user default port 9000");
            e.printStackTrace();
            port = 9000;
        }

        // 获取 flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";

        String delimiter = "\n";

        // 连接 socket 获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        SingleOutputStreamOperator<Tuple2<Integer, Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(String s) throws Exception {
                return new Tuple2<>(1, Integer.parseInt(s));
            }
        });

        // 根据第一个元素分区,最后只有一个分区
        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
                        System.out.println("执行reduce操作: " + v1 + "," + v2);
                        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
                    }
                }).print();


        // 不加程序不会执行
        env.execute("Sockt window count");

    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}


2.2 全量聚合





public class SocketDemoFullCount {

    public static void main(String[] args) throws Exception {
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("No port sert, user default port 9000");
            e.printStackTrace();
            port = 9000;
        }

        // 获取 flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";

        String delimiter = "\n";

        // 连接 socket 获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        SingleOutputStreamOperator<Tuple2<Integer, Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(String s) throws Exception {
                return new Tuple2<>(1, Integer.parseInt(s));
            }
        });

        // Object 是最终输出的数据类型,Tuple是传入的类型
        intData.keyBy(0)
                .timeWindow(Time.seconds(5))
                .process(new ProcessWindowFunction<Tuple2<Integer, Integer>, String, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)
                            throws Exception {
                        // 只执行了一次
                        System.out.println("执行proces...");
                        long count = 0;
                        for (Tuple2<Integer, Integer> element : elements) {
                            count++;
                        }
                        out.collect("window: " + context.window() + ",count: " + count);
                    }
                }).print();


        // 不加程序不会执行
        env.execute("Sockt window count");

    }

}


显示全文