MAPREDUCE服务 MRS-Flume业务配置指南:常用Channel配置
常用Channel配置
- Memory Channel
Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:
表6 Memory Channel常用配置 参数
默认值
描述
type
-
memory channel的类型,必须设置为memory。
capacity
10000
缓存在channel中的最大Event数。
transactionCapacity
1000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
byteCapacity
JVM最大内存的80%
channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。
byteCapacityBufferPercentage
20
channel中字节容量百分比(%)。
- File Channel
File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:
表7 File Channel常用配置 参数
默认值
描述
type
-
file channel的类型,必须设置为file。
checkpointDir
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint
说明:此路径随自定义数据路径变更。
检查点存放路径。
dataDirs
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data
说明:此路径随自定义数据路径变更。
数据缓存路径,设置多个路径可提升性能,中间用逗号分开。
maxFileSize
2146435071
单个缓存文件的最大值,单位:bytes。
minimumRequiredSpace
524288000
缓冲区空闲空间最小值,单位:bytes。
capacity
1000000
缓存在channel中的最大Event数。
transactionCapacity
10000
每次存取的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
channelfullcount
10
channel full次数,达到该次数后发送告警。
useDualCheckpoints
false
是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。
backupCheckpointDir
-
备份检查点路径。
checkpointInterval
30000
检查点间隔时间,单位:秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。
use-log-replay-v1
false
是否启用旧的回复逻辑。
use-fast-replay
false
是否使用队列回复。
checkpointOnClose
true
channel关闭时是否创建检查点。
- Memory File Channel
Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:
表8 Memory File Channel常用配置 参数
默认值
描述
type
org.apache.flume.channel.MemoryFileChannel
memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”。
capacity
50000
Channel缓存容量:缓存在Channel中的最大Event数。
transactionCapacity
5000
事务缓存容量:一次事务能处理的最大Event数。
说明:- 此参数值需要大于source和sink的batchSize。
- 事务缓存容量必须小于或等于Channel缓存容量。
subqueueByteCapacity
20971520
每个subqueue最多保存多少byte的Event,单位:byte。
Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。
subqueue能保存多少event,由“subqueueCapacity”和“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”或“subqueueInterval”上限时,subqueue内的Event才会发往目的地。
说明:“subqueueByteCapacity”必须大于一个batchsize内的Event总容量。
subqueueInterval
2000
每个subqueue最多保存一段多长时间的Event,单位:毫秒。
keep-alive
3
当事务缓存或Channel缓存满时,Put、Take线程等待时间。
单位:秒。
dataDir
-
缓存本地文件存储目录。
byteCapacity
JVM最大内存的80%
Channel缓存容量。
单位:bytes。
compression-type
None
消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。
channelfullcount
10
channel full次数,达到该次数后发送告警。
Memory File Channel配置样例:
server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel server.channels.c1.dataDir = /opt/flume/mfdata server.channels.c1.subqueueByteCapacity = 20971520 server.channels.c1.subqueueInterval=2000 server.channels.c1.capacity = 500000 server.channels.c1.transactionCapacity = 40000
- Kafka Channel
Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
表9 Kafka channel 常用配置 Parameter
Default Value
Description
type
-
kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”。
kafka.bootstrap.servers
-
Kafka的bootstrap地址端口列表。
如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
kafka.topic
flume-channel
channel用来缓存数据的topic。
kafka.consumer.group.id
flume
从kafka中获取数据的组标识,此参数不能为空。
parseAsFlumeEvent
true
是否解析为Flume event。
migrateZookeeperOffsets
true
当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。
kafka.consumer.auto.offset.reset
latest
当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示若没有offset则发生异常。
kafka.producer.security.protocol
SASL_PLAINTEXT
Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
说明:若该参数没有显示,请单击弹窗左下角的"+"显示全部参数。
kafka.consumer.security.protocol
SASL_PLAINTEXT
同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
pollTimeout
500
consumer调用poll()函数能接受的最大超时时间,单位:毫秒。
ignoreLongMessage
false
是否丢弃超大消息。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。