##

虽然pyflink里支持python的UDF 但是有一定的局限性,比如groupby聚合算子,没法使用python 所以需要开发一下javaudf

源码定义如下 只能是ScalarFunction TableFunction AggregateFunction

1
2
3
4
5
def register_java_function(self, name, function_class_name):
"""
Registers a java user defined function under a unique name. Replaces already existing
user-defined functions under this name. The acceptable function type contains
**ScalarFunction**, **TableFunction** and **AggregateFunction**.

我们需要一个聚合算子 所以需要继承 AggregateFunction

新建一个JAVA工程 地址 https://git.chinawayltd.com/rdp/pyflink-adapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.g7.rdp.pyflink.adapter;
import org.apache.flink.table.functions.AggregateFunction;
public class ConcatAggregateFunction extends AggregateFunction<String,StrConcatAcc> {
private static final long serialVersionUID = -2678065132752935739L;
private static final String DELIMITER = ",";
public void accumulate(StrConcatAcc acc, String value) throws Exception {
if (value != null) {
acc.list.add(value);
}
}
public void merge(StrConcatAcc acc, Iterable<StrConcatAcc> its) throws Exception {
for (StrConcatAcc otherAcc : its) {
Iterable<String> accList = otherAcc.list.get();
if (accList != null) {
for (String value : accList) {
acc.list.add(value);
}
}
}
}
@Override
public StrConcatAcc createAccumulator() {
return new StrConcatAcc();
}
@Override
public String getValue(StrConcatAcc acc) {
try {
Iterable<String> accList = acc.list.get();
if (accList == null || !accList.iterator().hasNext()) {
return null;
} else {
StringBuilder builder = new StringBuilder();
boolean isFirst = true;
for (String value : accList) {
if (!isFirst) {
builder.append(DELIMITER);
}
builder.append(value);
isFirst = false;
}
return builder.toString();
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T等参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void

开发完成后将项目打成jar包 放入 $pyflink_home/lib 下

代码中加入 register_java_function('methond','classpath') python中定义调用method 和java class路径即可

https://segmentfault.com/a/1190000018109879

https://www.alibabacloud.com/help/zh/doc-detail/69553.htm