Flink的window机制
迪丽瓦拉
2025-06-01 17:22:07
0

Flink的window机制

1.窗口概述

​ 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个时间窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。所以窗口就算将无限数据切割成有限的“数据块”进行处理。

​ 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

​ 在Flink中, 窗口(window)是处理无界流的核心,窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算

2.窗口分类

窗口分为两大类:

  • 基于时间的窗口
    • 时间窗口以时间点到来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁
    • 窗口大小 = 结束时间 - 开始时间
  • 基于元素个数
    • 基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口
    • 只需指定窗口大小,就可以把数据分配到对应的窗口中

2-1.基于时间的窗口(时间驱动)

​ 时间窗口包含一个开始时间戳和结束时间戳(前闭后开), 这两个时间戳一起限制了窗口的尺寸。

​ 在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口。这个类提供了key查询开始时间戳和结束时间戳的方法,还提供了针对给定的窗口获取它允许的最大时间戳的方法maxTimestamp()

时间窗口有分为滚动窗口,滑动窗口,会话窗口。

2-1-1.滚动窗口(Tumbling Windows)

​ 滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙。例如指定一个长度为5分钟的滚动窗口,当前窗口开始计算,每5分钟启动一个新的窗口。
​ 滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据

代码示例:实验使用工具类BigdataUtil

package com.zenitera.bigdata.util;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;public class BigdataUtil {public static  List toList(Iterable it) {List list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}public static String toDateTime(long ts) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);}
}

代码示例:Time - Tumbling Windows

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Tumbling Windows*/
public class Flink01_Window_Time_01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为5的滚动窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction() {  //ProcessWindowFunction@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + "  " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
p1,3,10
w1,5,20
w1,5,20
w1,5,20
w1,5,20
-----------------------------
窗口: 2023-03-22 14:52:05 2023-03-22 14:52:10, key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:52:20 2023-03-22 14:52:25, key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:52:25 2023-03-22 14:52:30, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 14:52:55 2023-03-22 14:53:00, key:w1  [WaterSensor(id=w1, ts=5, vc=20)]
窗口: 2023-03-22 14:53:00 2023-03-22 14:53:05, key:w1  [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)]*/

2-1-2.滑动窗口(Sliding Windows)

​ 与滚动窗口一样, 滑动窗口也是有固定的长度。另外一个参数我们叫滑动步长,用来控制滑动窗口启动的频率。

​ 如果滑动步长小于窗口长度,滑动窗口会重叠, 这种情况下,一个元素可能会被分配到多个窗口中。

​ 例如滑动窗口长度10分钟,滑动步长5分钟, 则每5分钟会得到一个包含最近10分钟的数据。

sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据

代码示例:Time - Sliding Windows

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Sliding Windows*/
public class Flink01_Window_Time_02 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)//定义一个滑动窗口: 长度是5s, 滑动是2秒.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))).process(new ProcessWindowFunction() {  //ProcessWindowFunction@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + "  " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
-----------------------------
窗口: 2023-03-22 14:59:26 2023-03-22 14:59:31, key:a1  [WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:28 2023-03-22 14:59:33, key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:30 2023-03-22 14:59:35, key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:32 2023-03-22 14:59:37, key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:38 2023-03-22 14:59:43, key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:40 2023-03-22 14:59:45, key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:42 2023-03-22 14:59:47, key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:52 2023-03-22 14:59:57, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 14:59:54 2023-03-22 14:59:59, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:04 2023-03-22 15:00:09, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:06 2023-03-22 15:00:11, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:08 2023-03-22 15:00:13, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]*/

2-1-3.会话窗口(Session Windows)

​ 会话窗口分配器会根据活动的元素进行分组。会话窗口不会有重叠,与滚动窗口和滑动窗口相比,会话窗口也没有固定的开启和关闭时间。

​ 如果会话窗口有一段时间没有收到数据,会话窗口会自动关闭,这段没有收到数据的时间就是会话窗口的gap(间隔)。

​ 我们可以配置静态的gap,也可以通过一个gap extractor 函数来定义gap的长度。当时间超过了这个gap,当前的会话窗口就会关闭,后序的元素会被分配到一个新的会话窗口。

创建原理:
因为会话窗口没有固定的开启和关闭时间,所以会话窗口的创建和关闭与滚动,滑动窗口不同。在Flink内部,每到达一个新的元素都会创建一个新的会话窗口,如果这些窗口彼此相距比较定义的gap小,则会对他们进行合并。为了能够合并,会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction

代码示例:Time - Session Windows

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Session Windows*/
public class Flink01_Window_Time_03 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义一个session窗口: gap是3s.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + "  " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
-----------------------------
窗口: 2023-03-22 15:04:59 2023-03-22 15:05:04, key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 15:05:07 2023-03-22 15:05:12, key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 15:05:16 2023-03-22 15:05:22, key:p1  [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:05:23 2023-03-22 15:05:26, key:p1  [WaterSensor(id=p1, ts=3, vc=10)]Process finished with exit code -1*/

2-2.基于元素个数的窗口(数据驱动)

  • 按照指定的数据条数生成一个Window,与时间无关

2-2-1.滚动窗口

​ 默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

代码示例:

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** 基于元素个数 - 滚动窗口*/
public class Flink02_Window_Count_01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口.countWindow(3).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);out.collect(" key:" + key + "  " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
p1,3,10
p1,3,10
w1,5,20
w1,5,20
---------------------key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]key:p1  [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:p1  [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]*/

2-2-2.滑动窗口

​ 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素

代码示例:

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** 基于元素个数 - 滑动窗口*/
public class Flink02_Window_Count_02 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3(窗口内元素的最大个数), 滑动步长为2的的基于个数的滑动窗口.countWindow(3, 2).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);out.collect(" key:" + key + "  " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
w1,5,20
w1,5,20
w2,6,22
---------------------
key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:a1  [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:u1  [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]key:p1  [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:p1  [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:w1  [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)]*/

2-3.全局窗口(Global Windows)(自定义触发器)

​ 全局窗口分配器会分配相同key的所有元素进入同一个 Global window。这种窗口机制只有指定自定义的触发器时才有用。否则不会做任何计算,因为这种窗口没有能够处理聚集在一起元素的结束点。

3.窗口函数

​ 前面指定了窗口的分配器,接着我们需要来指定如何计算,这事由window function来负责。一旦窗口关闭,window function 去计算处理窗口中的每个元素。
​ window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种。
​ ReduceFunction,AggregateFunction更加高效,原因就是Flink可以对到来的元素进行增量聚合。ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器,以及这些元素所属窗口的一些元数据信息。
​ ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前,需要在内部缓存这个窗口上所有的元素。

3-1ProcessWindowFunction

代码示例:

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** ProcessWindowFunction*/
public class Flink03_Window_ProcessFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (value1, value2) -> {value1.setVc(value1.getVc() + value2.getVc());return value1;},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {WaterSensor result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
2023-03-22 16:05:20 2023-03-22 16:05:25 WaterSensor(id=a1, ts=1, vc=6)
2023-03-22 16:05:25 2023-03-22 16:05:30 WaterSensor(id=a1, ts=1, vc=3)
2023-03-22 16:05:30 2023-03-22 16:05:35 WaterSensor(id=u1, ts=2, vc=12)
2023-03-22 16:05:40 2023-03-22 16:05:45 WaterSensor(id=p1, ts=3, vc=10)
2023-03-22 16:05:45 2023-03-22 16:05:50 WaterSensor(id=p1, ts=3, vc=20)*/

3-2.ReduceFunction

代码示例:

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** ReduceFunction*/
public class Flink03_Window_ReduceFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (value1, value2) -> {value1.setVc(value1.getVc() + value2.getVc());return value1;},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {WaterSensor result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
2023-03-22 16:13:05 2023-03-22 16:13:10 WaterSensor(id=a1, ts=1, vc=3)
2023-03-22 16:13:10 2023-03-22 16:13:15 WaterSensor(id=a1, ts=1, vc=6)
2023-03-22 16:13:15 2023-03-22 16:13:20 WaterSensor(id=u1, ts=2, vc=4)
2023-03-22 16:13:20 2023-03-22 16:13:25 WaterSensor(id=u1, ts=2, vc=8)
2023-03-22 16:13:25 2023-03-22 16:13:30 WaterSensor(id=p1, ts=3, vc=30)*/

3-3.AggregateFunction

代码示例:

package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Flink03_Window_AggregateFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.count++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.count;}@Overridepublic Avg merge(Avg a, Avg b) {return null;}},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {Double result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(key + " " + stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class Avg {public Integer sum = 0;public Long count = 0L;}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
a1 2023-03-22 16:19:45 2023-03-22 16:19:50 3.0
a1 2023-03-22 16:19:50 2023-03-22 16:19:55 3.0
u1 2023-03-22 16:19:55 2023-03-22 16:20:00 4.0
u1 2023-03-22 16:20:00 2023-03-22 16:20:05 4.0
p1 2023-03-22 16:20:05 2023-03-22 16:20:10 10.0
p1 2023-03-22 16:20:10 2023-03-22 16:20:15 10.0*/

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...