MapReduce服务 MRS-PyFlink样例程序代码说明:通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例
通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例
下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。
import osimport loggingimport sysfrom pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchemafrom pyflink.common.typeinfo import Typesfrom pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumerfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import TableEnvironment, EnvironmentSettingsdef read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_textdef exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute()def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test")if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka()
参数 |
说明 |
示例 |
---|---|---|
bootstrap.servers |
Kafka的Broker实例业务IP和端口。 |
192.168.12.25:21005 |
specific_jars |
“客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" |
specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar |
file_path |
“insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" |
file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql |
create table kafka_sink_table ( age int, name varchar(10)) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json');create TABLE datagen_source_table ( age int, name varchar(10)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1');INSERT INTO kafka_sink_tableSELECT *FROM datagen_source_table;
- 什么是数据湖探索服务_数据湖探索DLI用途与特点
- MapReduce服务_什么是Flink_如何使用Flink
- MapReduce服务_如何使用MapReduce服务_MRS集群客户端安装与使用
- MapReduce服务_什么是Hue_如何使用Hue
- 什么是Flink OpenSource SQL_数据湖探索_Flink OpenSource SQL
- 大数据分析是什么_使用MapReduce_创建MRS服务
- 数据治理中心_数据开发_数据开发能力_脚本和节点介绍-华为云
- MapReduce服务_什么是Yarn_如何使用Yarn
- 什么是Spark_如何使用Spark_Spark的功能是什么
- MapReduce服务_什么是Kafka_如何使用Kafka