1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* {
* 1:[2,3,4],
* 2:[3,4,5]
* }
* ↓
* {
* 2:[1],
* 3:[1,2],
* 4:[1,2],
* 5:[2]
* }
*/

flink由上转成下面的数据格式

  1. 打散
    1
    2
    3
    4
    5
    6
    2:1,
    3:1,
    4:1,
    3:2,
    4:2,
    5:2
    2.再根据key分组并且reduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
DataSet<Tuple2<String, String>> test = ...
test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<Tuple2<String, String>> out) throws Exception {
Tuple2<String, String> tuple2 = new Tuple2<>();
String key = value.f0;
String values = value.f1;
String[] splitCmId = values.split(",");
for (String cmId : splitCmId) {
tuple2.setField(cmId, 0);
tuple2.setField(key, 1);
out.collect(tuple2);
}
}
}).
groupBy(0).
reduce(new ReduceFunction<Tuple2<String, String>>() {
@Override
public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
Tuple2<String, String> tuple2 = new Tuple2<>();
tuple2.setField(value1.f0, 0);
String f1 = value1.f1 + "," + value2.f1;
// 这里可能重复,去重
Set<String> set = new HashSet<>(Arrays.asList(f1.split(",")));
tuple2.setField(String.join(",", set), 1);
return tuple2;
}
});