云服务器内容精选

  • Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序
  • Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序