WIN10+JDK1.8+IDEA2021+Maven3.6.3
CEP额外依赖为flink-cep
8 8 1.14.6 2.12 1.18.24
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.apache.flink flink-cep_${scala.binary.version} ${flink.version}
监测 严格近邻的连续三次a的事件流
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CepPractice {public static void main(String[] args) throws Exception {//创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//添加数据源,确定水位线策略SingleOutputStreamOperator d = env.fromElements("c", "a", "a", "a", "a", "b", "a", "a").assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> 1L));//定义模式Pattern p = Pattern.begin("first").where(new SimpleCondition() {@Overridepublic boolean filter(String value) {return value.equals("a");}}).next("second").where(new SimpleCondition() {@Overridepublic boolean filter(String value) {return value.equals("a");}}).next("third").where(new SimpleCondition() {@Overridepublic boolean filter(String value) {return value.equals("a");}});//在流上匹配模型PatternStream patternStream = CEP.pattern(d, p);//使用select方法将匹配到的事件流取出patternStream.select((PatternSelectFunction) map -> {//Map的key是事件名称(上面的first、second和third)//Map的key对应的value是列表,储存匹配到的事件String first = map.get("first").toString();String second = map.get("second").toString();String third = map.get("third").toString();return first + "->" + second + "->" + third;}).print();//执行env.execute();}
}
打印结果
[a]->[a]->[a]
[a]->[a]->[a]
留意.times(3).consecutive()
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;public class CepPractice2 {public static void main(String[] args) throws Exception {//创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//添加数据源,确定水位线策略SingleOutputStreamOperator> d = env.fromElements(Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("a", 3000L),Tuple2.of("a", 4000L), Tuple2.of("b", 5000L), Tuple2.of("a", 6000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner((element, recordTimestamp) -> element.f1));//定义模式Pattern, Tuple2> p = Pattern.>begin("=a").where(new SimpleCondition>() {@Overridepublic boolean filter(Tuple2 value) {return value.f0.equals("a");}}).times(3).consecutive(); //严格连续//在流上匹配模型PatternStream> patternStream = CEP.pattern(d, p);//使用select方法将匹配到的事件流取出patternStream.select((PatternSelectFunction, String>) map -> {//Map的key是事件名称(上面的first、second和third)//Map的key对应的value是列表,储存匹配到的事件List> ls = map.get("=a");String first = ls.get(0).f0;String second = ls.get(1).f0;String third = ls.get(2).f0;return String.join("=>", first, second, third);}).print();//执行env.execute();}
}
上一篇:多方协同!四川天府新区分层分类实施科学教育 多方协同!四川天府新区分层分类实施科学教育
下一篇:女乒选拔剧变,马琳:输外战扣分!陈梦王艺迪受益,王曼昱最亏? 女乒选拔剧变,马琳:输外战扣分!陈梦王艺迪受益,王曼昱最亏?