云服务器内容精选

  • 基本概念 本部分介绍日志采集中涉及的基本概念的描述及其作用。 日志采集组件(Logstash):用于日志采集、日志传输。 组件控制器(isap-agent):用于管理日志采集组件(Logstash)等。 日志采集器节点:用于采集日志到云脑,以及 安全云脑 日志转出。 一台E CS ,安装了安全云脑组件控制器,组件控制器中安装了日志采集组件。单个租户只需要配置安装一台日志采集器节点。 图2 日志采集器节点架构图 采集器:定制化的Logstash。采集器节点则是定制化的Logstash+组件控制器(isap-agent)。 连接器:Logstash配置的基础概念,主要包括input、output两部分,分别对应源连接器、目的连接器,用于定义采集器Logstash接受数据方式和规范。其中,安全云脑管道pipe连接器可以对接安全云脑,实现租户数据上报安全云脑,安全云脑数据转储到租户的能力。 解析器:Logstash配置的基础概念,主要为Logstash的filter部分,安全云脑解析器是对其filter部分的无码化封装和定制,用户只需在页面上配置解析器规则即可生成原生的filter配置脚本,从而轻松实现将原始日志转化为目标格式。 采集通道:采集通道等价于Logstash的pipeline,在Logstash可以配置多个pipeline,每个pipeline包括input、filter、output部分,每个pipeline为单独的作业,互不影响。在安全云脑租户采集上,可将相同的pipeline部署在多个节点上,并且配置相同的pipeline视为一个采集通道。
  • 开启匿名写入 登录 云日志 服务控制台。 单击日志组名称对应的按钮。 单击“创建日志流”,在创建日志流页面,参考表1填写日志流相关信息,开启“匿名写入”,适用于端侧SDK上报日志,打开匿名写入则表示该日志流打开匿名写入权限,不会经过有效鉴权,可能产生脏数据。 图1 创建日志流 表1 日志流参数说明 参数 说明 日志组名称 默认显示目标日志组名称。 日志流名称 日志流名称只支持输入英文、数字、中文、中划线、下划线及小数点,且不能以小数点、下划线开头或以小数点结尾。长度为1-64个字符。 企业项目 选择业务需要的企业项目,默认为default。也可单击“查看企业项目”,在企业项目管理页面查看全部企业项目。 日志存储 若关闭日志存储,则无法开启日志存储时间。 开启“日志存储”:日志将会被存入搜索引擎,能使用日志全量功能。 关闭“日志存储”:日志不会存储到LTS,可节约索引流量和存储费用,只能使用日志生成指标、转储功能,不能使用日志搜索分析、告警、消费加工等其他功能。 日志存储时间(天) 日志流的存储时间,即日志上报到LTS后日志存储的时间。 日志数据默认存储30天,可以在1~365天之间设置。 打开日志流的“日志存储时间”:日志的存储时间使用日志流设置的日志存储时间。 关闭日志流的“日志存储时间”:日志的存储时间使用日志组设置的日志存储时间。 智能冷存储 开启日志存储时间后,根据业务需要设置智能冷存储功能。详细请参考智能冷存储。 说明: 开启智能冷存储需要日志存储时间大于7天。 标准存储层数据保存时间 开启智能冷存储后,需要设置标准存储层数据的保存时间。 标准存储数据至少需保存7天才能转换为智能冷存储层数据。当日志保存时间大于标准存储层数据保存时间,且小于日志存储时间时,日志数据将自动转换为智能冷存储层数据存储。 标签 按照业务需求对不同的日志流添加对应的标签,单击“添加标签”,分别填写标签键key和标签值value。 说明: 如需添加多个标签可重复该步骤,最多支持添加20个标签。 如需删除标签可单击标签操作列的“删除”。 标签键长度不能超过128个字符;标签值长度不能超过255个字符。 标签键名称不可重复。 如果配置转储时使用了该标签,删除标签后,请同步修改转储配置信息。 匿名写入 匿名写入默认关闭,适用于安卓/IOS/小程序/浏览器端上报日志,打开匿名写入则表示该日志流打开匿名写入权限,不会经过有效鉴权,可能产生脏数据。关于SDK使用请参考SDK接入。 备注 自定义填写备注信息,字符长度0-1024个字符。 单击“确定”。
  • 通过HTTP GET请求上传日志 开通匿名写入后,您可以通过以下方法上传日志到日志流中。 HTTP GET请求,开通白名单后,技术支持工程师提供{endpoint}地址才能此功能。 /v3/{project_id}/lts/groups/{log_group_id}/streams/{log_stream_id}/struct/logs?key1=val1&key2=val2 GET接口总数据量不能超过16KB,超过可能会上报失败。 不支持压缩。 配置参数说明 表2 配置参数 参数 是否必选 参数类型 描述 project_id 是 String 项目ID,获取方式请参见:获取账号ID、项目ID、日志组ID、日志流ID。 缺省值:None 最小长度:32 最大长度:32 log_group_id 是 String 日志组ID,获取方式请参见:获取账号ID、项目ID、日志组ID、日志流ID。 缺省值:None 最小长度:36 最大长度:36 log_stream_id 是 String 日志流ID,获取方式请参见:获取账号ID、项目ID、日志组ID、日志流ID。 缺省值:None 最小长度:36 最大长度:36 key1=val1&key2=val2 是 String 您要上传到日志服务的键值对(Key-Value),支持设置多个键值对,由&隔开。请确保长度小于16KB。 对于上报日志中的特殊字符,请您使用url编码,否则可能出现日志上报乱码、失败。 日志长度小于16KB,指的是url编码后的长度。假如您的日志大小只有13KB,但是经过编码后的大小为17KB,也是超过长度限制。 响应参数 状态码为 200 时: 表3 响应Body参数 参数 参数类型 描述 errorCode String 错误码。 枚举值: SVCSTG.ALS.200.200 errorMessage String 调用失败响应信息描述。 枚举值: Report success. result String 响应结果。 请求示例 curl -k "https://lts-access.cn-north-4.myhuaweicloud.com:8102/v3/xxxxxxProject_id/lts/groups/xxxxxxLog_group_id/streams/xxxxxxxLog_stream_id/struct/logs?key1=val1&key2=val2" 响应示例 状态码: 200 日志上报成功。 { "errorCode": "SVCSTG.ALS.200.200", "errorMessage": "Report success.", "result": null } 状态码 表4 状态码 状态码 描述 200 请求响应成功。 400 BadRequest。非法请求。建议根据error_msg直接修改该请求,不要重试该请求。 401 在客户端提供认证信息后,返回该状态码,表明服务端指出客户端所提供的认证信息不正确或非法。 403 Forbidden。请求被拒绝访问。返回该状态码,表明请求能够到达服务端,且服务端能够理解用户请求,但是拒绝做更多的事情,因为该请求被设置为拒绝访问,建议直接修改该请求,不要重试该请求。 500 系统内部错误。 503 ServiceUnavailable。被请求的服务无效,服务不可用。
  • 方案概述 图1 方案流程图 你可以购买Linux云主机,配置为Syslog汇聚服务器,用于接收其他设备发送的日志数据;Syslog服务器默认接收日志大小为1024字节,超过会截断。 单台Syslog服务器处理日志能力为10MB/s,如果您的日志量较大,或者希望可靠性更高,可以购买多台ECS配置为Syslog服务器,并配置 ELB负载均衡 分发流量。 您需要在Syslog服务器上安装ICAgent,并配置日志采集规则,就可以将日志采集到LTS。
  • 背景信息 Syslog是网络上各种设备将日志收集到日志服务器的一种数据协议,它几乎被所有的网络设备支持,并且能够记录多种事件类型的日志消息,支持syslog的设备常见的有路由器、交换机、打印机等,甚至unix-like的服务器也可以支持产生syslog消息,用以记录用户的登录、防火墙事件、apache或者nginx access日志等。 Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息请参见RFC5424和RFC3164。 本文介绍通过Syslog协议将日志上传到日志服务的操作步骤。您需要购买ECS作为Syslog汇聚服务器,Linux服务器默认自带Rsyslog,目前华为云主机默认未配置接收远程Syslog写入,需要手动开启。
  • 采集终端用户日志 端侧日志全面采集接入LTS,例如Web浏览器、IOS、安卓、百度小程序、微信小程序、钉钉小程序、快应用等多类端侧日志,集成LTS提供的多种移动端SDK,实现了缓存发送、异常重试、批量发送等稳定功能,用户快速集成即可全面采集移动端日志到LTS。 采集方案: 移动端:可以使用移动端iOS SDK,Android或MAN(移动数据分析)接入。 ARM设备:ARM平台可以使用Native C交叉编译。 商家平台设备:X86平台设备可以用SDK、ARM平台可以使用Native C交叉编译。 实施方法: 日志写到本地文件,通过创建日志采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
  • 采集服务器日志 采集方案,例如: Syslog日志 Aug 31 11:07:24 zhouqi-mac WeChat[9676]: setupHotkeyListenning event NSEvent: type=KeyDown loc=(0,703) time=115959.8 flags=0 win=0x0 winNum=7041 ctxt=0x0 chars="u" unmodchars="u" repeat=0 keyCode=32 应用程序Debug日志 __FILE__:build/release64/sls/shennong_worker/ShardDataIndexManager.cpp __LEVEL__:WARNING __LINE__:238 __THREAD__:31502 offset:816103453552 saved_cursor:1469780553885742676 seek count:62900 seek data redo log:pangu://localcluster/redo_data/41/example/2016_08_30/250_1472555483 user_cursor:1469780553885689973 Trace日志 [2013-07-13 10:28:12.772518] [DEBUG] [26064] __TRACE_ID__:661353951201 __item__:[Class:Function]_end__ request_id:1734117 user_id:124 context:..... 采集方法: 日志写到本地文件,通过创建采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
  • 采集服务端数据 支付宝和微信公众账号编程是典型的Web端模式,一般会有四种类型的日志,可以使用SDK上报日志的方式,详细请参考云日志服务支付宝小程序SDK、云日志服务微信小程序SDK。 Nginx和Apache访问日志 Nginx和Apache访问日志用以监控、实时统计。 10.1.168.193 - - [01/Mar/2024:16:12:07 +0800] "GET /Send?AccessKeyId=8225105404 HTTP/1.1" 200 5 "-" "Mozilla/5.0 (X11; Linux i686 on x86_64; rv:10.0.2) Gecko/20100101 Firefox/10.0.2" Nginx和Apache错误日志 2024/04/18 18:59:01 [error] 26671#0: *20949999 connect() to unix:/tmp/fastcgi.socket failed (111: Connection refused) while connecting to upstream, client: 10.101.1.1, server: , request: "POST /logstores/test_log HTTP/1.1", upstream: "fastcgi://unix:/tmp/fastcgi.socket:", host: "example.com" 应用层日志 应用层日志可以把事件产生的时间、地点、结果、参数等记录详细,扩展类字段一般放在最后。 {"labels":{"cccdasd":51,"platform":"wx"},"logs":[{"contents":[{"aaa":"13123","__client_time__":1728565099070}]}]} 应用层错误日志:错误码、原因等。 {"errorCode":"LTS.403","errorMessage":" request header auth is empty"} 实施方法: 日志写到本地文件,通过创建日志采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
  • 数据采集难点 在数据化运营的过程中,第一步是如何将散落的日志数据集中收集起来,其中会遇到如下挑战: 多渠道:例如广告商、地推(传单)等。 多终端:网页版、公众账号、手机、浏览器(Web、移动端页面)等。 异构网:VPC、用户自建IDC,华为云ECS等。 多开发语言:核心系统Java、前端Nginx服务器、后台支付系统C++。 设备:商家有不同平台(X86、ARM)设备。 我们需要把散落在外部、内部的日志收集起来,统一进行管理。在过去这块需要大量的和不同种类的工作,现在可以通过 云日志服务LTS 采集功能完成统一接入。
  • 统一管理日志 在云日志服务LTS控制台创建日志组,详细操作请参考管理日志组。 在云日志服务LTS控制台为不同数据源产生的日志创建日志流,详细操作请参考管理日志流,例如: wechat-server(存储微信服务器访问日志) wechat-app(存储微信服务器应用日志) wechat-error(错误日志) alipay-server alipay-app deliver-app(送货员App状态) deliver-error(错误日志) web-click(H5页面点击) server-access(服务端Access-Log) server-app(应用) coupon(应用优惠券日志) pay(支付日志) order(订单日志) 如需要对原始数据进行加工,可以通过创建日志加工任务,详细请参考DSL加工。 DSL加工的功能在邀测中,支持华北-北京四、华东-上海一、华南-广州局点,仅针对用户内测使用,后续将全网开放,敬请期待!
  • 采集用户推广日志 为获取新用户,一般有两种方式: 网站注册时直接投放优惠券。 其他渠道扫描二维码,投放优惠券。 传单二维码 扫描网页二维码登录 实施方法: 定义如下注册服务器地址,生成二维码(传单、网页)供用户注册扫描。用户扫描该页面进行注册时,就可以得知用户是通过特定来源进入的,并记录日志。 http://example.com/login?source=10012&ref=kd4b 当服务端接受请求时,服务器输出如下日志: 2024-06-20 19:00:00 e41234ab342ef034,102345,5k4d,467890 收集方式: 应用程序输出日志到硬盘,通过ICAgent采集日志,详细请参考云主机ECS文本日志接入LTS。 应用程序通过SDK写入。详细请参考SDK概述。
  • 背景信息 “我要点外卖”是一个平台型电商网站,涉及用户、餐厅、配送员等。用户可以在网页、App、微信、支付宝等进行下单点菜。商家拿到订单后开始加工,并自动通知周围的快递员。快递员将外卖送到用户手中。 在运营的过程中,发现了如下的问题: 获取用户难,投放一笔不小的广告费到营销渠道(网页、微信推送),收获了一些用户,但无法评判各渠道的效果。 用户经常抱怨送货慢,但慢在什么环节,接单、配送还是加工,如何进行优化? 用户运营,经常搞一些优惠活动,但无法获得效果。 调度问题,如何帮助商家在高峰时提前备货?如何调度更多的快递员到指定区域? 客服服务,用户反馈下单失败,用户背后的操作是什么?系统是否有错误?
  • 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1