Pyflink目前只有table api 如下源代码文件列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| table __init__.py catalog.py descriptors.py environment_settings.py sinks.py sources.py sql_dialect.py table.py table_config.py table_environment.py table_schema.py types.py udf.py window.py
|
只能如hive 表一样对数据进行SQL操作 不能如JAVA一样写trigger 和 evictor
常用的如下
Tumble
1 2 3 4 5
| .window(Tumble.over("10.minutes").on("proctime").alias("w"))
.window(Tumble.over("10.rows").on("proctime").alias("w"));
|
Slide
1 2 3 4 5
| .window(Slide.over("10.minutes").every("5.minutes").on("proctime").alias("w"))
.window(Slide.over("10.rows").every("5.rows").on("proctime").alias("w"))
|
Session
1 2 3 4 5
| .window(Session.with_gap("10.minutes").on("rowtime").alias("w"))
.window(Session.with_gap("10.minutes").on("proctime").alias("w"))
|
Over
1 2 3 4 5 6 7 8 9 10 11
| .over_window(Over.partition_by("a").order_by("proctime").preceding("unbounded_range").alias("w"))
.over_window(Over.partition_by("a").order_by("rowtime").preceding("unbounded_row").alias("w"))
.over_window(Over.partition_by("a").order_by("proctime").preceding("1.minutes").alias("w"))
.over_window(Over.partition_by("a").order_by("rowtime").preceding("10.rows").alias("w"))
|
更多可以在官方文档中查看 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#group-windows
也可以在官方github中查看其测试unit https://github.com/apache/flink/blob/42f49482a3217287abba5e8903cf50791acf287a/flink-python/pyflink/table/tests/test_window.py