云服务器内容精选

  • 示例 示例一: 该示例将car_info数据,以buyday字段为分区字段,parquet为编码格式,转储数据到OBS。 1 2 3 4 5 6 7 8 9 10 11 12 13 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/buyday=xx/part-x-x。 数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理: 创建OBS分区表。 1 2 3 4 5 6 7 8 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (buyday string) stored as parquet location 'obs://obs-sink/car_infos'; 从关联OBS路径中恢复分区信息。 1 alter table car_infos recover partitions; 示例二 该示例将car_info数据,以buyday字段为分区字段,csv为编码格式,转储数据到HDFS。 1 2 3 4 5 6 7 8 9 10 11 12 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "hdfs://node-master1sYAx:9820/user/car_infos", encode = "csv", field_delimiter = "," ); 数据最终在HDFS中的存储目录结构为:/user/car_infos/buyday=xx/part-x-x。
  • HDFS代理用户配置 登录 MRS 管理页面。 选择MRS的HDFS Namenode配置,在“自定义”中添加配置参数。 图1 HDFS服务配置 其中,core-site值名称“hadoop.proxyuser.myname.hosts”和“hadoop.proxyuser.myname.groups”中的“myname”为传入的krb认证用户名称。 需要保证写入HDFS数据路径权限为777。 配置完成后,单击“保存配置”进行保存。
  • 注意事项 使用文件系统输出流的Flink作业必须开启checkpoint,保证作业的一致性。 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启或者手动重启,需要配置为“从checkpoint恢复”。 checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。 checkpoint支持如下两种模式: AtLeastOnce:事件至少被处理一次。 ExactlyOnce:事件仅被处理一次。 使用文件系统输出流写入数据到OBS时,应避免多个作业写同一个目录的情况。 OBS对象存储桶的默认行为为覆盖写,可能导致数据丢失。 OBS并行文件系统桶的默认行为追加写,可能导致数据混淆。 因为以上OBS桶类型行为的区别,为避免作业异常重启可能导致的数据异常问题,请根据您的业务需求选择OBS桶类型。
  • 功能描述 创建sink流将数据输出到分布式文件系统(HDFS)或者 对象存储服务 (OBS)等文件系统。数据生成后,可直接对生成的目录创建非 DLI 表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。 OBS的更多信息,请参见《对象存储服务用户指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) [PARTITIONED BY (attr_name (',' attr_name)*] WITH ( type = "filesystem", file.path = "obs://bucket/xx", encode = "parquet", ak = "", sk = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出流类型。“type”为“filesystem”,表示输出数据到文件系统。 file.path 是 输出目录,格式为: schema://file.path。 当前schame只支持obs和hdfs。 当schema为obs时,表示输出到对象存储服务OBS。 当schema为hdfs时,表示输出到HDFS。HDFS需要配置代理用户,具体请参考HDFS代理用户配置。 示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为MRS集群NameNode所在节点信息。 encode 是 输出数据编码格式,当前支持“parquet”格式和“csv”格式。 当schema为obs时,输出数据编码格式仅支持“parquet”格式。 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。 ak 否 输出到OBS时该参数必填。用于访问OBS认证的accessKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《 数据湖探索 用户指南》。 sk 否 输出到OBS时该参数必填。用于访问OBS认证的secretKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《 数据湖 探索用户指南》。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 field_delimiter 否 属性分隔符。 当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,如:“,”。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《数据湖探索用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 功能描述 DLI将Flink作业的输出数据输出到分布式缓存服务(D CS )的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。 分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。 DCS的更多信息,请参见《分布式缓存服务用户指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。 region 是 数据所在的DCS所在区域。 cluster_address 是 Redis实例连接地址。 password 否 Redis实例连接密码,当设置为免密访问时,省略该配置项。 value_type 是 该参数可配置为如下选项或选项的组合: 支持指定插入数据类型,包括:string, list, hash, set, zset; 支持设置key的过期时间,包括expire, pexpire, expireAt, pexpireAt; 支持删除key命令,包括del, hdel; 当需要使用多个命令时,用“;”分隔。 key_value 是 设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。
  • 注意事项 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。 字符":", ",", ";", "$", "{", "}"已被征用为特殊分隔符,暂时没有提供转义功能,禁止在key和value中作为普通字符使用,否则会影响解析,导致程序异常。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dcs_redis", region = "", cluster_address = "", password = "", value_type= "",key_value= "" );
  • 示例 将流qualified_cars的数据输出到DCS服务的Redis类型的缓存实例中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) WITH ( type = "dcs_redis", cluster_address = "192.168.0.34:6379", password = "xxxxxxxx", value_type = "string; list; hash; set; zset", key_value = "${car_id}_str: ${car_owner}; name_list: ${car_owner}; ${car_id}_hash: {name:${car_owner}, age: ${car_age}}; name_set: ${car_owner}; math_zset: {${car_owner}:${average_speed}}" );