FLINK基础(107): DS算子与窗口(18)窗口 (3) window functions(二)ProcessWindowFunction

摘要:
此时,需要ProcessWindowFunction。首先查看接口定义publicabstractclassProcessWindowFunction<IN,OUT,KEY,WeextendsWindow˃extendsAbstractRichFunction{//通过异常评估windowvoidprocess;//在窗口被清除时删除自定义窗口状态通过异常{}//上下文窗口元数据公共抽象类上下文实现可串行化{//返回窗口公共抽象Wwindow()的元数据;//返回当前处理时间公共抽象longcurrentProcessingTime() ;// 返回当前事件-timewatermarkpublicabstractlongcurrentWatermark();//用于per-windowstatepublicabstractKeyedStateStorewindowState()的状态访问器;//状态访问器forper-keyglobalstatepublicabstractKeyedStateStoreglobalState();//发射记录到由OutputTag标识的输出。publicabstract<X>voidoutput;}}process()方法接受的参数是:窗口的键,Iterable迭代器包含窗口的所有元素,收集器用于输出结果流。Context参数与其他流程方法相同。ProcessWindowFunction的Context对象还可以访问窗口的元数据、当前处理时间和水位、每个windowsstate、每个keyglobalstate和sideoutput。Per-keyglobalstate:对于同一个键,即在KeyedStream上,不同的窗口可以访问由Per-keyglobalstate保存的值。示例1:示例:计算5s滚动窗口中的最低和最高温度。

ProcessWindowFunction

  一些业务场景,我们需要收集窗口内所有的数据进行计算,例如计算窗口数据的中位数,或者计算窗口数据中出现频率最高的值。这样的需求,使用ReduceFunction和AggregateFunction就无法实现了。这个时候就需要ProcessWindowFunction了。

先来看接口定义

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  extends AbstractRichFunction {

  // Evaluates the window
  void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out)
    throws Exception;

  // Deletes any custom per-window state when the window is purged
  public void clear(Context ctx) throws Exception {}

  // The context holding window metadata
  public abstract class Context implements Serializable {
    // Returns the metadata of the window
    public abstract W window();

    // Returns the current processing time
    public abstract long currentProcessingTime();

    // Returns the current event-time watermark
    public abstract long currentWatermark();

    // State accessor for per-window state
    public abstract KeyedStateStore windowState();

    // State accessor for per-key global state
    public abstract KeyedStateStore globalState();

    // Emits a record to the side output identified by the OutputTag.
    public abstract <X> void output(OutputTag<X> outputTag, X value);
  }
}

process()方法接受的参数为:

  window的key,

  Iterable迭代器包含窗口的所有元素,

  Collector用于输出结果流。

  Context参数和别的process方法一样。而ProcessWindowFunction的Context对象还可以访问window的元数据(窗口开始和结束时间),当前处理时间和水位线,per-window state和per-key global state,side outputs。

  • per-window state: 用于保存一些信息,这些信息可以被process()访问,只要process所处理的元素属于这个窗口。
  • per-key global state: 同一个key,也就是在一条KeyedStream上,不同的window可以访问per-key global state保存的值。

实例一:

例子:计算5s滚动窗口中的最低和最高的温度。输出的元素包含了(流的Key, 最低温度, 最高温度, 窗口结束时间)。

val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
  .keyBy(_.id)
  .timeWindow(Time.seconds(5))
  .process(new HighAndLowTempProcessFunction)

case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)

class HighAndLowTempProcessFunction  extends ProcessWindowFunction[SensorReading,MinMaxTemp, String, TimeWindow] {
  override def process(key: String,
                       ctx: Context,
                       vals: Iterable[SensorReading],
                       out: Collector[MinMaxTemp]): Unit = {
    val temps = vals.map(_.temperature)
    val windowEnd = ctx.window.getEnd

    out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
  }
}

实例二:

// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API
// IN:  输入元素类型
// OUT: 输出元素类型
// KEY: Key类型
// W: Window类型
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    ......
    
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    
    ........
}

// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品总价值(ProcessWindowFunction)
kafkaStream
    // 将从Kafka获取的JSON数据解析成Java Bean
    .process(new KafkaProcessFunction())
    // 提取时间戳生成水印
    .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
    // 按用户分组
    .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
    // 构造TimeWindow
    .timeWindow(Time.seconds(windowLengthSeconds))
    // 窗口函数: 用ProcessWindowFunction计算这段时间内每个用户浏览的商品总价值
    .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {

            int sum=0;
            for (UserActionLog element : elements) {
                sum += element.getProductPrice();
            }

            String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
            String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

            String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品总价值: "+sum;
            out.collect(record);

        }
    })
    .print();

// 结果
Key: user_1 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 60
Key: user_5 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 30
Key: user_5 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 80
Key: user_3 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 40
Key: user_4 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 70

实例三:

ProcessWindowFunction业务实践:每隔5秒统计每个基站的日志数量

1.创建日志数据对象

case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)

2.业务实现

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
 
/**
  * 全量聚合函数
  */
object TestProcessFunctionByWindow {
   
  // 每隔5秒统计每个基站的日志数量
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // source
    var stream = env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })
 
    // 设置并行度
    stream.setParallelism(1)
    stream.map(log=> (log.sid, 1))
      .keyBy(_._1)
//        .timeWindow(Time.seconds(5))
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction)  // 一个窗口结束的时候调用一次(在一个并行度中)
        .print()
 
 
    env.execute("TestReduceFunctionByWindow")
  }
}
 
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
 
  // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
    // 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据, Iterable的size就是日志的总行数
    out.collect(key, elements.size)
  }
}

免责声明:文章转载自《FLINK基础(107): DS算子与窗口(18)窗口 (3) window functions(二)ProcessWindowFunction》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇ImagesLazyLoad 图片延迟加载效果android studio 导入外部库文件,以及将项目中module变成library引用依赖下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Oracle 11c下载 及连接到OracleDB的简单程序

Oracle官网总是不太贴心。还是网友贴心。 https://pan.baidu.com/s/1ZCFLUi4Ti_WUYOFR3gB2dA 是11g版本下载包,下载下来解压就能用了。 安装完毕后,驱动包在【oralcehome】product11.2.0dbhome_1jdbclib下。  访问Oracle的JDBC程序,建表请见 https://www...

第三方授权认证(一)实现第三方授权登录、分享以及获取用户资料

转载请注明出处:http://blog.csdn.net/yangyu20121224/article/details/9057257             由于公司项目的需要,要实现在项目中使用第三方授权登录以及分享文字和图片等这样的效果,几经波折,查阅了一番资料,做了一个Demo。实现起来的效果还是不错的,不敢独享,决定写一个总结的教程,供大家互相交...

[Swift]Scanner字符串扫描类

★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★➤微信公众号:山青咏芝(shanqingyongzhi)➤博客园地址:山青咏芝(https://www.cnblogs.com/strengthen/)➤GitHub地址:https://github.com/strengthen/LeetCode➤原文地址:https://w...

Spring batch学习 (1)

          Spring Batch 批处理框架 埃森哲和Spring Source研发                          主要解决批处理数据的问题,包含并行处理,事务处理机制等。具有健壮性 可扩展,和自带的监控功能,并且支持断点和重发。让程序员更加注重于业务实现。           Spring Batch 结构如下      ...

IDEA使用switch传入String编译不通过

今天在使用IDEA的时候,用到switch分支语句,传入String参数的时候一直报错,下面是源码报错截图: 看错误提示并没有提到switch支持String类型,不过ava1.7之后就支持String类型才对呀,于是想到了会不会是JDK问题,但是JDK用的是1.8呀!网上搜索才发现,会不会是编译环境版本过低的原因呢?于是查看对IDEA的ProjectS...

C# EPL USB 指令打印

private void btnPrinter_Click(object sender, EventArgs e) { #region ESC 热敏图像点阵像素点读取打印 //Bitmap bitmap = new Bitmap(@"D:450X100.bmp");...