云服务器内容精选
-
采集终端用户日志 端侧日志全面采集接入LTS,例如Web浏览器、IOS、安卓、百度小程序、微信小程序、钉钉小程序、快应用等多类端侧日志,集成LTS提供的多种移动端SDK,实现了缓存发送、异常重试、批量发送等稳定功能,用户快速集成即可全面采集移动端日志到LTS。 采集方案: 移动端:可以使用移动端iOS SDK,Android或MAN(移动数据分析)接入。 ARM设备:ARM平台可以使用Native C交叉编译。 商家平台设备:X86平台设备可以用SDK、ARM平台可以使用Native C交叉编译。 实施方法: 日志写到本地文件,通过创建日志采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
-
采集服务器日志 采集方案,例如: Syslog日志 Aug 31 11:07:24 zhouqi-mac WeChat[9676]: setupHotkeyListenning event NSEvent: type=KeyDown loc=(0,703) time=115959.8 flags=0 win=0x0 winNum=7041 ctxt=0x0 chars="u" unmodchars="u" repeat=0 keyCode=32 应用程序Debug日志 __FILE__:build/release64/sls/shennong_worker/ShardDataIndexManager.cpp __LEVEL__:WARNING __LINE__:238 __THREAD__:31502 offset:816103453552 saved_cursor:1469780553885742676 seek count:62900 seek data redo log:pangu://localcluster/redo_data/41/example/2016_08_30/250_1472555483 user_cursor:1469780553885689973 Trace日志 [2013-07-13 10:28:12.772518] [DEBUG] [26064] __TRACE_ID__:661353951201 __item__:[Class:Function]_end__ request_id:1734117 user_id:124 context:..... 采集方法: 日志写到本地文件,通过创建采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
-
采集服务端数据 支付宝和微信公众账号编程是典型的Web端模式,一般会有四种类型的日志,可以使用SDK上报日志的方式,详细请参考 云日志 服务支付宝小程序SDK、云日志服务微信小程序SDK。 Nginx和Apache访问日志 Nginx和Apache访问日志用以监控、实时统计。 10.1.168.193 - - [01/Mar/2024:16:12:07 +0800] "GET /Send?AccessKeyId=8225105404 HTTP/1.1" 200 5 "-" "Mozilla/5.0 (X11; Linux i686 on x86_64; rv:10.0.2) Gecko/20100101 Firefox/10.0.2" Nginx和Apache错误日志 2024/04/18 18:59:01 [error] 26671#0: *20949999 connect() to unix:/tmp/fastcgi.socket failed (111: Connection refused) while connecting to upstream, client: 10.101.1.1, server: , request: "POST /logstores/test_log HTTP/1.1", upstream: "fastcgi://unix:/tmp/fastcgi.socket:", host: "example.com" 应用层日志 应用层日志可以把事件产生的时间、地点、结果、参数等记录详细,扩展类字段一般放在最后。 {"labels":{"cccdasd":51,"platform":"wx"},"logs":[{"contents":[{"aaa":"13123","__client_time__":1728565099070}]}]} 应用层错误日志:错误码、原因等。 {"errorCode":"LTS.403","errorMessage":" request header auth is empty"} 实施方法: 日志写到本地文件,通过创建日志采集任务写到指定日志流中。 Docker中产生的日志可以使用容器服务集成日志服务进行采集。 C#、Python、Java、PHP、C等可以使用SDK写入。
-
背景信息 “我要点外卖”是一个平台型电商网站,涉及用户、餐厅、配送员等。用户可以在网页、App、微信、支付宝等进行下单点菜。商家拿到订单后开始加工,并自动通知周围的快递员。快递员将外卖送到用户手中。 在运营的过程中,发现了如下的问题: 获取用户难,投放一笔不小的广告费到营销渠道(网页、微信推送),收获了一些用户,但无法评判各渠道的效果。 用户经常抱怨送货慢,但慢在什么环节,接单、配送还是加工,如何进行优化? 用户运营,经常搞一些优惠活动,但无法获得效果。 调度问题,如何帮助商家在高峰时提前备货?如何调度更多的快递员到指定区域? 客服服务,用户反馈下单失败,用户背后的操作是什么?系统是否有错误?
-
统一管理日志 在云日志服务LTS控制台创建日志组,详细操作请参考管理日志组。 在云日志服务LTS控制台为不同数据源产生的日志创建日志流,详细操作请参考管理日志流,例如: wechat-server(存储微信服务器访问日志) wechat-app(存储微信服务器应用日志) wechat-error(错误日志) alipay-server alipay-app deliver-app(送货员App状态) deliver-error(错误日志) web-click(H5页面点击) server-access(服务端Access-Log) server-app(应用) coupon(应用优惠券日志) pay(支付日志) order(订单日志) 如需要对原始数据进行加工,可以通过创建日志加工任务,详细请参考DSL加工。 DSL加工的功能在邀测中,支持华北-北京四、华东-上海一、华南-广州局点,仅针对用户内测使用,后续将全网开放,敬请期待!
-
采集用户推广日志 为获取新用户,一般有两种方式: 网站注册时直接投放优惠券。 其他渠道扫描二维码,投放优惠券。 传单二维码 扫描网页二维码登录 实施方法: 定义如下注册服务器地址,生成二维码(传单、网页)供用户注册扫描。用户扫描该页面进行注册时,就可以得知用户是通过特定来源进入的,并记录日志。 http://example.com/login?source=10012&ref=kd4b 当服务端接受请求时,服务器输出如下日志: 2024-06-20 19:00:00 e41234ab342ef034,102345,5k4d,467890 收集方式: 应用程序输出日志到硬盘,通过ICAgent采集日志,详细请参考云主机ECS文本日志接入LTS。 应用程序通过SDK写入。详细请参考SDK概述。
-
使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
-
使用Flume采集syslog协议传输的日志上报到LTS Syslog协议是一种用于在IP网络中传输日志消息的协议,通过Flume将syslog协议传输的日志采集并上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 接收UDP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1 接收TCP日志,参考如下示例添加采集Syslog协议的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port为syslog服务器的端口 a1.sources.r1.port = {host_port} #host_ip为syslog服务器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
-
使用默认拦截器处理日志 使用Flume采集器时,拦截器是简单的插件式组件,设置在Source和Channel之间。Source接收到的事件Event,在写入Channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个Source接收到的事件。 时间戳拦截器 该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接收到的只有message。时间戳拦截器的配置, 参数默认值描述type,类型名称timestamp,也可以使用类名的全路径preserveExisting为false。如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值。source连接到时间戳拦截器的配置: a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false 正则过滤拦截器 在日志采集的时候,可能有一些数据是不需要的,添加过滤拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述type,类型名称REGEX_FILTER。excludeEvents为false时默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。source连接到正则过滤拦截器的配置: a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false 这样配置的拦截器就只会接收日志消息中带有today或者Monday的日志。 搜索并替换拦截器 拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。配置如下: # 拦截器别名 a1.sources.r1.interceptors = search-replace # 拦截器类型,必须是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #删除事件正文中的字符,根据正则匹配event内容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替换匹配到的event内容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 设置字符集,默认是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
-
通过Flume采集TCP/UDP协议传输的日志上报到LTS 通过Flume采集TCP/UDP协议传输的日志上报到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 采集TCP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 采集UDP端口日志,参考如下示例添加采集端口的conf文件。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
通过Flume采集SNMP协议上报的设备管理数据并发送到LTS 通过Flume采集SNMP协议上报的设备管理数据并发送到LTS。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 监听SNMP协议通信端口号161。参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 监听SNMP协议陷阱(Trap)通信的端口号162,参考如下示例添加SNMP协议接受日志的conf。 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
使用Flume采集文本日志上报到LTS 支持使用Flume采集文本日志内容上报至LTS,参考如下示例添加采集文本日志的conf文件。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 #Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
使用Flume采集数据库表数据并且上报至LTS 使用Flume采集数据库表数据并且上报至LTS,实现对表数据变动监控。以下示例中的参数介绍请参考使用KAFKA协议上报日志。 在https://github.com/keedio/flume-ng-sql-source页面下载flume-ng-sql-source插件,转换为jar包并取名为flume-ng-sql-source.jar,打包前注意将pom文件中的flume-ng-core 版本与flume安装版本保持一致,并且将jar包放在安装Flume包路径的lib目录下面,例如FLUME_HOME/lib目录下(例子中的FLUME_HOME为Flume安装路径,仅供参考,请以实际安装路径为准)。 添加MySQL驱动到FLUME_HOME/lib目录下: 下载MySQL驱动。 wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz 将驱动包解压并打为jar包。 tar xzf mysql-connector-java-5.1.35.tar.gz 将jar包存放在FLUME_HOME/lib/路径。 cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/ 添加采集MySQL的conf文件。 # a1表示agent的名称 # source是a1的输入源 # channels是缓冲区 # sinks是a1输出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 连接mysql的一系列操作,{mysql_host}改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看,{database_name}改为数据库名称 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql账号,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入你的mysql密码 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驱动 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填写需要采集的数据表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 启动Flume后,即可开始采集数据库中的表数据到LTS。
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格