Pyflink1.10 已经支持python lambda udf的开发 也可以调用javaudf 和 ScalarFunction

python udf比较简单

Python UDF

如下: input_types 为输入 注意此处为复数时是数组 result_type 为输出 在这里进行一个简单的print.

1
2
3
4
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING(), name="pr")
def pr(str):
print(str)
return str

需要先进行注册后调用

1
2
3
4
5
6
st_env.register_function('pr',pr) 
st_env.from_path('source') \
.window(Session.with_gap('20.minutes').on('rowtime').alias('w'))\
.group_by('w,carnum')\
.select('carnum,cast(time.count ,int) as count, pr(carnum) as t')\
.insert_into('mySink')

Java UDF

java udf 必须继承 ScalarFunction 并且有无参构造器 且方法名必须为eval

1
2
3
4
5
6
7
8
public class HashCode extends ScalarFunction {

private int factor = 12;

public int eval(String s) {
return s.hashCode() * factor;
}
}

注册如下 调用如python udf

"my.java.function.HashCode")```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17


需要将文件编译打包后放入pyflink环境中 $pyflink_home/lib 下

### Scala UDF

scala udf 方法申明跟JAVA类似 方法注册跟Python类似

直接在python文件中写代码 定义一个class 继承ScalarFunction 方法名为eval

```scala
class Add(ScalarFunction):
def eval(self, i, j):
return i + j

add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
调用如 table_env.register_function("add", add)

其它见官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html

官方测试用例 https://github.com/apache/flink/blob/42f49482a3217287abba5e8903cf50791acf287a/flink-python/pyflink/table/tests/test_udtf.py