MapReduce服务 MRS-Flink Jar作业提交SQL样例程序(Java)

时间:2025-02-12 15:00:30

Flink Jar作业提交SQL样例程序(Java)

提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。

public class FlinkSQLExecutor {    public static void main(String[] args) throws IOException {        System.out.println("--------------------  begin init ----------------------");        final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql");        final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);        StatementSet statementSet = tableEnv.createStatementSet();        String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8");        String[] sqlArr = sqlStr.split(";");        for (String sql : sqlArr) {            sql = sql.trim();           if (sql.toLowerCase(Locale.ROOT).startsWith("create")) {               System.out.println("----------------------------------------------\nexecuteSql=\n" + sql);               tableEnv.executeSql(sql);           } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) {               System.out.println("----------------------------------------------\ninsert=\n" + sql);               statementSet.addInsertSql(sql);           }        }        System.out.println("---------------------- begin exec sql --------------------------");        statementSet.execute();    }}

需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。

以对接普通模式Kafka提交SQL为例:

create table kafka_sink(    uuid varchar(20),    name varchar(10),    age  int,    ts   timestamp(3),    p    varchar(20)) with (      'connector' = 'kafka',      'topic' = 'input2',      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',      'properties.group.id' = 'testGroup2',      'scan.startup.mode' = 'latest-offset',      'format' = 'json'      );create TABLE datagen_source(    uuid varchar(20),    name varchar(10),    age  int,    ts   timestamp(3),    p    varchar(20)) WITH (      'connector' = 'datagen',      'rows-per-second' = '1'      );INSERT INTO kafka_sinkSELECT *FROM datagen_source;
support.huaweicloud.com/devg-lts-mrs/mrs_07_260072.html