云服务器内容精选
-
示例1 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到30min或128MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min' ); INSERT into sink_table SELECT * from orders;
-
示例2 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'parquet', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
-
参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 固定位filesystem。 path 是 无 String OBS路径。 format 是 无 String 文件格式。 支持csv、parquet格式。 sink.rolling-policy.file-size 否 128MB MemorySize 单个part文件最大大小,超过该数值会滚动产生新文件。 说明: RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后再转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。 sink.rolling-policy.rollover-interval 否 30 min Duration 单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。 说明: 该参数数字与单位之间必须要有空格。 支持的时间单位包括: d,h,min,s,ms等。 对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。 sink.rolling-policy.check-interval 否 1 min Duration 基于时间的滚动策略的检查间隔。 该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。 auto-compaction 否 false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。 compaction.file-size 否 `sink.rolling-policy.file-size`的大小 MemorySize 合并目标文件大小,默认值为滚动文件大小。 说明: 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。
-
功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者 对象存储服务 OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。更多信息参考文件系统 SQL 连接器 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
-
语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'auto-compaction' = 'true' );
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格