数据湖探索 DLI-Elasticsearch结果表:示例

时间:2025-02-12 15:00:46

示例

该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,在 DLI 上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Elasticsearch和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index:
    PUT /orders{  "settings": {    "number_of_shards": 1  },"mappings": {  "properties": {    "order_id": {      "type": "text"    },    "order_channel": {      "type": "text"    },    "order_time": {      "type": "text"    },    "pay_amount": {      "type": "double"    },    "real_pay": {      "type": "double"    },    "pay_time": {      "type": "text"    },    "user_id": {      "type": "text"    },    "user_name": {      "type": "text"    },    "area_id": {      "type": "text"    }  }}}
  4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource (  order_id string,  order_channel string,  order_time string,   pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'kafka',  'topic' = 'KafkaTopic',  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',  'properties.group.id' = 'GroupId',  'scan.startup.mode' = 'latest-offset',  "format" = "json");CREATE TABLE elasticsearchSink (  order_id string,  order_channel string,  order_time string,   pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'elasticsearch-7',  'hosts' = 'ElasticsearchAddress:ElasticsearchPort',  'index' = 'orders');insert into elasticsearchSink select * from kafkaSource;
  5. 连接Kafka集群,向kafka中插入如下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果:
    GET orders/_search
    {  "took" : 1,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 2,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "orders",        "_type" : "_doc",        "_id" : "ae7wpH4B1dV9conjpXeB",        "_score" : 1.0,        "_source" : {          "order_id" : "202103241000000001",          "order_channel" : "webShop",          "order_time" : "2021-03-24 10:00:00",          "pay_amount" : 100.0,          "real_pay" : 100.0,          "pay_time" : "2021-03-24 10:02:03",          "user_id" : "0001",          "user_name" : "Alice",          "area_id" : "330106"        }      },      {        "_index" : "orders",        "_type" : "_doc",        "_id" : "au7xpH4B1dV9conjn3er",        "_score" : 1.0,        "_source" : {          "order_id" : "202103241606060001",          "order_channel" : "appShop",          "order_time" : "2021-03-24 16:06:06",          "pay_amount" : 200.0,          "real_pay" : 180.0,          "pay_time" : "2021-03-24 16:10:06",          "user_id" : "0001",          "user_name" : "Alice",          "area_id" : "330106"        }      }    ]  }}
support.huaweicloud.com/sqlref-flink-dli/dli_08_0395.html