华为云用户手册

  • 注意事项 暂不支持通过python写UDF、UDTF、UDAF自定义函数。 如果使用IntelliJ IDEA工具对创建的自定义函数进行调试,则需要在IDEA上勾选:include dependencies with "Provided" scope,否则本地调试运行时会加载不到pom文件中的依赖包。 具体操作以IntelliJ IDEA版本2020.2为例,参考如下: 在IntelliJ IDEA界面,选择调试的配置文件,单击“Edit Configurations”。 在“Run/Debug Configurations”界面,勾选:include dependencies with "Provided" scope。 单击“OK”完成应用配置。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE TABLE [IF NOT EXISTS] TABLE_NAME( FIELDNAME1 FIELDTYPE1, FIELDNAME2 FIELDTYPE2) USING MONGO OPTIONS ( 'url'='IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION][AUTH_PROPERTIES]', 'database'='xx', 'collection'='xx', 'passwdauth' = 'xxx', 'encryption' = 'true' ); 文档数据库服务(Document Database Service,简称DDS)完全兼容MongoDB协议,因此语法中使用“using mongo options”。
  • 示例 1 2 3 4 5 6 create table 1_datasource_mongo.test_momgo(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'database' = 'test', 'collection' = 'test', 'passwdauth' = 'xxx', 'encryption' = 'true');
  • 语法支持范围 基础类型: VARCHAR,STRING,BOOLEAN,TINYINT,SMALLINT,INTEGER/INT,BIGINT,REAL/FLOAT,DOUBLE,DECIMAL,DATE,TIME,TIMESTAMP Array:使用[]进行引用。例如: 1 insert into temp select CARDINALITY(ARRAY[1,2,3]) FROM OrderA;
  • 语法定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 INSERT INTO stream_name query; query: values | { select | selectWithoutFrom | query UNION [ ALL ] query } orderItem: expression [ ASC | DESC ] select: SELECT { * | projectItem [, projectItem ]* } FROM tableExpression [ JOIN tableExpression ] [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  • 示例 使用Kafka发送数据,通过JDBC结果表将Kafka数据再输出到MySQL数据库中。 参考增强型跨源连接,在 DLI 上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 登录MySQL,并使用下述命令在flink库下创建orders表。 CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, `order_time` VARCHAR(32) NULL, `pay_amount` DOUBLE UNSIGNED NOT NULL, `real_pay` DOUBLE UNSIGNED NULL, `pay_time` VARCHAR(32) NULL, `user_id` VARCHAR(32) NULL, `user_name` VARCHAR(32) NULL, `area_id` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE jdbcSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select * from kafkaSource; 连接Kafka集群,向Kafka相应的topic中发送如下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 查看表中数据,在MySQL中执行sql查询语句。 select * from orders; 其结果参考如下(注意,以下数据为从MySQL中复制的结果,并不是MySQL中的数据样式)。 202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106 202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106
  • 数据类型映射 表1 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,这里应该是'jdbc'。 url 是 无 String 数据库的URL。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 sink.buffer-flush.max-rows 否 100 Integer 每次写入请求缓存的最大行数。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 sink.buffer-flush.interval 否 1s Duration 刷新缓存的间隔,在这段时间内以异步线程刷新数据。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 注意:"sink.buffer-flush.max-rows" 设置为 "0",并设置刷新缓存间隔,则以完整的异步处理方式刷新缓存。 格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 sink.max-retries 否 3 Integer 将记录写入数据库失败时的最大重试次数。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 如果JDBC结果表定义了主键,则连接器以upsert模式运行,否则,连接器以Append模式运行。 upsert模式:Flink会根据主键插入新行或更新现有行,Flink可以通过这种方式保证幂等性。为保证输出结果符合预期,建议为表定义主键。 Append模式:Flink 会将所有记录解释为INSERT消息,如果底层数据库发生主键或唯一约束违规,INSERT操作可能会失败。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table jdbcSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“obs”表示数据源为 对象存储服务 。 region 是 对象存储服务所在区域。 encode 否 数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 bucket 是 数据所在的OBS桶名。 object_name 是 数据所在OBS桶中的对象名。如果对象不在OBS根目录下,则需添加文件夹名,例如:test/test.csv。对象文件格式参考“encode”参数。 row_delimiter 是 行间的分隔符。 field_delimiter 否 属性分隔符。 当“encode”参数为csv时,该参数必选。用户可以自定义属性分隔符。 当“encode”参数为json时,该参数不需要填写。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前只适用于 CS V格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 version_id 否 版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。
  • 示例 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。 测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为input.csv格式文件。将input.csv上传到对应OBS桶目录下。例如,当前上传到:“dli-test-obs01”桶目录下。 1,2,3,4,1403149534 5,6,7,8,1403149535 创建表参考如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", bucket = "dli-test-obs01", region = "xxx", object_name = "input.csv", row_delimiter = "\n", field_delimiter = "," ); 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。 CREATE SOURCE STREAM obs_source ( str STRING ) WITH ( type = "obs", bucket = "obssource", region = "xxx", encode = "json", row_delimiter = "\n", object_name = "input.json" );
  • 功能描述 创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "obs", region = "", bucket = "", object_name = "", row_delimiter = "\n", field_delimiter = '', version_id = "" );
  • 示例 1 2 3 4 5 6 7 8 9 10 11 12 create table disCsvSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' );
  • 功能描述 创建source流从 数据接入服务 (DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 语法格式 create table disSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format-type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 connector.partition-count 否 读取从0分区开始计算的partition-count个通道范围内的数据。 该参数和partition-range参数不能同时配置。 当两个参数都没有配置的时候默认读取所有partition。 connector.partition-range 否 指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。 partition-range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。 connector.offset 否 用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。 connector.start-time 否 DIS数据读取从该起始时间的数据。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start-time也没配置offset的时候,读取最新数据。 connector. enable-checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。 connector. checkpoint-app-name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。 connector. checkpoint-interval 否 DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second 勿与offset或start-time同时设置。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。
  • 修订记录 发布日期 修订记录 2024-06-27 新增 使用Hive语法创建OBS表补充关于指定多字符的分隔符的说明。 使用Hive语法创建DLI表补充说明Hive DLI表不支持在建表时指定多字符的分隔符。 2024-05-28 新增 批作业SQL常用配置项说明新增配置参数:spark.sql.legacy.correlated.scalar.query.enabled 使用Hive语法创建OBS表新增关于设置多字符分隔符的使用示例。 使用Hive语法创建DLI表新增关于设置多字符分隔符的使用示例。 2024-02-01 新增 Spark开源命令支持说明。 2024-01-30 优化 使用DataSource语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用DataSource语法创建DLI表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建DLI表,补充CTAS创建分区表约束限制说明。 2023-11-17 优化 创建函数,删除关键字TEMPORARY。 2023-11-09 优化 使用DataSource语法创建OBS表,补充示例代码。 使用Hive语法创建OBS表,补充示例代码。 使用DataSource语法创建DLI表,补充示例代码。 使用Hive语法创建DLI表,补充示例代码。 2023-09-22 优化 内置函数,优化日期函数、字符串函数、数学函数、聚合函数、分析窗口函数、其他函数的内容。 2023-08-14 新增 修改列注释 2023-08-01 新增 跨源连接Oracle表。 2023-04-02 修改 使用DataSource语法创建OBS表,修改建表相关参数说明。 2023-02-23 修改 使用DataSource语法创建OBS表,修改PERMISSIVE参数说明。 2023-01-11 修改 DDL语法定义,补充对CREATE FUNCTION语句的功能描述。 2022-11-09 修改 创建DLI表关联HBase,补充约束该语法不支持安全集群。 2022-10-26 修改 时间函数,描述信息中补充FROM_UNIXTIME(numeric[, string])的单位。 窗口,补充time_attr参数说明。 2022-09-01 修改OBS输出流,修改关键字说明。
  • 常见问题 Q:若Flink作业日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。 Q:如果该DWS表在某schema下,则应该如何配置? A:如下示例是使用schema为dbuser2下的表area_info: --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbname', 'table-name' = 'dbuser2.area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'gaussdb'。 url 是 无 String jdbc连接地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.num 否 无 Integer 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。默认值为0,表示不限制。 scan.auto-commit 否 true Boolean 设置自动提交标志。 它决定每一个statement是否以事务的方式自动提交。 lookup.cache.max-rows 否 无 Integer 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。 默认表示不使用该配置。 lookup.cache.ttl 否 无 Duration 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 默认表示不使用该配置。 lookup.max-retries 否 3 Integer 维表配置,数据拉取最大重试次数。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
  • 前提条件 请务必确保您的账户下已在 数据仓库 服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 示例 从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下: create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下: insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作为数据源,DWS作为维表,数据输出到Kafka结果表中。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'table-name' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向kafka中source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,读取kafka中sink topic中数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 参数说明 表1 参数说明 参数 是否必选 说明 field_name 是 数据在数据流中的字段名。 图像分类中field_name类型需声明为ARRAY[TINYINT]。 文本分类中field_name类型需声明为String。 model_path 是 模型存放在OBS上的完整路径,包括模型结构和模型权值。 is_dl4j_model 是 是否是deeplearning4j的模型。 true代表是deeplearning4j,false代表是keras模型。 keras_model_config_path 是 模型结构存放在OBS上的完整路径。在keras中通过model.to_json()可得到模型结构。 keras_weights_path 是 模型权值存放在OBS上的完整路径。在keras中通过model.save_weights(filepath)可得到模型权值。 word2vec_path 是 word2vec模型存放在OBS上的完整路径。
  • 示例 图片分类预测我们采用Mnist数据集作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每张图片代表的数字。 1 2 3 4 5 6 CREATE SOURCE STREAM Mnist( image Array[TINYINT] ) SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_dl4j_model_path', false) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_path', true) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_config_path', 'keras_weights_path') FROM Mnist 文本分类预测我们采用一组新闻标题数据作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每个新闻标题所属的类别,比如经济,体育,娱乐等。 1 2 3 4 5 6 7 CREATE SOURCE STREAM News( title String ) SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_word2vec_model_path','your_dl4j_model_path', false) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_word2vec_model_path','your_keras_model_path', true) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_model_path', false) FROM New SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_model_path', true) FROM New
  • 语法格式 1 2 3 4 5 6 7 -- 图像分类, 返回预测图像分类的类别id DL_IMAGE_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) DL_IMAGE_MAX_PREDICTION_INDEX(field_name, keras_model_config_path, keras_weights_path) -- 适用于Keras模型 -- 文本分类,返回预测文本分类的类别id DL_TEXT_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) -- 采用默认word2vec模型 DL_TEXT_MAX_PREDICTION_INDEX(field_name, word2vec_path, model_path, is_dl4j_model) 模型及配置文件等需存储在用户的OBS中,路径格式为"obs://your_ak:your_sk@obs.your_obs_region.xxx.com:443/your_model_path"。例如你的模型存放在OBS上,桶名为dl_model,文件名为model.h5,则路径填写为"obs://your_ak:your_sk@obs.xxx.com:443/dl_model/model.h5"。
  • 功能描述 Spark为了提高性能会缓存Parquet的元数据信息。当更新了Parquet表时,缓存的元数据信息未更新,导致Spark SQL查询不到新插入的数据作业执行报错,报错信息参考如下: DLI.0002: FileNotFoundException: getFileStatus on error message 该场景下就需要使用REFRESH TABLE来解决该问题。REFRESH TABLE是用于重新整理某个分区的文件,重用之前的表元数据信息,能够检测到表的字段的增加或者减少,主要用于表中元数据未修改,表的数据修改的场景。
  • 注意事项 resource可以是queue、database、table、column、view,格式分别为: queue的格式为:queues.queue_name database的格式为:databases.db_name table的格式为:databases.db_name.tables.table_name column的格式为:databases.db_name.tables.table_name.columns.column_name view的格式为:databases.db_name.tables.view_name
  • 原生数据类型 Flink SQL支持原生数据类型,请参见表1。 表1 原生数据类型 数据类型 描述 存储空间 范围 VARCHAR 可变长度的字符 - - BOOLEAN 布尔类型 - TRUE/FALSE TINYINT 有符号整数 1字节 -128-127 SMALLINT 有符号整数 2字节 -32768-32767 INT 有符号整数 4字节 -2147483648~2147483647 INTEGER 有符号整数 4字节 -2147483648~2147483647 BIGINT 有符号整数 8字节 -9223372036854775808~9223372036854775807 REAL 单精度浮点型 4字节 - FLOAT 单精度浮点型 4字节 - DOUBLE 双精度浮点型 8字节 - DECIMAL 固定有效位数和小数位数的数据类型 - - DATE 日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29 - DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31 TIME 时间类型,以HH:mm:ss表示。 例如20:17:40 - - TIMESTAMP(3) 完整日期,包括日期和时间。 例如:1969-07-20 20:17:40 - - INTERVAL timeUnit [TO timeUnit] 时间间隔 例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY - -
共100000条