MAPREDUCE服务 MRS-PyFlink样例程序代码说明:通过Python API的方式提交Flink SQL作业到Yarn上代码样例
通过Python API的方式提交Flink SQL作业到Yarn上代码样例
下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。
完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。
import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql()
参数 |
说明 |
示例 |
---|---|---|
file_path |
“datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" |
file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql |
SQL示例:
create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
- 什么是数据湖探索服务_数据湖探索DLI用途与特点
- MapReduce服务_什么是Flink_如何使用Flink
- MapReduce服务_什么是Hue_如何使用Hue
- 什么是Spark_如何使用Spark_Spark的功能是什么
- MapReduce服务_什么是Yarn_如何使用Yarn
- MapReduce服务_如何使用MapReduce服务_MRS集群客户端安装与使用
- 数据治理中心_数据开发_数据开发示例_使用教程-华为云
- 什么是Flink OpenSource SQL_数据湖探索_Flink OpenSource SQL
- MapReduce服务_什么是Loader_如何使用Loader
- 数据治理中心_数据开发_数据开发能力_脚本和节点介绍-华为云