MapReduce服务 MRS-PyFlink样例程序代码说明:通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例

时间:2025-02-12 15:00:33

通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例

下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。

完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。
import osimport loggingimport sysfrom pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchemafrom pyflink.common.typeinfo import Typesfrom pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumerfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import TableEnvironment, EnvironmentSettingsdef read_sql(file_path):    if not os.path.isfile(file_path):        raise TypeError(file_path + " does not exist")    all_the_text = open(file_path).read()    return all_the_textdef exec_sql():    # 提交前修改sql路径    # file_path = "/opt/client/Flink/flink/insertData2kafka.sql"    # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql"    # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql"    file_path = "insertData2kafka.sql"    sql = read_sql(file_path)    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())    statement_set = t_env.create_statement_set()    sqlArr = sql.split(";")    for sqlStr in sqlArr:        sqlStr = sqlStr.strip()        if sqlStr.lower().startswith("create"):            print("---------create---------------")            print(sqlStr)            t_env.execute_sql(sqlStr)        if sqlStr.lower().startswith("insert"):            print("---------insert---------------")            print(sqlStr)            statement_set.add_insert_sql(sqlStr)    statement_set.execute()def read_write_kafka():    # find kafka connector jars    env = StreamExecutionEnvironment.get_execution_environment()    env.set_parallelism(1)    specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar"    # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar"    # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar"    # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues    env.add_jars(specific_jars)    kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'}    deserialization_schema = JsonRowDeserializationSchema.builder() \        .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()    kafka_consumer = FlinkKafkaConsumer(        topics='test_source_topic',        deserialization_schema=deserialization_schema,        properties=kafka_properties)    print("---------read ---------------")    ds = env.add_source(kafka_consumer)    serialization_schema = JsonRowSerializationSchema.builder().with_type_info(        type_info=Types.ROW([Types.INT(), Types.STRING()])).build()    kafka_producer = FlinkKafkaProducer(        topic='test_sink_topic',        serialization_schema=serialization_schema,        producer_config=kafka_properties)    print("--------write------------------")    ds.add_sink(kafka_producer)    env.execute("pyflink kafka test")if __name__ == '__main__':    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")    print("------------------insert data to kafka----------------")    exec_sql()    print("------------------read_write_kafka----------------")    read_write_kafka()
表1 使用Python提交普通作业参数说明

参数

说明

示例

bootstrap.servers

Kafka的Broker实例业务IP和端口。

192.168.12.25:21005

specific_jars

客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准:

specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar"

specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar

file_path

“insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径:

file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql"

file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql

SQL示例:
create table kafka_sink_table (  age int,  name varchar(10)) with (  'connector' = 'kafka',  'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',  'properties.group.id' = 'test_group',  'format' = 'json');create TABLE datagen_source_table (   age int,  name varchar(10)) WITH (  'connector' = 'datagen',  'rows-per-second' = '1');INSERT INTO  kafka_sink_tableSELECT  *FROM  datagen_source_table;
support.huaweicloud.com/devg-lts-mrs/mrs_07_260081.html