1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| DataSet<Tuple2<String, String>> test = ... test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() { @Override public void flatMap(Tuple2<String, String> value, Collector<Tuple2<String, String>> out) throws Exception { Tuple2<String, String> tuple2 = new Tuple2<>(); String key = value.f0; String values = value.f1; String[] splitCmId = values.split(","); for (String cmId : splitCmId) { tuple2.setField(cmId, 0); tuple2.setField(key, 1); out.collect(tuple2); } } }). groupBy(0). reduce(new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { Tuple2<String, String> tuple2 = new Tuple2<>(); tuple2.setField(value1.f0, 0); String f1 = value1.f1 + "," + value2.f1; Set<String> set = new HashSet<>(Arrays.asList(f1.split(","))); tuple2.setField(String.join(",", set), 1); return tuple2; } });
|