MAPREDUCE服务 MRS-Python样例代码:代码样例
代码样例
下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。
完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。
import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql()
参数 |
说明 |
示例 |
---|---|---|
file_path |
“datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明:
当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" |
file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql |
SQL示例:
create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
- 低代码开发平台好用吗_低代码平台_Astro低代码-华为云
- 低代码开发平台_低代码平台Astro_低代码开发是什么-华为云
- 代码检查工具_代码检查平台_代码检查CodeArts Check-华为云
- 代码检查快速入门_ 代码检查操作流程_代码检查CodeArts Check-华为云
- 代码检查_代码检查如何设置规则集_代码检查CodeArts Check-华为云
- 低代码开发平台_华为云低代码_Astro Zero
- 如何进行软件代码检查_ 代码检查的特性_代码检查CodeArts Check-华为云
- Git在代码托管服务实践_代码托管服务_代码托管工具-华为云
- 软件开发生产线_玩转代码托管CodeArts Repo_代码管理
- 无需代码建网站_网站建站_无需代码建网站快速建站