1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* {
* 1:[2,3,4],
* 2:[3,4,5]
* }
* ↓
* {
* 2:[1],
* 3:[1,2],
* 4:[1,2],
* 5:[2]
* }
*/

flink由上转成下面的数据格式

  1. 打散

    1
    2
    3
    4
    5
    6
    2:1,
    3:1,
    4:1,
    3:2,
    4:2,
    5:2

    2.再根据key分组并且reduce

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    DataSet<Tuple2<String, String>> test = ...
    test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
    @Override
    public void flatMap(Tuple2<String, String> value, Collector<Tuple2<String, String>> out) throws Exception {
    Tuple2<String, String> tuple2 = new Tuple2<>();
    String key = value.f0;
    String values = value.f1;
    String[] splitCmId = values.split(",");
    for (String cmId : splitCmId) {
    tuple2.setField(cmId, 0);
    tuple2.setField(key, 1);
    out.collect(tuple2);
    }
    }
    }).
    groupBy(0).
    reduce(new ReduceFunction<Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
    Tuple2<String, String> tuple2 = new Tuple2<>();
    tuple2.setField(value1.f0, 0);
    String f1 = value1.f1 + "," + value2.f1;
    // 这里可能重复,去重
    Set<String> set = new HashSet<>(Arrays.asList(f1.split(",")));
    tuple2.setField(String.join(",", set), 1);
    return tuple2;
    }
    });