云服务器内容精选

  • 数据来源 支持上报到LTS的日志数据来源如下表1所示。 表1 数据来源 类别 来源 接入方式 应用 程序输出 CCE接入 访问日志 WAF接入 语言 Java 云日志 服务Java SDK(标准插件) Log4J Appender 云日志服务Java SDK(log4j2插件) LogBack Appender 云日志服务LogBack SDK Go 云日志服务Go SDK OS Linux 安装ICAgent(区域内主机) 安装ICAgent(区域外主机) Windows Docker文件 Docker输出 数据库 MySQL GaussDB for MySQL接入LTS 云数据库RDS for MySQL接入LTS SQL Server 云数据库RDS for SQLServer接入LTS PostgreSQL 云数据库RDS for PostgreSQL接入LTS 云数据库 GaussDB 数据仓库 服务GaussDB(DWS)接入LTS 云数据库 GeminiDB 云数据库GeminiDB接入LTS 云数据库GeminiDB Mongo接入LTS 云数据库GeminiDB Cassandra接入LTS 移动端 iOS、Android、百度小程序、微信小程序、钉钉小程序、支付宝小程序 使用SDK接入LTS 标准协议 HTTP轮询 使用KAFKA协议上报日志到LTS Syslog 使用KAFKA协议上报日志到LTS Kafka 使用KAFKA协议上报日志到LTS 第三方 Logstash 使用KAFKA协议上报日志到LTS Flume 使用KAFKA协议上报日志到LTS Beats 使用KAFKA协议上报日志到LTS 云日志服务云产品 E CS 、CCE等 华为云产品 日志 使用云服务接入LTS
  • 使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
  • 通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
  • 使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
  • 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 报错说明 当参数错误或不匹配时,会有相应的报错提示。 表4 报错说明 报错信息 报错原因 TopicAuthorizationException projectId(项目ID)、accessKey(AK)、accessSecret(SK)参数错误或者不匹配。 UnknownTopicOrPartitionException logGroupId(日志组ID)、logStreamId(日志流ID)参数错误或者不匹配。 InvalidRecordException 仅当配置headers,上报规范化日志时,会出现此类报错: 日志格式错误或者日志中的projectId(项目ID)、logGroupId(日志组ID)、logStreamId(日志流ID)与外部设置参数不一致。
  • 配置方式 使用Kafka协议上报日志时,需要使用到的通用参数如下。 表1 通用参数 参数名称 描述 类型 projectId 用户账号的项目ID(project id) String logGroupId LTS的日志组ID String logStreamId LTS的日志流ID String regionName 云日志服务的区域 String accessKey 用户账号的AK String accessSecret 用户账号的SK String 使用Kafka协议上报日志时,需要配置以下参数。 表2 配置参数 参数名称 说明 连接类型 当前支持SASL_PLAINTEXT hosts Kafka的IP和PORT地址,格式为lts-kafka.${regionName}.${external_global_domain_name}:9095或lts-access.${regionName}.${external_global_domain_name}:9095 其中IP根据局点进行配置,PORT固定为9095。详细请参考参数获取方式,例如北京四局点对应hosts为 lts-kafka.cn-north-4.myhuaweicloud.com:9095。 topic Kafka的topic名称,格式为 ${日志组ID}_${日志流ID},即LTS的日志组ID和日志流ID通过下划线连接,作为topic的名称。 username Kafka访问用户名,配置为用户账号的项目ID。 password Kafka访问密码,格式为${accessKey}#${accessSecret},即用户账号的AK和SK通过#连接,作为Kafka的访问密码。 headers 设置自定义label字段时,需要配置headers。 headers中添加header,key值为LTS_ LOG _TYPE,value值为FORMAT,用户需要上报符合要求的规范化日志。 ${message}日志格式 仅当headers中添加了key为LTS_LOG_TYPE,value为FORMAT的header时,日志需要符合该格式规范。 表3 日志参数 参数名称 是否必选 参数类型 描述 tenant_project_id 是 String 用户账号的项目ID。 tenant_group_id 是 String LTS的日志组ID。 tenant_stream_id 是 String LTS的日志流ID。 log_time_ns 是 Long 日志数据采集时间,UTC时间(纳秒)。 说明: 采集时间需在日志存储时间范围之内,否则上报日志会被删除。比如日志组的日志存储时间是7天,则此参数不应早于当前时间的7天前。 contents 是 Array of String 日志内容。 labels 是 Object 用户自定义label。 说明: 请不要将字段名称设置为内置保留字段,否则可能会造成字段名称重复、查询不精确等问题。
  • 日志示例 { "tenant_project_id": "${projectId}", "tenant_group_id": "${logGroupId}", "tenant_stream_id": "${logStreamId}", "log_time_ns": "XXXXXXXXXXXXXXXXXXX", "contents": [ "This is a log 1", "This is a log 2" ], "labels": { "type": "kafka" } }
  • 参数获取方式 表5 区域表 区 域名 称 RegionName 华北-北京四 lts-kafka.cn-north-4.myhuaweicloud.com 华东-上海二 lts-kafka.cn-east-2.myhuaweicloud.com 华南-广州 lts-access.cn-south-1.myhuaweicloud.com 亚太-新加坡 lts-kafka.ap-southeast-3.myhuaweicloud.com
  • 使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
  • 使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
  • 使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1