您的当前位置:首页正文

Flink-6(Table API和SQL)

2024-11-25 来源:个人技术集锦

类似于Spark SQL,Flink同样支持使用SQL和Table API(类似于Spark SQL中DataFrame的操作,SQL的每个操作都有相对应的函数调用)对数据进行查询和处理

目前的TableAPI和SQL还不是完整的功能,还在开发之中

首先导入相关依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.10.1</version>
</dependency>

环境测试:

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.StreamTableEnvironment
import source.SensorReading

object example {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    val dataSource = env.readTextFile("resources/sensor_record.txt")
    val dataStream = dataSource.map(
      line => {
        val fields = line.split(",").map(_.trim)
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      }
    )
    //首先创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)
    //基于流创建一张表
    val dataTable: Table = tableEnv.fromDataStream(dataStream)
    //调用tableapi进行转换
    val resultTable = dataTable
      .select("id,temperature")
      .filter("id == 'sensor_1'")
    resultTable.toAppendStream[(String, Double)].print("result")

    //直接使用SQL实现
    tableEnv.createTemporaryView("dataTable", dataTable)
    //注册表
    val sql: String = "select id,temperature from dataTable where id = 'sensor_1'"
    val resultSqlTable = tableEnv.sqlQuery(sql)
    resultSqlTable.toAppendStream[(String,Double)].print("SQL result")

    env.execute("Table API example")
  }
}

以上分别使用Table API和SQL输出了sensor_1的数据,可以正常输出结果,则说明可以正常构建执行环境

Flink Table API和SQL的执行依赖于StreamTableEnvironment环境,该环境有四种创建方式:

// 默认创建方式
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val defaultTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 旧流处理环境创建
val oldStreamingSetting: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val oldStreamingTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, oldStreamingSetting)

// 旧批处理环境创建
val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)

// 新流处理环境创建
val newStreamingSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val newStreamingTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, newStreamingSettings)

// 新批处理环境创建
val newBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val newBatchTableEnv: TableEnvironment = TableEnvironment.create(newBatchSettings)

  • TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表
  • 表(Table)是由一个”标识符“(identifier)来制定的,由三个部分组成:Catalog名、数据库(database)名和对象名
  • 表可以是常规的,也可以使虚拟的(View视图)
  • 常规表(Table)一般可以用来描述外部数据,比如文件,数据库表或消息队列的数据,也可以直接从DataStream转换而来
  • 视图(View)可以从现有的表中创建,通常是tableAPI或者SQL查询的一个结果集

创建表(从CSV文件中读取数据)

// 创建表执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

// 需要传入一个ConnectorDescriptor对象指定信息源,当前版仅有两个实现类:FileSystem和Kafka
tableEnv.connect(new FileSystem().path("resources/sensor_record.txt"))
  // 传入一个FormatDescriptor对象指定文件格式为CSV,当前版本已实现的FormatDescriptor子类仅有CSV格式
  .withFormat(new OldCsv)
  // 传入一个Schema对象指定表格格式,格式为一个field函数对应一个列,传入一个列名和对应的一个数据类型
  .withSchema(new Schema()
      .field("id",DataTypes.STRING())
      .field("timestamp",DataTypes.BIGINT())
      .field("temperature",DataTypes.DOUBLE())
  // 根据以上结果创建临时表
  ).createTemporaryTable("sensor")

// 通过from函数读取表格数据
tableEnv.from("sensor").toAppendStream[(String,Long,Double)].print()
env.execute()

上面读取csv的方法已过时,如果要使用新的csv方法,需要导入新的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>

依赖导入成功后,将原来代码中的OldCsv改为Csv即可

从Kafka读取数据

tableEnv.connect(new Kafka()
  .version("0.11") // 设定kafka版本
  .topic("sensor") // 读取消息的topic
  .property("zookeeper.connect", "num01:9092") // 必配的两项,其他consumer配置视实际需求增加
  .property("bootstrap.servers", "num01:9092")
).withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  ).createTemporaryTable("sensor")

当前版本已实现的读取方式仅有以上两种

TableAPI

类似于SparkSQL的DataFrame用法,Flink也可以使用单个函数代替SQL中的一个操作,同时Flink也支持使用表达式expression,导入import org.apache.flink.table.expressions._即可使用。

案例:求所有传感器温度的最大值:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object TableOperator {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

    tableEnv.connect(new FileSystem().path("resources/sensor_record.txt"))
      .withFormat(new Csv)
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      ).createTemporaryTable("sensor")

    tableEnv.from("sensor")
      .select('id, 'temperature)
      .groupBy('id)
      .aggregate('temperature.max.as('max))
      .select('id, 'max)
      .toRetractStream[(String, Double)].print()

    tableEnv.execute("API_1")
  }
}

使用'field表示一个列,另外,在aggregate中,并不是直接调用max、min、sum等聚合函数,而是通过列调用相应函数,其余功能的使用与SparkDataFrame的使用很类似

另外一点需要注意的是:这里只可以使用toRetractStream,而不能使用toAppendStream。两者的区别从名字中也可以得出,toAppendStream会将接收到的数据追加到原有的结果上,而toRetractStream可以对已有结果进行更新(可以看到toRetractStream的结果输出前面多了一个true/flase,意为当新接受的数据导致现有数据更新时,会输出两条数据:false(旧的数据),true(新的数据)),这里我们只取最大的一条数据,因此在读取过程中必然会修改中间的结果,因此使用toRetractStream

上述逻辑也可以通过SQL实现:

tableEnv.sqlQuery(
  """
    |select id, max(temperature) as `max` from sensor group by id
    |""".stripMargin)
    .toRetractStream[(String,Double)].print()

这里注意下,Flink中解析比较严格,别名为max必须使用以上格式,直接写max会报错

输出表

  • 表的输出,是通过将数据写入TableSink来实现的
  • TableSink是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列

如果数据实现了更新,就不能再输出到文件系统,因为只实现了append…方法

可以输出到数据库

更新模式

  • 追加模式(add):只做插入操作
  • 撤回模式(retract):表和外部连接器交换添加和撤回消息,插入操作编码为add消息,删除编码为delete消息,更新表为上一条的retract和下一条的add消息
  • 更新插入模式(upsert):更新和插入都被编码为upsert消息,删除编码为delete消息
关系代数(表)/SQL流处理
处理的数据对象字段元组的有界集合字段元组的无限序列
查询(Query)对数据的访问可以访问到完整的数据输入无法访问所有数据,必须持续“等待”流式输入
查询终止条件生成固定大小的结果集合后终止永不停止,根据持续受到的数据不断更新查询结果

动态表(Dynamic Tables)

  • 动态表是Flink对流数据的Table API和SQL支持的核心概念
  • 与表示批处理数据的静态表不同,动态表是随时间变化的
  • 持续查询(Continuous Query)
    • 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
    • 持续查询永远不会停止,并会生成另一个动态表
    • 查询会不断更新其动态结果表,以反映其动态输入表上的更改
  • 流式表查询的处理过程

时间特性

  • 基于时间的操作,需要定义相关的时间语义和时间数据来源的信息
  • Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间的访问相应的时间戳
  • 时间属性,可以是每个表schema的一部分。一旦定义了时间属性,他就可以作为一个字段引用,并且可以基于事件的操作中使用
  • 时间属性的行为类似于常规时间戳,可以访问,并且进行计算

获取处理时间:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 指定时间语义为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

val source: DataStream[SensorReading] = env.readTextFile("resources/sensor_record.txt")
  .map(line => {
    val fields: Array[String] = line.split(",")
    SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
  })

// 增加一列表示处理时间
tableEnv.fromDataStream(source,'id,'timeStamp,'temperature,'processingTimestamp.proctime)
  .toAppendStream[Row]
  .print()

tableEnv.execute("API_2")

另外,也可以使用rowtime来获取事件事件:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 时间语义设为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

val source: DataStream[SensorReading] = env.readTextFile("resources/sensor_record.txt")
  .map(line => {
    val fields: Array[String] = line.split(",")
    SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
  // 指定事件时间
  }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(0)) {
  // 在指定时间戳的基础上加上23,证明rowtime是从这里抽取的事件时间
  override def extractTimestamp(element: SensorReading): Long = element.timeStamp + 23
})

// 使用rowtime获取事件时间
tableEnv.fromDataStream(source, 'id, 'timeStamp, 'temperature, 'eventTime.rowtime)
  .toAppendStream[Row]
  .print()

tableEnv.execute("API_3")

必须要在DataStream中指定事件时间,一般情况下事件事件都可以随数据传入,本身将作为数据中的一列,因此使用该方法获取事件事件的情况并不多见

定义事件时间(Event Time)

  • 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果
  • 为了处理无序事件,并区分流中的准时和迟到事件,Flink需要从事件数据中,提取时间戳,并用来推进事件时间的进展
  • 定义事件时间,有三种方法
    • 由DataStream转换成表时指定(上面刚才指定的形式)
    • 定义TableSchema时指定
    • 在创建表的DDL中定义(定义结构,使用系统函数获取处理时间或者抽取数据中的时间时间)
tableEnv.connect(new FileSystem)
 .withFormat(new Csv)
 .withSchema(new Schema()
   .field("id", DataTypes.STRING())
   ......
   .proctime() // 或者调用rowtime指定事件时间
 )

窗口

  • 时间语义,要配合窗口操作才能发挥作用
  • 在Table API和SQL中,主要有两种窗口
    • Group Windows:根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
    • Over Windows:针对每个输入行,计算相邻行范围内的聚合
  • SQL中的Group Windows
    • TUMBLE(time_attr,interval):定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
    • HOP(time_attr,interval,interval):定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度
    • SESSION(time_attr,interval):定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

应用案例:

tableEnv.from("sensor")
  .filter('id.isNotNull && 'timestamp.isNotNull && 'temperature.isNotNull)
  .select('id.lowerCase() as 'id, 'temperature, 'rowtime)
  .window(Tumble over 1.hour on 'rowtime as 'hourlywindow)
  .groupBy('hourlywindow, 'id)
  .select('id, 'hourlywindow.end as 'hour, 'temperature.avg as 'avg)

可能是依赖和隐式转换的问题,相同版本的部分依赖导入后,会导致原本可以正常运行的代码报ClassNotFoundException,并且会导致类型识别错误,暂且记下用法,换高版本后再来解决这个问题。其他API就不再演示了,可以查看更多的应用案例和expresiion用法

函数

SQLTableAPI
比较函数value1=value2
value1>value2
any1 === any2
any1 > any2
逻辑函数boolean1 or boolean2
boolean is false
not boolean
boolean1 || boolean2
boolean.isFalse
!boolean
算术函数numeric1 +numeric2
POWER(numeric1,numeric2)
NUMERIC1 + NUMERIC2
NUMERIC1.power(NUMERIC2)
字符串函数string1 || string2
upper(string)
char_length(string)
string1 + string2
string.upperCase()
string.charLength()
时间函数date string
timestamp string
current_time
interval string range
string.toDate
string.toTimestamp
currentTime()
NUMERIC.days
NUMERIC.minuts
聚合函数count(*)
sum(expression)
rank()
row_number()
field.count
field.sum

UDF

  • 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
  • 函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,他被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确解析

标量函数(Scalar Functions)

  • 用户定义的标量函数,可以将0、1或者多个标量值,映射到新的标量值
  • 为了定义标量函数,必须在org.apache.flink.table.functions中拓展基类Scalar Function,并实现求值(eval)方法
  • 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval

表函数(Table Function)

  • 用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值
  • 为了定义表函数,必须拓展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法
  • 表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval

UDAF案例:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row
import source.SensorReading

object AggregateFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    val inputPath: String = "resources/sensor_record.txt"
    val inputStream = env.readTextFile(inputPath)

    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)

    // table api
    val avgTemp = new AvgTemp
    val resultTable = sensorTable
      .groupBy('id)
      .aggregate(avgTemp('temperature) as 'avgTemp)
        .select('id,'avgTemp)

    // sql
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("avgTemp",avgTemp)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id, avgTemp(temperature)
        |from sensor
        |group by id
      """.stripMargin
    )

    resultTable.toRetractStream[Row].print("result")
    resultSqlTable.toRetractStream[Row].print("resultSql")

    env.execute("aggregateFunctions")
  }
}

// 定义一个类 用于表示聚合的状态
class AvgTmepAcc {
  var sum: Double = 0.0
  var count: Int = 0
}

//自定义一个聚合函数类,求每个传感器的平均温度值
class AvgTemp extends AggregateFunction[Double, AvgTmepAcc] {
  override def getValue(acc: AvgTmepAcc): Double = acc.sum / acc.count

  override def createAccumulator(): AvgTmepAcc = new AvgTmepAcc

  // 实现一个具体处理计算函数,类似eval函数,名称叫做 accumulate
  def accumulate(accumulator: AvgTmepAcc, temp: Double): Unit = {
    accumulator.sum += temp
    accumulator.count += 1
  }
}

表聚合函数

案例:求top2

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import source.SensorReading

object TableAggregateFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    val inputPath: String = "resources/sensor_record.txt"
    val inputStream = env.readTextFile(inputPath)

    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)

    // Table API
    val top2Temp = new Top2Temp
    val resultTable = sensorTable
      .groupBy('id)
      .flatAggregate(top2Temp('temperature) as ('temp,'rank))
      .select('id,'temp,'rank)

    resultTable.toRetractStream[Row].print()

    env.execute("Table Aggregate Function Test")

  }
}

// 定义一个类,用来表示表聚合函数的状态
class Top2TempAcc {
  var highestTemp: Double = Double.MinValue
  var secondaryHighestTemp: Double = Double.MinValue
}

//自定义表聚合函数,提取所有温度值中最高的两个温度,输出(temperature,rank)
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc

  // 实现计算聚合结果的函数
  def accumulate(acc: Top2TempAcc, temp: Double): Unit = {
    // 判断当前温度是否大于保存的温度
    if (temp > acc.highestTemp) {
      acc.secondaryHighestTemp = acc.highestTemp
      acc.highestTemp = temp
    } else if (temp > acc.secondaryHighestTemp) {
      acc.secondaryHighestTemp = temp
    }
  }

  // 实现一个输出结果的方法,最终处理完表中所有数据时调用
  def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
    out.collect((acc.highestTemp, 1))
    out.collect((acc.secondaryHighestTemp, 2))
  }
}
显示全文