检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
DLI的数据可存储在哪些地方 DLI支持存储哪些格式的数据? DLI支持如下数据格式: Parquet CSV ORC Json Avro DLI服务的数据可以存储在哪些地方? OBS:SQL作业,Spark作业,Flink作业使用的数据均可以存储在OBS服务中,降低存储成本。
若配置了该参数,则'key.fields'也需要配置,否则kafka的记录中key会为空。 取值如下: csv json avro debezium-json canal-json maxwell-json avro-confluent raw 请参考Format页面以获取更多详细信息和格式参数。 key
变长二进制数据。需要带上前缀X,如:X'65683F',暂不支持指定长度的二进制字符串。 JSON 取值可以是a JSON object、a JSON array、a JSON number、a JSON string、true、false or null。 STRING 兼容impal
类型至 int96。 Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。 下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。 注意:复合数据类型暂只支持写不支持读(Array、Map 与 Row)。 表2 数据类型映射 Flink数据类型
null), 其中 something 是从 Flink 类型转换的 Avro 类型。 示例 从kafka中作为source的topic中读取json数据,并以confluent avro的形式写入作为sink的topic中。 根据kafka和ecs所在的虚拟私有云和子网创建相应的跨源
4291#section-2.5.5.2)来实现的。当创建一个IPv4时,会被映射到IPv6。当格式化时,如果数据是IPv4又会被重新映射为IPv4。 其他的地址则会按照RFC 5952所定义的规范格式来进行格式化。 示例: select IPADDRESS '10.0.0.1', IPADDRESS
sync:同步 请求示例 将db2.t2的数据导出至OBS,并以json格式存储。 { "data_path": "obs://home/data1/DLI/test", "data_type": "json", "database_name": "db2",
async:异步 sync:同步 请求示例 将SQL语句的查询结果导出到OBS中以json格式存储。 { "data_path": "obs://obs-bucket1/path", "data_type": "json", "compress": "gzip", "with_column_header":
format' = 'json', 'key.json.ignore-parse-errors' = 'true', 'key.fields' = 'user_id;item_id', 'value.format' = 'json', 'value.json.fail-on-missing-field'
data_path 是 String 导入或导出的文件路径。 data_type 是 String 导入或导出的数据类型(当前支持csv和json格式)。 database_name 是 String 导入或导出表所属的数据库名称。 table_name 是 String 导入或导出表的名称。
SQLContext(javaSparkContext); // // Read json file as DataFrame, read csv / parquet file, same as json file distribution // DataFrame dataFrame
'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'json', 'value.format' = 'json' ); insert into UPSERTKAFKASINK select * from orders;
X-Auth-Token 是 从IAM服务获取的用户Token。 Accept 是 默认值application/json。 Content-Type 是 指定类型为application/json。 charset 是 指定编码格式为utf8。 请求参数如表3所示。 表3 请求参数说明 参数 是否必选
= 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE printSink ( `topic` String, `partition` int,
'connector.document-type' = '', 'update-mode' = '', 'format.type' = 'json' ); 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,对于elastic
Y Y Y Y Y Y(8) \ CHAR N N N N N N N N Y VARBINARY N N N N N N N N N JSON N N N N N N N N Y DATE N N N N N N N N Y TIME N N N N N N N N Y TIME
表示永久不失效。 compaction.async.enabled 否 false Boolean 是否开启在线压缩。 true:开启 false:关闭 建议关闭在线压缩,提升性能。但是调度compaction.schedule.enabled仍然建议开启,之后可通过离线异步压缩,执行阶段性生成的压缩plan。
region' = 'xxxxx', 'connector.channel' = 'dis-input', 'format.type' = 'json' ); /** sink **/ CREATE TABLE cars_infos_out (cast_int_to_string string
com:443 Content-Type 消息体的类型(格式),默认取值为“application/json”,有其他取值时会在具体接口中专门说明。 是 application/json Content-Length 请求body长度,单位为Byte。 POST/PUT请求必填。 GET不能包含。
SimpleJsonBuild(); System.out.println(sjb.eval("json1", "json2", "json3", "json4")); } } 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline