云服务器内容精选
-
方案概述 ELK是Elasticsearch、Logstash和Kibana的简称,它们组合起来提供了业界最常用的 日志分析 和可视化工具。 Elasticsearch是一个基于Lucene的开源、分布式、RESTful搜索和分析引擎。 Logstash是一个开源的、服务器端的数据处理管道,能够同时从多个来源实时接收、转换并将数据发送到用户选择的“存储库”。通常用于日志的收集、过滤和转发。 Kibana是一个开源的分析和可视化平台,用于数据的可视化、仪表盘创建和搜索查询。通常与Elasticsearch一起使用。 华为 云日志服务LTS 在功能丰富度、成本、性能方面优于开源ELK方案,具体对比可以参考 云日志 服务LTS对比自建ELK Stack有什么优势?。本文提供最佳实践,使用自定义Python脚本和LTS采集器ICAgent,协助用户将日志从Elasticsearch(简称ES)迁移到LTS中。 当前华为云支持E CS 机器通过安装ICAgent来采集日志文件,因此可以基于该功能实现Elasticsearch日志导入云日志服务。 Elasticsearch数据先通过python脚本将数据落盘到ECS,然后通过LTS服务的日志接入功能,将落盘的日志文件采集到LTS服务。 图1 方案流程图
-
方案概述 图1 方案流程图 你可以购买Linux云主机,配置为Syslog汇聚服务器,用于接收其他设备发送的日志数据;Syslog服务器默认接收日志大小为1024字节,超过会截断。 单台Syslog服务器处理日志能力为10MB/s,如果您的日志量较大,或者希望可靠性更高,可以购买多台ECS配置为Syslog服务器,并配置 ELB负载均衡 分发流量。 您需要在Syslog服务器上安装ICAgent,并配置日志采集规则,就可以将日志采集到LTS。
-
背景信息 Syslog是网络上各种设备将日志收集到日志服务器的一种数据协议,它几乎被所有的网络设备支持,并且能够记录多种事件类型的日志消息,支持syslog的设备常见的有路由器、交换机、打印机等,甚至unix-like的服务器也可以支持产生syslog消息,用以记录用户的登录、防火墙事件、apache或者nginx access日志等。 Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息请参见RFC5424和RFC3164。 本文介绍通过Syslog协议将日志上传到日志服务的操作步骤。您需要购买ECS作为Syslog汇聚服务器,Linux服务器默认自带Rsyslog,目前华为云主机默认未配置接收远程Syslog写入,需要手动开启。
-
使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
-
使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
-
使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
-
通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
方案介绍 日志流是通过日志组管理的,给日志组添加标签时,默认开启应用到日志流,这样日志流就自动添加标签。即可通过日志流统计不同部门在LTS的费用开销。 本实践以aa和bb部门为例子,首先在aa部门的日志组添加group=groupaa标签,bb部门的日志组添加group=groupbb标签,然后在费用明细中导出账单,通过Excel进行统计分析。 以下提到的价格仅为示例,实际计算请以价格计算器中的价格为准。
-
步骤一:配置监控数据存储路径 Zabbix会将监控数据保存在其所在的机器上,您可以根据如下步骤设置监控数据的存储路径。 登录Zabbix所在服务器。 打开zabbix_server.conf文件。 vim /etc/zabbix/zabbix_server.conf 在zabbix_server.conf文件中,设置数据存储路径。 ExportDir=/tmp/ 重启Zabbix服务,使配置生效。 systemctl restart zabbix-server 配置生效后,Zabbix会在/tmp目录下生产文件(文件名后缀为.ndjson),用于保存监控数据。
-
字典构建 不同字典构建方式对比参考如下: 表1 不同字典构建方式对比 构建方式 优点 缺点 直接构建 直观、简单、方便。 如果内容较多,规则会相对冗长。且静态不灵活。 从任务配置资源构建 内容较多且经常修改时推荐使用,易于维护。 不易于扩展和跨任务复用,不支持自动刷新。 从表格构建 高级场景下使用,维护机制更灵活。 需要构建和维护对应的表格,过程相对繁琐。 从字典函数构建 基于逻辑动态构建字典,特定场景下适用。 较为高级,不易于维护。 从其他表达式构建 从日志事件的字段中动态提取映射关系,特定场景下适用。 较为高级,不易于维护。 直接构建 e_dict_map({"400": "error", "200": "ok", "*": "other"}, "status", "message") 从任务高级配置构建 e_dict_map(res_local("http_code_map"), "status", "message") 其中http_code_map是任务高级配置项,值为: 从表格构建 使用tab_to_dict从表格构建。而表格的构建参见本文后面的表格构建。 e_dict_map(tab_to_dict(tab_parse_csv("status_code,status_info\n400,error\n200,ok\n*,other"), "status_code", "status_info"), "status", "message") 从字典函数构建 e_dict_map(dct_make("400", "error", "200", "ok", "*", "other"), "status", "message") 从其他表达式构建 e_dict_map(json_parse(v("http_code_map")), "status", "message") 此处从源日志的http_code_map字段中获取映射关系。
-
表格构建 不同表格构建方式对比参考如下: 表2 不同表格构建方式对比 构建方式 优点 缺点 从文本构建 直观、简单、方便。 如果内容较多,规则会相对冗长。不易于维护、扩展和复用。 从OBS资源构建 内容较多且不常修改时推荐使用,易于维护。 编写相对复杂。 从文本构建 e_table_map(tab_parse_csv("city,name,age\nshanghai,baixiao,10\ncity:nanjing,Maki,18"), "name",["city", "age"]) 从OBS资源构建 e_search_table_map(tab_parse_csv(res_obs_file("https://obs.xxx.myhuaweicloud.com","dsl-test-xx","data.csv")), "name",["city", "age"]) 其中data.csv是obs中的文件,值为: e_search_table_map(tab_parse_csv(res_obs_file("https://obs.xxx.myhuaweicloud.com","dsl-test-xx","data.csv")), "name",["city", "age"])
-
使用e_search_dict_map函数进行数据富化 本案例介绍使用e_search_dict_map函数完成数据富化的方法。 原始日志 [{ "http_host": "example.com", "http_status": 200, "request_method": "GET" }, { "http_host": "example.org", "http_status": 201, "request_method": "POST" }, { "http_host": "example.net", "http_status": 404, "request_method": "GET" }] 加工需求 根据日志中的http_status字段的值的不同,为每条日志添加不同的type信息。 为http_status为2XX的日志,添加type字段,并将字段值设置为正常。 为http_status为3XX的日志,添加type字段,并将字段值设置为重定向。 为http_status为4XX的日志,添加type字段,并将字段值设置为错误。 加工规则 e_search_dict_map({"http_status:2??": "正常","http_status:3??": "重定向","http_status:4??": "错误"}, "http_status", "type") 加工结果 { "http_status": "正常", "request_method": "GET", "http_host": "example.com" } { "http_status": "正常", "request_method": "POST", "http_host": "example.org" } { "http_status": "错误", "request_method": "GET", "http_host": "example.net" }
-
背景信息 日志服务数据加工映射富化函数包括普通映射函数和搜索映射函数,两者区别如下所示: 普通映射函数使用文本完全匹配方式来映射。普通映射函数包括e_dict_map函数和e_table_map函数,两者区别在于e_dict_map函数接收的是dict类型的数据,e_table_map函数接收的是通过资源函数获取的table类型的数据。 例如:在nginx日志中,将特定的状态码转换为文本格式,可以使用普通映射函数e_dict_map。 状态码 文本 200 成功 300 跳转 400 请求错误 500 服务器错误 搜索映射函数的映射关键字是查询字符串,支持正则表达式匹配、完全匹配、模糊匹配等形式。搜索映射函数包括e_search_dict_map函数和e_search_table_map函数,两者区别在于e_search_dict_map函数接收的是dict类型的数据,而e_search_table_map函数接收的是通过资源函数获取的table类型的数据。 例如:在nginx日志中,将一定范围内的状态码转换为文本格式,可以使用搜索映射函数e_search_dict_map。 状态码 文本 2XX 成功 3XX 跳转 4XX 请求错误 5XX 服务器错误
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格