avatar

目录
spark系列之spark-streaming

SparkStreaming概述

Spark Streaming是什么

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。

Spark Streaming架构

整体架构图

spark-streaming架构图

背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

通过属性spark.streaming.backpressure.enabled来控制是否启用backpressure机制,默认值false,即不启用。

DStream入门

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

maven依赖

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>

WordCount案例代码

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object StreamWordCount {

def main(args: Array[String]): Unit = {

//1.初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

//2.初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))

//3.通过监控端口创建DStream,读进来的数据为一行行
val lineStreams = ssc.socketTextStream("linux1", 9999)

//将每一行数据做切分,形成一个个单词
val wordStreams = lineStreams.flatMap(_.split(" "))

//将单词映射成元组(word,1)
val wordAndOneStreams = wordStreams.map((_, 1))

//将相同的单词次数做统计
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)

//打印
wordAndCountStreams.print()

//启动SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}

启动程序并通过netcat发送数据:

Code
1
2
nc -lk 9999
hello ysss

WordCount解析

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。

DStream创建/数据源

RDD队列

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

  • 需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object SparkStreaming02_DStream_Queue {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

// TODO 环境对象
val ssc = new StreamingContext(sparkConf, Seconds(3))

// TODO 数据处理
val que = new mutable.Queue[RDD[String]]()
val queDS: InputDStream[String] = ssc.queueStream(que)
queDS.print()

// TODO 关闭连接环境
ssc.start()

println("queue append item")
for ( i <- 1 to 5 ) {
val rdd = ssc.sparkContext.makeRDD(List("1","2"))
que += rdd
Thread.sleep(2000)
}

// block
ssc.awaitTermination()
}
}

结果

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-------------------------------------------
Time: 1539075280000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)

-------------------------------------------
Time: 1539075284000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)

-------------------------------------------
Time: 1539075288000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)

-------------------------------------------
Time: 1539075292000 ms
-------------------------------------------

扩展,从文件中读取

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object SparkStreaming03_DStream_File {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

// TODO 环境对象
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// 从文件夹中读取新的文件数据,功能不稳定 ,所以不推荐使用
// flume更加专业,所以生产环境,监控文件或目录的变化,采集数据都使用flume
val fileDS: DStream[String] = ssc.textFileStream("in")
val wordDS: DStream[String] = fileDS.flatMap(_.split(" "))
val wordToOneDS: DStream[(String, Int)] = wordDS.map( (_, 1) )
val wordToCountDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_+_)
wordToCountDS.print()

// TODO 关闭连接环境
ssc.start()
ssc.awaitTermination()
}
}

自定义数据源

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

  • 需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.ysss.bigdata.spark.streaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming04_DStream_DIY {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

// TODO 环境对象
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// 自定义数据采集器
val myDS = ssc.receiverStream( new MyReceiver( "localhost", 9999 ) )
val wordDS: DStream[String] = myDS.flatMap(_.split(" "))
val wordToOneDS: DStream[(String, Int)] = wordDS.map( (_, 1) )
val wordToCountDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_+_)
wordToCountDS.print()

// TODO 关闭连接环境
ssc.start()
ssc.awaitTermination()
}
/*
自定义数据采集器
模仿spark自带的socket采集器

1. 继承Receiver ,设定泛型(采集数据的类型), 传递参数
2. 重写方法
*/
// rdd cache, checkpoint
class MyReceiver(host:String, port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var socket: Socket = _

// 接收数据
def receive(): Unit = {

val reader = new BufferedReader(
new InputStreamReader(
socket.getInputStream,
"UTF-8"
)
)

var s : String = null
// 网络编程中,获取的数据没有null的概念
// 如果网络编程中,需要明确告知服务器,客户端不再传数据,需要发送特殊的指令
// 文件读取时,如果读到结束的时候,获取的结果为null
while ( (s = reader.readLine()) != null ) {
// 采集到数据后,进行封装(存储)
if ( s != "-END-" ) {
store(s)
} else {
// stop
// close
// 重启
//restart("")
}
}
}
// 启动采集器
// 采集 & 封装
override def onStart(): Unit = {

socket = new Socket(host, port)

new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}

override def onStop(): Unit = {

if ( socket != null ) {
socket.close()
}
}
}
}

kakfa数据源[重点]

概述

ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。

DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

kafka 0-8 Receiver 模式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.ysss.bigdata.spark.streaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming05_DStream_Kafka {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

// TODO 环境对象
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// 使用0.8版本的kafka - 接收器方式
// 访问kakfa会有相应的工具类
val kafkaDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
ssc,
"linux1:2181,linux2:2181,linux3:2181",
"ysss191125",
Map("ysss191125" -> 3)
)

// Kafka消息传递的时候以k-v对
// k - 传值的时候提供的,默认为null,主要用于分区
// v - message
kafkaDS.map(_._2).print()

// TODO 关闭连接环境
ssc.start()
ssc.awaitTermination()
}
}

kafka 0-8 Direct 模式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

自动维护 offset

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.ysss.bigdata.spark.streaming

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming07_DStream_Kafka_Direct1 {

def main(args: Array[String]): Unit = {

// TODO 这种方式,可以保证数据不丢失,但是可能会出现数据重复消费

// TODO 环境对象 - 从checkpoint中读取数据偏移量
// checkpoint还保存了计算逻辑,不适合扩展功能
// checkpoint会延续计算,但是可能会压垮内存
// checkpoint一般的存储路径为HDFS,所以会导致小文件过多。性能受到影响
// 不推荐使用
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("scp", () => getStreamingContext)
// TODO 关闭连接环境
ssc.start()
ssc.awaitTermination()
}
def getStreamingContext () = {
// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("scp")

// TODO 数据处理
// 使用0.8版本的kafka - Direct方式 - 自动维护Offset
// TODO 默认情况下,SparkStreaming采用checkpoint来保存kafka的数据偏移量
// 访问kakfa会有相应的工具类
val kafkaParamMap = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "ysss191125new"
)
val kafkaDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParamMap,
Set("ysss191125new")
)
kafkaDS.map(_._2).print()
kafkaDS.print()

ssc
}
}

手动维护 offset

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.ysss.bigdata.spark.streaming

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming08_DStream_Kafka_Direc2 {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// 使用0.8版本的kafka - Direct方式 - 手动维护Offset
// 所谓的手动维护,其实就是开发人员自己获取偏移量,并进行保存处理。
// 通过保存的偏移量,可以动态获取kafka中指定位置的数据
// offset会保存到kakfa集群的系统主题中__consumer_offsets
val kafkaMap = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "ysss191125123"
)
val fromOffsets = Map(
(TopicAndPartition("ysss191125new", 0), 0L),
(TopicAndPartition("ysss191125new", 1), 1L),
(TopicAndPartition("ysss191125new", 2), 2L)
)
// TODO 从kafka中获取指定topic中指定offset的数据
val kafkaDS: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaMap,
fromOffsets,
(m:MessageAndMetadata[String, String]) => m.message()
)

var offsetRanges = Array.empty[OffsetRange]

// 转换
// 获取偏移量,一定要在最初的逻辑中获取,防止数据处理完毕后,无偏移量信息
kafkaDS.transform(rdd => {
// 获取RDD中的偏移量范围
// 默认Spark中的RDD是没有offsetRanges方法,所以必须转换类型后才能使用
// RDD 和 HasOffsetRanges有关系
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}).foreachRDD(rdd=>{
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.foreach(println)
})

ssc.start()
ssc.awaitTermination()
}
}

kafka 0-10 Direct 模式

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ysss.bigdata.spark.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming08_DStream_Kafka_Direc2 {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// 使用0.10版本的kafka - Direct方式 - 自动维护Offset
// LocationStrategy : 位置策略
// ConsumerStrategies : 消费策略
// TODO sealed : 用于修饰类的关键字,表示密封类
// 要求子类如果是样例类,必须全部在同一个源码文件中
val kafkaMap = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "ysss191125123",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
Set("ysss191125new"), kafkaMap
)
)
kafkaDS.map(_.value()).print()

ssc.start()
ssc.awaitTermination()
}
}

手动维护可参考官网,和0-8手动维护类似。

spark-streaming如何保证数据精准一次性处理呢?

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.ysss.bigdata.spark.streaming

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming09_DStream_Kafka_Direc3 {

def main(args: Array[String]): Unit = {

// TODO 配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

// TODO 数据处理
// SparkStreaming消费Kafka数据时,手动维护offset的思路

// TODO 1. 从指定的位置获取当前业务中保存的数据偏移量
// mysql => message offset => 5
// TODO 2. 从kafka中对应的分区里根据偏移量获取数据
// topicAndPartition => topic : xxx, partition : 0, offset : 5

// TODO 3. 消费数据时,需要将消费数据的偏移量拿到。
// KafkaRDD => offsetRange => (5, 100)

// TODO 4. 执行业务操作。要求,偏移量的更新和业务要求在同一个事务中
// Tx start
// service
// commit - offset -> mysql
// Tx commit
// TODO 4.1 如果不使用事务,那么可能业务成功,但是offset提交失败
// 会导致数据重复消费
// TODO 4.2 如果不使用事务,那么可能offset提交成功,但是业务失败
// 会导致数据丢失
// TODO 4.3 分布式事务, 如果中间出现shuffle,怎么办?
// 所以需要将数据拉取到driver端进行事务操作,保证数据不会出现问题。
// 这样会导致driver的性能下降,所以其实不是一个好的选择。
// SparkStreaming => 基本要求: 不丢失数据
// Flink => 数据精准一次性处理。

ssc.start()
ssc.awaitTermination()
}
}

消费kafka数据模式总结

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- 0-8 ReceiverAPI:
1) 专门的Executor读取数据,速度不统一
2) 跨机器传输数据
3) Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union
4) offset存储在zookeeper中

- 0-8 DirectAPI:
1) Executor读取数据并计算
2) 增加Executor个数来增加消费的并行度
3) offset存储
a. CheckPoint(getActiveOrCreate方式创建StreamingContext)
b. 手动维护(有事务的存储系统)
4) 获取offset必须在第一个调用的算子中:
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

- 0-10 DirectAPI:
1) Executor读取数据并计算
2) 增加Executor个数来增加消费的并行度
3) offset存储
a. __consumer_offsets系统主题中
b. 手动维护(有事务的存储系统)

DStream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

transform

transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming10_DStream_WordCount {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// TODO 可以将DStream转换为RDD进行操作。
// DStream => old RDD => new RDD => new DStream
val resultDS: DStream[(String, Int)] = socketDS.transform(
rdd => {
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
val reduceRDD = mapRDD.reduceByKey(_ + _)
reduceRDD
}
)
resultDS.print()

ssc.start()
ssc.awaitTermination()

}
}

相比直接在DStream上进行操作,transform的优势

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming11_DStream_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// TODO transform可以获取底层的RDD进行处理
// TODO transform可以周期性的执行driver的代码逻辑

// Code => Driver
// val newDS: DStream[String] = socketDS.map(
// dataString => {
// // Code = Executor
// "string : " + dataString
// }
// )

// Code = Driver
// JDBC.getData();
val newDS1: DStream[String] = socketDS.transform(
rdd => {
// Code = Driver
// JDBC.getData();
println(Thread.currentThread().getName)
rdd.map(
dataString => {
// Code = Executor
"string : " + dataString
// JDBC.updateData();
}
)
}
)

newDS1.print()

ssc.start()
ssc.awaitTermination()

}
}

join

两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming12_DStream_Join {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val socketDS1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val socketDS2: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

val ds1: DStream[(String, Int)] = socketDS1.map((_,1))
val ds2: DStream[(String, Int)] = socketDS2.map((_,1))

val joinDS: DStream[(String, (Int, Int))] = ds1.join(ds2)

joinDS.print()

ssc.start()
ssc.awaitTermination()

}
}

有状态转化操作

UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。

updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

  • 定义状态,状态可以是一个任意的数据类型。

  • 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming13_DStream_State {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("scp")
val socketDS = ssc.socketTextStream("localhost", 9999)

val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))

val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))
// TODO 使用有状态操作updateStateByKey保存数据
// SparkStreaming的状态保存依赖的是checkpoint,所以需要设定相关路径
val wordToCountDS: DStream[(String, Long)] = wordToOneDS.updateStateByKey[Long](
// 累加器 = 6
// UDAF = 8
// TODO 第一个参数表示相同key的value数据集合
// TODO 第二个参数表示相同key的缓冲区的数据
(seq: Seq[Int], buffer: Option[Long]) => {
// TODO 返回值表示更新后的缓冲区的值
val newBufferValue = buffer.getOrElse(0L) + seq.sum
Option(newBufferValue)
}
)
wordToCountDS.print()

ssc.start()
ssc.awaitTermination()

}
}

WindowOperations

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围;

  • 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

【回顾】scala语言中的window

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming14_DStream_Window {

def main(args: Array[String]): Unit = {

val list = List(1,2,3,4,5,6,7,8)

// overflow : 滚动 -> StackOverflowError -> 栈溢出
// 滑动
// flatMap => 整体->个体
// sliding => 整体连续部分(3) -> 整体
// 将sliding中的范围称之为窗口,其中的数据就称之为窗口数据
// 窗口可以动态调整,向后滑动。
val iterator: Iterator[List[Int]] = list.sliding(3, 2)
while ( iterator.hasNext ) {
println(iterator.next())
}
}
}

window(windowLength, slideInterval):

基于对源DStream窗化的批次进行计算返回一个新的Dstream;

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming15_DStream_Window1 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(3))

// 滑窗
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// 设定窗口。将2个采集周期的数据当成一个整体进行处理
// 默认窗口是可以滑动的。滑动的幅度为一个采集周期
// 可以动态改变滑动幅度
// 如果两个窗口移动过程中,没有重合的数据,称之为滚动窗口
// window方法的第一个参数表示窗口的范围大小,以采集周期为单位
// window方法的第二个参数表示窗口的滑动幅度,也表示计算的周期
val windowDS: DStream[String] = socketDS.window(
Seconds(6), Seconds(3))
windowDS
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()

ssc.start()
ssc.awaitTermination()

}
}

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming17_DStream_Window3 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(3))

// 滑窗
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToOneDS: DStream[(String, Int)] = socketDS
.flatMap(_.split(" "))
.map((_, 1))
val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow(
(x: Int, y: Int) => x + y, Seconds(6), Seconds(3)
)
windowDS.print()

ssc.start()
ssc.awaitTermination()

}
}

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):

这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming19_DStream_Window5 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("scp")
// 滑窗
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToOneDS: DStream[(String, Int)] = socketDS.map(num=>("a", 1))
val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow(
(x: Int, y: Int) => {
val sum = x + y
println( sum + "=" + x + "+" + y )
sum
},
(x:Int, y:Int) => {
val diff = x - y
println( diff + "=" + x + "-" + y )
diff
},
Seconds(6), Seconds(3)
)
windowDS.print()

ssc.start()
ssc.awaitTermination()

}
}

countByWindow(windowLength, slideInterval):

返回一个滑动窗口计数流中的元素个数;

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.ysss.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming18_DStream_Window4 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("scp")

// 滑窗
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// 对窗口的数据进行计数,会使用checkpoint进行保存
val countDS: DStream[Long] = socketDS.countByWindow(Seconds(6), Seconds(3))

countDS.print()

ssc.start()
ssc.awaitTermination()

}
}

DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

输出操作如下:

  • print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

  • saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。

  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”. Python中目前不可用。

  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。

  • foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。

注意:

1) 连接不能写在driver层面(序列化)

2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

3) 增加foreachPartition,在分区创建(获取)。

方法一:性能低,每个RDD要连接一次

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.ysss.bigdata.spark.streaming

import java.sql.{DriverManager, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming20_DStream_Output {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val socketDS = ssc.socketTextStream("localhost", 9999)

// 将数据保存到MySQL数据库中
// id, name, age
socketDS.foreachRDD(rdd=>{
rdd.foreach(data=>{
// 解决性能问题
val datas = data.split(",")
val id = datas(0).toInt
val name = datas(1)
val age = datas(2).toInt

// TODO 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver")
// TODO 建立链接和操作对象
val conn =
DriverManager.getConnection(
"jdbc:mysql://linux1:3306/rdd",
"root","000000")
val sql = "insert into user (id ,name, age) values (?, ?, ?)"
val statement: PreparedStatement = conn.prepareStatement(sql)
statement.setInt(1, id)
statement.setString(2, name)
statement.setInt(3, age)
// TODO 操作数据
statement.executeUpdate()
// TODO 关闭连接
statement.close()
conn.close()
println("数据保存成功!!!")
})
})

ssc.start()
ssc.awaitTermination()

}
}

方法二:把连接放到foreachRDD外面,但是根本执行不了,因为所有的连接对象都不支持序列化操作

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.ysss.bigdata.spark.streaming

import java.sql.{DriverManager, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming21_DStream_Output1 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val socketDS = ssc.socketTextStream("localhost", 9999)

// 将数据保存到MySQL数据库中
// id, name, age

// TODO 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver")
// TODO 建立链接和操作对象
// TODO 所有的连接对象都不支持序列化操作
val conn =
DriverManager.getConnection(
"jdbc:mysql://linux1:3306/rdd",
"root","000000")
val sql = "insert into user (id ,name, age) values (?, ?, ?)"
val statement: PreparedStatement = conn.prepareStatement(sql)

socketDS.foreachRDD(rdd=>{
// TODO RDD的方法称之为算子,存在分布式计算,需要进行闭包检测
rdd.foreach(data=>{
// 解决性能问题
val datas = data.split(",")
val id = datas(0).toInt
val name = datas(1)
val age = datas(2).toInt

statement.setInt(1, id)
statement.setString(2, name)
statement.setInt(3, age)
// TODO 操作数据
//statement.addBatch()
//statement.executeBatch()
statement.executeUpdate()

println("数据保存成功!!!")
})
})

// SparkException : Task not serializable
// TODO 关闭连接
statement.close()
conn.close()

ssc.start()
ssc.awaitTermination()

}
}

方法三:rdd.foreachPartition,以分区为单位进行遍历

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.ysss.bigdata.spark.streaming

import java.sql.{DriverManager, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming22_DStream_Output2 {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val socketDS = ssc.socketTextStream("localhost", 9999)

// 将数据保存到MySQL数据库中
// id, name, age

socketDS.foreachRDD(rdd=>{
//【注意】mapPartitions和foreachPartition的区别:
// 以分区为单位进行转换 => 返回
//rdd.mapPartitions()
// 以分区为单位进行遍历 => 不需要返回
rdd.foreachPartition(
datas => {
// TODO 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver")
// TODO 建立链接和操作对象
// TODO 所有的连接对象都不支持序列化操作
val conn =
DriverManager.getConnection(
"jdbc:mysql://linux1:3306/rdd",
"root","000000")
val sql = "insert into user (id ,name, age) values (?, ?, ?)"
val statement: PreparedStatement = conn.prepareStatement(sql)

// datas 其实是scala的集合,所以不存在分布式计算的概念
datas.foreach(
data => {
// 解决性能问题
val datas = data.split(",")
val id = datas(0).toInt
val name = datas(1)
val age = datas(2).toInt

statement.setInt(1, id)
statement.setString(2, name)
statement.setInt(3, age)
// TODO 操作数据
//statement.addBatch()
//statement.executeBatch()
statement.executeUpdate()

println("数据保存成功!!!")
}
)

// TODO 关闭连接
statement.close()
conn.close()
}
)
})


ssc.start()
ssc.awaitTermination()

}
}

优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。

使用外部文件系统来控制内部程序关闭。

spark.streaming.stopGracefullyOnShutdown参数设置成ture,Spark会在JVM关闭时正常关闭StreamingContext,而不是立马关闭

scala
1
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

案例:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.ysss.bigdata.spark.streaming

import java.sql.{DriverManager, PreparedStatement, ResultSet}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object SparkStreaming23_Stop {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
// TODO 配置优雅地关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

val ssc = new StreamingContext(sparkConf, Seconds(5))

val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
val wordToOneDS: DStream[(String, Int)] = wordDS.map( (_, 1) )
val wordToCountDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_+_)

wordToCountDS.print()

ssc.start()

new Thread(
new Runnable {
override def run(): Unit = {
// TODO SparkStreaming是可以停止。但是停止的逻辑代码的位置?
// TODO stop方法不能放置在driver的主线程中。
// TODO 直接调用ssc的stop方法是不可以的。需要循环判断sparkStreaming是否应该关闭
while ( true ) {
// TODO 在Driver端应该设置标记,让当前关闭线程可以访问。可以动态改变状态。
// TODO 但是Driver端的标记何时更新,由谁更新都是不确定的。
// TODO 所以一般标记不是放置在Driver端,而是在第三方软件中:redis,zk,mysql,hdfs

Class.forName("com.mysql.jdbc.Driver")
// TODO 建立链接和操作对象
// TODO 所有的连接对象都不支持序列化操作
val conn =
DriverManager.getConnection(
"jdbc:mysql://linux1:3306/rdd",
"root","000000")
val sql = "select age from user where id = 1"
val statement: PreparedStatement = conn.prepareStatement(sql)
val rs: ResultSet = statement.executeQuery()
rs.next()
val age: Int = rs.getInt(1)
if ( age <= 20 ) {

// TODO 判断SSC的状态
val state: StreamingContextState = ssc.getState()
if ( state == StreamingContextState.ACTIVE ) {
println("SparkStreaming的环境准备关闭...")
// TODO 优雅地关闭SSC
// 将现有的数据处理完再关闭就是优雅地关闭
ssc.stop(true, true)
System.exit(0)
}
}

Thread.sleep(1000 * 5)
}

}
}
).start()

ssc.awaitTermination()
// TODO Thread 线程停止的方式?run方法执行完毕
// 为什么不调用stop方法停止线程?因为会出现数据安全问题
// i++ => 1), 2)
// new Thread().stop()

}
}
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/17/spark%E7%B3%BB%E5%88%97%E4%B9%8Bspark-streaming/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论