使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以实现以上需求。
scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMapWithState[(String, Double, Double), Double] { case (in: SensorReading, None) => // no previous temperature defined. // Just update the last temperature (List.empty, Some(in.temperature)) case (r: SensorReading, lastTemp: Some[Double]) => // compare temperature difference with threshold val tempDiff = (r.temperature - lastTemp.get).abs if (tempDiff > 1.7) { // threshold exceeded. // Emit an alert and update the last temperature (List((r.id, r.temperature, tempDiff)), Some(r.temperature)) } else { // threshold not exceeded. Just update the last temperature (List.empty, Some(r.temperature)) } }
使用ListCheckpointed接口来实现操作符的列表状态(List State)
操作符状态会在操作符的每一个并行实例中去维护。一个操作符并行实例上的所有事件都可以访问同一个状态。Flink支持三种操作符状态:list state, list union state, broadcast state。
// index of the subtask privatelazyval subtaskIdx = getRuntimeContext .getIndexOfThisSubtask // local count variable privatevar highTempCnt = 0L
overridedefflatMap( in: SensorReading, out: Collector[(Int, Long)]): Unit = { if (in.temperature > threshold) { // increment counter if threshold is exceeded highTempCnt += 1 // emit update with subtask index and counter out.collect((subtaskIdx, highTempCnt)) } }
overridedefrestoreState( state: util.List[java.lang.Long]): Unit = { highTempCnt = 0 // restore state by adding all longs of the list for (cnt <- state.asScala) { highTempCnt += cnt } }
overridedefsnapshotState( chkpntId: Long, ts: Long): java.util.List[java.lang.Long] = { // snapshot state as list with a single count java.util.Collections.singletonList(highTempCnt) } }
overridedefsnapshotState( chkpntId: Long, ts: Long): java.util.List[java.lang.Long] = { // split count into ten partial counts val div = highTempCnt / 10 val mod = (highTempCnt % 10).toInt // return count as ten parts (List.fill(mod)(new java.lang.Long(div + 1)) ++ List.fill(10 - mod)(new java.lang.Long(div))).asJava }
val sensorData: DataStream[SensorReading] = ... val thresholds: DataStream[ThresholdUpdate] = ... val keyedSensorData: KeyedStream[SensorReading, String] = sensorData .keyBy(_.id)
// the descriptor of the broadcast state val broadcastStateDescriptor = newMapStateDescriptor[String, Double]( "thresholds", classOf[String], classOf[Double])
val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds .broadcast(broadcastStateDescriptor)
// connect keyed sensor stream and broadcasted rules stream val alerts: DataStream[(String, Double, Double)] = keyedSensorData .connect(broadcastThresholds) .process(newUpdatableTemperatureAlertFunction())
// the descriptor of the broadcast state privatelazyval thresholdStateDescriptor = newMapStateDescriptor[String, Double]( "thresholds", classOf[String], classOf[Double])
// the keyed state handle privatevar lastTempState: ValueState[Double] = _
overridedefopen(parameters: Configuration): Unit = { // create keyed state descriptor val lastTempDescriptor = newValueStateDescriptor[Double]( "lastTemp", classOf[Double]) // obtain the keyed state handle lastTempState = getRuntimeContext .getState[Double](lastTempDescriptor) }
overridedefprocessBroadcastElement( update: ThresholdUpdate, ctx: KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { // get broadcasted state handle val thresholds = ctx .getBroadcastState(thresholdStateDescriptor)
if (update.threshold != 0.0d) { // configure a new threshold for the sensor thresholds.put(update.id, update.threshold) } else { // remove threshold for the sensor thresholds.remove(update.id) } }
overridedefprocessElement( reading: SensorReading, readOnlyCtx: KeyedBroadcastProcessFunction [String, SensorReading, ThresholdUpdate, (String, Double, Double)]#ReadOnlyContext, out: Collector[(String, Double, Double)]): Unit = { // get read-only broadcast state val thresholds = readOnlyCtx .getBroadcastState(thresholdStateDescriptor) // check if we have a threshold if (thresholds.contains(reading.id)) { // get threshold for sensor val sensorThreshold: Double = thresholds.get(reading.id)
// fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > sensorThreshold) { // temperature increased by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } }
// update lastTemp state this.lastTempState.update(reading.temperature) } }
配置检查点
10秒钟保存一次检查点。
scala
1 2 3 4
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set checkpointing interval to 10 seconds (10000 milliseconds) env.enableCheckpointing(10000L)
val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMap(newTemperatureAlertFunction(1.1)) .uid("TempAlert")
指定操作符的最大并行度
操作符的最大并行度定义了操作符的keyed state可以被分到多少个key groups中。
scala
1 2 3 4 5 6 7 8 9 10
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set the maximum parallelism for this application env.setMaxParallelism(512)
val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMap(newTemperatureAlertFunction(1.1)) // set the maximum parallelism for this operator and // override the application-wide value .setMaxParallelism(1024)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val checkpointPath: String = ??? // configure path for checkpoints on the remote filesystem // env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
val backend = newRocksDBStateBackend(checkpointPath) // configure the state backend env.setStateBackend(backend)
// the keyed state handle for the last temperature privatevar lastTempState: ValueState[Double] = _ // the keyed state handle for the last registered timer privatevar lastTimerState: ValueState[Long] = _
overridedefopen(parameters: Configuration): Unit = { // register state for last temperature val lastTempDesc = newValueStateDescriptor[Double]( "lastTemp", classOf[Double]) lastTempState = getRuntimeContext .getState[Double](lastTempDescriptor) // register state for last timer val lastTimerDesc = newValueStateDescriptor[Long]( "lastTimer", classOf[Long]) lastTimerState = getRuntimeContext .getState(timestampDescriptor) }
// compute timestamp of new clean up timer // as record timestamp + one hour val newTimer = ctx.timestamp() + (3600 * 1000) // get timestamp of current timer val curTimer = lastTimerState.value() // delete previous timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimer) ctx.timerService().registerEventTimeTimer(newTimer) // update timer timestamp state lastTimerState.update(newTimer)
// fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { // temperature increased by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) }
// update lastTemp state this.lastTempState.update(reading.temperature) }