flink 如何实现对watermark 的checkpoint,防止数据复写

摘要:
finkslink后的数据被复写了???生产环境总会遇到各种各样的莫名其名的数据,一但考虑不周便是车毁人亡啊。线上sink流是es,es的文档id是自定义的id+windowSatarTime设windowsize=10min,watermark最大延迟时间是10s,.数据中的eventtime是乱序到达的,数据最大延迟时间是30minwatermark生成函数assignTimestampsAndWatermarks如果现在是10:15分,当前win的窗口是[10:10,10:20),意味着[09:40,09:50,10:00]的统计值已经生成。此时,程序发生异常,并有checkpoint+resart策略,那么重启后,watermark会继续从断点处消费?答案是不会,watermark会从0开始增长,window也会从新开始。

fink slink 后的数据被复写了???

生产环境总会遇到各种各样的莫名其名的数据,一但考虑不周便是车毁人亡啊。


线上sink 流是es , es 的文档id 是自定义的 id+windowSatarTime

设window size = 10min , watermark 最大延迟时间是 10s,. 数据中的event time 是乱序到达的,数据最大延迟时间是 30min

watermark 生成函数

assignTimestampsAndWatermarks(newAssignerWithPeriodicWatermarks[Goods] {
        val maxOutOfOrderness = 2L //最大无序数据到达的时间,用来生成水印2ms
        var currentMaxTimestamp: Long =_
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")

        override def getCurrentWatermark: Watermark ={
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp -maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long ={
          currentMaxTimestamp =Math.max(element.time, currentMaxTimestamp)
          element.time
        }
      })

如果现在是10:15 分,当前win的窗口是 [10:10,10:20),意味着[09:40,09:50,10:00] 的统计值已经生成 。

此时,程序发生异常,并有checkpoint + resart 策略,那么重启后,watermark 会继续从断点处消费?window 是否还是[10:10,10:20)?

答案是不会,watermark 会从0开始增长,window 也会从新开始。

重启后,如果不幸第一条数据的eventtime 是 09:45:02 , 那么此时 watermark 是 09:45:00 , window 是 [09:40:09:50), 一段时间后数据再次会聚合生条es 记录文档 [id+09:40], sink 时之前的es 数据会被覆盖

测试:

2020-10-21 23:57:01.001 -------watermark: -2input:Goods(id=1,count=10,time=10)               //输入: 1,10,10
()
2020-10-21 23:57:01.001 -------watermark: 8
.... 2020-10-21 23:57:04.004 -------watermark: 8 //输入: 0,0,0 触发异常,重启 2020-10-21 23:57:09.009 -------watermark: -2 // watermark 重新开始
.... 2020-10-21 23:57:17.017 -------watermark: -2input:Goods(id=1,count=10,time=10) () 2020-10-21 23:57:17.017 -------watermark: 8
...

解决:

这里的currentMaxTimestamp 本质可以看做是 Operator State , 那么可以通过实现CheckpointedFunction、ListCheckpointed接口来保存这个state

修改后的water mark 函数

.assignTimestampsAndWatermarks(newAssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
        val maxOutOfOrderness = 2L //最大无序数据到达的时间,用来生成水印2ms
        var currentMaxTimestamp: Long =_

        override def getCurrentWatermark: Watermark ={
          println("watermark", currentMaxTimestamp -maxOutOfOrderness)
          new Watermark(currentMaxTimestamp -maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long ={
          currentMaxTimestamp =Math.max(element.time, currentMaxTimestamp)
          element.time
        }

        override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] ={
          Collections.singletonList(currentMaxTimestamp)
        }

        override def restoreState(state: util.List[JavaLong]): Unit ={
          val stateMin =state.asScala.min
          if (stateMin > 0) currentMaxTimestamp =stateMin
        }
      })

测试:

2020-10-22 00:39:00.000 -------watermark: -2input:Goods(id=1,count=10,time=10)      //输入: 1,10,10
()
2020-10-22 00:39:00.000 -------watermark: 8...
2020-10-22 00:39:03.003 -------watermark: 8input:Goods(id=0,count=0,time=0)        //输入: 0,0,0 触发异常,重启
2020-10-22 00:39:08.008 -------watermark: 8  //从 checkpoints 中获取state
...
2020-10-22 00:39:23.023 -------watermark: 8input:Goods(id=1,count=20,time=20)   //输入: 1,20,20
()
2020-10-22 00:39:23.023 -------watermark: 18....

完整测试程序

flink 如何实现对watermark 的checkpoint,防止数据复写第1张flink 如何实现对watermark 的checkpoint,防止数据复写第2张
importjava.util.{Collections, Date}
importjava.util

importscala.collection.JavaConverters._
import java.lang.{Long =>JavaLong}
importjava.text.SimpleDateFormat
importjava.util.concurrent.TimeUnit

importorg.apache.flink.api.common.restartstrategy.RestartStrategies
importorg.apache.flink.api.common.time.Time
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.contrib.streaming.state.RocksDBStateBackend
importorg.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
importorg.apache.flink.streaming.api.checkpoint.ListCheckpointed
importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
importorg.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
importorg.apache.flink.streaming.api.watermark.Watermark

/*** CheckpointCount
 */object WatermarkCheckpoint {

  case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
    override def toString: String = s"Goods(id=$id,count=$count,time=$time)"}

  def main(args: Array[String]): Unit ={
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(1000 * 10)
    env.getCheckpointConfig.setCheckpointTimeout(1000 * 60) //checkpoint 超时时间
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 5) //两次 checkpoint 的最小间隔
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //checkpoint 模式
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //checkpoint 并发数
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //cancel job 时持久化checkopint
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false) //当checkpoint 失败时不会导致任务失败终止
    //restart strategy
env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))
    )
    //state backend
    val file_rocksdb = "file:///tmp/state/rocksdb"  //需要提前建立路径
    env.setStateBackend(new RocksDBStateBackend(file_rocksdb, true))
    env.setParallelism(1)

    env.socketTextStream("localhost", 9999)
      .filter(_.nonEmpty)
      .map(x =>{
        val arr = x.split(",")
        val g = Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) //id,count,time
        println(s"input:$g")
        g
      })

      //watermark 没有 checkpoint
      /*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
        val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
        var currentMaxTimestamp: Long = _

        override def getCurrentWatermark: Watermark = {
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
          currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
          element.time
        }
      })*/

      //watermark  checkpoint
      .assignTimestampsAndWatermarks(newAssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
        val maxOutOfOrderness = 2L //最大无序数据到达的时间,用来生成水印2ms
        var currentMaxTimestamp: Long =_

        override def getCurrentWatermark: Watermark ={
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp -maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long ={
          currentMaxTimestamp =Math.max(element.time, currentMaxTimestamp)
          element.time
        }

        override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] ={
          Collections.singletonList(currentMaxTimestamp)
        }

        override def restoreState(state: util.List[JavaLong]): Unit ={
          val stateMin =state.asScala.min
          if (stateMin > 0) currentMaxTimestamp =stateMin
        }
      })

      .map(x =>{
        if (x.id == 0) throw new RuntimeException("id is 0")
      })
      .print()

    env.execute(this.getClass.getSimpleName)
  }
}
完整测试代码

免责声明:文章转载自《flink 如何实现对watermark 的checkpoint,防止数据复写》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇java调用ip138实现ip地址查询使用boost.python进行混合开发下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Element UI Table合并行

Vue使用Element-ui Table 合并行,官方只是一个非常简单的合并例子,通常业务都是相同的某个字段进行合并。 效果图 代码实现 1、Table <el-table :data="dataTable" border :header-cell-style="{background: '#FAFAFA', textAlign:'center'...

EJB3 阶段总结+一个EJB3案例 (2)

这篇博文接着上一篇博文的EJB案例。 在上一篇博文中,将程序的架构基本给描述出来了,EJB模块分为5层。 1)DB层,即数据库层     在则一部分,我使用的数据库为mysql。在EJB程序中,访问数据库是通过Jboss中配置好的数据源进行的,然后在数据库中建立相应的数据库,不用建立表,在程序中使用JPA后通过Jboss启动会自动在数据库中间表     具...

gprof的使用介绍

转于:http://blog.chinaunix.net/uid-25194149-id-3215487.html #不知道这是在哪里找的了,感谢各位~性能分析工具gprof介绍Ver:1.0目录1. GPROF介绍 42. 使用步骤 43. 使用举例 43.1 测试环境 43.2 测试代码 43.3 数据分析 53.3.1 flat profile模式...

element el-upload组件获取文件名

组件的连接:http://element-cn.eleme.io/#/zh-CN/component/upload 需求:点x按钮,获取文件名传到后端服务,把文件从服务器删除 分析: 仔细看文档,会发现默认传有两个参数file,fileList,都是object类型;file.name就可以拿到当前操作的文件名...

Linux下对date和timestamp的互转

需要确保时区是正确的 若不是CST就要 cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime   http://www.linuxidc.com/Linux/2010-09/28537.htm linux下对date和timestamp的互转 1. date 到 timestamp:$ date -d '2...

Jquery实现鼠标双击Table单元格变成文本框,输入内容并更新到数据库

JS鼠标双击事件 onDblClick  <td width="10%" title="双击修改" ondblclick="ShowElement(this,<%#Eval("id") %></td>  这里的本人用绑定的值是传的当前行对应的ID号, function ShowElement(element, prod...