云服务器内容精选

  • Flume开源增强特性 提升传输速度。可以配置将指定的行数作为一个Event,而不仅是一行,提高了代码的执行效率以及减少写入磁盘的次数。 传输超大二进制文件。Flume根据当前内存情况,自动调整传输超大二进制文件的内存占用情况,不会导致Out of Memory(OOM)的出现。 支持定制传输前后准备工作。Flume支持定制脚本,指定在传输前或者传输后执行指定的脚本,用于执行准备工作。 管理客户端告警。Flume通过MonitorServer接收Flume客户端告警,并上报Manager告警管理中心。
  • 操作步骤 根据前提条件,创建一个满足要求的弹性云服务器。 登录集群详情页面,选择“组件管理”。 若集群详情页面没有“组件管理”页签,请先完成 IAM 用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。 单击“下载客户端”。 在“客户端类型”选择“完整客户端”。 在“下载路径”选择“远端主机”。 将“主机IP”设置为E CS 的IP地址,设置“主机端口”为“22”,并将“保存路径”设置为“/tmp”。 如果使用SSH登录ECS的默认端口“22”被修改,请将“主机端口”设置为新端口。 “保存路径”最多可以包含256个字符。 “登录用户”设置为“root”。 如果使用其他用户,请确保该用户对保存目录拥有读取、写入和执行权限。 在“登录方式”选择“密码”或“SSH私钥”。 密码:输入创建集群时设置的root用户密码。 SSH私钥:选择并上传创建集群时使用的密钥文件。 单击“确定”开始生成客户端文件。 若界面显示以下提示信息表示客户端包已经成功保存。 下载客户端文件到远端主机成功。 若界面显示以下提示信息,请检查用户名密码及远端主机的安全组配置,确保用户名密码正确,及远端主机的安全组已增加SSH(22)端口的入方向规则。然后从3执行重新下载客户端。 连接到服务器失败,请检查网络连接或参数设置。 图1 下载客户端 选择“Flume”服务,单击“实例”,查看任意一个Flume实例和两个MonitorServer实例的“业务IP”。 使用VNC方式,登录弹性云服务器。参见远程登录(VNC方式)。 所有镜像均支持Cloud-init特性。Cloud-init预配置的用户名“root”,密码为创建集群时设置的密码。首次登录建议修改。 在弹性云服务器,切换到root用户,并将安装包复制到目录“/opt”。 sudo su - root cp /tmp/ MRS _Flume_Client.tar /opt 在“/opt”目录执行以下命令,解压压缩包获取校验文件与客户端配置包。 tar -xvf MRS_Flume_Client.tar 执行以下命令,校验文件包。 sha256sum -c MRS_Flume_ClientConfig.tar.sha256 界面显示如下信息,表明文件包校验成功: MRS_Flume_ClientConfig.tar: OK 执行以下命令,解压“MRS_Flume_ClientConfig.tar”。 tar -xvf MRS_Flume_ClientConfig.tar 执行以下命令,安装客户端运行环境到新的目录,例如“/opt/Flumeenv”。安装时自动生成目录。 sh /opt/MRS_Flume_ClientConfig/install.sh /opt/Flumeenv 查看安装输出信息,如有以下结果表示客户端运行环境安装成功: Components client installation is complete. 执行以下命令,配置环境变量。 source /opt/Flumeenv/bigdata_env 执行以下命令,解压Flume客户端。 cd /opt/MRS_Flume_ClientConfig/Flume tar -xvf FusionInsight -Flume-1.6.0.tar.gz 执行以下命令,查看当前用户密码是否过期。 chage -l root “Password expires”时间早于当前则表示过期。此时需要修改密码,或执行chage -M -1 root设置密码为未过期状态。 执行以下命令,安装Flume客户端到新目录,例如“/opt/FlumeClient”。安装时自动生成目录。 sh /opt/MRS_Flume_ClientConfig/Flume/install.sh -d /opt/FlumeClient -f MonitorServer实例的业务IP地址 -c Flume配置文件路径 -l /var/log/ -e Flume的业务IP地址 -n Flume客户端名称 各参数说明如下: “-d”:表示Flume客户端安装路径。 “-f”:可选参数,表示两个MonitorServer角色的业务IP地址,中间用英文逗号分隔,若不设置则Flume客户端将不向MonitorServer发送告警信息,同时在MRS Manager界面上看不到该客户端的相关信息。 “-c”:可选参数,表示Flume客户端在安装后默认加载的配置文件“properties.properties”。如不添加参数,默认使用客户端安装目录的“fusioninsight-flume-1.6.0/conf/properties.properties”。客户端中配置文件为空白模板,根据业务需要修改后Flume客户端将自动加载。 “-l”:可选参数,表示日志目录,默认值为“/var/log/Bigdata”。 “-e”:可选参数,表示Flume实例的业务IP地址,主要用于接收客户端上报的监控指标信息。 “-n”:可选参数,表示自定义的Flume客户端的名称。 IBM的JDK不支持“-Xloggc”,需要修改“flume/conf/flume-env.sh”,将“-Xloggc”修改为“-Xverbosegclog”,若JDK为32位,“-Xmx”不能大于3.25GB。 “flume/conf/flume-env.sh”中,“-Xmx”默认为4GB。若客户端机器内存过小,可调整为512M甚至1GB。 例如执行:sh install.sh -d /opt/FlumeClient 系统显示以下结果表示客户端运行环境安装成功: install flume client successfully.
  • Flume模块介绍 Flume客户端/服务端由一个或多个Agent组成,而每个Agent是由Source、Channel、Sink三个模块组成,数据先进入Source然后传递到Channel,最后由Sink发送到下一个Agent或目的地(客户端外部)。各模块说明见表1。 表1 模块说明 名称 说明 Source Source负责接收数据或产生数据,并将数据批量放到一个或多个Channel。Source有两种类型:数据驱动和轮询。 典型的Source样例如下: 和系统集成并接收数据的Sources:Syslog、Netcat。 自动生成事件数据的Sources:Exec、SEQ。 用于Agent和Agent之间通信的IPC Sources:Avro。 Source必须至少和一个Channel关联。 Channel Channel位于Source和Sink之间,用于缓存Source传递的数据,当Sink成功将数据发送到下一跳的Channel或最终数据处理端,缓存数据将自动从Channel移除。 不同类型的Channel提供的持久化水平也是不一样的: Memory Channel:非持久化 File Channel:基于预写式日志(Write-Ahead Logging,简称WAL)的持久化实现 JDBC Channel:基于嵌入Database的持久化实现 Channel支持事务特性,可保证简易的顺序操作,同时可以配合任意数量的Source和Sink共同工作。 Sink Sink负责将数据传输到下一跳或最终目的,成功完成后将数据从Channel移除。 典型的Sink样例如下: 存储数据到最终目的终端Sink,比如:HDFS、Kafka 自动消耗的Sinks,比如:Null Sink 用于Agent和Agent之间通信的IPC sink:Avro Sink必须关联到一个Channel。 每个Flume的Agent可以配置多个Source、Channel、Sink模块,即一个Source将数据发送给多个Channel,再由多个Sink发送到下一个Agent或目的地。 Flume支持多个Flume配置级联,即上一个Agent的Sink将数据再发送给另一个Agent的Source。
  • 补充说明 Flume可靠性保障措施。 Source与Channel、Channel与Sink之间支持事务机制。 Sink Processor支持配置failover、load_balance机制。 例如load_balance示例如下: server.sinkgroups=g1server.sinkgroups.g1.sinks=k1 k2server.sinkgroups.g1.processor.type=load_balanceserver.sinkgroups.g1.processor.backoff=trueserver.sinkgroups.g1.processor.selector=random Flume多客户端聚合级联时的注意事项。 级联时需要走Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。 Flume客户端可以包含多个独立的数据流,即在一个配置文件properties.properties中配置多个Source、Channel、Sink。这些组件可以链接以形成多个流。 例如在一个配置中配置两个数据流,示例如下: server.sources = source1 source2server.sinks = sink1 sink2server.channels = channel1 channel2#dataflow1 server.sources.source1.channels = channel1server.sinks.sink1.channel = channel1#dataflow2server.sources.source2.channels = channel2server.sinks.sink2.channel = channel2
  • 使用Flume客户端加密工具 安装Flume客户端后,配置文件的部分参数可能需要填写加密的字符,Flume客户端中提供了加密工具。 安装Flume客户端。 登录安装Flume客户端的节点,并切换到客户端安装目录。例如“/opt/FlumeClient”。 切换到以下目录 cd fusioninsight-flume-Flume组件版本号/bin 执行以下命令,加密原始信息: ./genPwFile.sh 输入两次待加密信息。 执行以下命令,查看加密后的信息: cat password.property 如果加密参数是用于Flume Server,那么需要到相应的Flume Server所在节点执行加密。需要使用omm用户执行加密脚本进行加密。 针对MRS 3.x之前版本加密路径为“/opt/Bigdata/MRS_XXX/install/FusionInsight-Flume-Flume组件版本号/flume/bin/genPwFile.sh”。 针对MRS 3.x及之后版本加密路径为“/opt/Bigdata/FusionInsight_Porter_XXX/install/FusionInsight-Flume-Flume组件版本号/flume/bin/genPwFile.sh”。其中XXX为产品的版本号。 父主题: Flume企业级能力增强
  • 如何开发Flume第三方插件 该操作指导用户进行第三方插件二次开发。 本章节适用于MRS 3.x及之后版本。 将自主研发的代码打成jar包。 安装Flume服务端或者客户端,如安装目录为“/opt/flumeclient”。 建立插件目录布局。 进入“Flume客户端安装目录/fusionInsight-flume-*/plugins.d”路径下,使用以下命令建立目录,可根据实际业务进行命名,无固定名称: cd /opt/flumeclient/fusioninsight-flume-1.9.0/plugins.d mkdir thirdPlugin cd thirdPlugin mkdir lib libext native 显示结果如下: 将第三方jar包放入“Flume客户端安装目录/fusionInsight-flume-*/plugins.d/thirdPlugin/lib”路径下,若该jar包依赖其他jar包,则将所依赖的jar包放入“Flume客户端安装目录/fusionInsight-flume-*/plugins.d/thirdPlugin/libext”文件夹中,“Flume客户端安装目录/fusionInsight-flume-*/plugins.d/thirdPlugin/native”放置本地库文件。 配置“Flume客户端安装目录/fusionInsight-flume-*/conf/properties.properties”文件。 具体properties.properties参数配置方法,参考配置Flume非加密传输数据采集任务和配置Flume加密传输数据采集任务对应典型场景中properties.properties文件参数列表的说明。 父主题: Flume常见问题
  • 注意事项 Flume可靠性保障措施有哪些? Source&Channel、Channel&Sink之间的事务机制。 Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下,详细参考http://flume.apache.org/releases/1.9.0.html。 server.sinkgroups=g1server.sinkgroups.g1.sinks=k1 k2server.sinkgroups.g1.processor.type=load_balanceserver.sinkgroups.g1.processor.backoff=trueserver.sinkgroups.g1.processor.selector=random Flume多agent聚合级联时的注意事项? 级联时需要使用Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。
  • 常用Sink配置 HDFS Sink HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示: 表10 HDFS Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type hdfs hdfs sink的类型,必须设置为hdfs。 hdfs.path - HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 hdfs.inUseSuffix .tmp 正在写入的hdfs文件后缀。 hdfs.rollInterval 30 按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollSize 1024 按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 hdfs.rollCount 10 按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。 说明: 参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。 hdfs.idleTimeout 0 自动关闭空闲文件超时时间,单位:秒。 hdfs.batchSize 1000 批次写入HDFS的Event个数。 hdfs.kerberosPrincipal - 认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。 hdfs.kerberosKeytab - 认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。 hdfs.fileCloseByEndEvent true 收到源文件的最后一个Event时是否关闭hdfs文件。 hdfs.batchCallTimeout - 批次写入HDFS超时控制时间,单位:毫秒。 当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。 说明: “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。 serializer.appendNewline true 将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。 hdfs.filePrefix over_%{basename} 数据写入hdfs后文件名的前缀。 hdfs.fileSuffix - 数据写入hdfs后文件名的后缀。 hdfs.inUsePrefix - 正在写入的hdfs文件前缀。 hdfs.fileType DataStream hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。 说明: “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。 hdfs.codeC - 文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。 hdfs.maxOpenFiles 5000 最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。 hdfs.writeFormat Writable 文件写入格式,“Writable”或者“Text”。 hdfs.callTimeout 10000 写入HDFS超时控制时间,单位:毫秒。 hdfs.threadsPoolSize - 每个HDFS sink用于HDFS io操作的线程数。 hdfs.rollTimerPoolSize - 每个HDFS sink用于调度定时文件滚动的线程数。 hdfs.round false 时间戳是否四舍五入。若设置为true,则会影响所有基于时间的转义序列(%t除外)。 hdfs.roundUnit second 时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。 hdfs.useLocalTimeStamp true 是否启用本地时间戳,建议设置为“true”。 hdfs.closeTries 0 hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。 hdfs.retryInterval 180 尝试关闭hdfs文件的时间间隔,单位:秒。 说明: 每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。 hdfs.failcount 10 数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。 Avro Sink Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示: 表11 Avro Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - avro sink的类型,必须设置为avro。 hostname - 绑定的主机名/IP。 port - 监测端口,该端口需未被占用。 batch-size 1000 批次发送的Event个数。 client.type DEFAULT 客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括: DEFAULT,返回AvroRPC类型的客户端实例。 OTHER,返回NULL。 THRIFT,返回Thrift RPC类型的客户端实例。 DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。 DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。 ssl false 是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。 truststore-type JKS Java信任库类型,“JKS”或“PKCS12”。 说明: JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 keystore-type JKS ssl启用后密钥存储类型。 keystore - ssl启用后密钥存储文件路径,开启ssl后,该参数必填。 keystore-password - ssl启用后密钥存储密码,开启ssl后,该参数必填。 connect-timeout 20000 第一次连接的超时时间,单位:毫秒。 request-timeout 20000 第一次请求后一次请求的最大超时时间,单位:毫秒。 reset-connection-interval 0 一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。 compression-type none 批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。 compression-level 6 批数据压缩级别(1-9),数值越高,压缩率越高。 exclude-protocols SSLv3 排除的协议列表,用空格分开。默认排除SSLv3协议。 HBase Sink HBase Sink将数据写入到HBase中。常用配置如下表所示: 表12 HBase Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - hbase sink的类型,必须设置为hbase。 table - HBase表名称。 columnFamily - HBase列族。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 batchSize 1000 批次写入HBase的Event个数。 kerberosPrincipal - 认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。 kerberosKeytab - 认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。 coalesceIncrements true 是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。 Kafka Sink Kafka Sink将数据写入到Kafka中。常用配置如下表所示: 表13 Kafka Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type - kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。 kafka.bootstrap.servers - Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 monTime 0(不开启) 线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。 kafka.producer.acks 1 必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。 kafka.topic - 数据写入的topic,必须填写。 flumeBatchSize 1000 批次写入Kafka的Event个数。 kafka.security.protocol SASL_PLAINTEXT Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 ignoreLongMessage false 是否丢弃超大消息的开关。 messageMaxLength 1000012 Flume写入Kafka的消息的最大长度。 defaultPartitionId - 用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。 partitionIdHeader - 设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。 Other Kafka Producer Properties - 其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。 Thrift Sink Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示: 表14 Thrift Sink常用配置 参数 默认值 描述 channel - 与之相连的channel。 type thrift thrift sink的类型,必须设置为thrift。 hostname - 绑定的主机名/IP。 port - 监测端口,该端口需未被占用。 batch-size 1000 批次发送的Event个数。 connect-timeout 20000 第一次连接的超时时间,单位:毫秒。 request-timeout 20000 第一次请求后一次请求的最大超时时间,单位:毫秒。 kerberos false 是否启用Kerberos认证。 client-keytab - 客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。 client-principal - 客户端使用的安全用户的Principal。 server-principal - 服务端使用的安全用户的Principal。 compression-type none Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。 maxConnections 5 Flume发送数据时的最大连接池大小。 ssl false 是否使用SSL加密。 truststore-type JKS Java信任库类型。 truststore - Java信任库文件。 truststore-password - Java信任库密码。 reset-connection-interval 0 一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。
  • 常用Channel配置 Memory Channel Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示: 表6 Memory Channel常用配置 参数 默认值 描述 type - memory channel的类型,必须设置为memory。 capacity 10000 缓存在channel中的最大Event数。 transactionCapacity 1000 每次存取的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 channelfullcount 10 channel full次数,达到该次数后发送告警。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。 byteCapacity JVM最大内存的80% channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。 byteCapacityBufferPercentage 20 channel中字节容量百分比(%)。 File Channel File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示: 表7 File Channel常用配置 参数 默认值 描述 type - file channel的类型,必须设置为file。 checkpointDir ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint 说明: 此路径随自定义数据路径变更。 检查点存放路径。 dataDirs ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data 说明: 此路径随自定义数据路径变更。 数据缓存路径,设置多个路径可提升性能,中间用逗号分开。 maxFileSize 2146435071 单个缓存文件的最大值,单位:bytes。 minimumRequiredSpace 524288000 缓冲区空闲空间最小值,单位:bytes。 capacity 1000000 缓存在channel中的最大Event数。 transactionCapacity 10000 每次存取的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 channelfullcount 10 channel full次数,达到该次数后发送告警。 useDualCheckpoints false 是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。 backupCheckpointDir - 备份检查点路径。 checkpointInterval 30000 检查点间隔时间,单位:秒。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。 use-log-replay-v1 false 是否启用旧的回复逻辑。 use-fast-replay false 是否使用队列回复。 checkpointOnClose true channel关闭时是否创建检查点。 Memory File Channel Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示: 表8 Memory File Channel常用配置 参数 默认值 描述 type org.apache.flume.channel.MemoryFileChannel memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”。 capacity 50000 Channel缓存容量:缓存在Channel中的最大Event数。 transactionCapacity 5000 事务缓存容量:一次事务能处理的最大Event数。 说明: 此参数值需要大于source和sink的batchSize。 事务缓存容量必须小于或等于Channel缓存容量。 subqueueByteCapacity 20971520 每个subqueue最多保存多少byte的Event,单位:byte。 Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。 subqueue能保存多少event,由“subqueueCapacity”和“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”或“subqueueInterval”上限时,subqueue内的Event才会发往目的地。 说明: “subqueueByteCapacity”必须大于一个batchsize内的Event总容量。 subqueueInterval 2000 每个subqueue最多保存一段多长时间的Event,单位:毫秒。 keep-alive 3 当事务缓存或Channel缓存满时,Put、Take线程等待时间。 单位:秒。 dataDir - 缓存本地文件存储目录。 byteCapacity JVM最大内存的80% Channel缓存容量。 单位:bytes。 compression-type None 消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。 channelfullcount 10 channel full次数,达到该次数后发送告警。 Memory File Channel配置样例: server.channels.c1.type = org.apache.flume.channel.MemoryFileChannelserver.channels.c1.dataDir = /opt/flume/mfdataserver.channels.c1.subqueueByteCapacity = 20971520server.channels.c1.subqueueInterval=2000server.channels.c1.capacity = 500000server.channels.c1.transactionCapacity = 40000 Kafka Channel Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。 表9 Kafka channel 常用配置 Parameter Default Value Description type - kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”。 kafka.bootstrap.servers - Kafka的bootstrap地址端口列表。 如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 kafka.topic flume-channel channel用来缓存数据的topic。 kafka.consumer.group.id flume 从kafka中获取数据的组标识,此参数不能为空。 parseAsFlumeEvent true 是否解析为Flume event。 migrateZookeeperOffsets true 当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。 kafka.consumer.auto.offset.reset latest 当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示若没有offset则发生异常。 kafka.producer.security.protocol SASL_PLAINTEXT Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 说明: 若该参数没有显示,请单击弹窗左下角的"+"显示全部参数。 kafka.consumer.security.protocol SASL_PLAINTEXT 同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。 pollTimeout 500 consumer调用poll()函数能接受的最大超时时间,单位:毫秒。 ignoreLongMessage false 是否丢弃超大消息。 messageMaxLength 1000012 Flume写入Kafka的消息的最大长度。
  • 操作场景 该任务指导用户使用Flume服务端从本地采集静态日志保存到HDFS上“/flume/test”目录下。 本章节适用于MRS 3.x及之后版本。 本配置默认集群网络环境是安全的,数据传输过程不需要启用SSL认证。如需使用加密方式,请参考配置Flume加密传输数据采集任务。该配置为只用一个Flume场景,例如:Spooldir Source+Memory Channel+HDFS Sink。
  • 配置DIS Source 表1 DIS Source配置项说明 配置项 是否必填 说明 默认值 channels 是 Flume channel的名称。 请根据实际情况配置 type 是 Source的类型。 com.cloud.dis.adapter.flume.source.DISSource streams 是 指定在DIS服务上创建的通道名称。 与DIS控制台“购买接入通道”时配置的“通道名称”取值一致。 ak 是 用户的Access Key。 获取方式请参见检查认证信息。 请根据实际情况配置 sk 是 用户的Secret Key。 获取方式请参见检查认证信息。 请根据实际情况配置 region 是 将数据上传到指定Region的DIS服务。 请根据实际情况配置 projectId 是 用户所属区域的项目ID。 获取方式请参见检查认证信息。 请根据实际情况配置 endpoint 是 DIS对应Region的数据接口地址。 请根据实际情况配置 group.id 是 DIS App名称,用于标识一个消费组,由英文字符、数字、-、_组成。 请根据实际情况配置