Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

Flink流处理操作符

Byron_NG 2019-01-22 10:54:00 阅读数:140 评论数:0 点赞数:0 收藏数:0

一、工程创建与准备

使用maven进行工程创建,且采用提供的flink-quickstart模版,便利很多。

$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.2

本实验的数据采用自拟电影评分数据(userId, movieId, rating, timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共10个档位,timestamp为10000-20000之间的随机数,且数据顺序采用timestamp的升序排列。(2.1-2.6节的数据是乱序)

由于该文只是为了熟悉操作符的用法,所以数据自拟更有针对性。

二、操作符

2.0 Baseline

以下是本次实验的baseline,源source为kafka提供,所以还需要建立一个将数据一个个放入kafka的class。

这里的流对象使用的是POJO类型,即MovieRate类,在之后的各种操作符的使用中也更加方便。

public class Baseline {
public static void main(String[] args) throws Exception {
// 1. Get an ExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test2");
properties.setProperty("auto.offset.reset", "earliest");
// 2. Get the source
FlinkKafkaConsumer011<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>(
"test2",
new MovieRateSchema(),
properties);
DataStream<MovieRate> rates = env
.addSource(myConsumer);
// 3. Set the sink
rates.print();
// 4. Execute
env.execute();
}
}
...
1> 50,181,1.5,13667
1> 53,83,1,11838
1> 87,112,3.5,11701
1> 66,199,5,12427
...

2.1 Filter

对读入的每个element执行bool操作,保留返回True的element。

这里,我们新建一个MovieFilter过滤器,对movieId大于100的电影过滤掉。

// Baseline
DataStream<MovieRate> filteredRate = rates
.filter(new MovieFilter());
// MovieFilter
public static class MovieFilter implements FilterFunction<MovieRate>{
@Override
public boolean filter(MovieRate movieRate) throws Exception {
if (movieRate.movieId > 100){
return false;
} else {
return true;
}
}
}

运行后,可以看到movieId大于100的日志已经被过滤:

...
1> 74,36,3.5,14522
1> 90,46,4.5,14166
1> 3,52,1.5,12222
1> 19,36,1.0,12055
...

2.2 Map

DataStream DataStream

对流中的每一个元素进行转换。

如下列表示将刚才处理后的每个element的评分️2

//Baseline
DataStream<MovieRate> filteredRate = rates
.filter(new MovieFilter())
.map(new RateMap());
// RateMap
public static class RateMap implements MapFunction<MovieRate, MovieRate>{
@Override
public MovieRate map(MovieRate movieRate) throws Exception {
movieRate.rate = 2 * movieRate.rate;
return movieRate;
}
}

这样的话可以看到评分已经变成的以10为满分的整数了:

...
1> 86,65,6.0,14262
1> 56,32,10.0,19835
1> 36,54,5.0,19076
1> 8,21,4.0,12728
...

另外,map可以让我们很容易的使用lambdas。例如,如果我只想返回每个element的userId的话,我可以这样写:

// Baseline
DataStream<Integer> filteredRate = rates
.filter(new MovieFilter())
.map(value -> value.userId);

2.3 FlatMap

DataStream DataStream

读入一个元素,返回转换后的0个、1个或者多个元素。

事实上,FlatMap可以干Map可以干的任何事情。比如下面的代码是跟上面同样的操作,返回每个element的userId:

// Baseline
DataStream<Integer> filteredRate = rates
.filter(new MovieFilter())
.map(new RateMap())
.flatMap(new FlatMapFunction<MovieRate, Integer>() {
@Override
public void flatMap(MovieRate movieRate, Collector<Integer> collector) throws Exception {
collector.collect(movieRate.userId);
}
});

记得有三个地方需要改,上面代码中所有Integer的地方。

2.4 KeyBy

DataStream KeyedStream

逻辑上将流分区为不相交的分区(partitions),每个分区包含相同key的element。在内部通过hash分区来实现。

这里可以使用一个lambda来表示我们需要分区的key,这里我们的key选取的是rate:

DataStream<MovieRate> filteredRate = rates
.filter(new MovieFilter())
.map(new RateMap())
.keyBy((MovieRate rate) -> rate.rate); // .keyBy(MovieRate::getRate)

所以,按道理来说,拥有不同rate的elements应该出现在不同的partitions中。但是,我指定来最大的partitions数量,所以下面可以看出,有不同rate的elements可能出现在不一样的partitions中,但是有相同rate的elements一定出现在相同的partitions中。

...
1> 86,65,6.0,14262
3> 56,32,10.0,19835
1> 36,54,5.0,19076
4> 8,21,4.0,12728
3> 31,89,8.0,16671
3> 66,54,7.0,19989
4> 42,15,4.0,10613
4> 8,18,4.0,14237
3> 13,68,7.0,18793
3> 11,74,10.0,12379
3> 40,12,8.0,11450
...

2.5 Reduce

KeyedStream DataStream

在一个KeyedStream上不断进行reduce操作。

下面的例子是将相同的user进行的评分相加:

// Baseline
DataStream<MovieRate> filteredRate = rates
.filter(new MovieFilter())
.map(new RateMap())
.keyBy(MovieRate::getUserId)
.reduce(new RateReduce());
// RateReduce
public static class RateReduce implements ReduceFunction<MovieRate>{
@Override
public MovieRate reduce(MovieRate movieRate, MovieRate t1) throws Exception {
movieRate.setRate(movieRate.getRate()+t1.getRate());
return movieRate;
}
}

可以看到,有一些打印出来的元素他们的rate已经超过了10:

...
4> 16,35,3.0,10545
1> 8,15,11.0,14503
4> 79,33,18.0,12774
4> 88,53,8.0,14387
4> 19,43,13.0,10626
...

2.6 Aggregations

KeyedStream DataStream

在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值最最小值的所有元素(对于max和maxBy相同)

这次我先把我们的流从POJO类型转换为Tuple4类型,然后再keyBy(userId)后进行sum()的操作。

DataStream<Tuple4<Integer,Integer,Double,Long>> tupleRate = rates
.map(new MapFunction<MovieRate, Tuple4<Integer, Integer, Double, Long>>() {
@Override
public Tuple4<Integer, Integer, Double, Long> map(MovieRate m) throws Exception {
return Tuple4.of(m.getUserId(),m.getMovieId(),m.getRate(),m.getTimeStamp());
}
})
.keyBy(0)
.sum(2);

2.7 Window

这里需要介绍的就很多了,这一节并不对Flink的window机制进行介绍,而还是主要介绍window的基本操作用法。

对keyedStream进行时间窗口处理,返回的是Windowedstream

KeyedStream WindowedStream

WindowedStream<MovieRate> tupleRate = rates
.keyBy((MovieRate rate)->rate.userId)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))

这里介绍三种windows:

一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。

  1. Time window

    Time Window 是根据时间对数据流进行分组的。Flink 提出了三种时间的概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间)

    • Event time

      以下例子为事件时间窗口为2毫秒,每个用户在两毫秒时间内对于电影所有评分的平均分。

      public static void main(String[] args) throws Exception {
      // 1. Get an ExecutionEnvironment
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //设定事件时间处理模式
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      Properties properties = new Properties();
      properties.setProperty("zookeeper.connect", "localhost:2181");
      properties.setProperty("bootstrap.servers", "localhost:9092");
      properties.setProperty("group.id", "test");
      properties.setProperty("auto.offset.reset", "none");
      // 2. Get the source
      FlinkKafkaConsumerBase<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>(
      "test2",
      new MovieRateSchema(),
      properties)
      //分配时间戳
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MovieRate>() {
      @Override
      public long extractAscendingTimestamp(MovieRate movieRate) {
      return movieRate.timeStamp;
      }
      });
      DataStream<MovieRate> rates = env
      .addSource(myConsumer);
      DataStream<Tuple2<Integer,Double>> tupleRate = rates
      .keyBy((MovieRate rate)->rate.userId)
      // 设定事件时间窗口为2毫秒
      .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
      .apply(new WindowFunction<MovieRate, Tuple2<Integer, Double>, Integer, TimeWindow>() {
      @Override
      public void apply(Integer integer, TimeWindow timeWindow, Iterable<MovieRate> iterable, Collector<Tuple2<Integer, Double>> collector) throws Exception {
      double sum = 0;
      int count = 0;
      for (MovieRate t : iterable){
      sum += t.rate;
      count += 1;
      }
      collector.collect(Tuple2.of(integer, sum/count));
      }
      });
    • Processing time

      这就跟当前处理时的前来的事件有关了。相同的例子,只需改一句即可,最前面的环境设置也需要改动

      DataStream<Tuple2<Integer,Double>> tupleRate = rates
      .keyBy((MovieRate rate)->rate.userId)
      // 将Event改为Processing
      .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
      .apply(...);
  2. Count window

    这种window通过计数两进行分组,比如说每次有100个数据进行一个窗口封装。

  3. Session window

    在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。

更多关于windows的用法,请见另一篇博文:

2.8 Connect & Split & Select

2.8.1 connect

DataStream,DataStream ConnectedStream

“连接”两个数据流。这两个数据流可以是任意形式的,里面的元素不一定要格式一样。

我们可以这样想象,现在我们的数据流是某个用户对于某个电影在某个时间点的评分。就豆瓣而言,评分完成后还可以该电影写短评,这就又构成了一个数据流。把这两个流连接并进行匹配将会是该操作符的一个应用。

2.8.2 split & select

DataStream SplitStream DataStream

有连接就自然有分割,该操作符将一个数据流分割成若干个数据流。

下面的例子,同样的数据集,将userId大于50和小于50的分成old和young两个数据集,打印young数据集。select操作符用于选择Splitstream中特定的stream。

SplitStream<MovieRate> splitRate = rates
.split(new OutputSelector<MovieRate>() {
@Override
public Iterable<String> select(MovieRate movieRate) {
// 这个list储存每个splitstream的标签
List<String> output = new ArrayList<String>();
if (movieRate.userId < 50){
output.add("young");
} else{
output.add("old");
}
return output;
}
});
DataStream<MovieRate> young = splitRate.select("young");
young.print()

可见,输出中只包含了userId小于50的数据:

1> 30,67,2.5,10338
1> 15,172,1.0,10338
1> 37,154,0.5,10338
1> 37,172,1.0,10339
1> 31,66,1.0,10339
...

2.9 Iterate

DataStream IterativeStream DataStream

通过重定向操作符,建立一个回馈循环。在建立迭代算法的时候想必是很有用的。

使用上一节产生的young流,每次迭代将每个record的分数减半,直到分数不大于1为止,并返回不大于1时最后的结果。

IterativeStream<MovieRate> iteration = young.iterate();
// 每次迭代的操作
DataStream<MovieRate> iterationBody = iteration.map(new MapFunction<MovieRate, MovieRate>() {
@Override
public MovieRate map(MovieRate movieRate) throws Exception {
movieRate.rate = movieRate.rate / 2;
return movieRate;
}
});
// 迭代结束的标志
DataStream<MovieRate> feedback = iterationBody.filter(new FilterFunction<MovieRate>() {
@Override
public boolean filter(MovieRate movieRate) throws Exception {
return movieRate.rate > 1;
}
});
// 建立迭代
iteration.closeWith(feedback);
// 由于iterationBody包含所有迭代中间操作的数据,这里需要迭代之后的过滤
DataStream<MovieRate> output = iterationBody.filter(new FilterFunction<MovieRate>() {
@Override
public boolean filter(MovieRate movieRate) throws Exception {
return movieRate.rate <= 1;
}
});
output.print();

可以看到,输出的records分数都小于1:

...
1> 12,132,0.625,10500
1> 48,13,0.875,10500
1> 22,128,0.75,10500
1> 9,53,0.5,10501
1> 3,58,0.5,10501
...

2.10 Extract Timestamps

DataStream DataStream

提取时间戳,也可以说是分配时间戳。这个在2.7节的代码中也有提到,当要使用事件时间作为窗口依据时,在原数据中通常都会有一列代表着时间戳,这个时候需要在Datastream流中声明该列为时间戳列。嗯,就是这样。

stream.assignTimestamps(new TimeStampExtractor(){...});

2.11 Project

DataStream DataStream

在tuple流中选取指定的列作为新的流。

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Reference:

  1. Flink 原理与实现:Window 机制
  2. Flink官方网站
版权声明
本文为[Byron_NG]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/bjwu/p/10284657.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;