类似于Spark SQL,Flink同样支持使用SQL和Table API(类似于Spark SQL中DataFrame的操作,SQL的每个操作都有相对应的函数调用)对数据进行查询和处理
目前的TableAPI和SQL还不是完整的功能,还在开发之中
首先导入相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.1</version>
</dependency>
环境测试:
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.StreamTableEnvironment
import source.SensorReading
object example {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
val dataSource = env.readTextFile("resources/sensor_record.txt")
val dataStream = dataSource.map(
line => {
val fields = line.split(",").map(_.trim)
SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
}
)
//首先创建表执行环境
val tableEnv = StreamTableEnvironment.create(env)
//基于流创建一张表
val dataTable: Table = tableEnv.fromDataStream(dataStream)
//调用tableapi进行转换
val resultTable = dataTable
.select("id,temperature")
.filter("id == 'sensor_1'")
resultTable.toAppendStream[(String, Double)].print("result")
//直接使用SQL实现
tableEnv.createTemporaryView("dataTable", dataTable)
//注册表
val sql: String = "select id,temperature from dataTable where id = 'sensor_1'"
val resultSqlTable = tableEnv.sqlQuery(sql)
resultSqlTable.toAppendStream[(String,Double)].print("SQL result")
env.execute("Table API example")
}
}
以上分别使用Table API和SQL输出了sensor_1的数据,可以正常输出结果,则说明可以正常构建执行环境
Flink Table API和SQL的执行依赖于StreamTableEnvironment
环境,该环境有四种创建方式:
// 默认创建方式
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val defaultTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 旧流处理环境创建
val oldStreamingSetting: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val oldStreamingTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, oldStreamingSetting)
// 旧批处理环境创建
val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)
// 新流处理环境创建
val newStreamingSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val newStreamingTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, newStreamingSettings)
// 新批处理环境创建
val newBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val newBatchTableEnv: TableEnvironment = TableEnvironment.create(newBatchSettings)
创建表(从CSV文件中读取数据)
// 创建表执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
// 需要传入一个ConnectorDescriptor对象指定信息源,当前版仅有两个实现类:FileSystem和Kafka
tableEnv.connect(new FileSystem().path("resources/sensor_record.txt"))
// 传入一个FormatDescriptor对象指定文件格式为CSV,当前版本已实现的FormatDescriptor子类仅有CSV格式
.withFormat(new OldCsv)
// 传入一个Schema对象指定表格格式,格式为一个field函数对应一个列,传入一个列名和对应的一个数据类型
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
// 根据以上结果创建临时表
).createTemporaryTable("sensor")
// 通过from函数读取表格数据
tableEnv.from("sensor").toAppendStream[(String,Long,Double)].print()
env.execute()
上面读取csv的方法已过时,如果要使用新的csv方法,需要导入新的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
依赖导入成功后,将原来代码中的OldCsv改为Csv即可
从Kafka读取数据
tableEnv.connect(new Kafka()
.version("0.11") // 设定kafka版本
.topic("sensor") // 读取消息的topic
.property("zookeeper.connect", "num01:9092") // 必配的两项,其他consumer配置视实际需求增加
.property("bootstrap.servers", "num01:9092")
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
).createTemporaryTable("sensor")
当前版本已实现的读取方式仅有以上两种
类似于SparkSQL的DataFrame用法,Flink也可以使用单个函数代替SQL中的一个操作,同时Flink也支持使用表达式expression
,导入import org.apache.flink.table.expressions._
即可使用。
案例:求所有传感器温度的最大值:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object TableOperator {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
tableEnv.connect(new FileSystem().path("resources/sensor_record.txt"))
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
).createTemporaryTable("sensor")
tableEnv.from("sensor")
.select('id, 'temperature)
.groupBy('id)
.aggregate('temperature.max.as('max))
.select('id, 'max)
.toRetractStream[(String, Double)].print()
tableEnv.execute("API_1")
}
}
使用'field
表示一个列,另外,在aggregate中,并不是直接调用max、min、sum等聚合函数,而是通过列调用相应函数,其余功能的使用与SparkDataFrame的使用很类似
另外一点需要注意的是:这里只可以使用toRetractStream
,而不能使用toAppendStream
。两者的区别从名字中也可以得出,toAppendStream
会将接收到的数据追加到原有的结果上,而toRetractStream
可以对已有结果进行更新(可以看到toRetractStream
的结果输出前面多了一个true/flase,意为当新接受的数据导致现有数据更新时,会输出两条数据:false(旧的数据),true(新的数据)),这里我们只取最大的一条数据,因此在读取过程中必然会修改中间的结果,因此使用toRetractStream
。
上述逻辑也可以通过SQL实现:
tableEnv.sqlQuery(
"""
|select id, max(temperature) as `max` from sensor group by id
|""".stripMargin)
.toRetractStream[(String,Double)].print()
这里注意下,Flink中解析比较严格,别名为max必须使用以上格式,直接写max会报错
如果数据实现了更新,就不能再输出到文件系统,因为只实现了append…方法
可以输出到数据库
更新模式
关系代数(表)/SQL | 流处理 | |
---|---|---|
处理的数据对象 | 字段元组的有界集合 | 字段元组的无限序列 |
查询(Query)对数据的访问 | 可以访问到完整的数据输入 | 无法访问所有数据,必须持续“等待”流式输入 |
查询终止条件 | 生成固定大小的结果集合后终止 | 永不停止,根据持续受到的数据不断更新查询结果 |
获取处理时间:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 指定时间语义为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val source: DataStream[SensorReading] = env.readTextFile("resources/sensor_record.txt")
.map(line => {
val fields: Array[String] = line.split(",")
SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
})
// 增加一列表示处理时间
tableEnv.fromDataStream(source,'id,'timeStamp,'temperature,'processingTimestamp.proctime)
.toAppendStream[Row]
.print()
tableEnv.execute("API_2")
另外,也可以使用rowtime
来获取事件事件:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 时间语义设为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val source: DataStream[SensorReading] = env.readTextFile("resources/sensor_record.txt")
.map(line => {
val fields: Array[String] = line.split(",")
SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
// 指定事件时间
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(0)) {
// 在指定时间戳的基础上加上23,证明rowtime是从这里抽取的事件时间
override def extractTimestamp(element: SensorReading): Long = element.timeStamp + 23
})
// 使用rowtime获取事件时间
tableEnv.fromDataStream(source, 'id, 'timeStamp, 'temperature, 'eventTime.rowtime)
.toAppendStream[Row]
.print()
tableEnv.execute("API_3")
必须要在DataStream
中指定事件时间,一般情况下事件事件都可以随数据传入,本身将作为数据中的一列,因此使用该方法获取事件事件的情况并不多见
tableEnv.connect(new FileSystem)
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
......
.proctime() // 或者调用rowtime指定事件时间
)
应用案例:
tableEnv.from("sensor")
.filter('id.isNotNull && 'timestamp.isNotNull && 'temperature.isNotNull)
.select('id.lowerCase() as 'id, 'temperature, 'rowtime)
.window(Tumble over 1.hour on 'rowtime as 'hourlywindow)
.groupBy('hourlywindow, 'id)
.select('id, 'hourlywindow.end as 'hour, 'temperature.avg as 'avg)
可能是依赖和隐式转换的问题,相同版本的部分依赖导入后,会导致原本可以正常运行的代码报ClassNotFoundException
,并且会导致类型识别错误,暂且记下用法,换高版本后再来解决这个问题。其他API就不再演示了,可以查看更多的应用案例和expresiion用法
SQL | TableAPI | |
---|---|---|
比较函数 | value1=value2 value1>value2 | any1 === any2 any1 > any2 |
逻辑函数 | boolean1 or boolean2 boolean is false not boolean | boolean1 || boolean2 boolean.isFalse !boolean |
算术函数 | numeric1 +numeric2 POWER(numeric1,numeric2) | NUMERIC1 + NUMERIC2 NUMERIC1.power(NUMERIC2) |
字符串函数 | string1 || string2 upper(string) char_length(string) | string1 + string2 string.upperCase() string.charLength() |
时间函数 | date string timestamp string current_time interval string range | string.toDate string.toTimestamp currentTime() NUMERIC.days NUMERIC.minuts |
聚合函数 | count(*) sum(expression) rank() row_number() | field.count field.sum |
UDAF案例:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row
import source.SensorReading
object AggregateFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputPath: String = "resources/sensor_record.txt"
val inputStream = env.readTextFile(inputPath)
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
// table api
val avgTemp = new AvgTemp
val resultTable = sensorTable
.groupBy('id)
.aggregate(avgTemp('temperature) as 'avgTemp)
.select('id,'avgTemp)
// sql
tableEnv.createTemporaryView("sensor",sensorTable)
tableEnv.registerFunction("avgTemp",avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, avgTemp(temperature)
|from sensor
|group by id
""".stripMargin
)
resultTable.toRetractStream[Row].print("result")
resultSqlTable.toRetractStream[Row].print("resultSql")
env.execute("aggregateFunctions")
}
}
// 定义一个类 用于表示聚合的状态
class AvgTmepAcc {
var sum: Double = 0.0
var count: Int = 0
}
//自定义一个聚合函数类,求每个传感器的平均温度值
class AvgTemp extends AggregateFunction[Double, AvgTmepAcc] {
override def getValue(acc: AvgTmepAcc): Double = acc.sum / acc.count
override def createAccumulator(): AvgTmepAcc = new AvgTmepAcc
// 实现一个具体处理计算函数,类似eval函数,名称叫做 accumulate
def accumulate(accumulator: AvgTmepAcc, temp: Double): Unit = {
accumulator.sum += temp
accumulator.count += 1
}
}
案例:求top2
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import source.SensorReading
object TableAggregateFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputPath: String = "resources/sensor_record.txt"
val inputStream = env.readTextFile(inputPath)
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
// Table API
val top2Temp = new Top2Temp
val resultTable = sensorTable
.groupBy('id)
.flatAggregate(top2Temp('temperature) as ('temp,'rank))
.select('id,'temp,'rank)
resultTable.toRetractStream[Row].print()
env.execute("Table Aggregate Function Test")
}
}
// 定义一个类,用来表示表聚合函数的状态
class Top2TempAcc {
var highestTemp: Double = Double.MinValue
var secondaryHighestTemp: Double = Double.MinValue
}
//自定义表聚合函数,提取所有温度值中最高的两个温度,输出(temperature,rank)
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
// 实现计算聚合结果的函数
def accumulate(acc: Top2TempAcc, temp: Double): Unit = {
// 判断当前温度是否大于保存的温度
if (temp > acc.highestTemp) {
acc.secondaryHighestTemp = acc.highestTemp
acc.highestTemp = temp
} else if (temp > acc.secondaryHighestTemp) {
acc.secondaryHighestTemp = temp
}
}
// 实现一个输出结果的方法,最终处理完表中所有数据时调用
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
out.collect((acc.highestTemp, 1))
out.collect((acc.secondaryHighestTemp, 2))
}
}