avatar

目录
flink系列12电商用户行为分析

数据集解析

淘宝数据集解析

我们准备了一份淘宝用户行为数据集,保存为csv文件。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

字段名 数据类型 说明
userId Long 脱敏后的用户ID
itemId Long 脱敏后的商品ID
categoryId Int 脱敏后的商品所属类别ID
behavior String 用户行为类型,包括:(‘pv’, ‘buy’, ‘cart’, ‘fav’)
timestamp Long 行为发生的时间戳,单位秒

Apache服务器日志数据集解析

这里以apache服务器的一份log为例,每一行日志记录了访问者的IP、userId、访问时间、访问方法以及访问的url,具体描述如下:

字段名 数据类型 说明
ip String 访问的IP
userId Long 访问的userId
eventTime Long 访问时间
method String 访问方法 GET/POST/PUT/DELETE
url String 访问的url

实时热门商品统计

首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。

基本需求

  • 每隔5分钟输出最近一小时内点击量最多的前N个商品
  • 点击量用浏览次数(“pv”)来衡量

解决思路

. 在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计 . 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟 . 窗口计算使用增量聚合函数和全窗口聚合函数相结合的方法 . 使用窗口结束时间作为key,对DataStream进行keyBy()操作 . 将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出

数据准备

将数据文件UserBehavior.csv复制到资源文件目录src/main/resources下。

程序主体

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 把数据需要ETL成UserBehavior类型
case class UserBehavior(userId: Long,
itemId: Long,
categoryId: Int,
behavior: String,
timestamp: Long)

// 全窗口聚合函数输出的数据类型
case class ItemViewCount(itemId: Long,
windowEnd: Long,
count: Long)

object HotItems {
def main(args: Array[String]): Unit = {
// 创建一个 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设定Time类型为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 为了打印到控制台的结果不乱序,
// 我们配置全局的并发为1,这里改变并发对结果正确性没有影响
env.setParallelism(1)
val stream = env
// 以window下为例,需替换成数据集的绝对路径
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong,
linearray(1).toLong,
linearray(2).toInt,
linearray(3),
linearray(4).toLong)
})
// 过滤出点击事件
.filter(_.behavior == "pv")
// 指定时间戳和Watermark,这里我们已经知道了数据集的时间戳是单调递增的了。
.assignAscendingTimestamps(_.timestamp * 1000)
// 根据商品Id分流
.keyBy(_.itemId)
// 开窗操作
.timeWindow(Time.minutes(60), Time.minutes(5))
// 窗口计算操作
.aggregate(new CountAgg(), new WindowResultFunction())
// 根据窗口结束时间分流
.keyBy(_.windowEnd)
// 求点击量前3名的商品
.process(new TopNHotItems(3))

// 打印结果
stream.print()

// 别忘了执行
env.execute("Hot Items Job")
}
}

真实业务场景一般都是乱序的,所以一般不用assignAscendingTimestamps,而是使用BoundedOutOfOrdernessTimestampExtractor

增量聚合函数逻辑编写

scala
1
2
3
4
5
6
7
8
// COUNT统计的聚合函数实现,每出现一条记录就加一
class CountAgg
extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}

全窗口聚合函数逻辑编写

其实就是将增量聚合的结果包上一层窗口信息和key的信息。

代码如下:

scala
1
2
3
4
5
6
7
8
9
10
// 用于输出窗口的结果
class WindowResultFunction
extends ProcessWindowFunction[Long, ItemViewCount, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[Long],
out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key, context.window.getEnd, elements.iterator.next()))
}
}

现在我们就得到了每个商品在每个窗口的点击量的数据流。

计算最热门TopN商品

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class TopNHotItems(topSize: Int)
extends KeyedProcessFunction[Long, ItemViewCount, String] {
// 惰性赋值一个状态变量
lazy val itemState = getRuntimeContext.getListState(
new ListStateDescriptor[ItemViewCount]("items", Types.of[ItemViewCount])
)

// 来一条数据都会调用一次
override def processElement(value: ItemViewCount,
ctx: KeyedProcessFunction[Long,
ItemViewCount, String]#Context,
out: Collector[String]): Unit = {
itemState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}

// 定时器事件
override def onTimer(
ts: Long,
ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
out: Collector[String]
): Unit = {
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
// 导入一些隐式类型转换
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}

// 清空状态变量,释放空间
itemState.clear()

// 降序排列
val sortedItems = allItems.sortBy(-_.count).take(topSize)
val result = new StringBuilder
result.append("====================================\n")
result.append("时间: ").append(new Timestamp(ts - 1)).append("\n")
for (i <- sortedItems.indices) {
val currentItem = sortedItems(i)
result.append("No")
.append(i+1)
.append(":")
.append(" 商品ID=")
.append(currentItem.itemId)
.append(" 浏览量=")
.append(currentItem.count)
.append("\n")
}
result.append("====================================\n\n")
Thread.sleep(1000)
out.collect(result.toString())
}
}

更换Kafka作为数据源

实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:

注意:这里Kafka的版本要用2.2!

添加依赖:

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

编写代码:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty("auto.offset.reset", "latest")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val stream = env
.addSource(new FlinkKafkaConsumer[String](
"hotitems",
new SimpleStringSchema(),
properties)
)

当然,根据实际的需要,我们还可以将Sink指定为Kafka、ES、Redis或其它存储,这里就不一一展开实现了。

kafka生产者程序

添加依赖

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.2.0</version>
</dependency>

编写代码:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducerUtil {

def main(args: Array[String]): Unit = {
writeToKafka("hotitems")
}

def writeToKafka(topic: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"
)
props.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"
)
val producer = new KafkaProducer[String, String](props)
val bufferedSource = io.Source.fromFile("UserBehavior.csv文件的绝对路径")
for (line <- bufferedSource.getLines) {
val record = new ProducerRecord[String, String](topic, line)
producer.send(record)
}
producer.close()
}
}

实时流量统计

  • 基本需求
    • 从web服务器的日志中,统计实时的访问流量
    • 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
  • 解决思路
    • 将apache服务器日志中的时间,转换为时间戳,作为Event Time
    • 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒

数据准备

将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。

代码实现

我们现在要实现的模块是“实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。

具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.ysss.project

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction

import org.apache.flink.streaming.api.functions
.timestamps.BoundedOutOfOrdernessTimestampExtractor

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object ApacheLogAnalysis {

case class ApacheLogEvent(ip: String,
userId: String,
eventTime: Long,
method: String,
url: String)

case class UrlViewCount(url: String,
windowEnd: Long,
count: Long)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
// 文件的绝对路径
.readTextFile("apache.log的绝对路径")
.map(line => {
val linearray = line.split(" ")
// 把时间戳ETL成毫秒
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0),
linearray(2),
timestamp,
linearray(5),
linearray(6))
})
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](
Time.milliseconds(1000)
) {
override def extractTimestamp(t: ApacheLogEvent): Long = {
t.eventTime
}
}
)
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(_.windowEnd)
.process(new TopNHotUrls(5))
.print()

env.execute("Traffic Analysis Job")
}

class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}

class WindowResultFunction
extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect(UrlViewCount(key, context.window.getEnd, elements.iterator.next()))
}
}

class TopNHotUrls(topSize: Int)
extends KeyedProcessFunction[Long, UrlViewCount, String] {

lazy val urlState = getRuntimeContext.getListState(
new ListStateDescriptor[UrlViewCount](
"urlState-state",
Types.of[UrlViewCount]
)
)

override def processElement(
input: UrlViewCount,
context: KeyedProcessFunction[Long, UrlViewCount, String]#Context,
collector: Collector[String]
): Unit = {
// 每条数据都保存到状态中
urlState.add(input)
context
.timerService
.registerEventTimeTimer(input.windowEnd + 1)
}

override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext,
out: Collector[String]
): Unit = {
// 获取收到的所有URL访问量
val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (urlView <- urlState.get) {
allUrlViews += urlView
}
// 提前清除状态中的数据,释放空间
urlState.clear()
// 按照访问量从大到小排序
val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
.take(topSize)
// 将排名信息格式化成 String, 便于打印
var result: StringBuilder = new StringBuilder
result
.append("====================================\n")
.append("时间: ")
.append(new Timestamp(timestamp - 1))
.append("\n")

for (i <- sortedUrlViews.indices) {
val currentUrlView: UrlViewCount = sortedUrlViews(i)
// e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55
result
.append("No")
.append(i + 1)
.append(": ")
.append(" URL=")
.append(currentUrlView.url)
.append(" 流量=")
.append(currentUrlView.count)
.append("\n")
}
result
.append("====================================\n\n")
// 控制输出频率,模拟实时滚动结果
Thread.sleep(1000)
out.collect(result.toString)
}
}
}

Uv统计的布隆过滤器实现

依赖:

xml
1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>

完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.ysss

import com.ysss.UserBehavior.UserAction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloomFilter {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env
.readTextFile("UserBehavior.csv的绝对路径")
.map(line => {
val arr = line.split(",")
UserAction(arr(0), arr(1), arr(2), arr(3), arr(4).toLong * 1000)
})
.assignAscendingTimestamps(_.ts)
.filter(_.behavior == "pv")
.map(r => ("dummyKey", r.userId))
.keyBy(_._1)
.timeWindow(Time.minutes(60), Time.minutes(5))
.trigger(new MyTrigger123)
.process(new MyProcess)

stream.print()
env.execute()
}

class MyProcess
extends ProcessWindowFunction[(String, String),
(Long, Long), String, TimeWindow] {
lazy val jedis = new Jedis("localhost", 6379)
lazy val bloom = new Bloom(1 << 29)

override def process(key: String,
context: Context,
vals: Iterable[(String, String)],
out: Collector[(Long, Long)]): Unit = {
val storeKey = context.window.getEnd.toString
var count = 0L

if (jedis.hget("UvCountHashTable", storeKey) != null) {
count = jedis.hget("UvCountHashTable", storeKey).toLong
}

val userId = vals.last._2
val offset = bloom.hash(userId, 61)

val isExist = jedis.getbit(storeKey, offset)
if (!isExist) {
jedis.setbit(storeKey, offset, true)
jedis.hset("UvCountHashTable", storeKey, (count + 1).toString)
}

// out.collect((count, storeKey.toLong))

}
}

class MyTrigger123 extends Trigger[(String, String), TimeWindow] {
override def onEventTime(time: Long,
window: TimeWindow,
ctx: TriggerContext): TriggerResult = {
if (ctx.getCurrentWatermark >= window.getEnd) {
val jedis = new Jedis("localhost", 6379)
val key = window.getEnd.toString
TriggerResult.FIRE_AND_PURGE
println(key, jedis.hget("UvCountHashTable", key))
}
TriggerResult.CONTINUE
}

override def onProcessingTime(
time: Long,
window: TimeWindow,
ctx: TriggerContext
): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(
window: TimeWindow,
ctx: Trigger.TriggerContext
): Unit = {}

override def onElement(element: (String, String),
timestamp: Long,
window: TimeWindow,
ctx: TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
}

class Bloom(size: Long) extends Serializable {
private val cap = size

def hash(value: String, seed: Int): Long = {
var result = 0
for (i <- 0 until value.length) {
result = result * seed + value.charAt(i)
}
(cap - 1) & result
}
}
}

APP分渠道数据统计

完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.ysss

import java.util.{Calendar, UUID}

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

object AppMarketingByChannel {
case class MarketingUserBehavior(userId: String,
behavior: String,
channel: String,
ts: Long)

class SimulatedEventSource
extends RichParallelSourceFunction[MarketingUserBehavior] {

var running = true

val channelSet = Seq("AppStore", "XiaomiStore")
val behaviorTypes = Seq("BROWSE", "CLICK")
val rand = new Random

override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = {
while (running) {
val userId = UUID.randomUUID().toString
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = Calendar.getInstance().getTimeInMillis

ctx.collect(MarketingUserBehavior(userId, behaviorType, channel, ts))

Thread.sleep(10)
}
}

override def cancel(): Unit = running = false
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env
.addSource(new SimulatedEventSource)
.assignAscendingTimestamps(_.ts)
.filter(_.behavior != "UNINSTALL")
.map(r => {
((r.channel, r.behavior), 1L)
})
.keyBy(_._1)
.timeWindow(Time.seconds(5), Time.seconds(1))
.process(new MarketingCountByChannel)
stream.print()
env.execute()
}

class MarketingCountByChannel
extends ProcessWindowFunction[((String, String), Long),
(String, Long, Long), (String, String), TimeWindow] {
override def process(key: (String,String),
context: Context,
elements: Iterable[((String, String), Long)],
out: Collector[(String, Long, Long)]): Unit = {
out.collect((key._1, elements.size, context.window.getEnd))
}
}
}

APP不分渠道数据统计

完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.ysss

import com.ysss.AppMarketingByChannel.SimulatedEventSource
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AppMarketingStatistics {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env
.addSource(new SimulatedEventSource)
.assignAscendingTimestamps(_.ts)
.filter(_.behavior != "UNINSTALL")
.map(r => {
("dummyKey", 1L)
})
.keyBy(_._1)
.timeWindow(Time.seconds(5), Time.seconds(1))
.process(new MarketingCountTotal)
stream.print()
env.execute()
}

class MarketingCountTotal
extends ProcessWindowFunction[(String, Long),
(String, Long, Long), String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Long)],
out: Collector[(String, Long, Long)]): Unit = {
out.collect((key, elements.size, context.window.getEnd))
}
}
}

恶意登陆实现

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.ysss

import com.ysss.FlinkCepExample.LoginEvent
import org.apache.flink.api.common.state.{ListStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object LoginFailWithoutCEP {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val stream = env
.fromElements(
LoginEvent("1", "0.0.0.0", "fail", "1"),
LoginEvent("1", "0.0.0.0", "success", "2"),
LoginEvent("1", "0.0.0.0", "fail", "3"),
LoginEvent("1", "0.0.0.0", "fail", "4")
)
.assignAscendingTimestamps(_.ts.toLong * 1000)
.keyBy(_.userId)
.process(new MatchFunction)

stream.print()
env.execute()
}

class MatchFunction extends KeyedProcessFunction[String, LoginEvent, String] {

lazy val loginState = getRuntimeContext.getListState(
new ListStateDescriptor[LoginEvent]("login-fail", Types.of[LoginEvent])
)

lazy val timestamp = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("ts", Types.of[Long])
)

override def processElement(
value: LoginEvent,
ctx: KeyedProcessFunction[String, LoginEvent, String]#Context,
out: Collector[String]
): Unit = {
if (value.loginStatus == "fail") {
loginState.add(value)
if (!timestamp.value()) {
timestamp.update(value.ts.toLong * 1000 + 5000L)
ctx
.timerService()
.registerEventTimeTimer(value.ts.toLong * 1000 + 5000L)
}
}

if (value.loginStatus == "success") {
loginState.clear()
ctx
.timerService()
.deleteEventTimeTimer(timestamp.value())
}
}

override def onTimer(
ts: Long,
ctx: KeyedProcessFunction[String, LoginEvent, String]#OnTimerContext,
out: Collector[String]
): Unit = {
val allLogins = ListBuffer[LoginEvent]()
import scala.collection.JavaConversions._
for (login <- loginState.get) {
allLogins += login
}
loginState.clear()

if (allLogins.length > 1) {
out.collect("5s以内连续两次登陆失败")
}
}
}
}

订单支付实时监控

  • 基本需求
    • 用户下单之后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
    • 用户下单后15分钟未支付,则输出监控信息
  • 解决思路
    • 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔

在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。

我们将会利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”严格紧邻:

scala
1
2
3
4
5
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.next("next")
.where(_.eventType == "pay")
.within(Time.seconds(5))

这样调用.select方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。 在src/main/scala下继续创建OrderTimeout.scala文件,新建一个单例对象。定义样例类OrderEvent,这是输入的订单事件流;另外还有OrderResult,这是输出显示的订单状态结果。由于没有现成的数据,我们还是用几条自定义的示例数据来做演示。 完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.util.Collector
import scala.collection.Map

case class OrderEvent(orderId: String, eventType: String, eventTime: String)

object OrderTimeout {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val orderEventStream = env.fromCollection(List(
OrderEvent("1", "create", "1558430842"),
OrderEvent("2", "create", "1558430843"),
OrderEvent("2", "pay", "1558430844"),
OrderEvent("3", "pay", "1558430942"),
OrderEvent("4", "pay", "1558430943")
)).assignAscendingTimestamps(_.eventTime.toLong * 1000)

// val orders: DataStream[String] = env
// .socketTextStream("localhost", 9999)
//
// val orderEventStream = orders
// .map(s => {
// println(s)
// val slist = s.split("\\|")
// println(slist)
// OrderEvent(slist(0), slist(1), slist(2))
// })
// .assignAscendingTimestamps(_.eventTime.toLong * 1000)

val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType.equals("create"))
.next("next")
.where(_.eventType.equals("pay"))
.within(Time.seconds(5))

val orderTimeoutOutput = OutputTag[OrderEvent]("orderTimeout")

val patternStream = CEP.pattern(
orderEventStream.keyBy("orderId"), orderPayPattern)

val timeoutFunction = (
map: Map[String, Iterable[OrderEvent]],
timestamp: Long,
out: Collector[OrderEvent]
) => {
print(timestamp)
val orderStart = map.get("begin").get.head
out.collect(orderStart)
}

val selectFunction = (
map: Map[String, Iterable[OrderEvent]],
out: Collector[OrderEvent]
) => {}

val timeoutOrder = patternStream
.flatSelect(orderTimeoutOutput)(timeoutFunction)(selectFunction)

timeoutOrder.getSideOutput(orderTimeoutOutput).print()

env.execute

}
}

使用Process Function实现订单超时需求

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.ysss.project

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object OrderTimeoutWIthoutCep {

case class OrderEvent(orderId: String,
eventType: String,
eventTime: String)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val stream = env
.fromElements(
OrderEvent("1", "create", "2"),
OrderEvent("2", "create", "3"),
OrderEvent("2", "pay", "4")
)
.assignAscendingTimestamps(_.eventTime.toLong * 1000L)
.keyBy(_.orderId)
.process(new OrderMatchFunc)

stream.print()
env.execute()
}

class OrderMatchFunc extends KeyedProcessFunction[String, OrderEvent, String] {
lazy val orderState = getRuntimeContext.getState(
new ValueStateDescriptor[OrderEvent]("saved order", Types.of[OrderEvent])
)

override def processElement(value: OrderEvent,
ctx: KeyedProcessFunction[String, OrderEvent, String]#Context,
out: Collector[String]): Unit = {
if (value.eventType.equals("create")) {
if (orderState.value() == null) { // 为什么要判空?因为可能出现`pay`先到的情况
// 如果orderState为空,保存`create`事件
orderState.update(value)
}
} else {
// 保存`pay`事件
orderState.update(value)
}

ctx.timerService().registerEventTimeTimer(value.eventTime.toLong * 1000 + 5000L)
}

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, OrderEvent, String]#OnTimerContext,
out: Collector[String]): Unit = {
val savedOrder = orderState.value()

if (savedOrder != null && savedOrder.eventType.equals("create")) {
out.collect("超时订单的ID为:" + savedOrder.orderId)
}

orderState.clear()
}
}
}

实时对帐:实现两条流的Join

完整代码如下:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

case class OrderEvent(orderId: String,
eventType: String,
eventTime: String)

case class PayEvent(orderId: String,
eventType: String,
eventTime: String)

object TwoStreamsJoin {
val unmatchedOrders = new OutputTag[OrderEvent]("unmatchedOrders"){}
val unmatchedPays = new OutputTag[PayEvent]("unmatchedPays"){}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val orders = env
.fromCollection(List(
OrderEvent("1", "create", "1558430842"),
OrderEvent("2", "create", "1558430843"),
OrderEvent("1", "pay", "1558430844"),
OrderEvent("2", "pay", "1558430845"),
OrderEvent("3", "create", "1558430849"),
OrderEvent("3", "pay", "1558430849")
)).assignAscendingTimestamps(_.eventTime.toLong * 1000)
.keyBy("orderId")

val pays = env.fromCollection(List(
PayEvent("1", "weixin", "1558430847"),
PayEvent("2", "zhifubao", "1558430848"),
PayEvent("4", "zhifubao", "1558430850")
)).assignAscendingTimestamps(_.eventTime.toLong * 1000)
.keyBy("orderId")

val processed = orders
.connect(pays)
.process(new EnrichmentFunction)

processed.getSideOutput[PayEvent](unmatchedPays).print()
processed.getSideOutput[OrderEvent](unmatchedOrders).print()

env.execute
}

class EnrichmentFunction extends CoProcessFunction[
OrderEvent, PayEvent, (OrderEvent, PayEvent)] {
lazy val orderState: ValueState[OrderEvent] = getRuntimeContext
.getState(new ValueStateDescriptor[OrderEvent]("saved order",
classOf[OrderEvent]))

lazy val payState: ValueState[PayEvent] = getRuntimeContext
.getState(new ValueStateDescriptor[PayEvent]("saved pay",
classOf[PayEvent]))

override def processElement1(
order: OrderEvent,
context: CoProcessFunction[OrderEvent,
PayEvent, (OrderEvent, PayEvent)]#Context,
out: Collector[(OrderEvent, PayEvent)]
): Unit = {
val pay = payState.value()

if (pay != null) {
payState.clear()
out.collect((order, pay))
} else {
orderState.update(order)
// as soon as the watermark arrives,
// we can stop waiting for the corresponding pay
context.timerService
.registerEventTimeTimer(order.eventTime.toLong * 1000)
}
}

override def processElement2(
pay: PayEvent,
context: CoProcessFunction[OrderEvent,
PayEvent,(OrderEvent, PayEvent)]#Context,
out: Collector[(OrderEvent, PayEvent)]
): Unit = {
val order = orderState.value()

if (order != null) {
orderState.clear()
out.collect((order, pay))
} else {
payState.update(pay)
context
.timerService
.registerEventTimeTimer(pay.eventTime.toLong * 1000)
}
}

override def onTimer(
timestamp: Long,
ctx: CoProcessFunction[OrderEvent,
PayEvent, (OrderEvent, PayEvent)]#OnTimerContext,
out: Collector[(OrderEvent, PayEvent)]
): Unit = {
if (payState.value != null) {
ctx.output(unmatchedPays, payState.value)
payState.clear()
}

if (orderState.value != null) {
ctx.output(unmatchedOrders, orderState.value)
orderState.clear()
}
}
}
}
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/08/09/flink%E7%B3%BB%E5%88%9712%E7%94%B5%E5%95%86%E7%94%A8%E6%88%B7%E8%A1%8C%E4%B8%BA%E5%88%86%E6%9E%90/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论