华为云用户手册

  • 功能描述 在 DLI 数据多版本功能开启后,备份数据默认保留7天,您可以通过配置系统参数“dli.multi.version.retention.days”调整保留周期。保留周期外的多版本数据后续在执行insert overwrite或者truncate语句时会自动进行清理。在添加列或者修改分区表时,也可以设置表属性“dli.multi.version.retention.days”调整保留周期。 开启和关闭多版本功能SQL语法请参考开启或关闭数据多版本。 DLI数据多版本功能当前仅支持通过Hive语法创建的OBS表,具体建表SQL语法可以参考使用Hive语法创建OBS表。
  • 功能描述 创建边缘作业source流,从EdgeHub中获取数据。用户数据写入EdgeHub中,Flink边缘作业从中读取数据,作为流计算的数据输入。 适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。 仅Flink 1.7版本适配边缘作业场景,且Flink 1.7 EOS。DLI后续版本不再提供边缘作业场景的语法参考。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。 topic 是 主题,需要消费数据的edgehub中的主题名称。 encode 是 数据编码格式,可选为“csv”和“json”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 field_delimiter 否 属性分隔符。当“encode”为“csv”时,用于指定csv字段分隔符,默认为“,"。 当“encode”为“json”时,不需要设置属性之间的分隔符。 json_config 否 当“encode”为“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为: "field1=data_json.field1;field2=data_json.field2;field3=$" 其中"field3=$"表示field3的内容为整个json串。
  • 示例 从edgehub主题abc中读取数据,数据编码格式为json。数据示例为:{"student":{"score":90,"name":"1bc2"}}。 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM student_scores( name string, score int) WITH ( type = "edgehub", topic = "abc", encode = "json", json_config = "score = student.score; name=student.name" );
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "edgehub", topic = "", encode = "", json_config = "", field_delimiter = '' );
  • 关键字 GROUPING SETS:为对GROUP BY的扩展,例如 SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b)); 将转换为以下一条查询: 1 2 SELECT a, b, sum(expression) FROM table GROUP BY a, b; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS(a,b); 将转换为以下两条查询: 1 2 3 SELECT a, NULL, sum(expression) FROM table GROUP BY a; UNION SELECT NULL, b, sum(expression) FROM table GROUP BY b; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b), a); 将转换为以下两条查询: 1 2 3 SELECT a, b, sum(expression) FROM table GROUP BY a, b; UNION SELECT a, NULL, sum(expression) FROM table GROUP BY a; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b), a, b, ()); 将转换为以下四条查询: 1 2 3 4 5 6 7 SELECT a, b, sum(expression) FROM table GROUP BY a, b; UNION SELECT a, NULL, sum(expression) FROM table GROUP BY a, NULL; UNION SELECT NULL, b, sum(expression) FROM table GROUP BY NULL, b; UNION SELECT NULL, NULL, sum(expression) FROM table;
  • 示例 将kafkaSink的数据输出到Kafka中 1 2 3 4 5 6 7 8 9 10 11 12 13 create table kafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.sink-partitioner' = 'round-robin', 'format.type' = 'csv' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 否 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据序列化格式,支持:'csv'、 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.sink-partitioner 否 记录分区的方式,支持:'fixed', 'round-robin'及'custom'。 connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' 。 update-mode 否 支持:'append'、'retract'及'upsert'三种写入模式。 connector.properties.* 否 配置kafka任意原生属性
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“opentsdb”表示输出到 MRS 的OpenTSDB。 region 是 MRS服务所在区域。 tsdb_link_address 是 MRS中OpenTSDB实例的服务地址,格式为http://ip:port或者https://ip:port。 说明: 配置项tsd.https.enabled为true时,需要使用https,注意https暂时不支持证书认证。 tsdb_metrics 是 数据点的metric,支持参数化。 tsdb_timestamps 是 数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 tsdb_values 是 数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 tsdb_tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 batch_insert_data_num 否 表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。
  • 示例 将流weather_out的数据输出到MRS服务的OpenTSDB中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", tsdb_link_address = "https://x.x.x.x:4242", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" );
  • 示例 将数据输出到MRS的HBase中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "mrs_hbase", region = "xxx", cluster_address = "192.16.0.88,192.87.3.88:2181", table_name = "car_pass_inspect_with_age_${car_age}", table_columns = "rowKey,info:owner,,car:speed,car:miles", illegal_data_table = "illegal_data", batch_insert_data_num = "20", action = "add", krb_auth = "KRB_AUTH_NAME" );
  • 前提条件 确保您的账户下已在 MapReduce服务 (MRS)里创建了您配置的集群。DLI支持与开启kerberos的hbase集群对接。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《 数据湖探索 用户指南》中创建队列章节。 确保DLI独享队列与MRS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 如何添加IP 域名 映射,请参见《数据湖探索用户指南》中修改主机信息章节。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"mrs_hbase"表示输出到MRS的HBase中。 region 是 MRS服务所在区域。 cluster_address 是 待插入数据表所属集群zookeeper地址,形如:ip1,ip2:port。 table_name 是 待插入数据的表名。 支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。 table_columns 是 待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。 illegal_data_table 否 如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为taskNo加下划线加时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。 batch_insert_data_num 否 表示一次性批量写入的数据条数,值必须为正整数,上限为1000,默认值为10。 action 否 表示数据是插入还是删除,可选值为add和delete,默认值为add。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数,填写对应的跨源认证名称。跨源认证创建详见创建跨源认证。 说明: 请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "mrs_hbase", region = "", cluster_address = "", table_name = "", table_columns = "", illegal_data_table = "", batch_insert_data_num = "", action = "" )
  • 示例 从Kafka源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接MySQL数据库实例,在flink数据库中创建相应的表,作为维表,表名为area_info,SQL语句如下: CREATE TABLE `flink`.`area_info` ( `area_id` VARCHAR(32) NOT NULL, `area_province_name` VARCHAR(32) NOT NULL, `area_city_name` VARCHAR(32) NOT NULL, `area_county_name` VARCHAR(32) NOT NULL, `area_street_name` VARCHAR(32) NOT NULL, `region_name` VARCHAR(32) NOT NULL, PRIMARY KEY (`area_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 连接MySQL数据库实例,向JDBC维表area_info中插入测试数据,其语句如下: insert into flink.area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka为数据源,JDBC作为维表,数据写入到Kafka结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'jdbc-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--其中url中的flink表示MySQL中area_info表所在的数据库名 'table-name' = 'area_info', 'username' = 'JDBCUserName', 'password' = 'JDBCPassWord' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,在Kafka的sink topic读取数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector 是 数据源类型,固定为:jdbc。 url 是 数据库的URL。 table-name 是 读取数据库中的数据所在的表名。 driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 username 否 数据库认证用户名,需要和'password'一起配置。 password 否 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在 scan.partition.upper-bound 否 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在 scan.partition.num 否 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。 pwd_auth_name 否 DLI侧创建的Password类型的跨源认证名称。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE table_id ( attr_name attr_type (',' attr_name attr_type)* ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
  • 数据类型映射 表2 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 示例 CS V格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "csv", field_delimiter = "\u0006\u0006\u0002", quote = "\u0007", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" ); ORC格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "orc", db_obs_server = "obs_server", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" );
  • 功能描述 创建sink流将Flink作业数据通过OBS转储方式输出到 数据仓库 服务(DWS),即Flink作业数据先输出到OBS,然后再从OBS导入到DWS。如何导入OBS数据到DWS具体可参考《数据仓库服务数据库开发指南》中“从OBS并行导入数据到集群”章节。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 注意事项 通过OBS转储支持两种中间文件方式: ORC: ORC格式不支持Array数据类型,如果使用ORC格式,需先在DWS中创建外部服务器,具体可参考《数据仓库服务数据库开发指南》中“创建外部服务器”章节。 CSV: CSV格式默认记录分隔符为换行符,若属性内容中有换行符,建议配置quote,具体参见表1。 如果要写入的表不存在,则会自动创建表。由于DLI SQL类型不支持text,如果存在长文本,建议先在数据库中创建表。 encode使用orc格式时,创建DWS表时,如果SQL流字段属性定义为String类型,DWS表字段属性不能使用varchar类型,需使用特定的text类型;如果是SQL流字段属性定义为Integer类型,DWS表字段需要使用Integer类型。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dws表示输出到数据仓库服务中。 region 是 数据仓库服务所在区域。 ak 是 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 是 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 encode 是 编码方式。当前支持csv和orc两种方式。 field_delimiter 否 属性分隔符。当编码方式为csv时需要配置,建议尽量用不可见字符作为分隔符,如\u0006\u0002。 quote 否 单字节,建议使用不可见字符,如\u0007。 db_obs_server 否 已在数据库中创建的外部服务器,如obs_server。 如何创建外部服务器,具体操作步骤可参考《数据仓库服务数据库开发指南》中创建外部服务器章节。 如果编码方式为orc格式时需指定该参数。 obs_dir 是 中间文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址。格式为/ip:port/database,如 “192.168.1.21:8000/test1”。 table_name 是 数据表名,若表不存在,则自动创建。 max_record_num_per_file 是 每个文件最多存储多少条记录。当文件记录数少于最大值时,该文件会延迟一个转储周期输出。 dump_interval 是 转储周期,单位为秒。 delete_obs_temp_file 否 是否要删除obs上的临时文件,默认为“true”,若设置为“false”,则不会删除obs上的文件,需用户自己清理。 max_dump_file_num 否 执行一次转储操作时最多转储多少文件。 当本次转储操作发现文件数小于最大值,则会延迟一个转储周期输出。
  • 前提条件 确保已创建OBS桶和文件夹。 如何创建OBS桶,具体请参见《 对象存储服务 控制台指南》中的“创建桶”章节。 如何新建文件夹,具体请参见《对象存储服务控制台指南》中的“新建文件夹”章节。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dws", region = "", ak = "", sk = "", encode = "", field_delimiter = "", quote = "", db_obs_server = "", obs_dir = "", username = "", password = "", db_url = "", table_name = "", max_record_num_per_file = "", dump_interval = "" );
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "smn", region = "", topic_urn = "", urn_column = "", message_subject = "", message_column = "" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,smn表示输出到 消息通知 服务中。 region 是 SMN 所在区域。 topic_urn 否 SMN服务的主题URN,用于静态主题URN配置。作为消息通知的目标主题,需要提前在SMN服务中创建。 与“urn_column”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 urn_column 否 主题URN内容的字段名,用于动态主题URN配置。 与“topic_urn”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 message_subject 是 发往SMN服务的消息标题,用户自定义。 message_column 是 输出流的字段名,其内容作为消息的内容,用户自定义。目前只支持默认的文本消息。
  • 示例 将流over_speed_warning的数据输出到消息通知服务SMN中。 1 2 3 4 5 6 7 8 9 10 11 //静态主题配置 CREATE SINK STREAM over_speed_warning ( over_speed_message STRING /* over speed message */ ) WITH ( type = "smn", region = "xxx", topic_Urn = "xxx", message_subject = "message title", message_column = "over_speed_message" ); 1 2 3 4 5 6 7 8 9 10 11 12 //动态主题配置 CREATE SINK STREAM over_speed_warning2 ( over_speed_message STRING, /* over speed message */ over_speed_urn STRING ) WITH ( type = "smn", region = "xxx", urn_column = "over_speed_urn", message_subject = "message title", message_column = "over_speed_message" );
  • 功能描述 DLI将Flink作业的输出数据输出到消息通知服务(SMN)中。 消息通知服务(Simple Message Notification,简称SMN)为DLI提供可靠的、可扩展的、海量的消息处理服务,它大大简化系统耦合,能够根据用户的需求,向订阅终端主动推送消息。可用于连接云服务、向多个协议推送消息以及集成在产生或使用通知的任何其他应用程序等场景。 SMN的更多信息,请参见《消息通知服务用户指南》。
共100000条