华为云用户手册

  • create命令扩展属性 针对HDFS与SFTP服务器或RDB进行数据交换场景, MRS 在开源sqoop-shell工具的基础上对create命令属性进行扩展,以达到在创建作业时指定行、列分隔符及转换步骤的目的。 表2 create命令扩展属性 属性 说明 fields-terminated-by 默认的列分割符。 lines-terminated-by 默认的行分割符。 input-fields-terminated-by 输入步骤的列分割符,当不指定时,默认等于fields-terminated-by的值。 input-lines-terminated-by 输入步骤的行分割符,当不指定时,默认等于lines-terminated-by的值。 output-fields-terminated-by 输出步骤的列分割符,当不指定时,默认等于fields-terminated-by的值。 output-lines-terminated-by 输出步骤的行分割符,当不指定时,默认等于lines-terminated-by的值。 trans 指定转换步骤,值为转换步骤文件所在的路径。当指定文件的相对路径时,默认为“sqoop2-shell”脚本所在路径下的文件。当配置了该属性,其他扩展属性都被忽略。
  • sqoop1对接MRS服务 下载开源Sqoop,http://www.apache.org/dyn/closer.lua/sqoo:p/1.4.7。 将下载好的sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz 包放入MRS集群master节点的/opt/sqoop目录下并解压。 tar zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz 进入解压完成的目录,修改配置。 cd /opt/sqoop/sqoop-1.4.7.bin__hadoop-2.6.0/conf cp sqoop-env-template.sh sqoop-env.sh vi sqoop-env.sh 添加配置: export HADOOP_COMMON_HOME=/opt/client/HDFS/hadoop export HADOOP_MAPRED_HOME=/opt/client/HDFS/hadoop export HIVE_HOME=/opt/Bigdata/MRS_1.9.X/install/ FusionInsight -Hive-3.1.0/hive(请按照实际路径填写) export HIVE_CONF_DIR=/opt/client/Hive/config export HCAT_HOME=/opt/client/Hive/HCatalog 添加系统变量,将“SQOOP_HOME”添加到PATH中。 vi /etc/profile 添加以下信息: export SQOOP_HOME=/opt/sqoop/sqoop-1.4.7.bin__hadoop-2.6.0 export PATH=$PATH:$SQOOP_HOME/bin 执行以下命令复制jline-2.12.jar文件到lib文件下。 cp /opt/share/jline-2.12/jline-2.12.jar /opt/sqoop/sqoop-1.4.7.bin__hadoop-2.6.0/lib 执行以下命令,在文件中添加下列配置。 vim $JAVA_HOME/jre/lib/security/java.policy permission javax.management.MBeanTrustPermission "register"; 执行以下命令,实现sqoop1对接MRS服务。 source /etc/profile
  • 已安装Flume客户端 在客户端flume-check.properties文件中配置client.per-check.shell,指向plugin.sh的绝对路径。 例如Flume客户端安装路径为“/opt/FlumeClient”,则flume-check.properties文件所在目录为/opt/FlumeClient/fusioninsight-flume-1.9.0/conf, 配置如下: client.per-check.shell=/opt/FlumeClient/fusioninsight-flume-1.9.0/plugins.s/plugin.sh plugins = com.huawei.flume.services.FlumePreTransmitService flume.check.default.interval = 15 配置plugin.conf,定义具体调用的脚本、相关参数。 例如Flume客户端安装路径为“/opt/FlumeClient”,则plugin.conf配置文件所在目录为/opt/FlumeClient/fusioninsight-flume-1.9.0/conf, 配置如下: RUN_PLUGIN="PLUGIN_LIST_1" LOG _TO_HDFS_PATH="/yxs" LOG_TO_HDFS_ENCODE_PATH="${LOG_TO_HDFS_PATH}/Flume_Encoded/" PLUGIN_LINK_DIR="/tmp/yxs1" PLUGIN_MV_TARGET_DIR="/tmp/yxs2" PLUGIN_SUFFIX="COMPLETED" PLUGIN_LIST_1="mv_complete.sh --linkdir ${PLUGIN_LINK_DIR} --mvtargetdir ${PLUGIN_MV_TARGET_DIR} --suffix ${PLUGIN_SUFFIX}" 在客户端安装路径bin目录执行以下命令,重启Flume客户端,例如“/opt/FlumeClient/fusioninsight-flume-1.9.0/bin”。 ./flume-manage.sh restart
  • 操作步骤 创建表时指定inputFormat和outputFormat: CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type [COMMENT col_comment], ...)] [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 查询之前指定配置项: set hive.textinput.record.delimiter='!@!'; Hive会以‘!@!’为行分隔符查询数据。
  • 操作场景 通常情况下,Hive以文本文件存储的表会以回车作为其行分隔符,即在查询过程中,以回车符作为一行表数据的结束符。但某些数据文件并不是以回车分隔的规则文本格式,而是以某些特殊符号分割其规则文本。 MRS Hive支持指定不同的字符或字符组合作为Hive文本数据的行分隔符,即在创建表的时候,指定inputformat为SpecifiedDelimiterInputFormat,然后在每次查询前,都设置如下参数来指定分隔符,就可以以指定的分隔符查询表数据。 set hive.textinput.record.delimiter=''; 当前版本的Hue组件,不支持导入文件到Hive表时设置多个分割符。 本章节适用于MRS 3.x及后续版本。
  • 安全认证 Flink整个系统存在三种认证方式: 使用kerberos认证:Flink yarn client与Yarn Resource Manager、JobManager与Zookeeper、JobManager与HDFS、TaskManager与HDFS、Kafka与TaskManager、TaskManager和Zookeeper。 使用security cookie进行认证:Flink yarn client与Job Manager、JobManager与TaskManager、TaskManager与TaskManager。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 Flink的JobManager与YARN的AM是在同一个进程下。 如果用户集群开启Kerberos认证需要使用kerberos认证。 针对MRS 3.x之前版本,Flink不支持使用security cookie方式进行认证。 表1 安全认证方式 安全认证方式 说明 配置方法 Kerberos认证 当前只支持keytab认证方式。 从FusionInsight Manager下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。 在“flink-conf.yaml”上配置: keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab 说明: “/home/flinkuser/keytab/abc222.keytab”表示的是用户目录。 principal名。 security.kerberos.login.principal: abc222 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下: zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient Security Cookie 认证 - 参考签发Flink证书样例章节生成“generate_keystore.sh”脚本并放置在Flink客户端的“bin”目录下,调用“generate_keystore.sh”脚本,生成“Security Cookie”、“flink.keystore”文件和“flink.truststore”文件。 执行sh generate_keystore.sh,输入用户自定义密码。密码不允许包含#。 说明: 执行脚本后,在Flink客户端的“conf”目录下生成“flink.keystore”和“flink.truststore”文件,并且在客户端配置文件“flink-conf.yaml”中将以下配置项进行了默认赋值。 将配置项“security.ssl.keystore”设置为“flink.keystore”文件所在绝对路径。 将配置项“security.ssl.truststore”设置为“flink.truststore”文件所在的绝对路径。 将配置项“security.cookie”设置为“generate_keystore.sh”脚本自动生成的一串随机规则密码。 默认“flink-conf.yaml”中“security.ssl.encrypt.enabled: false”,“generate_keystore.sh”脚本将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值设置为调用“generate_keystore.sh”脚本时输入的密码。配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。 MRS 3.x及之后版本,如果需要使用密文时,设置“flink-conf.yaml”中“security.ssl.encrypt.enabled: true”,“generate_keystore.sh”脚本不会配置“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值,需要使用Manager明文加密API进行获取,执行curl -k -i -u user name:password -X POST -HContent-type:application/json -d '{"plainText":"password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt' 其中user name:password分别为当前系统登录用户名和密码;"plainText"的password为调用“generate_keystore.sh”脚本时的密码;x.x.x.x为集群Manager的浮动IP。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。 查看是否打开“Security Cookie”开关,即查看配置“flink-conf.yaml”文件中的“security.enable: true”,查看“security cookie”是否已配置成功,例如: security.cookie: ae70acc9-9795-4c48-ad35-8b5adc8071744f605d1d-2726-432e-88ae-dd39bfec40a9 说明: 用户需要获取SSL证书,放置到Flink客户端中。具体操作可参考签发Flink证书样例。 使用MRS客户端预制“generate_keystore.sh”脚本获取SSL证书有效期为5年。参考签发Flink证书样例获取的SSL证书有效期为10年。 若要关闭默认的SSL认证方式,需在“flink-conf.yaml”文件中配置“security.ssl.enabled”的值为“false”,并且注释如下参数:security.ssl.key-password、security.ssl.keystore-password、security.ssl.keystore、security.ssl.truststore-password、security.ssl.trustore。 YARN内部认证方式 该方式是YARN内部的认证方式,不需要用户配置。 - 当前一个Flink集群只支持一个用户,一个用户可以创建多个Flink集群。
  • 操作场景 此功能在MRS 3.x之前版本适用于Hive,Spark。在MRS3.x及后续版本适用于Hive,Spark2x。 开启此功能后,仅有Hive管理员可以创建库和在default库中建表,其他用户需通过Hive管理员授权才可使用库。 开启本功能之后,会限制普通用户新建库和在default库新建表。请充分考虑实际应用场景,再决定是否作出调整。 因为对执行用户做了限制,使用非管理员用户执行建库、表脚本迁移、重建元数据操作时需要特别注意,防止错误。
  • 操作场景 此功能在MRS 3.x之前版本适用于Hive,Spark。在MRS3.x及后续版本适用于Hive,Spark2x。 开启此功能后,允许有目录读权限和执行权限的用户和用户组创建外部表,而不必检查用户是否为该目录的属主,并且禁止外表的location目录在当前默认warehouse目录下。同时在外表授权时,禁止更改其location目录对应的权限。 开启本功能之后,外表功能变化大。请充分考虑实际应用场景,再决定是否作出调整。
  • 批量写入Hudi表 引入Hudi包生成测试数据,参考使用Spark Shell创建Hudi表章节的2到4。 写入Hudi表,写入命令中加入参数:option("hoodie.datasource.write.operation", "bulk_insert"),指定写入方式为bulk_insert,如下所示: df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", ""). option("hoodie.datasource.write.operation", "bulk_insert"). option("hoodie.table.name", tableName). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", ""). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor"). option("hoodie.datasource.hive_sync.table", tableName). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.bulkinsert.shuffle.parallelism", 4). mode(Overwrite). save(basePath) 示例中各参数介绍请参考表1。 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。
  • 操作场景 Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。 INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,若数据源中存在更新数据,则在 数据湖 中会出现重复数据。 BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。 UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。
  • 使用Yarn客户端 安装客户端。 MRS 3.x之前版本请参考安装客户端章节。 MRS 3.x及之后版本请参考安装客户端章节。 以客户端安装用户,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 直接执行Yarn命令。例如: yarn application -list
  • Flink认证和加密 Flink集群中,各部件支持认证。 Flink集群内部各部件和外部部件之间,支持和外部部件如YARN、HDFS、ZooKeeprer进行Kerberors认证。 Flink集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持security cookie认证。 Flink集群中,各部件支持SSL加密传输;集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持SSL加密传输。 详情可参考配置Flink认证和加密。
  • ACL控制 在HA模式下,支持ACL控制。 Flink在HA模式下,支持用ZooKeeper来管理集群和发现服务。ZooKeeper支持SASL ACL控制,即只有通过SASL(kerberos)认证的用户,才有往ZK上操作文件的权限。如果要在Flink上使用SASL ACL控制,需要在Flink配置文件中设置如下配置: high-availability.zookeeper.client.acl: creator zookeeper.sasl.disable: false 具体配置项介绍请参考HA。
  • Web安全 Flink Web安全加固,支持白名单过滤,Flink Web只能通过YARN代理访问,支持安全头域增强。在Flink集群中,各部件的监测端口支持范围可配置。 编码规范: 说明:Web Service客户端和服务器间使用相同的编码方式,是为了防止出现乱码现象,也是实施输入校验的基础。 安全加固:web server响应消息统一采用UTF-8字符编码。 支持IP白名单过滤: 说明:防止非法用户登录,需在web server侧添加IP Filter过滤源IP非法的请求。 安全加固:支持IP Filter实现Web白名单配置,配置项是“jobmanager.web.allow-access-address”,默认情况下只支持YARN用户接入。 安装客户端之后需要将客户端节点IP追加到jobmanager.web.allow-access-address配置项中。 禁止将文件绝对路径发送到客户端: 说明:文件绝对路径发送到客户端会暴露服务端的目录结构信息,有助于攻击者遍历了解系统,为攻击者攻击提供帮助。 安全加固:Flink配置文件中所有配置项中如果包含以/开头的,则删掉第一级目录。 同源策略: 适用于MRS 3.x及之后版本。 说明:如果两个URL的协议,主机和端口均相同,则它们同源;如果不同源,默认不能相互访问;除非被访问者在其服务端显示指定访问者的来源。 安全加固:响应头“Access-Control-Allow-Origin”头域默认配置为YARN集群ResourceManager的IP地址,如果源不是来自YARN的,则不能互相访问。 防范敏感信息泄露: 适用于MRS 3.x及之后版本。 说明:带有敏感数据的Web页面都应该禁止缓存,以防止敏感信息泄漏或通过代理服务器上网的用户数据互窜现象。 安全加固:添加“Cache-control”、“Pragma”、“Expires”安全头域,默认值为:“Cache-Control:no-store”,“Pragma :no-cache”,“Expires : 0”。实现了安全加固,Flink和web server交互的内容将不会被缓存。 防止劫持: 适用于MRS 3.x及之后版本。 说明:由于点击劫持(ClickJacking)和框架盗链都利用到框架技术,所以需要采用安全措施。 安全加固:添加“X-Frame-Options”安全头域,给浏览器提供允许一个页面可否在“iframe”、“frame”或“object”网站中的展现页面的指示,如果默认配置为“X-Frame-Options: DENY”,则确保任何页面都不能被嵌入到别的“iframe”、“frame”或“object”网站中,从而避免了点击劫持 (clickjacking) 的攻击。 对Web Service接口调用记录日志: 适用于MRS 3.x及之后版本。 说明:对“Flink webmonitor restful”接口调用进行日志记录。 安全加固:“access log”支持配置:“jobmanager.web.accesslog.enable”,默认为“true”。且日志保存在单独的“webaccess.log”文件中。 跨站请求( CS RF)伪造防范: 适用于MRS 3.x及之后版本。 说明:在B/S应用中,对于涉及服务器端数据改动(如增加、修改、删除)的操作必须进行跨站请求伪造的防范。跨站请求伪造是一种挟制终端用户在当前已登录的Web应用程序上执行非本意的操作的攻击方法。 安全加固:现有请求修改的接口有2个post,1个delete,其余均是get请求,非get请求的接口均已删除。 异常处理: 适用于MRS 3.x及之后版本。 说明:应用程序出现异常时,捕获异常,过滤返回给客户端的信息,并在日志中记录详细的错误信息。 安全加固:默认的错误提示页面,进行信息过滤,并在日志中记录详细的错误信息。新加四个配置项,默认配置为FusionInsight提供的跳转URL,错误提示页面跳转到固定配置的URL中,防止暴露不必要的信息。 表1 四个配置项参数介绍 参数 描述 默认值 是否必选配置 jobmanager.web.403-redirect-url web403页面,访问若遇到403错误,则会重定向到配置的页面。 - 是 jobmanager.web.404-redirect-url web404页面,访问若遇到404错误,则会重定向到配置的页面。 - 是 jobmanager.web.415-redirect-url web415页面,访问若遇到415错误,则会重定向到配置的页面。 - 是 jobmanager.web.500-redirect-url web500页面,访问若遇到500错误,则会重定向到配置的页面。 - 是 HTML5安全: 适用于MRS 3.x及之后版本。 说明:HTML5是下一代的Web开发规范,为开发者提供了许多新的功能并扩展了标签。这些新的标签及功能增加了攻击面,存在被攻击的风险(例如跨域资源共享、客户端存储、WebWorker、WebRTC、WebSocket等)。 安全加固:添加“Access-Control-Allow-Origin”配置,如运用到跨域资源共享功能,可对HTTP响应头的“Access-Control-Allow-Origin”属性进行控制。 Flink不涉及如客户端存储、WebWorker、WebRTC、WebSocket等安全风险。
  • 回答 Spark SQL可以将表cache到内存中,并且使用压缩存储来尽量减少内存压力。通过将表cache,查询可以直接从内存中读取数据,从而减少读取磁盘带来的内存开销。 但需要注意的是,被cache的表会占用executor的内存。尽管在Spark SQL采用压缩存储的方式来尽量减少内存开销、缓解GC压力,但当缓存的表较大或者缓存表数量较多时,将不可避免的影响executor的稳定性。 此时的最佳实践是,当不需要将表cache来实现查询加速时,应及时将表进行uncache以释放内存。可以执行命令uncache table table_name来uncache表。 被cache的表也可以在Spark Driver UI的Storage标签里查看。
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使该三个部件的性能都合理化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能更好,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Spark Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Spark Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Spark Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Spark Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
  • Hive SQL扩展语法说明 Hive SQL支持Hive-3.1.0版本中的所有特性,详情请参见https://cwiki.apache.org/confluence/display/hive/languagemanual。 系统提供的扩展Hive语句如表1所示。 表1 扩展Hive语句 扩展语法 语法说明 语法示例 示例说明 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ...... [TBLPROPERTIES ("groupId"=" group1 ","locatorId"="locator1")] ...; 创建一个hive表,并指定表数据文件分布的locator信息。详细说明请参见使用HDFS Colocation存储Hive表。 CREATE TABLE tab1 (id INT, name STRING) row format delimited fields terminated by '\t' stored as RCFILE TBLPROPERTIES("groupId"=" group1 ","locatorId"="locator1"); 创建表tab1,并指定tab1的表数据分布在locator1节点上。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ... [TBLPROPERTIES ('column.encode.columns'='col_name1,col_name2'| 'column.encode.indices'='col_id1,col_id2','column.encode.classname'='encode_classname')]...; 创建一个hive表,并指定表的加密列和加密算法。详细说明请参见使用Hive列加密功能。 create table encode_test(id INT, name STRING, phone STRING, address STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('column.encode.indices'='2,3', 'column.encode.classname'='org.apache.hadoop.hive.serde2. SMS 4Rewriter') STORED AS TEXTFILE; 创建表encode_test,并指定插入数据时对第2、3列加密,加密算法类为org.apache.hadoop.hive.serde2.SMS4Rewriter。 REMOVE TABLE hbase_tablename [WHERE where_condition]; 删除hive on hbase表中符合条件的数据。详细说明请参见删除Hive on HBase表中的单行记录。 remove table hbase_table1 where id = 1; 删除表中符合条件“id =1”的数据。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建hive表,并设定表可以指定自定义行分隔符。详细说明请参见自定义行分隔符。 create table blu(time string, num string, msg string) row format delimited fields terminated by ',' stored as inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建表blu,指定inputformat为SpecifiedDelimiterInputFormat,以便查询时可以指定表的查询行分隔符。 父主题: Hive常见SQL语法参考
  • 配置描述 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入表1中参数名称。 表1 参数说明 参数 描述 默认值 yarn.resourcemanager.am.max-attempts ApplicationMaster重试次数,增加重试次数,可以防止资源不足导致的ApplicationMaster启动失败问题。适用于所有ApplicationMaster的全局设置。每个ApplicationMaster都可以使用API设置一个单独的最大尝试次数,但这个次数不能大于全局的最大次数。如果大于,ResourceManager将会覆写这个单独的最大尝试次数。以允许至少一次重试。取值范围大于等于1。 5
  • Interceptors Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。官网参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。 Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。 指定拦截器的顺序就是它们被调用的顺序。 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。 表5 Flume内建支持的拦截器类型 拦截器类型 简要描述 Timestamp Interceptor 该拦截器会在Event的Header中插入一个时间戳。 Host Interceptor 该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。 Remove Header Interceptor 该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。 UUID Interceptor 该拦截器会为每个Event的Header生成一个UUID字符串。 Search and Replace Interceptor 该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。 Regex Filtering Interceptor 该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。 Regex Extractor Interceptor 该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。 下面以Regex Filtering Interceptor 为例说明Interceptor使用(其余的可参考官网配置): 表6 Regex Filtering Interceptor配置参数说明 选项名称 默认值 描述 type - 组件类型名称,必须写为regex_filter。 regex - 用于匹配事件的正则表达式。 excludeEvents false 默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。 配置示例(为了方便观察,此模型使用了netcat tcp作为Source源,logger作为Sink)。配置好如下参数后,在Linux的配置的主机节点上执行Linux命令“telnet 主机名或IP 44444”,并任意敲入符合正则和不符合正则的字符串。会在日志中观察到,只有匹配到的字符串被传输了。 #define the source、channel、sink server.sources = r1 server.channels = c1 server.sinks = k1 #config the source server.sources.r1.type = netcat server.sources.r1.bind = ${主机IP} server.sources.r1.port = 44444 server.sources.r1.interceptors= i1 server.sources.r1.interceptors.i1.type= regex_filter server.sources.r1.interceptors.i1.regex= (flume)|(myflume) server.sources.r1.interceptors.i1.excludeEvents= false server.sources.r1.channels = c1 #config the channel server.channels.c1.type = memory server.channels.c1.capacity = 1000 server.channels.c1.transactionCapacity = 100 #config the sink server.sinks.k1.type = logger server.sinks.k1.channel = c1
  • 业务模型配置指导 本章节适用于MRS 3.x及之后版本。 本任务旨在提供Flume常用模块的性能差异,用于指导用户进行合理的Flume业务配置,避免出现前端Source和后端Sink性能不匹配进而导致整体业务性能不达标的场景。 本任务只针对于单通道的场景进行比较说明。 Flume业务配置及模块选择过程中,一般要求Sink的极限吞吐量需要大于Source的极限吞吐量,否则在极限负载的场景下,Source往Channel的写入速度大于Sink从Channel取出的速度,从而导致Channel频繁被写满,进而影响性能表现。 Avro Source和Avro Sink一般都是成对出现,用于多个Flume Agent间进行数据中转,因此一般场景下Avro Source和Avro Sink都不会成为性能瓶颈。
  • Channel Selector Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。 Replicating:表示Source的数据同步发送给所有Channel。 Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。 Replicating配置样例: client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = replicating client.sources.kafkasource.selector.optional = channel2 表1 Replicating配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为replicating Selector.optional - 可选Channel,可以配置为列表 Multiplexing配置样例: client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = multiplexing client.sources.kafkasource.selector.header = myheader client.sources.kafkasource.selector.mapping.topic1 = channel1 client.sources.kafkasource.selector.mapping.topic2 = channel2 client.sources.kafkasource.selector.default = channel1 表2 Multiplexing配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为multiplexing Selector.header Flume.selector.header - Selector.default - - Selector.mapping.* - - Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。 这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。
  • 模块间性能 根据模块间性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。
  • Hive权限模型 使用Hive组件,必须对Hive数据库和表(含外表和视图)拥有相应的权限。在MRS中,完整的Hive权限模型由Hive元数据权限与HDFS文件权限组成。使用数据库或表时所需要的各种权限都是Hive权限模型中的一种。 Hive元数据权限。 与传统关系型数据库类似,MRS的Hive数据库包含“建表”和“查询”权限,Hive表和列包含“查询”、“插入”和“删除”权限。Hive中还包含拥有者权限“OWNERSHIP”和“Hive管理员权限”。 Hive数据文件权限,即HDFS文件权限。 Hive的数据库、表对应的文件保存在HDFS中。默认创建的数据库或表保存在HDFS目录“/user/hive/warehouse”。系统自动以数据库名称和数据库中表的名称创建子目录。访问数据库或者表,需要在HDFS中拥有对应文件的权限,包含“读”、“写”和“执行”权限。 用户对Hive数据库或表执行不同操作时,需要关联不同的元数据权限与HDFS文件权限。例如,对Hive数据表执行查询操作,需要关联元数据权限“查询”,以及HDFS文件权限“读”和“写”。 使用Manager界面图形化的角色管理功能来管理Hive数据库和表的权限,只需要设置元数据权限,系统会自动关联HDFS文件权限,减少界面操作,提高效率。
  • 操作步骤 优化GC。 调整老年代和新生代的比值。在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“-XX:NewRatio”。如“ -XX:NewRatio=2”,则表示老年代与新生代的比值为2:1,新生代占整个堆空间的1/3,老年代占2/3。 开发Flink应用程序时,优化DataStream的数据分区或分组操作。 当分区导致数据倾斜时,需要考虑优化分区。 避免非并行度操作,有些对DataStream的操作会导致无法并行,例如WindowAll。 keyBy尽量不要使用String。
  • 配置描述 查看Yarn服务配置参数 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入表1中参数名称。 表1 参数描述 参数 描述 默认值 yarn.acl.enable Yarn权限控制启用开关。 true yarn.webapp.filter-entity-list-by-user 严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。当要开启该功能时,同时需要设置参数“yarn.acl.enable”为true。 说明: 此参数适用于MRS 3.x及后续版本集群。 true 查看 MapReduce服务 配置参数 参考修改集群服务配置参数进入MapReduce服务参数“全部配置”界面,在搜索框中输入表2中参数名称。 表2 参数描述 参数 描述 默认值 mapreduce.cluster.acls.enabled MR JobHistoryServer权限控制启用开关。该参数为客户端参数,当JobHistoryServer服务端开启权限控制之后该参数生效。 true yarn.webapp.filter-entity-list-by-user MR JobHistoryServer严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。该参数为JobHistoryServer的服务端参数,表示JHS开启了权限控制,但是否要对某一个特定的Application进行控制,是由客户端参数:“mapreduce.cluster.acls.enabled”决定。 说明: 此参数适用于MRS 3.x及后续版本集群。 true 以上配置会影响restful API和shell命令结果,即以上配置开启后,restful API调用和shell命令运行所返回的内容只包含调用用户有权查看的信息。 当“yarn.acl.enable”或“mapreduce.cluster.acls.enabled”设置为“false”时,即关闭Yarn或MapReduce的权限校验功能。此时任何用户都可以在Yarn或MapReduce上提交任务和查看任务信息,存在安全风险,请谨慎使用。
  • 操作描述 有Minor合并、Major合并和Custom合并三种类型。 Minor合并: 在Minor合并中,用户可指定合并数据加载的数量。如果设置了参数“carbon.enable.auto.load.merge”,每次数据加载都可触发Minor合并。如果任意segment均可合并,那么合并将于数据加载时并行进行。 Minor合并有两个级别。 Level 1:合并未合并的segment。 Level 2:合并已合并的segment,以形成更大的segment。 Major合并: 在Major合并中,许多segment可以合并为一个大的segment。用户将指定合并尺寸,将对未达到该尺寸的segment进行合并。Major合并通常在非高峰时段进行。 Custom合并: 在Custom合并中,用户可以指定几个segment的id合并为一个大的segment。所有指定的segment的id必须存在并且有效,否则合并将会失败。Custom合并通常在非高峰时段进行。 具体的命令操作,请参考ALTER TABLE COMPACTION。 表1 合并参数 参数 默认值 应用类型 描述 carbon.enable.auto.load.merge false Minor 数据加载时启用合并。 “true”:数据加载时自动触发segment合并。 “false”:数据加载时不触发segment合并。 carbon.compaction.level.threshold 4,3 Minor 对于Minor合并,该属性参数决定合并segment的数量。 例如,如果该参数设置为“2,3”,在Level 1,每2个segment触发一次Minor合并。在Level2,每3个Level 1合并的segment将被再次合并为新的segment。 合并策略根据实际的数据大小和可用资源决定。 有效值为0-100。 carbon.major.compaction.size 1024mb Major 通过配置该参数可配置Major合并。低于该阈值的segment之和将被合并。 例如,如果该阈值是1024MB,且有5个大小依次为300MB,400MB,500MB,200MB,100MB的segment用于Major合并,那么只有相加的总数小于阈值的segment会被合并,也就是300+400+200+100 = 1000MB的segment会被合并,而500MB的segment将会被跳过。 carbon.numberof.preserve.segments 0 Minor/Major 如果用户希望从被合并的segment中保留一定数量的segment,可通过该属性参数进行设置。 例如,“carbon.numberof.preserve.segments”=“2”,那么最新的2个segment将不会包含在合并中。 默认不保留任何segment。 carbon.allowed.compaction.days 0 Minor/Major 合并将合并在指定的配置天数中加载的segment。 例如,如果配置为“2”,那么只有在2天的时间框架中被加载的segment可以被合并。在2天以外被加载的segment将不被合并。 默认为禁用。 carbon.number.of.cores.while.compacting 2 Minor/Major 在合并过程中写入数据时所用的核数。配置的核数越大合并性能越好。如果CPU资源充足可以增加此值。 carbon.merge.index.in.segment true SEGMENT_INDEX 如果设置为true,则一个segment中所有Carbon索引文件(.carbonindex)将合并为单个Carbon索引合并文件(.carbonindexmerge)。 这增强了首次查询性能。
  • 操作场景 频繁的数据获取导致在存储目录中产生许多零碎的CarbonData文件。由于数据排序只在每次加载时进行,所以,索引也只在每次加载时执行。这意味着,对于每次加载都会产生一个索引,随着数据加载数量的增加,索引的数量也随之增加。由于每个索引只在一次加载时工作,索引的性能被降低。CarbonData提供加载压缩。压缩过程通过合并排序各segment中的数据,将多个segment合并为一个大的segment。
  • 参数说明 表1 参数说明 配置参数 说明 默认值 supervisor.slots.ports supervisor上能够运行workers的端口列表。每个worker占用一个端口,且每个端口只运行一个worker。通过这项配置可以设置每台机器上运行的worker数量。端口的取值范围是1024到65535,不同端口使用逗号分隔。 6700,6701,6702,6703 WORKER_GC_OPTS supervisor启动worker时使用的jvm选项。需要根据业务中对内存等的使用来进行设置,例如是简单业务处理,建议1G,既“-Xmx1G”;如果有窗口缓存,根据窗口大小计算:每条记录大小*周期*2。 -Xms1G -Xmx1G -XX:+UseG1GC -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump default.schedule.mode 默认调度器的调度模式。目前支持两个值,具体值与含义如下: “AVERAGE”:使用按空闲Slot数目为优先级的调度机制 “RATE”:使用按空闲Slot比率为优先级的调度机制 AVERAGE nimbus.thrift.threads 设置主用Nimbus对外提供服务时的最大连接线程数。当Storm集群规模较大,Supervisor实例数量较多时,需要增加线程数。 512
  • 上传UDF 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“UDF管理”进入UDF管理页面。 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。 填写UDF名称以及描述信息后,单击“确定”。 “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。 (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业。
  • UDF java代码及SQL样例 UDF java使用样例 package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } } UDF SQL使用样例 CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
共100000条