MAPREDUCE服务 MRS-Flume业务模型配置说明:常用Sink配置
常用Sink配置
- HDFS Sink
HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:
表16 HDFS Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
hdfs
hdfs sink的类型,必须设置为hdfs。
hdfs.path
-
HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
hdfs.inUseSuffix
.tmp
正在写入的hdfs文件后缀。
hdfs.rollInterval
30
按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollSize
1024
按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
hdfs.rollCount
10
按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。
说明:参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。
hdfs.idleTimeout
0
自动关闭空闲文件超时时间,单位:秒。
hdfs.batchSize
1000
批次写入HDFS的Event个数。
hdfs.kerberosPrincipal
-
认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
hdfs.kerberosKeytab
-
认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。
hdfs.fileCloseByEndEvent
true
收到源文件的最后一个Event时是否关闭hdfs文件。
hdfs.batchCallTimeout
-
批次写入HDFS超时控制时间,单位:毫秒。
当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。
说明:“hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。
serializer.appendNewline
true
将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。
hdfs.filePrefix
over_%{basename}
数据写入hdfs后文件名的前缀。
hdfs.fileSuffix
-
数据写入hdfs后文件名的后缀。
hdfs.inUsePrefix
-
正在写入的hdfs文件前缀。
hdfs.fileType
DataStream
hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。
说明:“SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。
hdfs.codeC
-
文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。
hdfs.maxOpenFiles
5000
最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。
hdfs.writeFormat
Writable
文件写入格式,“Writable”或者“Text”。
hdfs.callTimeout
10000
写入HDFS超时控制时间,单位:毫秒。
hdfs.threadsPoolSize
-
每个HDFS sink用于HDFS io操作的线程数。
hdfs.rollTimerPoolSize
-
每个HDFS sink用于调度定时文件滚动的线程数。
hdfs.round
false
时间戳是否四舍五入。如果设置为true,则会影响所有基于时间的转义序列(%t除外)。
hdfs.roundUnit
second
时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。
hdfs.useLocalTimeStamp
true
是否启用本地时间戳,建议设置为“true”。
hdfs.closeTries
0
hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。
hdfs.retryInterval
180
尝试关闭hdfs文件的时间间隔,单位:秒。
说明:每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。
hdfs.failcount
10
数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。
- Avro Sink
Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:
表17 Avro Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
avro sink的类型,必须设置为avro。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
client.type
DEFAULT
客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:
- DEFAULT,返回AvroRPC类型的客户端实例。
- OTHER,返回NULL。
- THRIFT,返回Thrift RPC类型的客户端实例。
- DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
- DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。
ssl
false
是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。
truststore-type
JKS
Java信任库类型,“JKS”或“PK CS 12”。
说明:JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
keystore-type
JKS
ssl启用后密钥存储类型。
keystore
-
ssl启用后密钥存储文件路径,开启ssl后,该参数必填。
keystore-password
-
ssl启用后密钥存储密码,开启ssl后,该参数必填。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
compression-type
none
批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。
compression-level
6
批数据压缩级别(1-9),数值越高,压缩率越高。
exclude-protocols
SSLv3
排除的协议列表,用空格分开。默认排除SSLv3协议。
- HBase Sink
HBase Sink将数据写入到HBase中。常用配置如下表所示:
表18 HBase Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
hbase sink的类型,必须设置为hbase。
table
-
HBase表名称。
columnFamily
-
HBase列族。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
batchSize
1000
批次写入HBase的Event个数。
kerberosPrincipal
-
认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。
kerberosKeytab
-
认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。
coalesceIncrements
true
是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。
- Kafka Sink
Kafka Sink将数据写入到Kafka中。常用配置如下表所示:
表19 Kafka Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
-
kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。
kafka.bootstrap.servers
-
Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
monTime
0(不开启)
线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。
kafka.producer.acks
1
必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。
kafka.topic
-
数据写入的topic,必须填写。
allowTopicOverride
false
是否将Event Header中保存的topic替换kafka.topic中配置的topic。
flumeBatchSize
1000
批次写入Kafka的Event个数。
kafka.security.protocol
SASL_PLAINTEXT
Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。
ignoreLongMessage
false
是否丢弃超大消息的开关。
messageMaxLength
1000012
Flume写入Kafka的消息的最大长度。
defaultPartitionId
-
用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。
partitionIdHeader
-
设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。
Other Kafka Producer Properties
-
其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。
- Thrift Sink
Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:
表20 Thrift Sink常用配置 参数
默认值
描述
channel
-
与之相连的channel。
type
thrift
thrift sink的类型,必须设置为thrift。
hostname
-
绑定的主机名/IP。
port
-
监测端口,该端口需未被占用。
batch-size
1000
批次发送的Event个数。
connect-timeout
20000
第一次连接的超时时间,单位:毫秒。
request-timeout
20000
第一次请求后一次请求的最大超时时间,单位:毫秒。
kerberos
false
是否启用Kerberos认证。
client-keytab
-
客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。
client-principal
-
客户端使用的安全用户的Principal。
server-principal
-
服务端使用的安全用户的Principal。
compression-type
none
Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。
maxConnections
5
Flume发送数据时的最大连接池大小。
ssl
false
是否使用SSL加密。
truststore-type
JKS
Java信任库类型。
truststore
-
Java信任库文件。
truststore-password
-
Java信任库密码。
reset-connection-interval
0
一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
- MapReduce服务_什么是Flume_如何使用Flume
- MapReduce服务_什么是Loader_如何使用Loader
- ModelArts模型训练_模型训练简介_如何训练模型
- MapReduce服务_什么是ClickHouse_如何使用ClickHouse
- GaussDB(DWS)常用SQL_常用SQL命令_SQL语法
- 人工智能学习入门
- MapReduce服务_什么是存算分离_如何配置MRS集群存算分离
- 人工智能学习入门
- MapReduce工作原理_MapReduce是什么意思_MapReduce流程_MRS_华为云
- MapReduce服务_什么是MapReduce服务_什么是HBase