CSS兼容Elasticsearch吗
云搜索服务(Cloud Search Service)是一个基于Elasticsearch且完全托管的在线分布式搜索服务,为用户提供结构化、非结构化文本的多条件检索、统计、报表。完全兼容开源Elasticsearch原生接口,无需编程即可对接多种数据源。
CSS 向量数据库 GaussDB向量数据库 基于华为云自研的向量搜索引擎,针对大规模的向量检索场景提供了业界性能领先的向量数据库服务。 基于华为云自研的向量搜索引擎,针对大规模的向量检索场景提供了业界性能领先的向量数据库服务。 购买 控制台 价格计算器 云原生高可用 全球首款在
软硬协同性能提升30%,兼容标准SQL 全托管NoSQL 表格存储服务 CloudTable 千万级TPS,毫秒级随机读写能力 兼容开源Elasticsearch的在线分布式搜索 云搜索服务 CSS 向量检索能力相比开源ES提升10+倍 图引擎服务 图引擎服务 GES 国内首个商用且拥有自主知识产权
数据仓库服务 GaussDB(DWS)兼容性 GaussDB(DWS)采用开放性的设计理念,不但支持标准数仓功能,还致力于融合大数据平台、与云原生服务互联互通。 无缝对接Hadoop GaussDB(DWS)可以无缝对接HDFS存储,通过外表机制,能够交互式查询分析Hadoop平
古大模型能力,构筑自己的模型推理服务;、 搭配使用 专属计算集群 DCC 专属分布式存储DSS 裸金属服务器 BMS AI开发平台 ModelArts 可支持云服务 资源服务 基础服务 数据库服务 安全服务 应用服务 大数据服务 专属计算集群服务 为租户提供物理隔离的云上专属计算
EVS 为计算服务提供持久性块存储的服务 云备份 CBR 为云服务器、云硬盘等提供备份服务 键值存储服务 KVS Serverless KV存储服务 数据工坊 DWR 开放的近数据处理服务 专属分布式存储服务 DSS 提供独享的物理存储资源 弹性文件服务 SFS 为计算实例提供完全托管的NAS文件存储
查看更多 即刻 免费试用 开启您的上云之旅 免费试用 您可能感兴趣的产品 您可能感兴趣的产品 MapReduce服务 MRS 企业级大数据集群云服务 云搜索服务 CSS 提供多条件检索与分析能力 云数据仓库 GaussDB(DWS) 极致性能、稳定、按需扩展的数据仓库
日志分析服务 LOG 日志分析服务 LOG 日志分析服务(Log Analysis Service, 简称LOG)一站式海量实时日志分析服务,提供日志实时采集、智能分析与可视化、转储等功能。提供端到端的快速、易用、丰富的日志分析平台 日志分析服务(Log Analysis Service
高性能 裸金属服务器配合InfiniBand技术,达到极高吞吐量和极低的时延,保证大数据离线计算超高的性能 高可靠 分布式存储跨可用区容灾技术,数据持久度高达12个9,优良数据可靠性 推荐产品 对象存储服务 OBS MapReduce服务 MRS 数据湖探索 DLI 云搜索服务 CSS
分布式消息服务 DMS 分布式消息服务 DMS 兼容业界主流的Kafka、RocketMQ、RabbitMQ的高可用消息队列服务,适用于应用解耦、突发流量处理以及与第三方应用的集成 兼容业界主流的Kafka、RocketMQ、RabbitMQ的高可用消息队列服务,适用于应用解耦、突发流量处理以及与第三方应用的集成
兼容性测试服务,包括脚本兼容测试、标准兼容测试和专家兼容测试三项服务内容,深度发现并定位APP 兼容性问题,帮助定位提升产品,提供详细测试报告。1.脚本兼容测试在海量手机上自动执行,从安装、启动、运行、功能、UI等多维度,深度发现并定位APP 兼容性问题,可定制测试脚本,覆盖所需
此镜像为Java环境,操作系统:Anolis(兼容 CentOS ),更新时间2024年8月。您可以一键快速搭建自己的Java环境服务器。一、镜像说明此镜像为Java环境,操作系统:Anolis(兼容 CentOS ),更新时间2024年8月。您可以 一键快速搭建自己的Java环境服务器。二、java环境信息外网地址:
,系统Anolis 兼容CentOS,该版本已做安全加固,系统已更新至最新。您可以 一键快速搭建自己的LAMP环境服务器。一、产品说明此镜像为LAMP环境镜像 ,系统Anolis 兼容CentOS,该版本已做安全加固,系统已更新至最新。您可以 一键快速搭建自己的LAMP环境服务器。二、镜像信息外网地址:
VPN,操作系统:Anolis(兼容CentOS),您可以一键快速搭建自己的 IPsec VPN 服务器。支持 IPsec/L2TP协议。一、镜像说明:此镜像为IPsec VPN,系统:Anolis(兼容CentOS),更新时间2024年8月。您可以一键快速搭建自己的IPsec VPN 服务器。支持I
,系统 Anolis 兼容 CentOS ,更新时间2024年8月。您可以 一键快速搭建自己的LNMP环境服务器。一、产品介绍此镜像为LNMP环境镜像 ,系统 Anolis 兼容 CentOS ,更新时间2024年8月。您可以 一键快速搭建自己的LNMP环境服务器。二、镜像信息外网地址:
、分辨率、系统版本机型上的兼容性问题,并提供详细的测试报告。兼容性测试在海量机型上通过自动化与人工复核的方式,针对安装、启动、运行、功能、性能、UI等多维度定位应用在不同品牌、分辨率、系统版本机型上的兼容性问题,并提供详细的测试报告。帮助企业避免由于兼容性问题而引起的客户投诉、客
,系统Anolis(兼容 CentOS ),更新时间 2024 年 8 月。您可以一键快速搭建自己的Nodejs服务器 。一、商品介绍此镜像为Nodejs镜像 ,系统Anolis(兼容 CentOS ),更新时间 2024 年 8 月。您可以一键快速搭建自己的Nodejs服务器 。二、服
提供面向OpenHarmony商用化的 硬件适配、性能调优、认证服务深资专家坐镇 , 经验丰富
发送国 "sendLogisticsCompany": "00173", // 发件地快递公司 "purposeCountry": "BRONX, NY, 10462, US, United States", // 目的国 "purposeLogisticsCompany": "00172"
华为云云搜索服务 CSS 华为云云搜索服务 CSS 云搜索服务(Cloud Search Service,简称CSS),是华为云ELK生态的一系列软件集合,为您全方位提供托管的ELK生态云服务,兼容Elasticsearch、Kibana、Cerebro等软件。 云搜索服务中El
Elasticsearch镜像下载 移动端下载镜像请点击展开详情 Elasticsearch镜像 是Elastic组件,开源的分布式、RESTful 风格的搜索和数据分析引擎。Elasticsearch用于云计算中,能够达到实时搜索,稳定
IP地址和端口。 2、对象存储服务(Object Storage Service,简称OBS)上的备份文件以及TaurusDB服务使用的弹性云服务器(Elastic Cloud Server,简称ECS),都对用户不可见,它们只对TaurusDB服务的后台管理系统可见。 3、查看
查询分析,数据格式兼容CSV、JSON、Parquet和ORC主流数据格式。 DLI用户可以通过可视化界面、Restful API、JDBC、ODBC、Beeline等多种接入方式对云上CloudTable、RDS和DWS等异构数据源进行查询分析,数据格式兼容CSV、JSON、Parquet和ORC主流数据格式。
令人难以置信的速度将.scss文件本地编译为css,并通过连接中间件自动编译。 Sass是一种预处理器脚本语言,可以解释或编译成层叠样式表(CSS)。Sass包含两种语法:较旧的语法使用缩进将代码块和换行符分隔为单独的规则;较新的语法SCSS使用像CSS这样的块格式。它使用大括号
Kibana镜像下载 移动端下载镜像请点击展开详情 Kibana镜像 Kibana是一个为 Elasticsearch 平台分析和可视化的开源平台,使用 Kibana 能够搜索、展示存储在 Elasticsearch 中的索引数据。使用它可以很方便用图表、表格、地图展示和分析数据。 更多详情请下载文件查看
集 【Elasticsearch镜像】Elastic组件,开源的分布式、RESTful 风格的搜索和数据分析引擎 【ChromeDriver镜像】Chrome浏览器引擎驱动 【Kibana镜像】Elastic组件,开源的分析和可视化平台,设计用于和Elasticsearch一起工作
集 【Elasticsearch镜像】Elastic组件,开源的分布式、RESTful 风格的搜索和数据分析引擎 【ChromeDriver镜像】Chrome浏览器引擎驱动 【Kibana镜像】Elastic组件,开源的分析和可视化平台,设计用于和Elasticsearch一起工作
集 【Elasticsearch镜像】Elastic组件,开源的分布式、RESTful 风格的搜索和数据分析引擎 【ChromeDriver镜像】Chrome浏览器引擎驱动 【Kibana镜像】Elastic组件,开源的分析和可视化平台,设计用于和Elasticsearch一起工作
CSS兼容Elasticsearch吗
使用华为云CSS服务的Logstash集群可以实现Elasticsearch集群间的数据迁移。
应用场景
华为云Logstash是一款全托管的数据接入处理服务,兼容开源Logstash的能力,支持用于Elasticsearch集群间数据迁移。
通过华为云Logstash可以实现华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch迁移至华为云Elasticsearch,该方案常用于以下场景:
- 跨版本迁移:利用Logstash的兼容性和灵活性,实现不同版本间的数据迁移,确保数据在新版本中的可用性和一致性。适用于Elasticsearch集群版本跨度较大的迁移场景,例如从6.X版本迁移至7.X版本。
- 集群合并:使用Logstash进行数据迁移,将多个Elasticsearch集群的数据整合到一个Elasticsearch集群中,实现多个Elasticsearch数据的统一管理和分析。
- 服务迁移上云:将自建的Elasticsearch服务迁移到云平台,以利用云服务的可扩展性、维护简便性和成本效益。
- 变更服务提供商:如果企业当前使用的是第三方Elasticsearch服务,但出于成本、性能或其他战略考虑,希望更换服务提供商至华为云。
方案架构
通过华为云Logstash实现Elasticsearch集群间数据迁移的迁移流程如图1所示。
- 输入(Input):华为云Logstash接收来自华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch的数据。
华为云Elasticsearch、自建Elasticsearch或第三方Elasticsearch数据迁移到华为云Elasticsearch的操作步骤相同,只是获取源集群的访问地址有差异,具体请参见获取Elasticsearch集群信息。
- 过滤(Filter):华为云Logstash对数据进行清洗和转换。
- 输出(Output):华为云Logstash将数据输出到目标设备,如华为云Elasticsearch。
- 全量数据迁移:使用Logstash进行全量数据迁移,适用于迁移初期或需要确保数据完整性的场景。
- 增量数据迁移:通过Logstash配置增量查询,可以只迁移有增量字段的索引数据。此方法适用于需要持续同步数据或对数据实时性有较高要求的场景。
方案优势
- 高版本兼容性:适用于不同版本的Elasticsearch集群迁移。
- 高效的数据处理能力:Logstash支持批量读写操作,可以大幅度提高数据迁移的效率。
- 并发同步技术:利用slice并发同步技术,可以提高数据迁移的速度和性能,尤其是在处理大规模数据时。
- 配置简单:华为云Logstash的配置相对简单直观,通过配置文件即可实现数据的输入、处理和输出。
- 强大的数据处理功能:Logstash内置了丰富的过滤器,可以在迁移过程中对数据进行清洗、转换和丰富。
- 灵活的迁移策略:根据业务需求,可以灵活选择全量迁移或增量迁移,优化存储使用和迁移时间。
性能影响
- 对于资源消耗较高的集群,建议通过调整size参数来减缓迁移速率,或者选择在业务流量低谷时段进行迁移操作,以减轻对集群资源的影响。
- 对于资源消耗较低的集群,在迁移时可以采用默认参数配置,建议同时监控源集群的性能负载,并根据实际情况适时调整size和slice参数,以优化迁移效率和资源使用。
约束限制
集群迁移过程中,源集群的索引数据不能增删改,否则会导致迁移后的源集群数据和目标集群数据内容不一致。
前提条件
- 源Elasticsearch集群和目标Elasticsearch集群处于可用状态。
- 集群间需要保证网络连通。
- 确认集群的索引已开启“_source”。
集群索引的“_source”默认是开启的。执行命令GET {index}/_search,当返回的索引信息里有“_source”信息时表示已开启。
操作步骤
- 获取Elasticsearch集群信息
- 准备迁移环境:创建 ECS 并准备必要的迁移工具和脚本。
- 创建Logstash集群:创建一个Logstash集群用于迁移数据。
- 验证集群间的网络连通性:验证Logstash和源Elasticsearch集群的连通性。
- 使用Logstash迁移集群
- 在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据。
- 在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据。
- 释放Logstash集群:当集群迁移完成后,请及时释放Logstash集群。
获取Elasticsearch集群信息
在迁移集群前,需要先获取必备的集群信息,用于配置迁移任务。
集群来源 |
要获取的信息 |
获取方式 |
|
---|---|---|---|
源集群 |
华为云Elasticsearch集群 |
|
|
自建Elasticsearch集群 |
|
联系服务管理员获取。 |
|
第三方Elasticsearch集群 |
|
联系服务管理员获取。 |
|
目标集群 |
华为云Elasticsearch集群 |
|
|
源集群的来源不同,获取信息的方式不同,此处仅介绍如何获取华为云Elasticsearch集群的信息。
准备迁移环境
创建ECS并准备必要的迁移工具和脚本。
- 创建弹性 云服务器ECS ,用于迁移源集群的元数据。
- 创建弹性 云服务器 ECS,ECS的操作系统选择CentOS,规格选择2U4G,且和CSS服务的集群在同一个 虚拟私有云 和安全组中。
- 测试ECS和源集群、目标集群的连通性。
在ECS执行命令curl http:// {ip}:{port}测试连通性,当返回200时,则表示已经连通。
IP是源集群和目标集群访问地址;port是端口号,默认是9200,请以集群实际端口号为准。
curl http://10.234.73.128:9200 # 输入实际的IP地址,此处仅以非安全集群举例。 { "name" : "es_cluster_migrate-ess-esn-1-1", "cluster_name" : "es_cluster_migrate", "cluster_uuid" : "1VbP7-39QNOx_R-llXKKtA", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "Tagline" : "You Know, for Search" }
- 准备工具和软件,判断ECS是否可以联网,选择不同的安装方式。
- 在线安装工具和软件。
- 执行yum install python2安装python2。
[root@ecs opt]# yum install python2
- 执行yum install python-pip安装pip。
[root@ecs opt]# yum install python-pip
- 执行pip install pyyaml安装yaml依赖。
- 执行pip install requests安装requests依赖。
- 执行yum install python2安装python2。
- 离线安装工具和软件。
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
图3 下载python2安装包
- 使用WinSCP工具上传Python安装包到opt目录下,安装python。
# 解压python压缩包 [root@ecs-52bc opt]# tar -xvf Python-2.7.18.tgz Python-2.7.18/Modules/zlib/crc32.c Python-2.7.18/Modules/zlib/gzlib.c Python-2.7.18/Modules/zlib/inffast.c Python-2.7.18/Modules/zlib/example.c Python-2.7.18/Modules/python.c Python-2.7.18/Modules/nismodule.c Python-2.7.18/Modules/Setup.config.in … # 解压完成进入目录 [root@ecs-52bc opt]# cd Python-2.7.18 # 检查文件配置安装路径 [root@ecs-52bc Python-2.7.18]# ./configure --prefix=/usr/local/python2 … checking for build directories... checking for --with-computed-gotos... no value specified checking whether gcc -pthread supports computed gotos... yes done checking for ensurepip... no configure: creating ./config.status config.status: creating Makefile.pre config.status: creating Modules/Setup.config config.status: creating Misc/python.pc config.status: creating Modules/ld_so_aix config.status: creating pyconfig.h creating Modules/Setup creating Modules/Setup.local creating Makefile # 编译python [root@ecs-52bc Python-2.7.18]# make # 安装python [root@ecs-52bc Python-2.7.18]# make install
- 安装完成检查,检查python安装结果。
# 检查python版本 [root@ecs-52bc Python-2.7.18]# python --version Python 2.7.5 # 检查pip版本 [root@ecs-52bc Python-2.7.18]# pip --version pip 7.1.2 from /usr/lib/python2.7/site-packages/pip-7.1.2-py2.7.egg (python 2.7) [root@ecs-52bc Python-2.7.18]#
- 下载python2安装包,下载地址https://www.python.org/downloads/release/python-2718/。选择源码下载安装。
- 准备Elasticsearch集群的索引迁移脚本。
- 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息。
es_cluster_new: # 源集群的名称 clustername: es_cluster_new # 源Elasticsearch集群的访问地址,加上“http://”。 src_ip: http://x.x.x.x:9200 # 访问源Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。 src_username: "" src_password: "" # 目标Elasticsearch集群的访问地址,加上“http://”。 dest_ip: http://x.x.x.x:9200 # 访问目标Elasticsearch集群的用户名和密码,如果为非安全集群则设置为""。 dest_username: "" dest_password: "" # “only_mapping”可以不定义,默认值为false,需要搭配“migrateMapping.py”使用,表示是否只处理这个文件中mapping地址的索引。当设置成true时,则只迁移源集群中和下面mapping的key一致的索引数据;当设置成false时,则迁移源集群中除“.kibana”和“.*”之外的所有索引数据。 # 迁移过程中会将索引名称与下面的mapping匹配,如果匹配一致,则使用mapping的value作为目标集群的索引名称;如果匹配不到,则使用源集群原始的索引名称。 only_mapping: false # 设置要迁移的索引,key为源集群的索引名字,value为目标集群的索引名字。 mapping: test_index_1: test_index_1 # “only_compare_index”可以不定义,默认值为false,需要搭配“checkIndices.py”使用,当设置为false会比较所有的索引和文档数量,当设置为true只比较索引数量。 only_compare_index: false
- 执行vi migrateTemplate.py命令,直接复制输入以下内容无需修改,执行wq保存为索引模板迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def put_template_to_target(url, template, cluster, template_name, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(template), auth=dest_auth, verify=False) if not os.path.exists("templateLogs"): os.makedirs("templateLogs") if create_resp.status_code != 200: print( "create template " + url + " failed with response: " + str( create_resp) + ", source template is " + template_name) print(create_resp.text) filename = "templateLogs/" + str(cluster) + "#" + template_name with open(filename + ".json", "w") as f: json.dump(template, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migration template!") config = loadConfig(argv) src_clusters = config.keys() print("process cluster name:") for name in src_clusters: print(name) print("cluster total number:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster name:" + name) source_url = value["src_ip"] + "/_template" response = requests.get(source_url, auth=source_auth, verify=False) if response.status_code != 200: print("*** get all template failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue all_template = response.json() migrate_itemplate = [] for template in all_template.keys(): if template.startswith(".") or template == "logstash": continue if "index_patterns" in all_template[template]: for t in all_template[template]["index_patterns"]: # if "kibana" in template: if t.startswith("."): continue migrate_itemplate.append(template) for template in migrate_itemplate: dest_index_url = value["dest_ip"] + "/_template/" + template result = put_template_to_target(dest_index_url, all_template[template], name, template, dest_auth) if result is True: print('[success] delete success, cluster: %-10s, template %-10s ' % (str(name), str(template))) else: print('[failure] delete failure, cluster: %-10s, template %-10s ' % (str(name), str(template))) if __name__ == '__main__': main(sys.argv)
- 执行vi migrateMapping.py命令,直接复制输入以下内容无需修改,执行wq保存为索引结构迁移脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # config = yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True def process_mapping(index_mapping, dest_index): # remove unnecessary keys del index_mapping["settings"]["index"]["provided_name"] del index_mapping["settings"]["index"]["uuid"] del index_mapping["settings"]["index"]["creation_date"] del index_mapping["settings"]["index"]["version"] if "lifecycle" in index_mapping["settings"]["index"]: del index_mapping["settings"]["index"]["lifecycle"] # check alias aliases = index_mapping["aliases"] for alias in list(aliases.keys()): if alias == dest_index: print( "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.") del index_mapping["aliases"][alias] # if index_mapping["settings"]["index"].has_key("lifecycle"): if "lifecycle" in index_mapping["settings"]["index"]: lifecycle = index_mapping["settings"]["index"]["lifecycle"] opendistro = {"opendistro": {"index_state_management": {"policy_id": lifecycle["name"], "rollover_alias": lifecycle["rollover_alias"]}}} index_mapping["settings"].update(opendistro) # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"] del index_mapping["settings"]["index"]["lifecycle"] # replace synonyms_path if "analysis" in index_mapping["settings"]["index"]: analysis = index_mapping["settings"]["index"]["analysis"] if "filter" in analysis: filter = analysis["filter"] if "my_synonym_filter" in filter: my_synonym_filter = filter["my_synonym_filter"] if "synonyms_path" in my_synonym_filter: index_mapping["settings"]["index"]["analysis"]["filter"]["my_synonym_filter"][ "synonyms_path"] = "/rds/datastore/elasticsearch/v7.10.2/package/elasticsearch-7.10.2/plugins/analysis-dynamic-synonym/config/synonyms.txt" return index_mapping def getAlias(source, source_auth): # get all indices response = requests.get(source + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return system_index, create_index def put_mapping_to_target(url, mapping, cluster, source_index, dest_auth=None): headers = {'Content-Type': 'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth, verify=False) if not os.path.exists("mappingLogs"): os.makedirs("mappingLogs") if create_resp.status_code != 200: print( "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + str(source_index)) print(create_resp.text) filename = "mappingLogs/" + str(cluster) + "#" + str(source_index) with open(filename + ".json", "w") as f: json.dump(mapping, f) return False else: return True def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) print("start to process cluster: " + name) # only deal with mapping list if 'only_mapping' in value and value["only_mapping"]: for source_index, dest_index in value["mapping"].iteritems(): print("start to process source index" + source_index + ", target index: " + dest_index) source_url = source + "/" + source_index response = requests.get(source_url, auth=source_auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) continue mapping = response.json() index_mapping = process_mapping(mapping[source_index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, source_index, dest_auth) if result is False: print("cluster name:" + name + ", " + source_index + ":failure") continue print("cluster name:" + name + ", " + source_index + ":success") else: # get all indices system_index, create_index = getAlias(source, source_auth) success_index = 0 for index in create_index: source_url = source + "/" + index index_response = requests.get(source_url, auth=source_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) continue mapping = index_response.json() dest_index = index if 'mapping' in value: if index in value["mapping"].keys(): dest_index = value["mapping"][index] index_mapping = process_mapping(mapping[index], dest_index) dest_url = dest + "/" + dest_index result = put_mapping_to_target(dest_url, index_mapping, name, index, dest_auth) if result is False: print("[failure]: migrate mapping cluster name: " + name + ", " + index) continue print("[success]: migrate mapping cluster name: " + name + ", " + index) success_index = success_index + 1 print("create index mapping success total: " + str(success_index)) if __name__ == '__main__': main(sys.argv)
- 执行vi checkIndices.py命令,直接复制输入以下内容无需修改,执行wq保存为索引数据对比脚本。
# -*- coding:UTF-8 -*- import sys import yaml import requests import re import json import os def printDividingLine(): print("<=============================================================>") def get_cluster_version(url, auth=None): response = requests.get(url, auth=auth) if response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) return False cluster = response.json() version = cluster["version"]["number"] return True # get all indices def get_indices(url, source_auth): response = requests.get(url + "/_alias", auth=source_auth) if response.status_code != 200: print("*** get all index failed. resp statusCode:" + str( response.status_code) + " response is " + response.text) exit() all_index = response.json() system_index = [] create_index = [] for index in list(all_index.keys()): if (index.startswith(".")): system_index.append(index) else: create_index.append(index) return create_index def get_mapping(url, _auth, index): source_url = url + "/" + index index_response = requests.get(source_url, auth=_auth) if index_response.status_code != 200: print("*** get ElasticSearch message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return "[failure] --- index is not exist in destination es. ---" mapping = index_response.json() return mapping def get_index_total(url, index, es_auth): stats_url = url + "/" + index + "/_stats" index_response = requests.get(stats_url, auth=es_auth, verify=False) if index_response.status_code != 200: print("*** get ElasticSearch stats message failed. resp statusCode:" + str( index_response.status_code) + " response is " + index_response.text) return 0 return index_response.json() def get_indices_stats(url, es_auth): endpoint = url + "/_cat/indices" indicesResult = requests.get(endpoint, es_auth) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: indexList.append(indices.split()[2]) return indexList def loadConfig(argv): if argv is None or len(argv) != 2: config_yaml = "migrateConfig.yaml" else: config_yaml = argv[1] config_file = open(config_yaml) # python3 # return yaml.load(config_file, Loader=yaml.FullLoader) return yaml.load(config_file) def main(argv): requests.packages.urllib3.disable_warnings() print("begin to migrate index mapping!") config = loadConfig(argv) src_clusters = config.keys() print("begin to process cluster name :") for name in src_clusters: print(name) print("cluster count:" + str(src_clusters.__len__())) for name, value in config.items(): printDividingLine() source = value["src_ip"] source_user = value["src_username"] source_passwd = value["src_password"] source_auth = None if source_user != "": source_auth = (source_user, source_passwd) dest = value["dest_ip"] dest_user = value["dest_username"] dest_passwd = value["dest_password"] dest_auth = None if dest_user != "": dest_auth = (dest_user, dest_passwd) cluster_name = name if "clustername" in value: cluster_name = value["clustername"] print("start to process cluster :" + cluster_name) # get all indices all_source_index = get_indices(source, source_auth) all_dest_index = get_indices(dest, dest_auth) if "only_compare_index" in value and value["only_compare_index"]: print("[success] only compare mapping, not compare index count.") continue for index in all_source_index: index_total = get_index_total(value["src_ip"], index, source_auth) src_total = index_total["_all"]["primaries"]["docs"]["count"] src_size = int(index_total["_all"]["primaries"]["store"]["size_in_bytes"]) / 1024 / 1024 dest_index = get_index_total(value["dest_ip"], index, dest_auth) if dest_index is 0: print('[failure] not found, index: %-20s, source total: %-10s size %6sM' % (str(index), str(src_total), src_size)) continue dest_total = dest_index["_all"]["primaries"]["docs"]["count"] if src_total != dest_total: print('[failure] not consistent, ' 'index: %-20s, source total: %-10s size %6sM destination total: %-10s ' % (str(index), str(src_total), src_size, str(dest_total))) continue print('[success] compare index total equal : index : %-20s, total: %-20s ' % (str(index), str(dest_total))) if __name__ == '__main__': main(sys.argv)
- 执行“vi migrateConfig.yaml”配置文件,输入并基于实际信息修改以下内容,执行wq保存为Logstash迁移脚本。集群信息的获取方式请参见获取Elasticsearch集群信息。
创建Logstash集群
当ECS中的迁移环境准备完成后,在CSS服务创建一个Logstash集群用于迁移数据。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择“集群管理 > Logstash”。
- 单击右上角的“创建集群”,进入创建集群页面。
- 在创建集群页面,根据指导完成集群配置。
关键配置参数请参见表3,其他参数可以保持默认值。创建集群的详细说明请参见创建Logstash集群。
表3 Logstash集群的关键配置 参数
说明
计费模式
选择“按需计费”,按实际使用时长计费,计费周期为一小时,不足一小时按一小时计费。
集群类型
选择“Logstash”。
集群版本
选择“7.10.0”。
集群名称
自定义集群名称,可输入的字符范围为4~32个字符,只能包含数字、字母、中划线和下划线,且必须以字母开头。
此处以“Logstash-ES”为例。
虚拟 私有云
VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。
此处选择和目标Elasticsearch集群同一个虚拟私有云(VPC)。
子网
通过子网提供与其他网络隔离的、可以独享的网络资源,以提高网络安全。
选择创建集群需要的子网,可进入VPC服务查看VPC下已创建的子网名称和ID。
安全组
安全组是一个逻辑上的分组,为同一个VPC内具有相同安全保护需求并相互信任的弹性云服务器提供访问策略。
此处选择和目标Elasticsearch集群同一个安全组。
- 单击“下一步:高级配置”,此配置保持默认配置即可。
- 单击“下一步:确认配置”,确认完成后单击“立即创建”开始创建集群。
- 单击“返回集群列表”,系统将跳转到“集群管理”页面。您创建的集群将展现在集群列表中,且集群状态为“创建中”,创建成功后集群状态会变为“可用”。
验证集群间的网络连通性
在启动迁移任务前,需要先验证Logstash和源Elasticsearch集群的网络连通性。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击“连通性测试”。
- 在弹窗中输入源集群的IP地址和端口号,单击“测试”。
图4 连通性测试
当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由。
使用Logstash全量迁移集群数据
在集群迁移初期或需要确保数据完整性的场景,推荐使用Logstash全量迁移集群数据,该方法会一次迁移整个Elasticsearch集群的数据。
- 使用Putty登录准备迁移环境中创建的Linux虚拟机。
- 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
- 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择“集群管理 > Logstash”。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的全量迁移配置文件。
- 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”。
- 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-all”。
- 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。集群信息的获取方式请参见获取Elasticsearch集群信息。
input{ elasticsearch{ # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。 hosts => ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"] # 访问源集群的用户名和密码,非安全集群无需配置。 # user => "css_logstash" # password => "*****" # 配置待迁移的索引信息,多个索引以逗号隔开,可以使用通配符设置,例如“index*”。 index => "*_202102" docinfo => true slices => 3 size => 3000 # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持以下不变。 # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true } } # 移除一些logstash增加的字段。 filter { mutate { remove_field => ["@metadata", "@version"] } } output{ elasticsearch{ # 目标Elasticsearch集群的访问地址 hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # 访问目标集群的用户名和密码,非安全集群无需配置。 # user => "css_logstash" # password => "*****" # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。 index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持以下不变。 #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。 #ssl_certificate_verification => false } }
表4 全量迁移配置项说明 配置项名称
说明
input
hosts
源集群的访问地址,如果集群有多个访问节点请分别填写,使用逗号隔开。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
需要全量迁移的源端索引信息,使用逗号隔开,可以使用通配符设置,例如“index*”。
docinfo
是否重新索引文档,必须为true。
slices
配置该参数可以使用切片滚动同时对查询的不同切片,提高整体吞吐量。建议在2-8内。
size
每次查询返回的最大命中数。
output
hosts
目标集群访问地址,如果集群有多个节点,请分别填写,使用逗号隔开。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
迁移到目标集群的索引名称,支持扩展修改,如“Logstash-%{+yyyy.MM.dd}”。
document_type
使目标端文档类型与源端保持一致。
document_id
索引中的文档ID,建议与源端保持一致,如需要自动生成,使用“#”注释掉即可。
- 编辑完成后,单击“下一页”配置Logstash配置文件运行参数。
此示例保持默认值即可,如需设置请参见创建Logstash配置文件。
- 配置完成后,单击“创建”。
- 启动全量迁移任务。
- 在配置文件列表,选择配置文件“es-es-all”,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。
开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。
- 单击“确定”,开始启动配置文件启动Logstash全量迁移任务。
可以在管道列表看到启动的配置文件。
- 数据迁移完毕检查数据一致性。
使用Logstash增量迁移集群数据
在需要持续同步数据或对数据实时性有较高要求的场景,推荐使用Logstash增量迁移集群数据,该方法通过Logstash配置增量查询,仅支持迁移有增量字段的索引数据。
- 使用Putty登录准备迁移环境中创建的Linux虚拟机。
- 在虚拟机中,执行命令python migrateTemplate.py迁移索引模板。
- 在虚拟机中,执行命令 python migrateMapping.py迁移索引结构。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择“集群管理 > Logstash”。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“配置中心”,进入配置中心页面。
- 在配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑Elasticsearch集群的增量迁移配置文件。
- 选择集群模板:展开系统模板,选择“elasticsearch”,单击操作列的“应用”。
- 设置配置文件名称:在“名称”处自定义配置文件名称,例如“es-es-inc”。
- 修改配置文件内容:在“配置文件内容”处填写Elasticsearch集群的迁移配置方案,配置文件示例如下。
不同的索引的增量迁移配置不同,必须基于索引分析给出增量配置文件迁移命令。集群信息的获取方式请参见获取Elasticsearch集群信息。
input{ elasticsearch{ # 源Elasticsearch的访问地址,不需要添加协议,添加HTTPS协议会导致报错。 hosts => ["xx.xx.xx.xx:9200"] # 访问源集群的用户名和密码,非安全集群无需配置。 user => "css_logstash" password => "******" # 配置增量迁移索引。 index => "*_202102" # 配置增量迁移查询语句。 query => '{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}' docinfo => true size => 1000 # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持以下不变。 # ca_file => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true } } filter { mutate { remove_field => ["@timestamp", "@version"] } } output{ elasticsearch{ # 目标集群的访问地址。 hosts => ["xx.xx.xx.xx:9200","xx.xx.xx.xx:9200"] # 访问目标集群的用户名和密码,非安全集群无需配置。 #user => "admin" #password => "******" # 配置目标集群的索引,以下配置为索引名称和源端保持一致,保持默认。 index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" # 当目标集群是HTTPS访问方式时,则需额外配置以下信息。 # 配置集群HTTPS访问证书,CSS集群保持默认。 #cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs" # for 7.10.0 # 是否开启HTTPS通信,HTTPS访问集群则设置为true。 #ssl => true # 是否验证服务端Elasticsearch证书,设置为false表示不验证证书。 #ssl_certificate_verification => false } #stdout { codec => rubydebug { metadata => true }} }
表5 增量迁移配置项说明 配置
说明
hosts
分别填写源集群和目标集群的访问地址,如果集群有多个访问节点请分别填写。
user
访问集群的用户名,如果是非安全集群此项使用“#”注释掉。
password
访问集群的密码,如果是非安全集群此项使用“#”注释掉。
index
需要增加迁移的索引信息,一个配置文件只支持一个索引的增量迁移。
query
增量数据的识别标识,一般是Elasticsearch的DLS语句,需要提前分析。其中postsDate为业务中的时间字段。
{"query":{"bool":{"should":[{"range":{"postsDate":{"from":"2021-05-25 00:00:00"}}}]}}}
此处命令是迁移2021-05-25之后新增加的数据,在多次增量迁移过程中需要修改此处的日志值,如果日期是时间戳方式请转换为时间戳。此处命令需要提前验证有效性。
scroll
当源端数据量过大,为了防止Logstash内存溢出,可以使用scroll分批次获取数据。默认为"1m"。间隔时间不要太长,否则可能会丢失数据。
- 启动增量迁移任务。
- 在配置文件列表,选择配置文件“es-es-inc”,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,根据业务需要选择“是否保持常驻”。此示例不开启保持常驻。
开启“保持常驻”以后,将会在每个节点上面配置一个守护进程,当logstash服务出现故障的时候,会主动拉起并修复。“保持常驻”适用于需要长期运行的业,不适用于短期运行的业务,短期业务如果源端无数据,开启保持常驻会导致任务失败。
- 单击“确定”,开始启动配置文件启动Logstash增量迁移任务。
可以在管道列表看到启动的配置文件。
- 数据迁移完毕检查数据一致性。
释放Logstash集群
当集群迁移完成后,请及时释放Logstash集群,可以节约资源,避免产生不必要的费用。
- 登录云搜索服务管理控制台。
- 在左侧菜单栏选择“集群管理 > Logstash”。
- 在Logstash集群列表,选择创建的Logstash集群“Logstash-ES”,单击操作列的“更多 > 删除”,在弹出的确认提示框中,输入“DELETE”,单击“确定”完成集群删除。
CSS兼容Elasticsearch吗常见问题
更多常见问题 >>-
CSS是什么_云搜索服务_CSS功能
-
Elasticsearch是一个兼有搜索引擎和NoSQL数据库功能的开源系统,基于Lucene构建,可以用于全文搜索,结构化搜索以及近实时分析。
-
Elasticsearch是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。
-
Elastic组件,开源的分布式、RESTful 风格的搜索和数据分析引擎。是一个基于Lucene的搜索服务器,它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。
-
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎。
-
云搜索服务(Cloud Search Service,简称CSS)是一个基于Elasticsearch且完全托管的在线分布式搜索服务,为用户提供结构化、非结构化文本、以及基于AI向量的多条件检索、统计、报表。
更多相关专题
更多精彩内容
域名注册服务机构许可:黔D3-20230001 代理域名注册服务机构:新网、西数