avatar

目录
flink系列05Flink DataStream API

第五章,Flink DataStream API

本章介绍了Flink DataStream API的基本知识。我们展示了典型的Flink流处理程序的结构和组成部分,还讨论了Flink的类型系统以及支持的数据类型,还展示了数据和分区转换操作。窗口操作符,基于时间语义的转换操作,有状态的操作符,以及和外部系统的连接器将在接下来的章节进行介绍。阅读完这一章后,我们将会知道如何去实现一个具有基本功能的流处理程序。我们的示例程序采用Scala语言,因为Scala语言相对比较简洁。但Java API也是十分类似的(特殊情况,我们将会指出)。在我们的Github仓库里,我们所写的应用程序具有Scala和Java两种版本。

你好,Flink!

让我们写一个简单的例子来获得使用DataStream API编写流处理应用程序的粗浅印象。我们将使用这个简单的示例来展示一个Flink程序的基本结构,以及介绍一些DataStream API的重要特性。我们的示例程序摄取了一条(来自多个传感器的)温度测量数据流。

首先让我们看一下表示传感器读数的数据结构:

scala
1
2
3
4
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double)

示例程序5-1将温度从华氏温度读数转换成摄氏温度读数,然后针对每一个传感器,每5秒钟计算一次平均温度纸。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Scala object that defines
// the DataStream program in the main() method.
object AverageSensorReadings {
// main() defines and executes the DataStream program
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// create a DataStream[SensorReading] from a stream source
val sensorData: DataStream[SensorReading] = env
// ingest sensor readings with a SensorSource SourceFunction
.addSource(new SensorSource)
// assign timestamps and watermarks (required for event time)
val avgTemp: DataStream[SensorReading] = sensorData
// convert Fahrenheit to Celsius with an inline lambda function
.map( r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
})
// organize readings by sensor id
.keyBy(_.id)
// group readings in 5 second tumbling windows
.timeWindow(Time.seconds(5))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager)
// print result stream to standard out
avgTemp.print()
// execute application
env.execute("Compute average sensor temperature")
}
}

你可能已经注意到Flink程序的定义和提交执行使用的就是正常的Scala或者Java的方法。大多数情况下,这些代码都写在一个静态main方法中。在我们的例子中,我们定义了AverageSensorReadings对象,然后将大多数的应用程序逻辑放在了main()中。

Flink流处理程序的结构如下:

  1. 创建Flink程序执行环境。
  2. 从数据源读取一条或者多条流数据
  3. 使用流转换算子实现业务逻辑
  4. 将计算结果输出到一个或者多个外部设备(可选)
  5. 执行程序

接下来我们详细的学习一下这些部分。

搭建执行环境

编写Flink程序的第一件事情就是搭建执行环境。执行环境决定了程序是运行在单机上还是集群上。在DataStream API中,程序的执行环境是由StreamExecutionEnvironment设置的。在我们的例子中,我们通过调用静态getExecutionEnvironment()方法来获取执行环境。这个方法根据调用方法的上下文,返回一个本地的或者远程的环境。如果这个方法是一个客户端提交到远程集群的代码调用的,那么这个方法将会返回一个远程的执行环境。否则,将返回本地执行环境。

也可以用下面的方法来显式的创建本地或者远程执行环境:

scala
1
2
3
4
5
6
7
8
9
10
// create a local stream execution environment
val localEnv = StreamExecutionEnvironment
.createLocalEnvironment()
// create a remote stream execution environment
val remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // hostname of JobManager
1234, // port of JobManager process
"path/to/jarFile.jar"
) // JAR file to ship to the JobManager

接下来,我们使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)来将我们程序的时间语义设置为事件时间。执行环境提供了很多配置选项,例如:设置程序的并行度和程序是否开启容错机制。

读取输入流

一旦执行环境设置好,就该写业务逻辑了。StreamExecutionEnvironment提供了创建数据源的方法,这些方法可以从数据流中将数据摄取到程序中。数据流可以来自消息队列或者文件系统,也可能是实时产生的(例如socket)。

在我们的例子里面,我们这样写:

scala
1
2
val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)

这样就可以连接到传感器测量数据的数据源并创建一个类型为SensorReadingDataStream了。Flink支持很多数据类型,我们将在接下来的章节里面讲解。在我们的例子里面,我们的数据类型是一个定义好的Scala样例类。SensorReading样例类包含了传感器ID,数据的测量时间戳,以及测量温度值。assignTimestampsAndWatermarks(new SensorTimeAssigner)方法指定了如何设置事件时间语义的时间戳和水位线。有关SensorTimeAssigner我们后面再讲。

转换算子的使用

一旦我们有一条DataStream,我们就可以在这条数据流上面使用转换算子了。转换算子有很多种。一些转换算子可以产生一条新的DataStream,当然这个DataStream的类型可能是新类型。还有一些转换算子不会改变原有DataStream的数据,但会将数据流分区或者分组。业务逻辑就是由转换算子串起来组合而成的。

在我们的例子中,我们首先使用map()转换算子将传感器的温度值转换成了摄氏温度单位。然后,我们使用keyBy()转换算子将传感器读数流按照传感器ID进行分区。接下来,我们定义了一个timeWindow()转换算子,这个算子将每个传感器ID所对应的分区的传感器读数分配到了5秒钟的滚动窗口中。

scala
1
2
3
4
5
6
7
8
val avgTemp: DataStream[SensorReading] = sensorData
.map(r => {
val celsius = (r.temperature - 32) * (5.0 / 9.0)
SensorReading(r.id, r.timestamp, celsius)
})
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.apply(new TemperatureAverager)

窗口转换算子将在“窗口操作符”一章中讲解。最后,我们使用了一个UDF函数来计算每个窗口的温度的平均值。我们稍后将会讨论UDF函数的实现。

输出结果

流处理程序经常将它们的计算结果发送到一些外部系统中去,例如:Apache Kafka,文件系统,或者数据库中。Flink提供了一个维护的很好的sink算子的集合,这些sink算子可以用来将数据写入到不同的系统中去。我们也可以实现自己的sink算子。也有一些Flink程序并不会向第三方外部系统发送数据,而是将数据存储到Flink系统内部,然后可以使用Flink的可查询状态的特性来查询数据。

在我们的例子中,计算结果是一个DataStream[SensorReading]数据记录。每一条数据记录包含了一个传感器在5秒钟的周期里面的平均温度。计算结果组成的数据流将会调用print()将计算结果写到标准输出。

scala
1
avgTemp.print()

要注意一点,流的Sink算子的选择将会影响应用程序端到端(end-to-end)的一致性,具体就是应用程序的计算提供的到底是at-least-once还是exactly-once的一致性语义。应用程序端到端的一致性依赖于所选择的流的Sink算子和Flink的检查点算法的集成使用。

执行

当应用程序完全写好时,我们可以调用StreamExecutionEnvironment.execute()来执行应用程序。在我们的例子中就是我们的最后一行调用:

scala
1
env.execute("Compute average sensor temperature")

Flink程序是惰性执行的。也就是说创建数据源和转换算子的API调用并不会立刻触发任何数据处理逻辑。API调用仅仅是在执行环境中构建了一个执行计划,这个执行计划包含了执行环境创建的数据源和所有的将要用在数据源上的转换算子。只有当execute()被调用时,系统才会触发程序的执行。

构建好的执行计划将被翻译成一个JobGraph并提交到JobManager上面去执行。根据执行环境的种类,一个JobManager将会运行在一个本地线程中(如果是本地执行环境的化)或者JobGraph将会被发送到一个远程的JobManager上面去。如果JobManager远程运行,那么JobGraph必须和一个包含有所有类和应用程序的依赖的JAR包一起发送到远程JobManager

产生传感器读数代码编写

从批读取数据

scala
1
2
3
4
5
6
7
val stream = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718199, 15.402984393403084),
SensorReading("sensor_7", 1547718199, 6.720945201171228),
SensorReading("sensor_10", 1547718199, 38.101067604893444)
))

从文件读取数据

scala
1
val stream = env.readTextFile(filePath)

以Kafka消息队列的数据为数据来源

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
// source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
.addSource(
new FlinkKafkaConsumer[String](
"hotitems",
new SimpleStringSchema(),
properties
)
)

注意,Kafka的版本为2.2

自定义数据源

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.util.Random

// 传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)

// 需要extends RichParallelSourceFunction, 泛型为SensorReading
class SensorSource
extends RichParallelSourceFunction[SensorReading] {

// flag indicating whether source is still running.
// flag: 表示数据源是否还在正常运行
var running: Boolean = true

// run()函数连续的发送SensorReading数据,使用SourceContext
// 需要override
override def run(srcCtx: SourceContext[SensorReading]): Unit = {

// initialize random number generator
// 初始化随机数发生器
val rand = new Random()
// look up index of this parallel task
// 查找当前运行时上下文的任务的索引
val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask

// initialize sensor ids and temperatures
// 初始化10个(温度传感器的id, 温度值)元组
var curFTemp = (1 to 10).map {
// nextGaussian产生高斯随机数
i => ("sensor_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20))
}

// emit data until being canceled
// 无限循环,产生数据流
while (running) {

// update temperature
// 更新温度
curFTemp = curFTemp.map(t => (t._1, t._2 + (rand.nextGaussian() * 0.5)) )
// get current time
// 获取当前时间戳
val curTime = Calendar.getInstance.getTimeInMillis

// emit new SensorReading
// 发射新的传感器数据, 注意这里srcCtx.collect
curFTemp.foreach(t => srcCtx.collect(SensorReading(t._1, curTime, t._2)))

// wait for 100 ms
Thread.sleep(100)
}

}

// override cancel函数
override def cancel(): Unit = {
running = false
}

}

使用方法

scala
1
2
3
4
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)

注意,在我们本教程中,我们一直会使用这个自定义的数据源。

转换算子

在这一小节我们将大概看一下DataStream API的基本转换算子。与时间有关的操作符(例如窗口操作符和其他特殊的转换算子)将会在后面的章节叙述。一个流的转换操作将会应用在一个或者多个流上面,这些转换操作将流转换成一个或者多个输出流。编写一个DataStream API简单来说就是将这些转换算子组合在一起来构建一个数据流图,这个数据流图就实现了我们的业务逻辑。

大部分的流转换操作都基于用户自定义函数UDF。UDF函数打包了一些业务逻辑并定义了输入流的元素如何转换成输出流的元素。像MapFunction这样的函数,将会被定义为类,这个类实现了Flink针对特定的转换操作暴露出来的接口。

scala
1
2
3
class MyMapFunction extends MapFunction[Int, Int] {
override def map(value: Int): Int = value + 1
}

函数接口定义了需要由用户实现的转换方法,例如上面例子中的map()方法。

大部分函数接口被设计为Single Abstract Method(单独抽象方法)接口,并且接口可以使用Java 8匿名函数来实现。Scala DataStream API也内置了对匿名函数的支持。当讲解DataStream API的转换算子时,我们展示了针对所有函数类的接口,但为了简洁,大部分接口的实现使用匿名函数而不是函数类的方式。

DataStream API针对大多数数据转换操作提供了转换算子。如果你很熟悉批处理API、函数式编程语言或者SQL,那么你将会发现这些API很容易学习。我们会将DataStream API的转换算子分成四类:

  • 基本转换算子:将会作用在数据流中的每一条单独的数据上。
  • KeyedStream转换算子:在数据有key的情况下,对数据应用转换算子。
  • 多流转换算子:合并多条流为一条流或者将一条流分割为多条流。
  • 分布式转换算子:将重新组织流里面的事件。

基本转换算子

基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。我们将解释这些算子的语义并提供示例代码。

MAP

map算子通过调用DataStream.map()来指定。map算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图5-1展示了一个map算子,这个map将每一个正方形转化成了圆形。

img

MapFunction的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction接口来定义。接口包含map()函数,这个函数将一个输入事件恰好转换为一个输出事件。

scala
1
2
3
4
// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
> map(T): O

下面的代码实现了将SensorReading中的id字段抽取出来的功能。

scala
1
2
3
4
5
6
val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new MyMapFunction)

class MyMapFunction extends MapFunction[SensorReading, String] {
override def map(r: SensorReading): String = r.id
}

当然我们更推荐匿名函数的写法。

scala
1
2
val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(r => r.id)

FILTER

filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream.filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图5-2展示了只产生白色方框的filter操作。

img

布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。

scala
1
2
3
// T: the type of elements
FilterFunction[T]
> filter(T): Boolean

下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。

scala
1
2
val readings: DataStream[SensorReading] = ...
val filteredSensors = readings.filter(r => r.temperature >= 25)

FLATMAP

flatMap算子和map算子很类似,不同之处在于针对每一个输入事件flatMap可以生成0个、1个或者多个输出元素。事实上,flatMap转换算子是filtermap的泛化。所以flatMap可以实现mapfilter算子的功能。图5-3展示了flatMap如何根据输入事件的颜色来做不同的处理。如果输入事件是白色方框,则直接输出。输入元素是黑框,则复制输入。灰色方框会被过滤掉。

img

flatMap算子将会应用在每一个输入事件上面。对应的FlatMapFunction定义了flatMap()方法,这个方法返回0个、1个或者多个事件到一个Collector集合中,作为输出结果。

scala
1
2
3
4
// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
> flatMap(T, Collector[O]): Unit

下面的例子展示了在数据分析教程中经常用到的例子,我们用flatMap来实现。这个函数应用在一个语句流上面,将每个句子用空格切分,然后把切分出来的单词作为单独的事件发送出去。

scala
1
2
3
val sentences: DataStream[String] = ...
val words: DataStream[String] = sentences
.flatMap(id => id.split(" "))

键控流转换算子

很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。DataStream API提供了一个叫做KeyedStream的抽象,此抽象会从逻辑上对DataStream进行分区,分区后的数据拥有同样的Key值,分区后的流互不相关。

针对KeyedStream的状态转换操作可以读取数据或者写入数据到当前事件Key所对应的状态中。这表明拥有同样Key的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。

要小心使用状态转换操作和基于Key的聚合操作。如果Key的值越来越多,例如:Key是订单ID,我们必须及时清空Key所对应的状态,以免引起内存方面的问题。稍后我们会详细讲解。

KeyedStream可以使用map,flatMap和filter算子来处理。接下来我们会使用keyBy算子来将DataStream转换成KeyedStream,并讲解基于key的转换操作:滚动聚合和reduce算子。

KEYBY

keyBy通过指定key来将DataStream转换成KeyedStream。基于不同的key,流中的事件将被分配到不同的分区中去。所有具有相同key的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同key的事件可以在同一个任务中处理。但是算子只能访问当前事件的key所对应的状态。

如图5-4所示,把输入事件的颜色作为key,黑色的事件输出到了一个分区,其他颜色输出到了另一个分区。

img

keyBy()方法接收一个参数,这个参数指定了key或者keys,有很多不同的方法来指定key。我们将在后面讲解。下面的代码声明了id这个字段为SensorReading流的key。

scala
1
2
3
val readings: DataStream[SensorReading] = ...
val keyed: KeyedStream[SensorReading, String] = readings
.keyBy(r => r.id)

匿名函数r => r.id抽取了传感器读数SensorReading的id值。

滚动聚合

滚动聚合算子由KeyedStream调用,并生成一个聚合以后的DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的key保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。DataStream API提供了以下滚动聚合方法。

滚动聚合算子只能用在滚动窗口,不能用在滑动窗口。

  • sum():在输入流上对指定的字段做滚动相加操作。
  • min():在输入流上对指定的字段求最小值。
  • max():在输入流上对指定的字段求最大值。
  • minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
  • maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

下面的例子根据第一个字段来对类型为Tuple3[Int, Int, Int]的流做分流操作,然后针对第二个字段做滚动求和操作。

scala
1
2
3
4
5
6
val inputStream: DataStream[(Int, Int, Int)] = env.fromElements(
(1, 2, 2), (2, 3, 1), (2, 2, 4), (1, 5, 3))

val resultStream: DataStream[(Int, Int, Int)] = inputStream
.keyBy(0) // key on first field of the tuple
.sum(1) // sum the second field of the tuple in place

在这个例子里面,输入流根据第一个字段来分流,然后在第二个字段上做计算。对于key 1,输出结果是(1,2,2),(1,7,2)。对于key 2,输出结果是(2,3,1),(2,5,1)。第一个字段是key,第二个字段是求和的数值,第三个字段未定义。

滚动聚合操作会对每一个key都保存一个状态。因为状态从来不会被清空,所以我们在使用滚动聚合算子时只能使用在含有有限个key的流上面。

REDUCE

reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()方法,此方法接收两个输入事件,输入一个相同类型的事件。

scala
1
2
3
// T: the element type
ReduceFunction[T]
> reduce(T, T): T

下面的例子,流根据语言这个key来分区,输出结果为针对每一种语言都实时更新的单词列表。

scala
1
2
3
4
5
6
val inputStream: DataStream[(String, List[String])] = env.fromElements(
("en", List("tea")), ("fr", List("vin")), ("en", List("cake")))

val resultStream: DataStream[(String, List[String])] = inputStream
.keyBy(0)
.reduce((x, y) => (x._1, x._2 ::: y._2))

reduce匿名函数将连续两个tuple的第一个字段(key字段)继续发送出去,然后将两个tuple的第二个字段List[String]连接。

reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。

多流转换算子

许多应用需要摄入多个流并将流合并处理,还可能需要将一条流分割成多条流然后针对每一条流应用不同的业务逻辑。接下来,我们将讨论DataStream API中提供的能够处理多条输入流或者发送多条输出流的操作算子。

UNION

DataStream.union()方法将两条或者多条DataStream合并成一条具有与输入流相同类型的输出DataStream。接下来的转换算子将会处理输入流中的所有元素。图5-5展示了union操作符如何将黑色和白色的事件流合并成一个单一输出流。

img

事件合流的方式为FIFO方式。操作符并不会产生一个特定顺序的事件流。union操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

下面的例子展示了如何将三条类型为SensorReading的数据流合并成一条流。

scala
1
2
3
4
5
val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
.union(tokyoStream, rioStream)

CONNECT, COMAP和COFLATMAP

联合两条流的事件是非常常见的流处理需求。例如监控一片森林然后发出高危的火警警报。报警的Application接收两条流,一条是温度传感器传回来的数据,一条是烟雾传感器传回来的数据。当两条流都超过各自的阈值时,报警。

DataStream API提供了connect操作来支持以上的应用场景。DataStream.connect()方法接收一条DataStream,然后返回一个ConnectedStreams类型的对象,这个对象表示了两条连接的流。

scala
1
2
3
4
5
6
7
// first stream
val first: DataStream[Int] = ...
// second stream
val second: DataStream[String] = ...

// connect streams
val connected: ConnectedStreams[Int, String] = first.connect(second)

ConnectedStreams提供了map()flatMap()方法,分别需要接收类型为CoMapFunctionCoFlatMapFunction的参数。

以上两个函数里面的泛型是第一条流的事件类型和第二条流的事件类型,以及输出流的事件类型。还定义了两个方法,每一个方法针对一条流来调用。map1()flatMap1()会调用在第一条流的元素上面,map2()flatMap2()会调用在第二条流的元素上面。

scala
1
2
3
4
5
6
7
8
9
10
// IN1: 第一条流的事件类型
// IN2: 第二条流的事件类型
// OUT: 输出流的事件类型
CoMapFunction[IN1, IN2, OUT]
> map1(IN1): OUT
> map2(IN2): OUT

CoFlatMapFunction[IN1, IN2, OUT]
> flatMap1(IN1, Collector[OUT]): Unit
> flatMap2(IN2, Collector[OUT]): Unit

函数无法选择读某一条流。我们是无法控制函数中的两个方法的调用顺序的。当一条流中的元素到来时,将会调用相对应的方法。

对两条流做连接查询通常需要这两条流基于某些条件被确定性的路由到操作符中相同的并行实例里面去。在默认情况下,connect()操作将不会对两条流的事件建立任何关系,所以两条流的事件将会随机的被发送到下游的算子实例里面去。这样的行为会产生不确定性的计算结果,显然不是我们想要的。为了针对ConnectedStreams进行确定性的转换操作,connect()方法可以和keyBy()或者broadcast()组合起来使用。我们首先看一下keyBy()的示例。

scala
1
2
3
4
5
6
7
8
9
10
11
12
val one: DataStream[(Int, Long)] = ...
val two: DataStream[(Int, String)] = ...

// keyBy two connected streams
val keyedConnect1: ConnectedStreams[(Int, Long), (Int, String)] = one
.connect(two)
.keyBy(0, 0) // key both input streams on first attribute

// alternative: connect two keyed streams
val keyedConnect2: ConnectedStreams[(Int, Long), (Int, String)] = one
.keyBy(0)
.connect(two.keyBy(0))

无论使用keyBy()算子操作ConnectedStreams还是使用connect()算子连接两条KeyedStreams,connect()算子会将两条流的含有相同Key的所有事件都发送到相同的算子实例。两条流的key必须是一样的类型和值,就像SQL中的JOIN。在connected和keyed stream上面执行的算子有访问keyed state的权限。

下面的例子展示了如何连接一条DataStream和广播过的流。

scala
1
2
3
4
5
6
7
val first: DataStream[(Int, Long)] = ...
val second: DataStream[(Int, String)] = ...

// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
// broadcast second input stream
.connect(second.broadcast())

一条被广播过的流中的所有元素将会被复制然后发送到下游算子的所有并行实例中去。未被广播过的流仅仅向前发送。所以两条流的元素显然会被连接处理。

SPLIT和SELECT

Split是Union的反函数。Split将输入的流分成两条或者多条流。每一个输入的元素都可以被路由到0、1或者多条流中去。所以,split可以用来过滤或者复制元素。图5-6展示了split操作符将所有的白色事件都路由到同一条流中去了,剩下的元素去往另一条流。

img

DataStream.split()方法接受OutputSelector类型,此类型定义了输入流中的元素被分配到哪个名字的流中去。OutputSelector定义了select()方法,此方法将被每一个元素调用,并返回java.lang.Iterable[String]类型的数据。返回的String类型的值将指定元素将被路由到哪一条流。

Code
1
2
3
// IN: the type of the split elements
OutputSelector[IN]
> select(IN): Iterable[String]

DataStream.split()方法返回SplitStream类型,此类型提供select()方法,可以根据分流后不同流的名字,将某个名字对应的流提取出来。

例5-2将一条整数流分成了不同的流,大的整数一条流,小的整数一条流。

scala
1
2
3
4
5
6
7
8
val inputStream: DataStream[(Int, String)] = ...

val splitted: SplitStream[(Int, String)] = inputStream
.split(t => if (t._1 > 1000) Seq("large") else Seq("small"))

val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large")

不推荐使用split方法,推荐使用Flink的侧输出(side-output)特性。

分布式转换算子

分区操作对应于我们之前讲过的“数据交换策略”这一节。这些操作定义了事件如何分配到不同的任务中去。当我们使用DataStream API来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。有些时候,我们需要在应用程序的层面控制分区策略,或者自定义分区策略。例如,如果我们知道会发生数据倾斜,那么我们想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,我们需要自定义分区策略的时候。在这一小节,我们将展示DataStream的一些方法,可以使我们来控制或者自定义数据分区策略。

keyBy()方法不同于分布式转换算子。所有的分布式转换算子将产生DataStream数据类型。而keyBy()产生的类型是KeyedStream,它拥有自己的keyed state。

Random

随机数据交换由DataStream.shuffle()方法实现。shuffle方法将数据随机的分配到下游算子的并行任务中去。

Round-Robin

rebalance()方法使用Round-Robin负载均衡算法将输入流平均分配到随后的并行运行的任务中去。图5-7为round-robin分布式转换算子的示意图。

Rescale

rescale()方法使用的也是round-robin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时,rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时,rescale操作将会效率更高。

rebalance()rescale()的根本区别在于任务之间连接的机制不同。 rebalance()将会针对所有发送者任务和所有接收者任务之间建立通信通道,而rescale()仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。rescale的示意图为图5-7。

img

Broadcast

broadcast()方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。

Global

global()方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个task,将会对应用程序造成很大的压力。

Custom

当Flink提供的分区策略都不适用时,我们可以使用partitionCustom()方法来自定义分区策略。这个方法接收一个Partitioner对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者key来进行分区。下面的例子将一条整数流做partition,使得所有的负整数都发送到第一个任务中,剩下的数随机分配。

scala
1
2
3
4
5
6
7
8
9
10
val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)

object myPartitioner extends Partitioner[Int] {
val r = scala.util.Random

override def partition(key: Int, numPartitions: Int): Int = {
if (key < 0) 0 else r.nextInt(numPartitions)
}
}

设置并行度

Flink应用程序在一个像集群这样的分布式环境中并行执行。当一个数据流程序提交到JobManager执行时,系统将会创建一个数据流图,然后准备执行需要的操作符。每一个操作符将会并行化到一个或者多个任务中去。每个算子的并行任务都会处理这个算子的输入流中的一份子集。一个算子并行任务的个数叫做算子的并行度。它决定了算子执行的并行化程度,以及这个算子能处理多少数据量。

算子的并行度可以在执行环境这个层级来控制,也可以针对每个不同的算子设置不同的并行度。默认情况下,应用程序中所有算子的并行度都将设置为执行环境的并行度。执行环境的并行度(也就是所有算子的默认并行度)将在程序开始运行时自动初始化。如果应用程序在本地执行环境中运行,并行度将被设置为CPU的核数。当我们把应用程序提交到一个处于运行中的Flink集群时,执行环境的并行度将被设置为集群默认的并行度,除非我们在客户端提交应用程序时显式的设置好并行度。

通常情况下,将算子的并行度定义为和执行环境并行度相关的数值会是个好主意。这允许我们通过在客户端调整应用程序的并行度就可以将程序水平扩展了。我们可以使用以下代码来访问执行环境的默认并行度。

我们还可以重写执行环境的默认并行度,但这样的话我们将再也不能通过客户端来控制应用程序的并行度了。

算子默认的并行度也可以通过重写来明确指定。在下面的例子里面,数据源的操作符将会按照环境默认的并行度来并行执行,map操作符的并行度将会是默认并行度的2倍,sink操作符的并行度为2。

scala
1
2
3
4
5
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defaultP = env.getParallelism
val result = env.addSource(new CustomSource)
.map(new MyMapper).setParallelism(defaultP * 2)
.print().setParallelism(2)

当我们通过客户端将应用程序的并行度设置为16并提交执行时,source操作符的并行度为16,mapper并行度为32,sink并行度为2。如果我们在本地环境运行应用程序的话,例如在IDE中运行,机器是8核,那么source任务将会并行执行在8个任务上面,mapper运行在16个任务上面,sink运行在2个任务上面。

并行度是动态概念,任务槽数量是静态概念。并行度<=任务槽数量。一个任务槽最多运行一个并行度。

类型

Flink程序所处理的流中的事件一般是对象类型。操作符接收对象输出对象。所以Flink的内部机制需要能够处理事件的类型。在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。为了高效的进行此类操作,Flink需要流中事件类型的详细信息。Flink使用了Type Information的概念来表达数据类型,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。

有点像泛型。

Flink也能够通过分析输入数据和输出数据来自动获取数据的类型信息以及序列化器和反序列化器。尽管如此,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。

在这一节中,我们将讨论Flink支持的类型,以及如何为数据类型创建相应的类型信息,还有就是在Flink无法推断函数返回类型的情况下,如何帮助Flink的类型系统去做类型推断。

支持的数据类型

Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类:

  • Primitives(原始数据类型)
  • Java和Scala的Tuples(元组)
  • Scala的样例类
  • POJO类型
  • 一些特殊的类型

接下来让我们一探究竟。

Primitives

Java和Scala提供的所有原始数据类型都支持,例如Int(Java的Integer),String,Double等等。下面举一个例子:

scala
1
2
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map(n => n + 1)

Tuples

元组是一种组合数据类型,由固定数量的元素组成。

DataStream的Scala API直接使用Scala内置的Tuple。举个例子:

scala
1
2
3
4
5
6
7
val persons: DataStream[(String, Integer)] =
env.fromElements(
("Adam", 17),
("Sarah", 23)
)

persons.filter(p => p._2 > 18)

Flink为Java的Tuple同样提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。

我们可以将上面的例子用Java的DataStream API重写:

scala
1
2
3
4
5
6
7
DataStream<Tuple2<String, Integer>> persons = env
.fromElements(
Tuple2.of("Adam", 17),
Tuple2.of("Sarah", 23)
);

persons.filter(p -> p.f1 > 18);

Tuple的元素可以通过它们的public属性访问–f0,f1,f2等等。或者使用getField(int pos)方法来访问,元素下标从0开始:

scala
1
2
3
4
import org.apache.flink.api.java.tuple.Tuple2

Tuple2<String, Integer> personTuple = Tuple2.of("Alex", 42);
Integer age = personTuple.getField(1); // age = 42

不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子:

scala
1
2
personTuple.f1 = 42; // set the 2nd field to 42
personTuple.setField(43, 1); // set the 2nd field to 43

Scala case classes

scala
1
2
3
4
5
6
7
8
case class Person(name: String, age: Int)

val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23)
)

persons.filter(p => p.age > 18)

POJO

POJO类的定义:

  • 公有类
  • 无参数的公有构造器
  • 所有的字段都是公有的,可以通过getters和setters访问。
  • 所有字段的数据类型都必须是Flink支持的数据类型。

举个例子:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Person {
public String name;
public int age;

public Person() {}

public Person(String name, int age) {
this.name = name;
this.age = age;
}
}

DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23)
);

其他数据类型

  • Array, ArrayList, HashMap, Enum
  • Hadoop Writable types
  • Either, Option, Try

为数据类型创建类型信息

Flink类型系统的核心类是TypeInformation。它为系统在产生序列化器和比较操作符时,提供了必要的类型信息。例如,如果我们想使用某个key来做联结查询或者分组操作,TypeInformation可以让Flink做更严格的类型检查。

Flink针对Java和Scala分别提供了类来产生类型信息。在Java中,类是

scala
1
org.apache.flink.api.common.typeinfo.Types

举个例子:

scala
1
2
3
4
5
6
7
TypeInformation<Integer> intType = Types.INT;

TypeInformation<Tuple2<Long, String>> tupleType = Types
.TUPLE(Types.LONG, Types.STRING);

TypeInformation<Person> personType = Types
.POJO(Person.class);

在Scala中,类是 org.apache.flink.api.scala.typeutils.Types ,举个例子:

scala
1
2
3
4
5
6
// TypeInformation for primitive types
val stringType: TypeInformation[String] = Types.STRING
// TypeInformation for Scala Tuples
val tupleType: TypeInformation[(Int, Long)] = Types.TUPLE[(Int, Long)]
// TypeInformation for case classes
val caseClassType: TypeInformation[Person] = Types.CASE_CLASS[Person]

别忘了导入import org.apache.flink.streaming.api.scala._

定义Key以及引用字段

在Flink中,我们必须明确指定输入流中的元素中的哪一个字段是key。

使用字段位置进行keyBy

scala
1
2
val input: DataStream[(Int, String, Long)] = ...
val keyed = input.keyBy(1)

注意,要么明确写清楚类型注释,要么让Scala去做类型推断,不要用IDEA的类型推断功能。

如果我们想要用元组的第2个字段和第3个字段做keyBy,可以看下面的例子。

scala
1
val keyed2 = input.keyBy(1, 2)

使用字段表达式来进行keyBy

对于样例类:

scala
1
2
3
4
5
6
7
8
case class SensorReading(
id: String,
timestamp: Long,
temperature: Double
)

val sensorStream: DataStream[SensorReading] = ...
val keyedSensors = sensorStream.keyBy("id")

对于元组:

scala
1
2
3
4
5
6
val input: DataStream[(Int, String, Long)] = ...
val keyed1 = input.keyBy("2") // key by 3rd field
val keyed2 = input.keyBy("_1") // key by 1st field

DataStream<Tuple3<Integer, String, Long>> javaInput = ...
javaInput.keyBy("f2") // key Java tuple by 3rd field

对于存在嵌套的样例类:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case class Address (
address: String,
zip: String,
country: String
)

case class Person (
name: String,
birthday: (Int, Int, Int), // year, month, day
address: Address
)

val persons: DataStream[Person] = ...
persons.keyBy("address.zip") // key by nested POJO field
persons.keyBy("birthday._1") // key by field of nested tuple
persons.keyBy("birthday._") // key by all fields of nested tuple

Key选择器

方法类型

Code
1
2
KeySelector[IN, KEY]
> getKey(IN): KEY

两个例子

scala
1
2
3
4
val sensorData: DataStream[SensorReading] = ...
val byId: KeyedStream[SensorReading, String] = sensorData.keyBy(r => r.id)
val input: DataStream[(Int, Int)] = ...
val keyedStream = input.keyBy(value => math.max(value._1, value._2))

实现UDF函数,更细粒度的控制流

函数类(Function Classes)

Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

例子实现了FilterFunction接口

scala
1
2
3
4
5
6
7
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}

val flinkTweets = tweets.filter(new FlinkFilter)

还可以将函数实现成匿名类

scala
1
2
3
4
5
6
7
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)

我们filter的字符串“flink”还可以当作参数传进去。

scala
1
2
3
4
5
6
7
8
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))

class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}

匿名函数(Lambda Functions)

匿名函数可以实现一些简单的逻辑,但无法实现一些高级功能,例如访问状态等等。

scala
1
2
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))

富函数(Rich Functions)

我们经常会有这样的需求:在函数处理数据之前,需要做一些初始化的工作;或者需要在处理数据时可以获得函数执行上下文的一些信息;以及在处理完数据时做一些清理工作。而DataStream API就提供了这样的机制。

DataStream API提供的所有转换操作函数,都拥有它们的“富”版本,并且我们在使用常规函数或者匿名函数的地方来使用富函数。例如下面就是富函数的一些例子,可以看出,只需要在常规函数的前面加上Rich前缀就是富函数了。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

当我们使用富函数时,我们可以实现两个额外的方法:

  • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。open()函数通常用来做一些只需要做一次即可的初始化工作。
  • close()方法是生命周期中的最后一个调用的方法,通常用来做一些清理工作。

另外,getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,当前子任务的索引,当前子任务的名字。同时还它还包含了访问分区状态的方法。下面看一个例子:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0

override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 做一些初始化工作
// 例如建立一个和HDFS的连接
}

override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}

override def close(): Unit = {
// 清理工作,断开和HDFS的连接。
}
}

Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

scala
1
stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

Kafka

Kafka版本为0.11

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>

Kafka版本为2.0以上

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>

主函数中添加sink:

scala
1
2
3
4
5
6
7
8
9
10
11
val union = high
.union(low)
.map(_.temperature.toString)

union.addSink(
new FlinkKafkaProducer011[String](
"localhost:9092",
"test",
new SimpleStringSchema()
)
)

Redis

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>

定义一个redis的mapper类,用于定义保存到redis时调用的命令:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
class MyRedisMapper extends RedisMapper[SensorReading] {

override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}

override def getValueFromData(t: SensorReading): String = {
t.temperature.toString
}

override def getKeyFromData(t: SensorReading): String = t.id

}

ElasticSearch

在主函数中调用:

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.10.0</version>
</dependency>

在主函数中调用:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
println("saving data: " + t)
val json = new util.HashMap[String, String]()
json.put("data", t.toString)
val indexRequest = Requests
.indexRequest()
.index("sensor")
.`type`("readingData")
.source(json)
requestIndexer.add(indexRequest)
println("saved successfully")
}
})
dataStream.addSink(esSinkBuilder.build())

JDBC自定义sink

xml
1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>

添加MyJdbcSink

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/test",
"root",
"123456")
insertStmt = conn.prepareStatement(
"INSERT INTO temperatures (sensor, temp) VALUES (?, ?)"
)
updateStmt = conn.prepareStatement(
"UPDATE temperatures SET temp = ? WHERE sensor = ?"
)
}
// 调用连接,执行sql
override def invoke(value: SensorReading,
context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()

if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}

在main方法中增加,把明细保存到mysql中

scala
1
dataStream.addSink(new MyJdbcSink())
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/27/flink%E7%B3%BB%E5%88%9705Flink-DataStream-API/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论