MAPREDUCE服务 MRS-Flume业务模型配置说明:Interceptors

时间:2024-10-22 09:17:13

Interceptors

Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。官网参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

  1. 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。
  2. Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。
  3. 指定拦截器的顺序就是它们被调用的顺序。
  4. 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。
表5 Flume内建支持的拦截器类型

拦截器类型

简要描述

Timestamp Interceptor

该拦截器会在Event的Header中插入一个时间戳。

Host Interceptor

该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。

Remove Header Interceptor

该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。

UUID Interceptor

该拦截器会为每个Event的Header生成一个UUID字符串。

Search and Replace Interceptor

该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。

Regex Filtering Interceptor

该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。

Regex Extractor Interceptor

该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。

下面以Regex Filtering Interceptor 为例说明Interceptor使用(其余的可参考官网配置):
表6 Regex Filtering Interceptor配置参数说明

选项名称

默认值

描述

type

-

组件类型名称,必须写为regex_filter。

regex

-

用于匹配事件的正则表达式。

excludeEvents

false

默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。

配置示例(为了方便观察,此模型使用了netcat tcp作为Source源,logger作为Sink)。配置好如下参数后,在Linux的配置的主机节点上执行Linux命令“telnet 主机名或IP 44444”,并任意敲入符合正则和不符合正则的字符串。会在日志中观察到,只有匹配到的字符串被传输了。
#define the source、channel、sink
server.sources = r1

server.channels = c1
server.sinks = k1

#config the source
server.sources.r1.type = netcat
server.sources.r1.bind = ${主机IP}
server.sources.r1.port = 44444
server.sources.r1.interceptors= i1
server.sources.r1.interceptors.i1.type= regex_filter
server.sources.r1.interceptors.i1.regex= (flume)|(myflume)
server.sources.r1.interceptors.i1.excludeEvents= false
server.sources.r1.channels = c1

#config the channel
server.channels.c1.type = memory
server.channels.c1.capacity = 1000
server.channels.c1.transactionCapacity = 100
#config the sink
server.sinks.k1.type = logger
server.sinks.k1.channel = c1
support.huaweicloud.com/cmpntguide-mrs/mrs_01_1075.html