ProcessWindowFunction
一些业务场景,我们需要收集窗口内所有的数据进行计算,例如计算窗口数据的中位数,或者计算窗口数据中出现频率最高的值。这样的需求,使用ReduceFunction和AggregateFunction就无法实现了。这个时候就需要ProcessWindowFunction了。
先来看接口定义
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { // Evaluates the window void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out) throws Exception; // Deletes any custom per-window state when the window is purged public void clear(Context ctx) throws Exception {} // The context holding window metadata public abstract class Context implements Serializable { // Returns the metadata of the window public abstract W window(); // Returns the current processing time public abstract long currentProcessingTime(); // Returns the current event-time watermark public abstract long currentWatermark(); // State accessor for per-window state public abstract KeyedStateStore windowState(); // State accessor for per-key global state public abstract KeyedStateStore globalState(); // Emits a record to the side output identified by the OutputTag. public abstract <X> void output(OutputTag<X> outputTag, X value); } }
process()方法接受的参数为:
window的key,
Iterable迭代器包含窗口的所有元素,
Collector用于输出结果流。
Context参数和别的process方法一样。而ProcessWindowFunction的Context对象还可以访问window的元数据(窗口开始和结束时间),当前处理时间和水位线,per-window state和per-key global state,side outputs。
- per-window state: 用于保存一些信息,这些信息可以被process()访问,只要process所处理的元素属于这个窗口。
- per-key global state: 同一个key,也就是在一条KeyedStream上,不同的window可以访问per-key global state保存的值。
实例一:
例子:计算5s滚动窗口中的最低和最高的温度。输出的元素包含了(流的Key, 最低温度, 最高温度, 窗口结束时间)。
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData .keyBy(_.id) .timeWindow(Time.seconds(5)) .process(new HighAndLowTempProcessFunction) case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long) class HighAndLowTempProcessFunction extends ProcessWindowFunction[SensorReading,MinMaxTemp, String, TimeWindow] { override def process(key: String, ctx: Context, vals: Iterable[SensorReading], out: Collector[MinMaxTemp]): Unit = { val temps = vals.map(_.temperature) val windowEnd = ctx.window.getEnd out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd)) } }
实例二:
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值 // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10} // API // IN: 输入元素类型 // OUT: 输出元素类型 // KEY: Key类型 // W: Window类型 public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { ...... public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; ........ } // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品总价值(ProcessWindowFunction) kafkaStream // 将从Kafka获取的JSON数据解析成Java Bean .process(new KafkaProcessFunction()) // 提取时间戳生成水印 .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) // 按用户分组 .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) // 构造TimeWindow .timeWindow(Time.seconds(windowLengthSeconds)) // 窗口函数: 用ProcessWindowFunction计算这段时间内每个用户浏览的商品总价值 .process(new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception { int sum=0; for (UserActionLog element : elements) { sum += element.getProductPrice(); } String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品总价值: "+sum; out.collect(record); } }) .print(); // 结果 Key: user_1 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 60 Key: user_5 窗口开始时间: 2019-11-09 13:32:00 窗口结束时间: 2019-11-09 13:32:10 浏览的商品总价值: 30 Key: user_5 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 80 Key: user_3 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 40 Key: user_4 窗口开始时间: 2019-11-09 13:32:10 窗口结束时间: 2019-11-09 13:32:20 浏览的商品总价值: 70
实例三:
ProcessWindowFunction业务实践:每隔5秒统计每个基站的日志数量
1.创建日志数据对象
case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)
2.业务实现
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * 全量聚合函数 */ object TestProcessFunctionByWindow { // 每隔5秒统计每个基站的日志数量 def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // source var stream = env.socketTextStream("flink101", 8888) .map(line => { var arr = line.split(",") Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) // 设置并行度 stream.setParallelism(1) stream.map(log=> (log.sid, 1)) .keyBy(_._1) // .timeWindow(Time.seconds(5)) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new MyProcessWindowFunction) // 一个窗口结束的时候调用一次(在一个并行度中) .print() env.execute("TestReduceFunctionByWindow") } } class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] { // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出 override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = { // 聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据, Iterable的size就是日志的总行数 out.collect(key, elements.size) } }