云服务器内容精选

  • 方案概述 ELK是Elasticsearch、Logstash和Kibana的简称,它们组合起来提供了业界最常用的 日志分析 和可视化工具。 Elasticsearch是一个基于Lucene的开源、分布式、RESTful搜索和分析引擎。 Logstash是一个开源的、服务器端的数据处理管道,能够同时从多个来源实时接收、转换并将数据发送到用户选择的“存储库”。通常用于日志的收集、过滤和转发。 Kibana是一个开源的分析和可视化平台,用于数据的可视化、仪表盘创建和搜索查询。通常与Elasticsearch一起使用。 华为 云日志服务LTS 在功能丰富度、成本、性能方面优于开源ELK方案,具体对比可以参考 云日志 服务LTS对比自建ELK Stack有什么优势?。本文提供最佳实践,使用自定义Python脚本和LTS采集器ICAgent,协助用户将日志从Elasticsearch(简称ES)迁移到LTS中。 当前华为云支持E CS 机器通过安装ICAgent来采集日志文件,因此可以基于该功能实现Elasticsearch日志导入云日志服务。 Elasticsearch数据先通过python脚本将数据落盘到ECS,然后通过LTS服务的日志接入功能,将落盘的日志文件采集到LTS服务。 图1 方案流程图
  • 背景信息 Syslog是网络上各种设备将日志收集到日志服务器的一种数据协议,它几乎被所有的网络设备支持,并且能够记录多种事件类型的日志消息,支持syslog的设备常见的有路由器、交换机、打印机等,甚至unix-like的服务器也可以支持产生syslog消息,用以记录用户的登录、防火墙事件、apache或者nginx access日志等。 Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息请参见RFC5424和RFC3164。 本文介绍通过Syslog协议将日志上传到日志服务的操作步骤。您需要购买ECS作为Syslog汇聚服务器,Linux服务器默认自带Rsyslog,目前华为云主机默认未配置接收远程Syslog写入,需要手动开启。
  • 方案概述 图1 方案流程图 你可以购买Linux云主机,配置为Syslog汇聚服务器,用于接收其他设备发送的日志数据;Syslog服务器默认接收日志大小为1024字节,超过会截断。 单台Syslog服务器处理日志能力为10MB/s,如果您的日志量较大,或者希望可靠性更高,可以购买多台ECS配置为Syslog服务器,并配置 ELB负载均衡 分发流量。 您需要在Syslog服务器上安装ICAgent,并配置日志采集规则,就可以将日志采集到LTS。
  • 使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
  • 使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
  • 使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
  • 方案介绍 日志流是通过日志组管理的,给日志组添加标签时,默认开启应用到日志流,这样日志流就自动添加标签。即可通过日志流统计不同部门在LTS的费用开销。 本实践以aa和bb部门为例子,首先在aa部门的日志组添加group=groupaa标签,bb部门的日志组添加group=groupbb标签,然后在费用明细中导出账单,通过Excel进行统计分析。 以下提到的价格仅为示例,实际计算请以价格计算器中的价格为准。
  • 步骤一:配置监控数据存储路径 Zabbix会将监控数据保存在其所在的机器上,您可以根据如下步骤设置监控数据的存储路径。 登录Zabbix所在服务器。 打开zabbix_server.conf文件。 vim /etc/zabbix/zabbix_server.conf 在zabbix_server.conf文件中,设置数据存储路径。 ExportDir=/tmp/ 重启Zabbix服务,使配置生效。 systemctl restart zabbix-server 配置生效后,Zabbix会在/tmp目录下生产文件(文件名后缀为.ndjson),用于保存监控数据。
  • 表格构建 不同表格构建方式对比参考如下: 表2 不同表格构建方式对比 构建方式 优点 缺点 从文本构建 直观、简单、方便。 如果内容较多,规则会相对冗长。不易于维护、扩展和复用。 从OBS资源构建 内容较多且不常修改时推荐使用,易于维护。 编写相对复杂。 从文本构建 e_table_map(tab_parse_csv("city,name,age\nshanghai,baixiao,10\ncity:nanjing,Maki,18"), "name",["city", "age"]) 从OBS资源构建 e_search_table_map(tab_parse_csv(res_obs_file("https://obs.xxx.myhuaweicloud.com","dsl-test-xx","data.csv")), "name",["city", "age"]) 其中data.csv是obs中的文件,值为: e_search_table_map(tab_parse_csv(res_obs_file("https://obs.xxx.myhuaweicloud.com","dsl-test-xx","data.csv")), "name",["city", "age"])
  • 字典构建 不同字典构建方式对比参考如下: 表1 不同字典构建方式对比 构建方式 优点 缺点 直接构建 直观、简单、方便。 如果内容较多,规则会相对冗长。且静态不灵活。 从任务配置资源构建 内容较多且经常修改时推荐使用,易于维护。 不易于扩展和跨任务复用,不支持自动刷新。 从表格构建 高级场景下使用,维护机制更灵活。 需要构建和维护对应的表格,过程相对繁琐。 从字典函数构建 基于逻辑动态构建字典,特定场景下适用。 较为高级,不易于维护。 从其他表达式构建 从日志事件的字段中动态提取映射关系,特定场景下适用。 较为高级,不易于维护。 直接构建 e_dict_map({"400": "error", "200": "ok", "*": "other"}, "status", "message") 从任务高级配置构建 e_dict_map(res_local("http_code_map"), "status", "message") 其中http_code_map是任务高级配置项,值为: 从表格构建 使用tab_to_dict从表格构建。而表格的构建参见本文后面的表格构建。 e_dict_map(tab_to_dict(tab_parse_csv("status_code,status_info\n400,error\n200,ok\n*,other"), "status_code", "status_info"), "status", "message") 从字典函数构建 e_dict_map(dct_make("400", "error", "200", "ok", "*", "other"), "status", "message") 从其他表达式构建 e_dict_map(json_parse(v("http_code_map")), "status", "message") 此处从源日志的http_code_map字段中获取映射关系。
  • 使用e_search_dict_map函数进行数据富化 本案例介绍使用e_search_dict_map函数完成数据富化的方法。 原始日志 [{ "http_host": "example.com", "http_status": 200, "request_method": "GET" }, { "http_host": "example.org", "http_status": 201, "request_method": "POST" }, { "http_host": "example.net", "http_status": 404, "request_method": "GET" }] 加工需求 根据日志中的http_status字段的值的不同,为每条日志添加不同的type信息。 为http_status为2XX的日志,添加type字段,并将字段值设置为正常。 为http_status为3XX的日志,添加type字段,并将字段值设置为重定向。 为http_status为4XX的日志,添加type字段,并将字段值设置为错误。 加工规则 e_search_dict_map({"http_status:2??": "正常","http_status:3??": "重定向","http_status:4??": "错误"}, "http_status", "type") 加工结果 { "http_status": "正常", "request_method": "GET", "http_host": "example.com" } { "http_status": "正常", "request_method": "POST", "http_host": "example.org" } { "http_status": "错误", "request_method": "GET", "http_host": "example.net" }
  • 背景信息 日志服务数据加工映射富化函数包括普通映射函数和搜索映射函数,两者区别如下所示: 普通映射函数使用文本完全匹配方式来映射。普通映射函数包括e_dict_map函数和e_table_map函数,两者区别在于e_dict_map函数接收的是dict类型的数据,e_table_map函数接收的是通过资源函数获取的table类型的数据。 例如:在nginx日志中,将特定的状态码转换为文本格式,可以使用普通映射函数e_dict_map。 状态码 文本 200 成功 300 跳转 400 请求错误 500 服务器错误 搜索映射函数的映射关键字是查询字符串,支持正则表达式匹配、完全匹配、模糊匹配等形式。搜索映射函数包括e_search_dict_map函数和e_search_table_map函数,两者区别在于e_search_dict_map函数接收的是dict类型的数据,而e_search_table_map函数接收的是通过资源函数获取的table类型的数据。 例如:在nginx日志中,将一定范围内的状态码转换为文本格式,可以使用搜索映射函数e_search_dict_map。 状态码 文本 2XX 成功 3XX 跳转 4XX 请求错误 5XX 服务器错误