数据集解析 淘宝数据集解析 我们准备了一份淘宝用户行为数据集,保存为csv文件。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:
字段名
数据类型
说明
userId
Long
脱敏后的用户ID
itemId
Long
脱敏后的商品ID
categoryId
Int
脱敏后的商品所属类别ID
behavior
String
用户行为类型,包括:(‘pv’, ‘buy’, ‘cart’, ‘fav’)
timestamp
Long
行为发生的时间戳,单位秒
Apache服务器日志数据集解析 这里以apache服务器的一份log为例,每一行日志记录了访问者的IP、userId、访问时间、访问方法以及访问的url,具体描述如下:
字段名
数据类型
说明
ip
String
访问的IP
userId
Long
访问的userId
eventTime
Long
访问时间
method
String
访问方法 GET/POST/PUT/DELETE
url
String
访问的url
实时热门商品统计 首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。
基本需求
每隔5分钟输出最近一小时内点击量最多的前N个商品
点击量用浏览次数(“pv”)来衡量
解决思路
. 在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计 . 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟 . 窗口计算使用增量聚合函数和全窗口聚合函数相结合的方法 . 使用窗口结束时间作为key,对DataStream进行keyBy()操作 . 将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出
数据准备
将数据文件UserBehavior.csv复制到资源文件目录src/main/resources下。
程序主体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 case class UserBehavior (userId: Long , itemId: Long , categoryId: Int , behavior: String , timestamp: Long )// 全窗口聚合函数输出的数据类型 case class ItemViewCount (itemId: Long , windowEnd: Long , count: Long ) object HotItems { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val stream = env .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv" ) .map(line => { val linearray = line.split("," ) UserBehavior (linearray(0 ).toLong, linearray(1 ).toLong, linearray(2 ).toInt, linearray(3 ), linearray(4 ).toLong) }) .filter(_.behavior == "pv" ) .assignAscendingTimestamps(_.timestamp * 1000 ) .keyBy(_.itemId) .timeWindow(Time .minutes(60 ), Time .minutes(5 )) .aggregate(new CountAgg (), new WindowResultFunction ()) .keyBy(_.windowEnd) .process(new TopNHotItems (3 )) stream.print() env.execute("Hot Items Job" ) } }
真实业务场景一般都是乱序的,所以一般不用assignAscendingTimestamps,而是使用BoundedOutOfOrdernessTimestampExtractor。
增量聚合函数逻辑编写
1 2 3 4 5 6 7 8 class CountAgg extends AggregateFunction [UserBehavior , Long , Long ] { override def createAccumulator (): Long = 0 L override def add (userBehavior: UserBehavior , acc: Long ): Long = acc + 1 override def getResult (acc: Long ): Long = acc override def merge (acc1: Long , acc2: Long ): Long = acc1 + acc2 }
全窗口聚合函数逻辑编写
其实就是将增量聚合的结果包上一层窗口信息和key的信息。
代码如下:
1 2 3 4 5 6 7 8 9 10 class WindowResultFunction extends ProcessWindowFunction [Long , ItemViewCount , String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Long ], out: Collector [ItemViewCount ]): Unit = { out.collect(ItemViewCount (key, context.window.getEnd, elements.iterator.next())) } }
现在我们就得到了每个商品在每个窗口的点击量的数据流。
计算最热门TopN商品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class TopNHotItems (topSize: Int ) extends KeyedProcessFunction [Long , ItemViewCount , String ] { lazy val itemState = getRuntimeContext.getListState( new ListStateDescriptor [ItemViewCount ]("items" , Types .of[ItemViewCount ]) ) override def processElement (value: ItemViewCount , ctx: KeyedProcessFunction [Long , ItemViewCount , String ]#Context , out: Collector [String ]): Unit = { itemState.add(value) ctx.timerService().registerEventTimeTimer(value.windowEnd + 1 ) } override def onTimer ( ts: Long , ctx: KeyedProcessFunction [Long , ItemViewCount , String ]#OnTimerContext , out: Collector [String ] ): Unit = { val allItems: ListBuffer [ItemViewCount ] = ListBuffer () import scala.collection.JavaConversions ._ for (item <- itemState.get) { allItems += item } itemState.clear() val sortedItems = allItems.sortBy(-_.count).take(topSize) val result = new StringBuilder result.append("====================================\n" ) result.append("时间: " ).append(new Timestamp (ts - 1 )).append("\n" ) for (i <- sortedItems.indices) { val currentItem = sortedItems(i) result.append("No" ) .append(i+1 ) .append(":" ) .append(" 商品ID=" ) .append(currentItem.itemId) .append(" 浏览量=" ) .append(currentItem.count) .append("\n" ) } result.append("====================================\n\n" ) Thread .sleep(1000 ) out.collect(result.toString()) } }
更换Kafka作为数据源
实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:
注意:这里Kafka的版本要用2.2!
添加依赖:
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
编写代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 val properties = new Properties ()properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "consumer-group" ) properties.setProperty( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) properties.setProperty( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) properties.setProperty("auto.offset.reset" , "latest" ) val env = StreamExecutionEnvironment .getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val stream = env .addSource(new FlinkKafkaConsumer [String ]( "hotitems" , new SimpleStringSchema (), properties) )
当然,根据实际的需要,我们还可以将Sink指定为Kafka、ES、Redis或其它存储,这里就不一一展开实现了。
kafka生产者程序
添加依赖
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka_2.11</artifactId > <version > 2.2.0</version > </dependency >
编写代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer , ProducerRecord }object KafkaProducerUtil { def main (args: Array [String ]): Unit = { writeToKafka("hotitems" ) } def writeToKafka (topic: String ): Unit = { val props = new Properties () props.put("bootstrap.servers" , "localhost:9092" ) props.put( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) props.put( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) val producer = new KafkaProducer [String , String ](props) val bufferedSource = io.Source .fromFile("UserBehavior.csv文件的绝对路径" ) for (line <- bufferedSource.getLines) { val record = new ProducerRecord [String , String ](topic, line) producer.send(record) } producer.close() } }
实时流量统计
基本需求
从web服务器的日志中,统计实时的访问流量
统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
解决思路
将apache服务器日志中的时间,转换为时间戳,作为Event Time
构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
数据准备
将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。
代码实现
我们现在要实现的模块是“实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。
具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 package com.ysss.projectimport java.sql.Timestamp import java.text.SimpleDateFormat import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.ListStateDescriptor import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer object ApacheLogAnalysis { case class ApacheLogEvent (ip: String , userId: String , eventTime: Long , method: String , url: String ) case class UrlViewCount (url: String , windowEnd: Long , count: Long ) def main (args: Array [String ] ) : Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val stream = env .readTextFile("apache.log的绝对路径" ) .map(line => { val linearray = line.split(" " ) val simpleDateFormat = new SimpleDateFormat ("dd/MM/yyyy:HH:mm:ss" ) val timestamp = simpleDateFormat.parse(linearray(3 )).getTime ApacheLogEvent (linearray(0 ), linearray(2 ), timestamp, linearray(5 ), linearray(6 )) }) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor [ApacheLogEvent ]( Time .milliseconds(1000 ) ) { override def extractTimestamp (t: ApacheLogEvent ): Long = { t.eventTime } } ) .keyBy(_.url) .timeWindow(Time .minutes(10 ), Time .seconds(5 )) .aggregate(new CountAgg (), new WindowResultFunction ()) .keyBy(_.windowEnd) .process(new TopNHotUrls (5 )) .print() env.execute("Traffic Analysis Job" ) } class CountAgg extends AggregateFunction [ApacheLogEvent , Long , Long ] { override def createAccumulator (): Long = 0 L override def add (apacheLogEvent: ApacheLogEvent , acc: Long ): Long = acc + 1 override def getResult (acc: Long ): Long = acc override def merge (acc1: Long , acc2: Long ): Long = acc1 + acc2 } class WindowResultFunction extends ProcessWindowFunction [Long , UrlViewCount , String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Long ], out: Collector [UrlViewCount ]): Unit = { out.collect(UrlViewCount (key, context.window.getEnd, elements.iterator.next())) } } class TopNHotUrls (topSize: Int ) extends KeyedProcessFunction [Long , UrlViewCount , String ] { lazy val urlState = getRuntimeContext.getListState( new ListStateDescriptor [UrlViewCount ]( "urlState-state" , Types .of[UrlViewCount ] ) ) override def processElement ( input: UrlViewCount , context: KeyedProcessFunction [Long , UrlViewCount , String ]#Context , collector: Collector [String ] ): Unit = { urlState.add(input) context .timerService .registerEventTimeTimer(input.windowEnd + 1 ) } override def onTimer ( timestamp: Long , ctx: KeyedProcessFunction [Long , UrlViewCount , String ]#OnTimerContext , out: Collector [String ] ): Unit = { val allUrlViews: ListBuffer [UrlViewCount ] = ListBuffer () import scala.collection.JavaConversions ._ for (urlView <- urlState.get) { allUrlViews += urlView } urlState.clear() val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering .Long .reverse) .take(topSize) var result: StringBuilder = new StringBuilder result .append("====================================\n" ) .append("时间: " ) .append(new Timestamp (timestamp - 1 )) .append("\n" ) for (i <- sortedUrlViews.indices) { val currentUrlView: UrlViewCount = sortedUrlViews(i) result .append("No" ) .append(i + 1 ) .append(": " ) .append(" URL=" ) .append(currentUrlView.url) .append(" 流量=" ) .append(currentUrlView.count) .append("\n" ) } result .append("====================================\n\n" ) Thread .sleep(1000 ) out.collect(result.toString) } } }
Uv统计的布隆过滤器实现 依赖:
1 2 3 4 5 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 2.8.1</version > </dependency >
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 package com.ysssimport com.ysss.UserBehavior .UserAction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger , TriggerResult }import org.apache.flink.streaming.api.windowing.triggers.Trigger .TriggerContext import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis object UvWithBloomFilter { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream = env .readTextFile("UserBehavior.csv的绝对路径" ) .map(line => { val arr = line.split("," ) UserAction (arr(0 ), arr(1 ), arr(2 ), arr(3 ), arr(4 ).toLong * 1000 ) }) .assignAscendingTimestamps(_.ts) .filter(_.behavior == "pv" ) .map(r => ("dummyKey" , r.userId)) .keyBy(_._1) .timeWindow(Time .minutes(60 ), Time .minutes(5 )) .trigger(new MyTrigger123 ) .process(new MyProcess ) stream.print() env.execute() } class MyProcess extends ProcessWindowFunction [(String , String ), (Long , Long ), String , TimeWindow ] { lazy val jedis = new Jedis ("localhost" , 6379 ) lazy val bloom = new Bloom (1 << 29 ) override def process (key: String , context: Context , vals: Iterable [(String , String )], out: Collector [(Long , Long )]): Unit = { val storeKey = context.window.getEnd.toString var count = 0 L if (jedis.hget("UvCountHashTable" , storeKey) != null ) { count = jedis.hget("UvCountHashTable" , storeKey).toLong } val userId = vals.last._2 val offset = bloom.hash(userId, 61 ) val isExist = jedis.getbit(storeKey, offset) if (!isExist) { jedis.setbit(storeKey, offset, true ) jedis.hset("UvCountHashTable" , storeKey, (count + 1 ).toString) } } } class MyTrigger123 extends Trigger [(String , String ), TimeWindow ] { override def onEventTime (time: Long , window: TimeWindow , ctx: TriggerContext ): TriggerResult = { if (ctx.getCurrentWatermark >= window.getEnd) { val jedis = new Jedis ("localhost" , 6379 ) val key = window.getEnd.toString TriggerResult .FIRE_AND_PURGE println(key, jedis.hget("UvCountHashTable" , key)) } TriggerResult .CONTINUE } override def onProcessingTime ( time: Long , window: TimeWindow , ctx: TriggerContext ): TriggerResult = { TriggerResult .CONTINUE } override def clear ( window: TimeWindow , ctx: Trigger .TriggerContext ): Unit = {} override def onElement (element: (String , String ), timestamp: Long , window: TimeWindow , ctx: TriggerContext ): TriggerResult = { TriggerResult .FIRE_AND_PURGE } } class Bloom (size: Long ) extends Serializable { private val cap = size def hash (value: String , seed: Int ): Long = { var result = 0 for (i <- 0 until value.length) { result = result * seed + value.charAt(i) } (cap - 1 ) & result } } }
APP分渠道数据统计 完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.ysssimport java.util.{Calendar , UUID }import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction .SourceContext import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.util.Random object AppMarketingByChannel { case class MarketingUserBehavior (userId: String , behavior: String , channel: String , ts: Long ) class SimulatedEventSource extends RichParallelSourceFunction [MarketingUserBehavior ] { var running = true val channelSet = Seq ("AppStore" , "XiaomiStore" ) val behaviorTypes = Seq ("BROWSE" , "CLICK" ) val rand = new Random override def run (ctx: SourceContext [MarketingUserBehavior ]): Unit = { while (running) { val userId = UUID .randomUUID().toString val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size)) val channel = channelSet(rand.nextInt(channelSet.size)) val ts = Calendar .getInstance().getTimeInMillis ctx.collect(MarketingUserBehavior (userId, behaviorType, channel, ts)) Thread .sleep(10 ) } } override def cancel (): Unit = running = false } def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream = env .addSource(new SimulatedEventSource ) .assignAscendingTimestamps(_.ts) .filter(_.behavior != "UNINSTALL" ) .map(r => { ((r.channel, r.behavior), 1 L) }) .keyBy(_._1) .timeWindow(Time .seconds(5 ), Time .seconds(1 )) .process(new MarketingCountByChannel ) stream.print() env.execute() } class MarketingCountByChannel extends ProcessWindowFunction [((String , String ), Long ), (String , Long , Long ), (String , String ), TimeWindow ] { override def process (key: (String ,String ), context: Context , elements: Iterable [((String , String ), Long )], out: Collector [(String , Long , Long )]): Unit = { out.collect((key._1, elements.size, context.window.getEnd)) } } }
APP不分渠道数据统计 完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.ysssimport com.ysss.AppMarketingByChannel .SimulatedEventSource import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object AppMarketingStatistics { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val stream = env .addSource(new SimulatedEventSource ) .assignAscendingTimestamps(_.ts) .filter(_.behavior != "UNINSTALL" ) .map(r => { ("dummyKey" , 1 L) }) .keyBy(_._1) .timeWindow(Time .seconds(5 ), Time .seconds(1 )) .process(new MarketingCountTotal ) stream.print() env.execute() } class MarketingCountTotal extends ProcessWindowFunction [(String , Long ), (String , Long , Long ), String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [(String , Long )], out: Collector [(String , Long , Long )]): Unit = { out.collect((key, elements.size, context.window.getEnd)) } } }
恶意登陆实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.ysssimport com.ysss.FlinkCepExample .LoginEvent import org.apache.flink.api.common.state.{ListStateDescriptor , ValueStateDescriptor }import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer object LoginFailWithoutCEP { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val stream = env .fromElements( LoginEvent ("1" , "0.0.0.0" , "fail" , "1" ), LoginEvent ("1" , "0.0.0.0" , "success" , "2" ), LoginEvent ("1" , "0.0.0.0" , "fail" , "3" ), LoginEvent ("1" , "0.0.0.0" , "fail" , "4" ) ) .assignAscendingTimestamps(_.ts.toLong * 1000 ) .keyBy(_.userId) .process(new MatchFunction ) stream.print() env.execute() } class MatchFunction extends KeyedProcessFunction [String , LoginEvent , String ] { lazy val loginState = getRuntimeContext.getListState( new ListStateDescriptor [LoginEvent ]("login-fail" , Types .of[LoginEvent ]) ) lazy val timestamp = getRuntimeContext.getState( new ValueStateDescriptor [Long ]("ts" , Types .of[Long ]) ) override def processElement ( value: LoginEvent , ctx: KeyedProcessFunction [String , LoginEvent , String ]#Context , out: Collector [String ] ): Unit = { if (value.loginStatus == "fail" ) { loginState.add(value) if (!timestamp.value()) { timestamp.update(value.ts.toLong * 1000 + 5000 L) ctx .timerService() .registerEventTimeTimer(value.ts.toLong * 1000 + 5000 L) } } if (value.loginStatus == "success" ) { loginState.clear() ctx .timerService() .deleteEventTimeTimer(timestamp.value()) } } override def onTimer ( ts: Long , ctx: KeyedProcessFunction [String , LoginEvent , String ]#OnTimerContext , out: Collector [String ] ): Unit = { val allLogins = ListBuffer [LoginEvent ]() import scala.collection.JavaConversions ._ for (login <- loginState.get) { allLogins += login } loginState.clear() if (allLogins.length > 1 ) { out.collect("5s以内连续两次登陆失败" ) } } } }
订单支付实时监控
基本需求
用户下单之后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
用户下单后15分钟未支付,则输出监控信息
解决思路
利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
使用Flink CEP来实现 在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。
我们将会利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”严格紧邻:
1 2 3 4 5 val orderPayPattern = Pattern .begin[OrderEvent ]("begin" ) .where(_.eventType == "create" ) .next("next" ) .where(_.eventType == "pay" ) .within(Time .seconds(5 ))
这样调用.select方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。 在src/main/scala下继续创建OrderTimeout.scala文件,新建一个单例对象。定义样例类OrderEvent,这是输入的订单事件流;另外还有OrderResult,这是输出显示的订单状态结果。由于没有现成的数据,我们还是用几条自定义的示例数据来做演示。 完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.util.Collector import scala.collection.Map case class OrderEvent (orderId: String , eventType: String , eventTime: String )object OrderTimeout { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) val orderEventStream = env.fromCollection(List ( OrderEvent ("1" , "create" , "1558430842" ), OrderEvent ("2" , "create" , "1558430843" ), OrderEvent ("2" , "pay" , "1558430844" ), OrderEvent ("3" , "pay" , "1558430942" ), OrderEvent ("4" , "pay" , "1558430943" ) )).assignAscendingTimestamps(_.eventTime.toLong * 1000 ) val orderPayPattern = Pattern .begin[OrderEvent ]("begin" ) .where(_.eventType.equals("create" )) .next("next" ) .where(_.eventType.equals("pay" )) .within(Time .seconds(5 )) val orderTimeoutOutput = OutputTag [OrderEvent ]("orderTimeout" ) val patternStream = CEP .pattern( orderEventStream.keyBy("orderId" ), orderPayPattern) val timeoutFunction = ( map: Map [String , Iterable [OrderEvent ]], timestamp: Long , out: Collector [OrderEvent ] ) => { print(timestamp) val orderStart = map.get("begin" ).get.head out.collect(orderStart) } val selectFunction = ( map: Map [String , Iterable [OrderEvent ]], out: Collector [OrderEvent ] ) => {} val timeoutOrder = patternStream .flatSelect(orderTimeoutOutput)(timeoutFunction)(selectFunction) timeoutOrder.getSideOutput(orderTimeoutOutput).print() env.execute } }
使用Process Function实现订单超时需求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package com.ysss.projectimport org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector object OrderTimeoutWIthoutCep { case class OrderEvent (orderId: String , eventType: String , eventTime: String ) def main (args: Array [String ] ) : Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val stream = env .fromElements( OrderEvent ("1" , "create" , "2" ), OrderEvent ("2" , "create" , "3" ), OrderEvent ("2" , "pay" , "4" ) ) .assignAscendingTimestamps(_.eventTime.toLong * 1000 L) .keyBy(_.orderId) .process(new OrderMatchFunc ) stream.print() env.execute() } class OrderMatchFunc extends KeyedProcessFunction [String , OrderEvent , String ] { lazy val orderState = getRuntimeContext.getState( new ValueStateDescriptor [OrderEvent ]("saved order" , Types .of[OrderEvent ]) ) override def processElement (value: OrderEvent , ctx: KeyedProcessFunction [String , OrderEvent , String ]#Context , out: Collector [String ]): Unit = { if (value.eventType.equals("create" )) { if (orderState.value() == null ) { orderState.update(value) } } else { orderState.update(value) } ctx.timerService().registerEventTimeTimer(value.eventTime.toLong * 1000 + 5000 L) } override def onTimer (timestamp: Long , ctx: KeyedProcessFunction [String , OrderEvent , String ]#OnTimerContext , out: Collector [String ]): Unit = { val savedOrder = orderState.value() if (savedOrder != null && savedOrder.eventType.equals("create" )) { out.collect("超时订单的ID为:" + savedOrder.orderId) } orderState.clear() } } }
实时对帐:实现两条流的Join 完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 import org.apache.flink.api.common.state.{ValueState , ValueStateDescriptor }import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.scala.OutputTag import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector case class OrderEvent (orderId: String , eventType: String , eventTime: String )case class PayEvent (orderId: String , eventType: String , eventTime: String )object TwoStreamsJoin { val unmatchedOrders = new OutputTag [OrderEvent ]("unmatchedOrders" ){} val unmatchedPays = new OutputTag [PayEvent ]("unmatchedPays" ){} def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime ) env.setParallelism(1 ) val orders = env .fromCollection(List ( OrderEvent ("1" , "create" , "1558430842" ), OrderEvent ("2" , "create" , "1558430843" ), OrderEvent ("1" , "pay" , "1558430844" ), OrderEvent ("2" , "pay" , "1558430845" ), OrderEvent ("3" , "create" , "1558430849" ), OrderEvent ("3" , "pay" , "1558430849" ) )).assignAscendingTimestamps(_.eventTime.toLong * 1000 ) .keyBy("orderId" ) val pays = env.fromCollection(List ( PayEvent ("1" , "weixin" , "1558430847" ), PayEvent ("2" , "zhifubao" , "1558430848" ), PayEvent ("4" , "zhifubao" , "1558430850" ) )).assignAscendingTimestamps(_.eventTime.toLong * 1000 ) .keyBy("orderId" ) val processed = orders .connect(pays) .process(new EnrichmentFunction ) processed.getSideOutput[PayEvent ](unmatchedPays).print() processed.getSideOutput[OrderEvent ](unmatchedOrders).print() env.execute } class EnrichmentFunction extends CoProcessFunction [ OrderEvent , PayEvent , (OrderEvent , PayEvent )] { lazy val orderState: ValueState [OrderEvent ] = getRuntimeContext .getState(new ValueStateDescriptor [OrderEvent ]("saved order" , classOf[OrderEvent ])) lazy val payState: ValueState [PayEvent ] = getRuntimeContext .getState(new ValueStateDescriptor [PayEvent ]("saved pay" , classOf[PayEvent ])) override def processElement1 ( order: OrderEvent , context: CoProcessFunction [OrderEvent , PayEvent , (OrderEvent , PayEvent )]#Context , out: Collector [(OrderEvent , PayEvent )] ): Unit = { val pay = payState.value() if (pay != null ) { payState.clear() out.collect((order, pay)) } else { orderState.update(order) context.timerService .registerEventTimeTimer(order.eventTime.toLong * 1000 ) } } override def processElement2 ( pay: PayEvent , context: CoProcessFunction [OrderEvent , PayEvent ,(OrderEvent , PayEvent )]#Context , out: Collector [(OrderEvent , PayEvent )] ): Unit = { val order = orderState.value() if (order != null ) { orderState.clear() out.collect((order, pay)) } else { payState.update(pay) context .timerService .registerEventTimeTimer(pay.eventTime.toLong * 1000 ) } } override def onTimer ( timestamp: Long , ctx: CoProcessFunction [OrderEvent , PayEvent , (OrderEvent , PayEvent )]#OnTimerContext , out: Collector [(OrderEvent , PayEvent )] ): Unit = { if (payState.value != null ) { ctx.output(unmatchedPays, payState.value) payState.clear() } if (orderState.value != null ) { ctx.output(unmatchedOrders, orderState.value) orderState.clear() } } } }