在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个时间窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。所以窗口就算将无限数据切割成有限的“数据块”进行处理。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心,窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算
窗口分为两大类:
时间窗口包含一个开始时间戳和结束时间戳(前闭后开), 这两个时间戳一起限制了窗口的尺寸。
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口。这个类提供了key查询开始时间戳和结束时间戳的方法,还提供了针对给定的窗口获取它允许的最大时间戳的方法maxTimestamp()
时间窗口有分为滚动窗口,滑动窗口,会话窗口。
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙。例如指定一个长度为5分钟的滚动窗口,当前窗口开始计算,每5分钟启动一个新的窗口。
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据
代码示例:实验使用工具类BigdataUtil
package com.zenitera.bigdata.util;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;public class BigdataUtil {public static List toList(Iterable it) {List list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}public static String toDateTime(long ts) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);}
}
代码示例:Time - Tumbling Windows
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Tumbling Windows*/
public class Flink01_Window_Time_01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为5的滚动窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction() { //ProcessWindowFunction@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + " " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
p1,3,10
w1,5,20
w1,5,20
w1,5,20
w1,5,20
-----------------------------
窗口: 2023-03-22 14:52:05 2023-03-22 14:52:10, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:52:20 2023-03-22 14:52:25, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:52:25 2023-03-22 14:52:30, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 14:52:55 2023-03-22 14:53:00, key:w1 [WaterSensor(id=w1, ts=5, vc=20)]
窗口: 2023-03-22 14:53:00 2023-03-22 14:53:05, key:w1 [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)]*/
与滚动窗口一样, 滑动窗口也是有固定的长度。另外一个参数我们叫滑动步长,用来控制滑动窗口启动的频率。
如果滑动步长小于窗口长度,滑动窗口会重叠, 这种情况下,一个元素可能会被分配到多个窗口中。
例如滑动窗口长度10分钟,滑动步长5分钟, 则每5分钟会得到一个包含最近10分钟的数据。
sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据
代码示例:Time - Sliding Windows
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Sliding Windows*/
public class Flink01_Window_Time_02 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)//定义一个滑动窗口: 长度是5s, 滑动是2秒.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))).process(new ProcessWindowFunction() { //ProcessWindowFunction@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + " " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
-----------------------------
窗口: 2023-03-22 14:59:26 2023-03-22 14:59:31, key:a1 [WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:28 2023-03-22 14:59:33, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:30 2023-03-22 14:59:35, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:32 2023-03-22 14:59:37, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 14:59:38 2023-03-22 14:59:43, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:40 2023-03-22 14:59:45, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:42 2023-03-22 14:59:47, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 14:59:52 2023-03-22 14:59:57, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 14:59:54 2023-03-22 14:59:59, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:04 2023-03-22 15:00:09, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:06 2023-03-22 15:00:11, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:00:08 2023-03-22 15:00:13, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]*/
会话窗口分配器会根据活动的元素进行分组。会话窗口不会有重叠,与滚动窗口和滑动窗口相比,会话窗口也没有固定的开启和关闭时间。
如果会话窗口有一段时间没有收到数据,会话窗口会自动关闭,这段没有收到数据的时间就是会话窗口的gap(间隔)。
我们可以配置静态的gap,也可以通过一个gap extractor 函数来定义gap的长度。当时间超过了这个gap,当前的会话窗口就会关闭,后序的元素会被分配到一个新的会话窗口。
创建原理:
因为会话窗口没有固定的开启和关闭时间,所以会话窗口的创建和关闭与滚动,滑动窗口不同。在Flink内部,每到达一个新的元素都会创建一个新的会话窗口,如果这些窗口彼此相距比较定义的gap小,则会对他们进行合并。为了能够合并,会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
代码示例:Time - Session Windows
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** Time - Session Windows*/
public class Flink01_Window_Time_03 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义一个session窗口: gap是3s.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect("窗口: " + stt + " " + edt + ", key:" + key + " " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
-----------------------------
窗口: 2023-03-22 15:04:59 2023-03-22 15:05:04, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]
窗口: 2023-03-22 15:05:07 2023-03-22 15:05:12, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]
窗口: 2023-03-22 15:05:16 2023-03-22 15:05:22, key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]
窗口: 2023-03-22 15:05:23 2023-03-22 15:05:26, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]Process finished with exit code -1*/
默认的CountWindow
是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
代码示例:
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** 基于元素个数 - 滚动窗口*/
public class Flink02_Window_Count_01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口.countWindow(3).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);out.collect(" key:" + key + " " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
p1,3,10
p1,3,10
w1,5,20
w1,5,20
---------------------key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]*/
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size
,一个是sliding_size
。下面代码中的sliding_size
设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素
代码示例:
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.List;/*** 基于元素个数 - 滑动窗口*/
public class Flink02_Window_Count_02 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3(窗口内元素的最大个数), 滑动步长为2的的基于个数的滑动窗口.countWindow(3, 2).process(new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {List list = BigdataUtil.toList(elements);out.collect(" key:" + key + " " + list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
p1,3,10
w1,5,20
w1,5,20
w2,6,22
---------------------
key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)]key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)]key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)]key:w1 [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)]*/
全局窗口分配器会分配相同key的所有元素进入同一个 Global window。这种窗口机制只有指定自定义的触发器时才有用。否则不会做任何计算,因为这种窗口没有能够处理聚集在一起元素的结束点。
前面指定了窗口的分配器,接着我们需要来指定如何计算,这事由window function来负责。一旦窗口关闭,window function 去计算处理窗口中的每个元素。
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种。
ReduceFunction,AggregateFunction更加高效,原因就是Flink可以对到来的元素进行增量聚合。ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器,以及这些元素所属窗口的一些元数据信息。
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前,需要在内部缓存这个窗口上所有的元素。
代码示例:
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** ProcessWindowFunction*/
public class Flink03_Window_ProcessFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (value1, value2) -> {value1.setVc(value1.getVc() + value2.getVc());return value1;},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {WaterSensor result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
2023-03-22 16:05:20 2023-03-22 16:05:25 WaterSensor(id=a1, ts=1, vc=6)
2023-03-22 16:05:25 2023-03-22 16:05:30 WaterSensor(id=a1, ts=1, vc=3)
2023-03-22 16:05:30 2023-03-22 16:05:35 WaterSensor(id=u1, ts=2, vc=12)
2023-03-22 16:05:40 2023-03-22 16:05:45 WaterSensor(id=p1, ts=3, vc=10)
2023-03-22 16:05:45 2023-03-22 16:05:50 WaterSensor(id=p1, ts=3, vc=20)*/
代码示例:
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** ReduceFunction*/
public class Flink03_Window_ReduceFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (value1, value2) -> {value1.setVc(value1.getVc() + value2.getVc());return value1;},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {WaterSensor result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
2023-03-22 16:13:05 2023-03-22 16:13:10 WaterSensor(id=a1, ts=1, vc=3)
2023-03-22 16:13:10 2023-03-22 16:13:15 WaterSensor(id=a1, ts=1, vc=6)
2023-03-22 16:13:15 2023-03-22 16:13:20 WaterSensor(id=u1, ts=2, vc=4)
2023-03-22 16:13:20 2023-03-22 16:13:25 WaterSensor(id=u1, ts=2, vc=8)
2023-03-22 16:13:25 2023-03-22 16:13:30 WaterSensor(id=p1, ts=3, vc=30)*/
代码示例:
package com.zenitera.bigdata.window;import com.zenitera.bigdata.bean.WaterSensor;
import com.zenitera.bigdata.util.BigdataUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Flink03_Window_AggregateFunction {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.socketTextStream("localhost", 6666).map(line -> {String[] data = line.split(",");return new WaterSensor(String.valueOf(data[0]),Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.count++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.count;}@Overridepublic Avg merge(Avg a, Avg b) {return null;}},new ProcessWindowFunction() {@Overridepublic void process(String key,Context ctx,Iterable elements,Collector out) throws Exception {Double result = elements.iterator().next();String stt = BigdataUtil.toDateTime(ctx.window().getStart());String edt = BigdataUtil.toDateTime(ctx.window().getEnd());out.collect(key + " " + stt + " " + edt + " " + result);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}public static class Avg {public Integer sum = 0;public Long count = 0L;}
}/*
D:\netcat-win32-1.12>nc64.exe -lp 6666
a1,1,3
a1,1,3
a1,1,3
u1,2,4
u1,2,4
u1,2,4
p1,3,10
p1,3,10
p1,3,10
---------------------
a1 2023-03-22 16:19:45 2023-03-22 16:19:50 3.0
a1 2023-03-22 16:19:50 2023-03-22 16:19:55 3.0
u1 2023-03-22 16:19:55 2023-03-22 16:20:00 4.0
u1 2023-03-22 16:20:00 2023-03-22 16:20:05 4.0
p1 2023-03-22 16:20:05 2023-03-22 16:20:10 10.0
p1 2023-03-22 16:20:10 2023-03-22 16:20:15 10.0*/
上一篇:【STM32学习】定时器寄存器配置、功能工作过程详解
下一篇:看他人代码小总结