云日志服务 LTS-使用Flume采集器上报日志到LTS:使用Flume采集数据库表数据并且上报至LTS

时间:2024-10-25 09:39:34

使用Flume采集数据库表数据并且上报至LTS

使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志

  1. https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。
  2. 添加MySQL驱动到FLUME_HOME/lib目录下:

    1. 下载MySQL驱动。
      wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
    2. 将驱动包解压并打为jar包。
      tar xzf mysql-connector-java-5.1.35.tar.gz
    3. 将jar包存放在FLUME_HOME/lib/路径。
      cp mysql-connector-java-5.1.35-bin.jar  FLUME_HOME/lib/

  3. 添加采集MySQL的conf文件。

    # a1表示agent的名称
    # source是a1的输入源
    # channels是缓冲区
    # sinks是a1输出目的地,本例子sinks使用了kafka
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    #source
    a1.sources.r1.type = org.keedio.flume.source.SQLSource
    # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称
    # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败
    a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    # Hibernate Database connection properties
    # mysql账号,一般都是root
    a1.sources.r1.hibernate.connection.user = root
    # 填入你的mysql密码
    a1.sources.r1.hibernate.connection.password = xxxxxxxx
    a1.sources.r1.hibernate.connection.autocommit = true
    # mysql驱动
    a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    # 存放status文件
    a1.sources.r1.status.file.path = FLUME_HOME/bin
    a1.sources.r1.status.file.name = sqlSource.status
    # Custom query
    # 填写需要采集的数据表名{table_name},也可以使用下面的方法:
    a1.sources.r1.custom.query = select * from {table_name}
    
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000

  4. 启动Flume后,即可开始采集数据库中的表数据到LTS。
support.huaweicloud.com/bestpractice-lts/lts_04_1131.html