华为云用户手册

  • 示例 create table sink1( attr1 string, attr2 int ) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = 'es', 'connector.document-type' = 'one', 'update-mode' = 'append', 'format.type' = 'json' );
  • 功能描述 DLI 将Flink作业的输出数据输出到 云搜索服务 CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于 日志分析 、站内搜索等场景。 云搜索 服务(Cloud Search Service,简称 CSS )为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。云搜索服务的更多信息,请参见《云搜索服务用户指南》。
  • 语法格式 create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = '', 'connector.document-type' = '', 'update-mode' = '', 'format.type' = 'json' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,对于elasticsearch需配置为elasticsearch connector.version 是 使用的elasticsearch的版本。 当前只能使用版本7,即该值只能为7 connector.hosts 是 Elasticsearch所在集群的主机名,多个以’;’间隔,注意请以http开头,如http://x.x.x.x:9200 connector.index 是 Elasticsearch的索引名 connector.document-type 是 Elasticsearch的type名称 当版本为7时,由于elasticsearch使用默认的_doc类型,因此该属性无效 update-mode 是 sink的写入类型,支持append和upsert connector.key-delimiter 否 连接复合主键的拼接符,默认为_ connector.key-null-literal 否 当key中含有null时,使用该字符代替 connector.failure-handler 否 elasticsearch请求失败时的策略,默认为fail fail:当请求失败且作业失败时抛出异常 ignore:忽略 retry-rejected:对于由于es节点的队列满时,会重新请求而不抛出失败。 custom:使用定制策略 connector.failure-handler-class 否 使用失败时的定制策略时所使用的自定义处理方式 connector.flush-on-checkpoint 否 checkpoint时是否会等待所有阻塞请求完成。 默认为true,表示会等待阻塞请求完成,如果配置为false,则表示不会等待阻塞请求完成。 connector.bulk-flush.max-actions 否 批量写入时的每次最大写入记录数 connector.bulk-flush.max-size 否 批量写入时的最大数据量,当前只支持MB,请带上单位 mb connector.bulk-flush.interval 否 批量写入时的刷新的时间间隔,单位为milliseconds,无需带上单位 format.type 是 当前只支持json connector.username 否 Elasticsearch所在集群的账号。该账号参数需和密码“connector.password”参数同时配置。 使用账号密码参数时,创建的云搜索服务集群必须开启安全模式并且关闭https。 connector.password 否 Elasticsearch所在集群的密码。该密码参数需和“connector.username”参数同时配置。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 若使用 MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《 数据湖探索 用户指南》中的“修改主机信息”章节描述。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 create table hbaseSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为hbase connector.version 是 该值只能为1.4.3 connector.table-name 是 hbase中的表名 connector.zookeeper.quorum 是 Zookeeper的地址 connector.zookeeper.znode.parent 否 Zookeeper中的根目录,默认是/hbase connector.write.buffer-flush.max-size 否 每次插入的数据的最大的缓存大小,默认为2mb ,仅支持mb connector.write.buffer-flush.max-rows 否 每次刷新数据的最大条数 connector.write.buffer-flush.interval 否 刷新时间,默认值为0s,如2s connector.rowkey 否 设置复合rowkey,即根据多个字段设置。 形如:rowkey1:3,rowkey2:3,… 其中3表示取该字段的前3个byte,该值不能大于该字段的字节大小,且该值不能小于1
  • 功能描述 DLI将作业的输出数据输出到HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式 云存储 系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。
  • 示例 将数据写入smn的相应主题中,其中smn发送的消息的主题为'test',内容为字段'attr1'的内容 create table smnSink ( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'smn', 'connector.region' = 'cn-north-1', 'connector.topic-urn' = 'xxxxxx', 'connector.message-subject' = 'test', 'connector.message-column' = 'attr1' );
  • 功能描述 DLI将Flink作业的输出数据输出到 消息通知 服务( SMN )中。 消息通知服务(Simple Message Notification,简称SMN)为DLI提供可靠的、可扩展的、海量的消息处理服务,它大大简化系统耦合,能够根据用户的需求,向订阅终端主动推送消息。可用于连接云服务、向多个协议推送消息以及集成在产生或使用通知的任何其他应用程序等场景。SMN的更多信息,请参见《消息通知服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 sink的类型,smn表示输出到消息通知服务中 connector.region 是 SMN所在区域 connector.topic-urn 否 SMN服务的主题URN,用于静态主题URN配置。作为消息通知的目标主题,需要提前在SMN服务中创建。 与“urn_column”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 connector.urn-column 否 主题URN内容的字段名,用于动态主题URN配置。 与“topic_urn”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 connector.message-subject 是 发送SMN服务的消息标题,用户自定义 connector.message-column 是 当前表的某个字段名,其内容作为消息的内容,用户自定义。目前只支持默认的文本消息
  • 语法格式 create table smnSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'smn', 'connector.region' = '', 'connector.topic-urn' = '', 'connector.message-subject' = '', 'connector.message-column' = '' );
  • 示例 配置“connector.table-name”参数时的table存储模式示例。 table模式采用hash类型存储数据,与基本hash类型将表的三个字段分别作为key、hash_key、hash_value不同,table模式下的key值可以通过“connector.table-name”和“connector.key-column”两个参数设置,将表中的所有字段名作为hash_key,字段值作为hash_value写入到hash中。 create table redisSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.table-name'='car_info', 'connector.key-column'='car_id' ); insert into redisSink (car_id,car_owner,car_brand,car_speed) VALUES ("A1234","OwnA","A1234",30); 以下示例演示“connector.data-type”为string, list, hash, set类型时的建表语句。 “connector.data-type”为string类型。 表为2列:第一列为key,第二列为value。 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'string' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为list类型。 表为2列:第一列为key,第二列为value。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'list' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为set类型。 表为2列:第一列为key,第二列为value。 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'set' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为hash类型。 表为3列:第一列为key,第二列为hash_key,第三列为hash_value。 create table redisSink( attr1 STRING, attr2 STRING, attr3 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'hash' ); insert into redisSink (attr1,attr2,attr3) VALUES ("car_info","car_id","A1234");
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于redis,需配置为'redis'。 connector.host 是 redis连接地址。 connector.port 是 redis连接端口。 connector.password 否 redis认证密码。 connector.deploy-mode 否 redis部署模式,支持standalone/cluster,默认standalone connector.table-name 否 table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。 说明: table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。 connector.key-column 否 table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid connector.write-schema 否 table存储模式下可配置,是否将当前schema写入到redis,默认为false connector.data-type 否 数据存储类型,用户自定义存储模式必配。支持:string, list, hash, set类型。其中string/list以及sets中schema字段数必须为2,hash字段数必须为3 connector.ignore-retraction 否 是否忽略retraction消息,默认为false
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'redis', 'connector.host' = '', 'connector.port' = '', 'connector.password' = '', 'connector.table-name' = '', 'connector.key-column' = '' );
  • 示例 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 使用upsert模式,写入数据到DWS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm ( ddate DATE, dmin TIMESTAMP(3), game_appkey VARCHAR, channel_id VARCHAR, pay_user_num_1m bigint, pay_amt_1m bigint, PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED ) WITH ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/dws_bigdata_db', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.username' = 'xxxx', 'connector.password' = 'xxxxx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' );
  • 前提条件 请务必确保您的账户下已在 数据仓库 服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 功能描述 DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.write.mode 否 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 connector.write.flush.max-rows 否 数据flush大小,超过该值将触发写入flush。默认为5000。 connector.write.flush.interval 否 数据flush周期,周期性触发写入flush。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。不填写则默认不根据时间刷新。 connector.write.max-retries 否 写入最大重试次数,默认为3。 connector.write.merge.filter-key 否 配置PRIMARY KEY,并且“connector.write.mode”配置为copy时,可以配置merge时的过滤列名。 connector.write.escape-string-value 否 是否对string类型值进行转义,默认为false。
  • 语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 示例 将流jdbcSink的数据输出到MySQL数据库中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table jdbcSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxxx' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table jdbcSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 访问数据库所需要的账号 connector.password 否 访问数据库所需要的密码 connector.write.flush.max-rows 否 写数据时,刷新数据的最大行数。默认值为5000 connector.write.flush.interval 否 刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。不填写则默认不根据时间刷新 connector.write.max-retries 否 写数据失败时的最大尝试次数。默认值为3 connector.write.exclude-update-columns 否 默认值为空(默认忽略primary key字段),表示更新主键值相同的数据时,忽略指定字段的更新
  • 示例 将流disSink的数据输出到DIS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table disSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' );
  • 功能描述 DLI将Flink作业的输出数据写入 数据接入服务 (DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.partition-key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table disSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format.type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于upsert kafka,需配置为'upsert-kafka' connector.version 否 Kafka版本,仅支持:'0.11' format.type 是 数据序列化格式,支持:'csv', 'json'及'avro'等 connector.topic 是 kafka topic名 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔 connector.sink-partitioner 否 记录分区方式,支持:'fixed', 'round-robin'及'custom' connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' connector.sink.ignore-retraction 否 是否忽略回撤消息,默认为false。回撤消息将以null值写入kafka update-mode 否 支持:'append', 'retract'及'upsert'三种写入模式 connector.properties.* 否 配置kafka任意原生属性
  • 示例 create table upsertKafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT, primary key (car_id) not enforced ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'format.type' = 'csv' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
共100000条