云服务器内容精选

  • 方案概述 云上用户经常会遇到多云或者跨Region采集日志的场景,典型场景有两种,分别是: 场景一:将互联网数据中心(Internet Data Center,以下简称IDC)或者第三方云厂商的日志采集到华为云LTS。 图1 第三方云厂商日志采集 场景二:将华为云RegionA的日志采集到华为云RegionB的LTS。 图2 跨Region日志采集 对于场景一和场景二,您需要先打通网络,再安装ICAgent,最后按照日志接入向导即可以将日志采集到LTS。 ICAgent:ICAgent是华为 云日志 服务的日志采集器,通过在主机上安装ICAgent,您可以将日志采集到LTS。 安装ICAgent前,请确保本地浏览器的时间、时区与主机的时间、时区一致。 网络互通: 场景一:自建IDC或者第三方云厂商与华为云之间网络互通的典型方式是云专线DC,如果没有专线,您可以尝试VPN/公网IP方式。 场景二:华为云不同Region之间网络互通典型的方式是云连接CC/云专线DC,您也可以使用VPN/公网IP方式。 跳板机 安装在自建IDC/第三方云厂商/跨华为云Region的ICAgent无法直接访问华为云管理面上报日志的网段,需要配置跳板机进行数据转发;当您在进行POC测试,或者日志流量并不大的情况下,可以使用跳板机的方案。对于大流量的日志场景,如果您希望在生产环境中去掉跳板机,请提交工单给华为云网络技术支持工程师帮您设计网络直通的方案。 典型的跳板机配置是2vCPUs | 4GB,每台跳板机可以支持约30MB/s的流量转发,您可以根据自身的日志流量配置合理数量的跳板机,多台跳板机配置ELB进行流量分发。 本文将详细介绍将阿里云主机日志采集到华为云LTS的操作步骤,客户自建IDC和华为云上跨Region采集日志的操作方式与采集阿里云主机日志的方式类似。 以下将阿里云-华北二-北京局点的日志采集到华为云华东-上海一局点的LTS服务,云主机的操作系统为Linux环境。
  • 将__time以指定格式打印 使用date_format函数将字段__time,从 timestamp类型的日期和时间表达式的形式转换为指定格式。 * | select time_format( from_unixtime(__time), 'yyyy-MM-dd HH:mm:ss','+08:00') 使用time_format函数将字段__time,从 timestamp类型的日期和时间表达式的形式转换为指定格式。 * | select date_format(from_unixtime(__time,'+08:00'),'%Y-%m-%d %H:%i:%s')
  • 将日志中的时间转换成指定格式 把日志中的时间字段从字符转化成指定格式。 使用date_parse函数将时间字段,从字符串转化成年-月-日 时:分:秒。 使用date_format函数截取年-月-日部分。 使用group by进行分组。 日志样例: time:2017-05-17 09:45:00 SQL语句样例: * | select date_format (date_parse(time,'%Y-%m-%d %H:%i:%S'), '%Y-%m-%d') as day, count(1) as uv group by day order by day asc
  • 使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为您虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入您的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
  • 使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
  • 使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接收到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
  • 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 新建可视化图表 SQL查询与分析。 登录云日志服务控制台,在左侧导航栏中,选择“日志管理”。 在日志列表中,单击日志组名称前对应的,选择目标日志流,进入日志详情页面。 选择“ 日志分析 ”,在SQL查询条件框中,选择对应时间并输入SQL语句,单击“查询”进行日志搜索。 当设置时间范围内日志量超过10亿行时会触发迭代查询,可以通过迭代查询分多次完成全部日志的查询,界面会显示“查询状态:结果精确”。 根据SQL查询返回的数据,依照您的业务需求选择不同图表类型,呈现查询结果。 关于更多SQL查询的说明,请参见SQL分析语法介绍。 新建图表。 单击“新建”,新建可视化图表。 或单击“保存”,将当前查询结果新建为可视化图表。 在创建图表页面中,配置相关参数。 如果开启“同时添加到仪表盘”按钮,新建图表可以直接添加到仪表盘中。 完成后,单击“确定”。 查看可视化图表。 单击“展开图表”,查看可视化图表。
  • 将图表添加到仪表盘 将图表添加到仪表盘有两种方式: 方式一: 鼠标悬浮可视化图表名称,单击选择“添加到仪表盘”。 在弹出的移动图表页面中,选择待存放的仪表盘。 完成后,单击“确定”。 方式二: 创建仪表盘。 在左侧导航栏中,选择“仪表盘”。 在仪表盘下方,选择仪表盘分组。 单击“添加仪表盘”,在创建仪表盘页面,配置相关参数。 关于仪表盘参数的说明,请参见使用仪表盘将日志可视化。 将图表添加到仪表盘。 在创建仪表盘页面,单击“开始添加图表”,进入添加可视化图表界面,选择目标日志新建的可视化图表。 完成后,单击“确定”。
  • 使用场景 例如,VPC流日志已接入LTS,采集到云日志服务中的日志示例如下: { "content": "1 5f67944957444bd6bb4fe3b367de8f3d 1d515d18-1b36-47dc-a983-bd6512aed4bd 192.168.0.154 192.168.3.25 38929 53 17 1 96 1548752136 1548752736 ACCEPT OK", "version": 1, "project_id": "5f67944957444bd6bb4fe3b367de8f3d", "interface_id": "1d515d18-1b36-47dc-a983-bd6512aed4bd", "srcaddr": "192.168.0.154", "dstaddr": "192.168.3.25", "srcport": 38929, "dstport": 53, "protocol": 17, "packets": 1, "bytes": 96, "start": 1548752136, "end": 1548752736, "action": "ACCEPT", "log_status": "ok" } 在对VPC流日志查询和分析过程中,为便于分析公网流量,您需要对原始日志做以下处理: 如果srcaddr和dstaddr字段不存在,则丢弃该日志。 如果是私网之间互通的流量,则丢弃该日志。 基于以上需求,您可以通过DSL日志加工,对采集到的VPC流日志进行日志加工,便于问题分析。
  • 方案概述 云上用户经常会遇到多云或者跨Region采集Kubernetes日志场景,典型场景有两种,分别是: 场景一:将互联网数据中心(Internet Data Center,以下简称IDC)或者第三方云厂商的日志采集到华为云LTS。 图1 第三方云厂商日志采集 场景二:将华为云RegionA的日志采集到华为云RegionB的LTS。 图2 跨Region日志采集 对于场景一和场景二,您需要先打通网络,再安装ICAgent,最后按照日志接入向导即可以将日志采集到LTS。 ICAgent:ICAgent是华为云日志服务的日志采集器,通过在主机上安装ICAgent,您可以将日志采集到LTS。 安装ICAgent前,请确保本地浏览器的时间、时区与主机的时间、时区一致。 网络互通: 场景一:自建IDC或者第三方云厂商与华为云之间网络互通的典型方式是云专线DC,如果没有专线,您可以尝试VPN/公网IP方式。 场景二:华为云不同Region之间网络互通典型的方式是云连接CC/云专线DC,您也可以使用VPN/公网IP方式。 跳板机 安装在自建IDC/第三方云厂商/跨华为云Region的ICAgent无法直接访问华为云管理面上报日志的网段,需要配置跳板机进行数据转发;当您在进行POC测试,或者日志流量并不大的情况下,可以使用跳板机的方案。对于大流量的日志场景,如果您希望在生产环境中去掉跳板机,请提交工单给华为云网络技术支持工程师帮您设计网络直通的方案。 典型的跳板机配置是2vCPUs | 4GB,每台跳板机可以支持约30MB/s的流量转发,您可以根据自身的日志流量配置合理数量的跳板机,多台跳板机配置ELB进行流量分发。 本文将详细介绍将阿里云主机日志采集到华为云云日志服务(LTS)的操作步骤,客户自建IDC和华为云上跨region采集日志的操作方式与采集阿里云主机日志的方式类似。 以下将阿里云-华北二北京局点的日志采集到华为云华东-上海一局点的LTS服务,云主机的操作系统为Linux环境。
  • 采集Web/移动端页面用户行为日志 关于Web/移动端页面用户行为日志可以分为两类: 页面与后台服务器交互日志:例如下单、登录、退出等。 页面无后台服务器交互日志:请求直接在前端处理,例如滚屏、关闭页面等。 使用匿名写入采集Web/移动端页面用户行为日志,详细请参见使用匿名写入采集日志。 使用匿名写入采集日志功能仅支持华北-北京四、华东-上海一、华南-广州区域的白名单用户使用,如有需要,请提交工单,其他区域暂不支持申请开通。
  • 采集服务器日志 服务器日志参考如下:以下示例日志仅供参考,请以实际日志为准。 Syslog日志 Aug 31 11:07:24 zhouqi-mac WeChat[9676]: setupHotkeyListenning event NSEvent: type=KeyDown loc=(0,703) time=115959.8 flags=0 win=0x0 winNum=7041 ctxt=0x0 chars="u" unmodchars="u" repeat=0 keyCode=32 应用程序Debug日志 __FILE__:build/release64/sls/shennong_worker/ShardDataIndexManager.cpp __LEVEL__:WARNING __LINE__:238 __THREAD__:31502 offset:816103453552 saved_cursor:1469780553885742676 seek count:62900 seek data redo log:pangu://localcluster/redo_data/41/example/2016_08_30/250_1472555483 user_cursor:1469780553885689973 Trace日志 [2013-07-13 10:28:12.772518] [DEBUG] [26064] __TRACE_ID__:661353951201 __item__:[Class:Function]_end__ request_id:1734117 user_id:124 context:..... 支持通过以下方法采集服务器日志: 将日志写到本地文件,通过创建采集任务写到指定日志流中。 Docker中产生的日志可以使用CCE接入LTS进行采集,详细请参考云容器引擎CCE应用日志接入LTS。 C#、Python、Java、PHP、C等日志可以使用SDK写入LTS,详细请参考SDK概述。