package com.tzb.streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Description window 增量聚合
* @Author tzb
* @Date 2020/9/27 10:50
* @Version 1.0
**/
public class SocketDemoIncrAgg {
public static void main(String[] args) throws Exception {
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.err.println("No port sert, user default port 9000");
e.printStackTrace();
port = 9000;
}
// 获取 flink 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "master";
String delimiter = "\n";
// 连接 socket 获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
SingleOutputStreamOperator<Tuple2<Integer, Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(String s) throws Exception {
return new Tuple2<>(1, Integer.parseInt(s));
}
});
// 根据第一个元素分区,最后只有一个分区
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
System.out.println("执行reduce操作: " + v1 + "," + v2);
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
}).print();
// 不加程序不会执行
env.execute("Sockt window count");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
public class SocketDemoFullCount {
public static void main(String[] args) throws Exception {
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.err.println("No port sert, user default port 9000");
e.printStackTrace();
port = 9000;
}
// 获取 flink 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "master";
String delimiter = "\n";
// 连接 socket 获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
SingleOutputStreamOperator<Tuple2<Integer, Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(String s) throws Exception {
return new Tuple2<>(1, Integer.parseInt(s));
}
});
// Object 是最终输出的数据类型,Tuple是传入的类型
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer, Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)
throws Exception {
// 只执行了一次
System.out.println("执行proces...");
long count = 0;
for (Tuple2<Integer, Integer> element : elements) {
count++;
}
out.collect("window: " + context.window() + ",count: " + count);
}
}).print();
// 不加程序不会执行
env.execute("Sockt window count");
}
}