云服务器内容精选

  • 示例 CS V编码格式:数据输出到DIS通道,使用csv编码,并且以逗号为分隔符,多个分区用car_owner做为key进行分发。数据输出示例:"ZJA710XC", "lilei", "BMW", 700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "xxx", channel = "dlioutput", encode = "csv", field_delimiter = "," ); JSON编码格式:数据输出到DIS通道,使用json编码,多个分区用car_owner,car_brand 做为key进行分发,“enableOutputNull”为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。数据示例:"car_id ":"ZJA710XC", "car_owner ":"lilei", "car_brand ":"BMW", "car_price ":700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", channel = "dlioutput", region = "xxx", partition_key = "car_owner,car_brand", encode = "json", enable_output_null = "false" );
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_key = "", encode= "", field_delimiter= "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dis表示输出到 数据接入服务 。 region 是 数据所在的DIS所在区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 DIS通道。 partition_key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。 encode 是 数据编码格式,可选为“csv”、“json”和“user_defined”。 说明: 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需使用“enable_output_null”来配置是否输出空字段,具体见示例。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 是 属性分隔符。 当编码格式为csv时,需要设置属性分隔符,用户可以自定义,如:“,”。 当编码格式为json时,则不需要设置属性之间的分隔符。 json_config 否 当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2”。 enable_output_null 否 当编码格式为json时,需使用该参数来配置是否输出空字段。 当该参数为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。默认值为“true”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。
  • 功能描述 DLI 将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • HDFS代理用户配置 登录 MRS 管理页面。 选择MRS的HDFS Namenode配置,在“自定义”中添加配置参数。 图1 HDFS服务配置 其中,core-site值名称“hadoop.proxyuser.myname.hosts”和“hadoop.proxyuser.myname.groups”中的“myname”为传入的krb认证用户名称。 需要保证写入HDFS数据路径权限为777。 配置完成后,单击“保存配置”进行保存。
  • 示例 示例一: 该示例将car_info数据,以buyday字段为分区字段,parquet为编码格式,转储数据到OBS。 1 2 3 4 5 6 7 8 9 10 11 12 13 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/buyday=xx/part-x-x。 数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理: 创建OBS分区表。 1 2 3 4 5 6 7 8 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (buyday string) stored as parquet location 'obs://obs-sink/car_infos'; 从关联OBS路径中恢复分区信息。 1 alter table car_infos recover partitions; 示例二 该示例将car_info数据,以buyday字段为分区字段,csv为编码格式,转储数据到HDFS。 1 2 3 4 5 6 7 8 9 10 11 12 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "hdfs://node-master1sYAx:9820/user/car_infos", encode = "csv", field_delimiter = "," ); 数据最终在HDFS中的存储目录结构为:/user/car_infos/buyday=xx/part-x-x。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) [PARTITIONED BY (attr_name (',' attr_name)*] WITH ( type = "filesystem", file.path = "obs://bucket/xx", encode = "parquet", ak = "", sk = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出流类型。“type”为“filesystem”,表示输出数据到文件系统。 file.path 是 输出目录,格式为: schema://file.path。 当前schame只支持obs和hdfs。 当schema为obs时,表示输出到 对象存储服务 OBS。 当schema为hdfs时,表示输出到HDFS。HDFS需要配置代理用户,具体请参考HDFS代理用户配置。 示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为MRS集群NameNode所在节点信息。 encode 是 输出数据编码格式,当前支持“parquet”格式和“csv”格式。 当schema为obs时,输出数据编码格式仅支持“parquet”格式。 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。 ak 否 输出到OBS时该参数必填。用于访问OBS认证的accessKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《 数据湖探索 用户指南》。 sk 否 输出到OBS时该参数必填。用于访问OBS认证的secretKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《 数据湖 探索用户指南》。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 field_delimiter 否 属性分隔符。 当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,如:“,”。
  • 注意事项 使用文件系统输出流的Flink作业必须开启checkpoint,保证作业的一致性。 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启或者手动重启,需要配置为“从checkpoint恢复”。 checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。 checkpoint支持如下两种模式: AtLeastOnce:事件至少被处理一次。 ExactlyOnce:事件仅被处理一次。 使用文件系统输出流写入数据到OBS时,应避免多个作业写同一个目录的情况。 OBS对象存储桶的默认行为为覆盖写,可能导致数据丢失。 OBS并行文件系统桶的默认行为追加写,可能导致数据混淆。 因为以上OBS桶类型行为的区别,为避免作业异常重启可能导致的数据异常问题,请根据您的业务需求选择OBS桶类型。
  • 功能描述 创建sink流将数据输出到分布式文件系统(HDFS)或者对象存储服务(OBS)等文件系统。数据生成后,可直接对生成的目录创建非DLI表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《数据湖探索用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );