华为云用户手册

  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "mrs_hbase", region = "", cluster_address = "", table_name = "", table_columns = "", illegal_data_table = "", batch_insert_data_num = "", action = "" )
  • 示例 将数据输出到Kafka中。 示例一 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" ); 示例二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM kafka_sink ( a1 string, a2 string, a3 string, a4 INT ) // 输出字段 WITH ( type="kafka", kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093", kafka_topic = "testflink", // 写入的topic encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",", kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" );
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到 DLI 队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP 域名 映射,请参见《 数据湖探索 用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic。 encode 是 编码格式,当前支持“json”和“user_defined”。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的 MRS 集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。 region 是 数据所在的D CS 所在区域。 cluster_address 是 Redis实例连接地址。 password 否 Redis实例连接密码,当设置为免密访问时,省略该配置项。 value_type 是 该参数可配置为如下选项或选项的组合: 支持指定插入数据类型,包括:string, list, hash, set, zset; 支持设置key的过期时间,包括expire, pexpire, expireAt, pexpireAt; 支持删除key命令,包括del, hdel; 当需要使用多个命令时,用“;”分隔。 key_value 是 设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。
  • 注意事项 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。 字符":", ",", ";", "$", "{", "}"已被征用为特殊分隔符,暂时没有提供转义功能,禁止在key和value中作为普通字符使用,否则会影响解析,导致程序异常。
  • 示例 将流qualified_cars的数据输出到DCS服务的Redis类型的缓存实例中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) WITH ( type = "dcs_redis", cluster_address = "192.168.0.34:6379", password = "xxxxxxxx", value_type = "string; list; hash; set; zset", key_value = "${car_id}_str: ${car_owner}; name_list: ${car_owner}; ${car_id}_hash: {name:${car_owner}, age: ${car_age}}; name_set: ${car_owner}; math_zset: {${car_owner}:${average_speed}}" );
  • 功能描述 DLI将Flink作业的输出数据输出到分布式缓存服务(DCS)的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。 分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。 DCS的更多信息,请参见《分布式缓存服务用户指南》。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dcs_redis", region = "", cluster_address = "", password = "", value_type= "",key_value= "" );
  • 前提条件 请务必确保您的账户下已在分布式缓存服务(DCS)里创建了Redis类型的缓存实例。 如何创建Redis类型的缓存实例,请参考《分布式缓存服务用户指南》中“申请Redis缓存实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与DCS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 用户通过VPC对等访问DCS实例时,除了满足VPC对等网跨VPC访问的约束之外,还存在如下约束: 当创建DCS实例时使用了172.16.0.0/12~24网段时,DLI队列不能在192.168.1.0/24、192.168.2.0/24、192.168.3.0/24网段。 当创建DCS实例时使用了192.168.0.0/16~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。 当创建DCS实例时使用了10.0.0.0/8~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "es", region = "", cluster_address = "", es_index = "", es_type= "", es_fields= "", batch_insert_data_num= "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,es表示输出到 云搜索服务 中。 region 是 数据所在的 云搜索 服务所在区域。 cluster_address 是 云搜索服务集群的内网访问地址,例如:x.x.x.x:x,多个地址时以逗号分隔。 es_index 是 待插入数据的索引,支持参数化。对应 CSS 服务中的index。 具体请参考《云搜索服务产品介绍》。 es_type 是 待插入数据的文档类型,支持参数化。对应CSS服务中的type。 具体请参考《云搜索服务产品介绍》。 若使用的es版本为6.x,则该值不能以"_"开头。 若使用的es版本为7.x,如果提前预置CSS服务中的type,则es_type需为"_doc",否则可为符合CSS规范的值。 es_fields 是 待插入数据字段的key,具体形式如:"id,f1,f2,f3,f4",并且保证与sink中数据列一一对应;如果不使用key,而是采用随机的属性字段,则无需使用id关键字,具体形式如:"f1,f2,f3,f4,f5"。对应CSS服务中的filed。 具体请参考《云搜索服务产品介绍》。 batch_insert_data_num 是 表示一次性批量写入的数据量,值必须为正整数,单位为:条。上限为65536,默认值为10。 action 否 当值为add时,表示遇到相同id时,数据被强制覆盖,当值为upsert时,表示遇到相同id时,更新数据(选择upsert时,es_fields字段中必须指定id),默认值为add。 enable_output_null 否 使用该参数来配置是否输出空字段。当该参数为true表示输出空字段(值为null),若为false表示不输出空字段。默认为false。 max_record_num_cache 否 记录最大缓存数。 es_certificate_name 否 跨源认证信息名称。 创建跨源认证请参考跨源认证。 若es集群开启安全模式且开启https,则使用证书进行访问,创建的跨源认证类型需要为“CSS”。 若es集群开启安全模式,但关闭https,则使用证书和账号密码进行访问,创建的跨源认证类型需要为"Password"。
  • 功能描述 DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于 日志分析 、站内搜索等场景。 云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。 云搜索服务的更多信息,请参见《云搜索服务用户指南》。 创建CSS集群时如果开启了安全模式,后续将无法关闭。
  • 示例 将流qualified_cars的数据输出到云搜索服务的集群。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "es", region = "xxx", cluster_address = "192.168.0.212:9200", es_index = "car", es_type = "information", es_fields = "id,owner,age,speed,miles", batch_insert_data_num = "10" );
  • 功能描述 DLI将Flink作业的输出数据输出到 消息通知 服务( SMN )中。 消息通知服务(Simple Message Notification,简称SMN)为DLI提供可靠的、可扩展的、海量的消息处理服务,它大大简化系统耦合,能够根据用户的需求,向订阅终端主动推送消息。可用于连接云服务、向多个协议推送消息以及集成在产生或使用通知的任何其他应用程序等场景。 SMN的更多信息,请参见《消息通知服务用户指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "smn", region = "", topic_urn = "", urn_column = "", message_subject = "", message_column = "" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,smn表示输出到消息通知服务中。 region 是 SMN所在区域。 topic_urn 否 SMN服务的主题URN,用于静态主题URN配置。作为消息通知的目标主题,需要提前在SMN服务中创建。 与“urn_column”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 urn_column 否 主题URN内容的字段名,用于动态主题URN配置。 与“topic_urn”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 message_subject 是 发往SMN服务的消息标题,用户自定义。 message_column 是 输出流的字段名,其内容作为消息的内容,用户自定义。目前只支持默认的文本消息。
  • 示例 将流over_speed_warning的数据输出到消息通知服务SMN中。 1 2 3 4 5 6 7 8 9 10 11 //静态主题配置 CREATE SINK STREAM over_speed_warning ( over_speed_message STRING /* over speed message */ ) WITH ( type = "smn", region = "xxx", topic_Urn = "xxx", message_subject = "message title", message_column = "over_speed_message" ); 1 2 3 4 5 6 7 8 9 10 11 12 //动态主题配置 CREATE SINK STREAM over_speed_warning2 ( over_speed_message STRING, /* over speed message */ over_speed_urn STRING ) WITH ( type = "smn", region = "xxx", urn_column = "over_speed_urn", message_subject = "message title", message_column = "over_speed_message" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《数据湖探索用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dws表示输出到 数据仓库 服务中。 region 是 数据仓库服务所在区域。 ak 是 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 是 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 encode 是 编码方式。当前支持csv和orc两种方式。 field_delimiter 否 属性分隔符。当编码方式为csv时需要配置,建议尽量用不可见字符作为分隔符,如\u0006\u0002。 quote 否 单字节,建议使用不可见字符,如\u0007。 db_obs_server 否 已在数据库中创建的外部服务器,如obs_server。 如何创建外部服务器,具体操作步骤可参考《数据仓库服务数据库开发指南》中创建外部服务器章节。 如果编码方式为orc格式时需指定该参数。 obs_dir 是 中间文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址。格式为/ip:port/database,如 “192.168.1.21:8000/test1”。 table_name 是 数据表名,若表不存在,则自动创建。 max_record_num_per_file 是 每个文件最多存储多少条记录。当文件记录数少于最大值时,该文件会延迟一个转储周期输出。 dump_interval 是 转储周期,单位为秒。 delete_obs_temp_file 否 是否要删除obs上的临时文件,默认为“true”,若设置为“false”,则不会删除obs上的文件,需用户自己清理。 max_dump_file_num 否 执行一次转储操作时最多转储多少文件。 当本次转储操作发现文件数小于最大值,则会延迟一个转储周期输出。
  • 示例 CSV格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "csv", field_delimiter = "\u0006\u0006\u0002", quote = "\u0007", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" ); ORC格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "orc", db_obs_server = "obs_server", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" );
  • 功能描述 创建sink流将Flink作业数据通过OBS转储方式输出到数据仓库服务(DWS),即Flink作业数据先输出到OBS,然后再从OBS导入到DWS。如何导入OBS数据到DWS具体可参考《数据仓库服务数据库开发指南》中“从OBS并行导入数据到集群”章节。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 注意事项 通过OBS转储支持两种中间文件方式: ORC: ORC格式不支持Array数据类型,如果使用ORC格式,需先在DWS中创建外部服务器,具体可参考《数据仓库服务数据库开发指南》中“创建外部服务器”章节。 CSV: CSV格式默认记录分隔符为换行符,若属性内容中有换行符,建议配置quote,具体参见表1。 如果要写入的表不存在,则会自动创建表。由于DLI SQL类型不支持text,如果存在长文本,建议先在数据库中创建表。 encode使用orc格式时,创建DWS表时,如果SQL流字段属性定义为String类型,DWS表字段属性不能使用varchar类型,需使用特定的text类型;如果是SQL流字段属性定义为Integer类型,DWS表字段需要使用Integer类型。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dws", region = "", ak = "", sk = "", encode = "", field_delimiter = "", quote = "", db_obs_server = "", obs_dir = "", username = "", password = "", db_url = "", table_name = "", max_record_num_per_file = "", dump_interval = "" );
  • 前提条件 确保已创建OBS桶和文件夹。 如何创建OBS桶,具体请参见《 对象存储服务 控制台指南》中的“创建桶”章节。 如何新建文件夹,具体请参见《对象存储服务控制台指南》中的“新建文件夹”章节。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 示例 将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "xxxxxx", db_url = "postgresql://192.168.1.1:8000/test", table_name = "audi_cheaper_than_30w" ); insert into audi_cheaper_than_30w select "1","2","3",4;
共100000条