云服务器内容精选

  • 示例 将kafkaSink的数据输出到Kafka中 1 2 3 4 5 6 7 8 9 10 11 12 13 create table kafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.sink-partitioner' = 'round-robin', 'format.type' = 'csv' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 否 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据序列化格式,支持:'csv'、 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.sink-partitioner 否 记录分区的方式,支持:'fixed', 'round-robin'及'custom'。 connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' 。 update-mode 否 支持:'append'、'retract'及'upsert'三种写入模式。 connector.properties.* 否 配置kafka任意原生属性
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 创建 MRS 的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本,且勿开启kerberos认证。 ClickHouse结果表不支持删除表数据操作。 Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • 示例 从Kafka中读取数据,并将数据插入到数据库为flink、表名为order的ClickHouse数据库中,其具体步骤如下(clickhouse版本为MRS的21.3.4.25): 参考增强型跨源连接,在 DLI 上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink作业队列。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector.type' = 'clickhouse', 'connector.url' = 'jdbc:clickhouse://ClickhouseAddress:ClickhousePort/flink', 'connector.table' = 'order', 'connector.write.flush.max-rows' = '1' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110
  • 前提条件 确保已创建kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与Kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 示例(适用于Kafka集群未开启SASL_SSL场景) 该示例是从Kafka的一个topic中读取数据,并使用Kafka结果表将数据写入到kafka的另一个topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', "format" = "json" ); insert into kafkaSink select * from kafkaSource; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"} 连接Kafka集群,在Kafka的sink topic读取数据,参考如下: {"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table kafkaSink( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'format' = '' );
  • 示例 该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Elasticsearch和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index: PUT /orders { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } } } 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'ElasticsearchAddress:ElasticsearchPort', 'index' = 'orders' ); insert into elasticsearchSink select * from kafkaSource; 连接Kafka集群,向kafka中插入如下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果: GET orders/_search { "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "orders", "_type" : "_doc", "_id" : "ae7wpH4B1dV9conjpXeB", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "orders", "_type" : "_doc", "_id" : "au7xpH4B1dV9conjn3er", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } } ] } }
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,固定为:elasticsearch-7。表示连接到 Elasticsearch 7.x 及更高版本集群。 hosts 是 无 String Elasticsearch所在集群的主机名,多个以';'间隔。 index 是 无 String 每条记录的 Elasticsearch 索引。可以是静态索引(例如'myIndex')或动态索引(例如'index-{log_ts|yyyy-MM-dd}')。 username 否 无 String Elasticsearch所在集群的账号。该账号参数需和密码“password”参数同时配置。 password 否 无 String Elasticsearch所在集群的密码。该密码参数需和“username”参数同时配置。 certificate 否 无 String Elasticsearch集群的证书在obs中的位置。 例如:obs://bucket/path/CloudSearchService.cer 仅在开启安全模式,且开启https,且未使用其他跨源认证的场景下下需要配置该参数。 document-id.key-delimiter 否 _ String 连接复合主键的拼接符,默认为_。 failure-handler 否 fail String 对Elasticsearch请求失败时的故障处理策略。有效的策略是: fail: 如果请求失败并因此导致作业失败,则抛出异常。 ignore: 忽略失败并丢弃请求。 retry-rejected:重新添加由于队列容量饱和而失败的请求。 自定义类名:用于使用ActionRequestFailureHandler子类进行故障处理。 sink.flush-on-checkpoint 否 true Boolean 是否在检查点刷新。 如果配置为false,在Elasticsearch进行Checkpoint时,connector将不等待确认所有pending请求已完成。因此,connector不会为请求提供at-least-once保证。 sink.bulk-flush.max-actions 否 1000 Interger 每个批量请求的最大缓冲操作数。可以设置'0'为禁用它。 sink.bulk-flush.max-size 否 2mb MemorySize 每个批量请求的缓冲操作的内存中的最大大小。必须是MB粒度。可以设置'0'为禁用它。 sink.bulk-flush.interval 否 1s Duration 刷新缓冲操作的间隔。可以设置'0'为禁用它。 请注意: 'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions' 都可以设置为'0'刷新间隔,从而允许对缓冲操作进行完整的异步处理。 sink.bulk-flush.backoff.strategy 否 DISABLED String 指定在任何刷新操作由于临时请求错误而失败时如何执行重试。有效的策略是: DISABLED:未执行重试,即在第一个请求错误后失败。 CONSTANT:等待重试之间的退避延迟。 EXPONENTIAL:最初等待退避延迟并在重试之间呈指数增加。 sink.bulk-flush.backoff.max-retries 否 8 Integer 最大退避重试次数。 sink.bulk-flush.backoff.delay 否 50ms Duration 每次退避尝试之间的延迟。 对于CONSTANT退避,这只是每次重试之间的延迟。 对于EXPONENTIAL退避,这是初始基本延迟。 connection.max-retry-timeout 否 无 Duration 重试之间的最大超时时间。 connection.path-prefix 否 无 String 要添加到每个REST通信的前缀字符串,例如, '/v1'。 format 否 json String Elasticsearch连接器支持指定格式。该格式必须生成有效的 json 文档。默认情况下使用内置'json'格式。 请参考Format页面以获取更多详细信息和格式参数。 pwd_auth_name 否 无 String Password类型的跨源认证名称。 仅在使用 CSS 类型的跨源认证时配置该参数。 es_auth_name和pwd_auth_name只能配置一个。 es_auth_name 否 无 String CS S类型的跨源认证的名称。 仅在使用CSS类型的跨源认证时配置该参数。 es_auth_name和pwd_auth_name只能配置一个。
  • 前提条件 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 请务必确保您的账户下已在 云搜索服务 里创建了集群。如何创建集群请参考《 云搜索 服务用户指南》中创建集群章节。 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 功能描述 DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于 日志分析 、站内搜索等场景。 云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。 云搜索服务的更多信息,请参见《云搜索服务用户指南》
  • 注意事项 当前只支持CSS集群7.X及以上版本,推荐使用7.6.2版本。 若未开启安全模式,无需使用任何跨源认证,即无需配置pwd_auth_name、es_auth_name、user_name、password、certificate,且语法中hosts字段值以http开头。 若开启安全模式,未开启https: 方法1:推荐使用password类型跨源认证,并配置pwd_auth_name为跨源认证的名称,且语法中hosts字段值以http开头。 方法2:不使用跨源认证,但需要配置用户名username、密码password,且语法中hosts字段值以http开头。 若开启安全模式,开启https: 方法1:推荐使用CSS类型跨源认证名称,并配置es_auth_name为跨源认证的名称。请注意该场景hosts字段值以https开头。 方法2:不使用跨源认证,但需要配置用户名username、密码password、证书位置certificate。请注意该场景hosts字段值以https开头。 CSS集群安全组入向规则必须开启ICMP。 数据类型的使用,请参考Format章节。 提交Flink作业前,建议勾选“保存作业日志”参数,在OBS桶选项中选择日志保存的位置,方便后续作业提交失败或运行异常时,查看日志并分析问题原因。 Elasticsearch结果表根据是否定义了主键确定是在upsert模式还是在append模式下工作。 如果定义了主键,Elasticsearch Sink将在upsert模式下工作,该模式可以消费包含UPDATE和DELETE的消息。 如果未定义主键,Elasticsearch Sink将以append模式工作,该模式只能消费INSERT消息。 在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。文档ID为最多512个字节不包含空格的字符串。Elasticsearch结果表通过使用“document-id.key-delimiter”参数指定的键分隔符按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。如果未指定主键,Elasticsearch将自动生成随机的文档ID。 Elasticsearch结果表同时支持静态索引和动态索引。 如果使用静态索引,则索引选项值应为纯字符串,例如myusers,所有记录都将被写入myusers索引。 如果使用动态索引,可以使用{field_name}引用记录中的字段值以动态生成目标索引。您还可以使用 {field_name|date_format_string}将TIMESTAMP、DATE和TIME类型的字段值转换为date_format_string指定的格式。date_format_string与Java的DateTimeFormatter兼容。例如,如果设置为myusers-{log_ts|yyyy-MM-dd},则log_ts字段值为2020-03-27 12:25:55的记录将被写入myusers-2020-03-27索引。
  • 语法格式 create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'elasticsearch-7', 'hosts' = '', 'index' = '' );