华为云用户手册

  • 代码样例 具体代码参见com.huawei.bigdata.flink.examples.TestCreateTenants。 public class TestCreateTenants { public static void main(String[] args) { ParameterTool paraTool = ParameterTool.fromArgs(args); final String hostName = paraTool.get("hostName"); // 修改hosts文件,使用主机名 final String keytab = paraTool.get("keytab文件路径"); // user.keytab路径 final String krb5 = paraTool.get("krb5文件路径"); // krb5.conf路径 final String principal = paraTool.get("认证用户名"); // 认证用户 System.setProperty("java.security.krb5.conf", krb5); String url = "https://"+hostName+":28943/flink/v1/tenants"; String jsonstr = "{" + "\n\t \"tenantId\":\"92\"," + "\n\t \"tenantName\":\"test92\"," + "\n\t \"remark\":\"test tenant remark1\"," + "\n\t \"updateUser\":\"test_updateUser1\"," + "\n\t \"createUser\":\"test_createUser1\"" + "\n}"; try { LoginClient.getInstance().setConfigure(url, principal, keytab, ""); LoginClient.getInstance().login(); System.out.println(HttpClientUtil.doPost(url, jsonstr, "utf-8", true)); } catch (Exception e) { System.out.println(e); } }}
  • 代码样例 下面代码片段在com.huawei.bigdata.hbase.examples包的“TestMain”类的init方法中。 private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator; //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; conf.addResource(new Path(userdir + "core-site.xml"), false); conf.addResource(new Path(userdir + "hdfs-site.xml"), false); conf.addResource(new Path(userdir + "hbase-site.xml"), false); }
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 场景说明 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: DataStream应用程序可以在Windows环境和Linux环境中运行。 实时统计总计网购时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 DataStream样例工程的数据存储在文本中。 将log1.txt和log2.txt放置在指定路径下,例如"/opt/log1.txt"和"/opt/log2.txt"。 数据文件若存放在本地文件系统,需在所有部署Yarn NodeManager的节点指定目录放置,并设置运行用户访问权限。 若将数据文件放置于HDFS,需指定程序中读取文件路径HDFS路径,例如"hdfs://hacluster/path/to/file"。
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单地互通,而需要在通信之前进行相互认证,以确保通信的安全性。用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。 当前Flink系统支持认证和加密传输,要使用认证和加密传输,用户需要安装Flink客户端并配置安全认证,本章节以“/opt/hadoopclient”为客户端安装目录为例,介绍安装客户端及配置安全认证。客户端安装目录请根据实际修改。
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据。 数据包含两个属性:分别是Int和String类型。 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如: nettyconnector.ssl.enabled: true nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 安全认证配置: Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink。 SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见配置Flink应用安全认证。 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。
  • Flink样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • 组件介绍 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持集群自带的OpenJDK,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的: X86客户端: Oracle JDK:支持1.8版本; IBM JDK:支持1.8.0.7.20和1.8.0.6.15版本。 ARM客户端: OpenJDK:支持1.8.0_272版本(集群自带JDK,可通过集群客户端安装目录中“JDK”文件夹下获取)。 毕昇JDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情可参考https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 毕昇JDK详细信息可参考https://www.hikunpeng.com/zh/developer/devkit/compiler/jdk。 安装和配置IDEA 用于开发Flink应用程序的工具。版本要求:2019.1或其他兼容版本。 安装Scala Scala开发环境的基本配置。版本要求:2.11.7。 安装Scala插件 Scala开发环境的基本配置。版本要求:1.5.4。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 Python3 用于运行Flink Python作业。版本要求:Python3.7至Python3.10版本。
  • 准备运行环境 进行应用开发时,需要同时准备代码的运行调测的环境,用于验证应用程序运行正常。 如果使用Linux环境调测程序,需在准备安装集群客户端的Linux节点并获取相关配置文件。 在节点中安装客户端,例如客户端安装目录为“/opt/hadoopclient”。客户端安装可参考配置Flink应用安全认证。 客户端机器的时间与集群的时间要保持一致,时间差小于5分钟。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。 确保Flink客户端的“flink-conf.yaml”配置文件中的认证相关配置项已经配置正确,请参考配置Flink应用安全认证章节的步骤5。 安全模式下需要将客户端安装节点的业务IP地址以及Manager的浮动IP地址追加到“flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。 登录 FusionInsight Manager页面,下载集群客户端软件包至主管理节点并解压,然后以root用户登录主管理节点,进入集群客户端解压路径下,复制“FusionInsight_Cluster_1_Services_ClientConfig\Flink\config”路径下的所有配置文件至客户端节点,放置到与准备放置编译出的jar包同目录的“conf”目录下,用于后续调测,例如“/opt/hadoopclient/conf”。 例如客户端软件包为“FusionInsight_Cluster_1_Services_Client.tar”,下载路径为主管理节点的“/tmp/FusionInsight-Client”: cd /tmp/FusionInsight-Client tar -xvf FusionInsight_Cluster_1_Services_Client.tar tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar cd FusionInsight_Cluster_1_Services_ClientConfig scp Flink/config/* root@客户端节点IP地址:/opt/hadoopclient/conf 准备MRS应用开发用户时获取的keytab文件也放置于该目录下,主要配置文件说明如表2所示。 表2 配置文件 文件名称 作用 core-site.xml 配置Flink详细参数。 hdfs-site.xml 配置HDFS详细参数。 yarn-site.xml 配置Yarn详细参数。 flink-conf.yaml Flink客户端配置文件。 user.keytab 对于Kerberos安全认证提供用户信息。 krb5.conf Kerberos Server配置信息。 检查客户端节点网络连接。 在安装客户端过程中,系统会自动配置客户端节点“hosts”文件,建议检查“/etc/hosts”文件内是否包含集群内节点的主机名信息,如未包含,需要手动复制解压目录下的“hosts”文件中的内容到客户端所在节点的hosts文件中,确保本地机器能与集群各主机在网络上互通。 (可选)若运行Python作业,需额外配置如下:(适用于MRS 3.3.0及以后版本) 使用root用户登录flink客户端安装节点,使用如下命令确认环境已成功安装Python 3.7及以后版本。 python3 -V 进入python3安装路径,安装路径如“/srv/pyflink-example”,执行以下命令安装virtualenv。 cd /srv/pyflink-example virtualenv venv --python=python3.x source venv/bin/activate 执行以下命令将客户端安装目录下的“Flink/flink/opt/python/apache-flink-*.tar.gz”文件复制到“/srv/pyflink-example”。 cp 客户端安装目录/Flink/flink/opt/python/apache-flink-*.tar.gz /srv/pyflink-example 执行以下命令安装依赖包,显示如下表示安装成功。 python -m pip install apache-flink-libraries-*.tar.gz python -m pip install apache-flink-版本号.tar.gz ...Successfully built apache-flink Installing collected packages: apache-flink Attempting uninstall: apache-flink Found existing installation: apache-flink x.xx.x Uninstalling apache- flink-x.xx.x: Successfully uninstalled apache-flink-x.xx.xSuccessfully installed apache-flink-x.xx.x
  • MRS组件应用开发流程说明 通常MRS组件应用开发流程如下所示,各组件应用的开发编译操作可参考组件开发指南对应章节。 图1 MRS组件应用开发流程 表1 MRS组件应用开发流程说明 阶段 说明 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用IntelliJ IDEA工具,同时本地需完成JDK、Maven等初始配置。 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通。 配置并导入样例工程 MRS提供了不同组件场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置安全认证 连接开启了Kerberos认证的MRS集群时,应用程序中需配置具有相关资源访问权限的用户进行安全认证。 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。
  • 样例工程运行依赖包参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程运行依赖包 样例工程 依赖包 依赖包获取地址 DataStream程序 异步Checkpoint机制程序 flink-dist_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 使用Flink Jar提交SQL作业程序 FlinkServer REST API程序 flink-dist_*.jar flink-table_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序 kafka-clients-*.jar flink-connector-kafka_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序 flink-connector-netty_*.jar flink-dist_*.jar flink-connector-netty_*.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Stream SQL Join程序 kafka-clients-*.jar flink-connector-kafka_*.jar flink-dist_*.jar flink-table_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Flink读写HBase程序 flink-connector-hbase*.jar flink-dist_*.jar flink-table_*.jar hbase-clients-*.jar flink-connector-hbase_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 hbase-clients-*.jar由HBase组件发布提供,可在HBase组件客户端或者服务端安装路径下的lib目录下获取。 Flink读写Hudi程序 hbase-unsafe-*.jar 可在二次开发样例代码编译后产生的lib文件夹下获取。
  • 安全认证基本概念 本文以HDFS组件应用的安全认证为例介绍安全认证相关的常见基本概念,可以帮助用户减少学习Kerberos框架所花费的时间,有助于更好的理解Kerberos业务。 TGT 票据授权票据(Ticket-Granting Ticket),由Kerberos服务生成,提供给应用程序与Kerberos服务器建立认证安全会话,该票据的默认有效期为24小时,24小时后该票据自动过期。 TGT申请方式(以HDFS为例): 通过HDFS提供的接口获取。 /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } 通过MRS集群客户端以kinit方式获取。 登录MRS集群客户端所在节点,进入客户端安装目录。 cd {客户端安装目录} 执行以下命令配置环境变量。 source bigdata_env 执行以下命令进行用户认证。 kinit MRS集群业务用户 ST 服务票据(Server Ticket),由Kerberos服务生成,提供给应用程序与应用服务建立安全会话,该票据一次性有效。 ST的生成在MRS中,基于hadoop-rpc通信,由rpc底层自动向Kerberos服务端提交请求,由Kerberos服务端生成。
  • Kerberos认证说明 开启了Kerberos认证的安全模式集群,进行应用开发时需要进行安全认证。使用Kerberos的系统在设计上采用“客户端/服务器”结构与AES等加密技术,并且能够进行相互认证(即客户端和服务器端均可对对方进行身份认证)。可以用于防止窃听、防止replay攻击、保护数据完整性等场合,是一种应用对称密钥体制进行密钥管理的系统。 图1 Kerberos原理架构 表1 Kerberos模块说明 模块 说明 Application Client 应用客户端,通常是需要提交任务(或者作业)的应用程序。 Application Server 应用服务端,通常是应用客户端需要访问的应用程序。 Kerberos 提供安全认证的服务。 KerberosAdmin 提供认证用户管理的进程。 KerberosServer 提供认证票据分发的进程。 应用客户端(Application Client)可以是集群内某个服务,也可以是客户二次开发的一个应用程序,应用程序可以向应用服务提交任务或者作业。 应用程序在提交任务或者作业前,需要向Kerberos服务申请TGT(Ticket-Granting Ticket),用于建立和Kerberos服务器的安全会话。 Kerberos服务在收到TGT请求后,会解析其中的参数来生成对应的TGT,使用客户端指定的用户名的密钥进行加密响应消息。 应用客户端收到TGT响应消息后,解析获取TGT,此时,再由应用客户端(通常是rpc底层)向Kerberos服务获取应用服务端的ST(Server Ticket)。 Kerberos服务在收到ST请求后,校验其中的TGT合法后,生成对应的应用服务的ST,再使用应用服务密钥将响应消息进行加密处理。 应用客户端收到ST响应消息后,将ST打包到发给应用服务的消息里面传输给对应的应用服务端(Application Server)。 应用服务端收到请求后,使用本端应用服务对应的密钥解析其中的ST,并校验成功后,本次请求合法通过。
  • Kerberos认证代码示例 package com.huawei.bigdata.hdfs.examples;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.security.UserGroupInformation;public class KerberosTest { private static String PATH_TO_HDFS_SITE_XML = KerberosTest.class.getClassLoader().getResource("hdfs-site.xml") .getPath(); private static String PATH_TO_CORE_SITE_XML = KerberosTest.class.getClassLoader().getResource("core-site.xml") .getPath(); private static String PATH_TO_KEYTAB = KerberosTest.class.getClassLoader().getResource("user.keytab").getPath(); private static String PATH_TO_KRB5_CONF = KerberosTest.class.getClassLoader().getResource("krb5.conf").getPath(); private static String PRNCIPAL_NAME = "develop"; private FileSystem fs; private Configuration conf; /** * initialize Configuration */ private void initConf() { conf = new Configuration(); // add configuration files conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } /** * initialize FileSystem, and get ST from Kerberos * @throws IOException */ private void initFileSystem() throws IOException { fs = FileSystem.get(conf); } /** * An example to access the HDFS * @throws IOException */ private void doSth() throws IOException { Path path = new Path("/tmp"); FileStatus fStatus = fs.getFileStatus(path); System.out.println("Status of " + path + " is " + fStatus); //other thing } public static void main(String[] args) throws Exception { KerberosTest test = new KerberosTest(); test.initConf(); test.login(); test.initFileSystem(); test.doSth(); }} Kerberos认证时需要配置Kerberos认证所需要的文件参数,主要包含keytab文件路径、Kerberos认证的用户名称、Kerberos认证所需要的客户端配置“krb5.conf”文件。 login()方法为调用hadoop的接口执行Kerberos认证,生成TGT票据。 doSth()方法调用hadoop的接口访问文件系统,此时底层RPC会自动携带TGT去Kerberos认证,生成ST票据。 以上代码可在安全模式下的HDFS二次开发样例工程中创建KerberosTest.java,运行并查看调测结果,具体操作过程请参考HDFS开发指南(安全模式)。
  • MRS各组件样例工程汇总 MRS样例代码库提供了各组件的基本功能样例工程供用户使用,当前版本各组件提供的样例工程汇总参见表1。 表1 MRS组件样例工程汇总 组件 样例工程位置 描述 ClickHouse clickhouse-examples 指导用户基于Java语言,实现MRS集群中的ClickHouse的数据表创建、删除以及数据的插入、查询等操作。 本工程中包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据及删除数据表等操作示例。 ClickHouseJDBC-Transaction-JavaExample ClickHouse事务开发代码样例,仅MRS 3.3.0及之后版本支持。 Doris doris-examples/doris-jdbc-example Doris数据读写操作的应用开发示例,仅MRS 3.3.0及之后版本支持。 通过调用Doris接口可实现创建用户表、向表中插入数据、查询表数据、删除表等功能 Flink 开启Kerberos认证集群的样例工程目录“flink-examples/flink-examples-security”。 未开启Kerberos认证集群的样例工程目录为“flink-examples/flink-examples-normal”。 FlinkCheckpointJavaExample Flink异步Checkpoint机制的Java/Scala示例程序。 本工程中,程序使用自定义算子持续产生数据,产生的数据为一个四元组(Long,String,String,Integer)。数据经统计后,将统计结果打印到终端输出。每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 FlinkCheckpointScalaExample FlinkHBaseJavaExample Flink API作业读写HBase数据的Java示例程序。 MRS 3.2.0及之后版本支持。 FlinkKafkaJavaExample Flink向Kafka生产并消费数据的Java/Scala示例程序。 在本工程中,假定某个Flink业务每秒就会收到1个消息记录,启动Producer应用向Kafka发送数据,然后启动Consumer应用从Kafka接收数据,对数据内容进行处理后并打印输出。 FlinkKafkaScalaExample FlinkPipelineJavaExample Flink Job Pipeline的Java/Scala示例程序。 本样例中一个发布者Job自己每秒钟产生10000条数据,另外两个Job作为订阅者,分别订阅一份数据。订阅者收到数据之后将其转化格式,并抽样打印输出。 FlinkPipelineScalaExample FlinkSqlJavaExample 使用客户端通过jar作业提交SQL作业的应用开发示例。 FlinkStreamJavaExample Flink构造DataStream的Java/Scala示例程序。 本工程示例为基于业务要求分析用户日志数据,读取文本数据后生成相应的DataStream,然后筛选指定条件的数据,并获取结果。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Flink SQL Join示例程序。 本工程示例调用flink-connector-kafka模块的接口,生产并消费数据。生成Table1和Table2,并使用Flink SQL对Table1和Table2进行联合查询,打印输出结果。 FlinkRESTAPIJavaExample 本工程示例调用FlinkServer的RestAPI创建租户。 flink-examples/flink-sql 本工程示例使用Flink Jar提交SQL作业。 flink-examples/pyflink-example pyflink-kafka 本工程示例使用Python提交普通作业,提供Python读写Kafka作业的样例。 pyflink-sql 本工程示例使用Python提交SQL作业,提供Python提交SQL作业的样例。 HBase hbase-examples hbase-example HBase数据读写操作及全局二级索引的应用开发示例。通过调用HBase接口可实现以下功能: 创建用户表、导入用户数据、增加用户信息、查询用户信息及为用户表创建二级索引等功能。 MRS 3.3.0及之后版本,可实现创建/删除全局二级索引、修改全局二级索引状态、以及基于全局二级索引查询等功能。 hbase-rest-example HBase Rest接口应用开发示例。 使用Rest接口实现查询HBase集群信息、获取表、操作NameSpace、操作表等功能。 hbase-thrift-example 访问HBase ThriftServer应用开发示例。 访问ThriftServer操作表、向表中写数据、从表中读数据。 hbase-zk-example HBase访问ZooKeeper应用开发示例。 在同一个客户端进程内同时访问MRS ZooKeeper和第三方的ZooKeeper,其中HBase客户端访问MRS ZooKeeper,客户应用访问第三方ZooKeeper。 HDFS 开启Kerberos认证集群的样例工程目录“hdfs-example-security”。 未开启Kerberos认证集群的样例工程目录为“hdfs-example-normal”。 HDFS文件操作的Java示例程序。 本工程主要给出了创建HDFS文件夹、写文件、追加文件内容、读文件和删除文件/文件夹等相关接口操作示例。 hdfs-c-example HDFS C语言开发代码样例。 本示例提供了基于C语言的HDFS文件系统连接、文件操作如创建文件、读写文件、追加文件、删除文件等。 HetuEngine 开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-security”。 未开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-normal”。 通过不同方式连接HetuEngine的Java、Python示例程序。 通过HSFabric、HSBroker等连接方式,使用用户名和密码连接到HetuEngine,或通过KeyTab文件认证方式连接HetuEngine,组装对应的SQL发送到HetuEngine执行,完成对Hive数据源的增删改查操作。 Hive hive-examples hive-jdbc-example Hive JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Hive,在Hive中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能,还可实现在同一个客户端进程内同时访问FusionInsight ZooKeeper和第三方的ZooKeeper。 hive-jdbc-example-multizk hcatalog-example Hive HCatalog处理数据Java示例程序。 使用HCatalog接口实现通过Hive命令行方式对MRS Hive元数据进行数据定义和查询操作。 python-examples 使用Python连接Hive执行SQL样例。 可实现使用Python对接Hive并提交数据分析任务。 python3-examples 使用Python3连接Hive执行SQL样例。 可实现使用Python3对接Hive并提交数据分析任务。 IoTDB iotdb-examples iotdb-flink-example 通过Flink访问IoTDB数据的示例程序,包括FlinkIoTDBSink和FlinkIoTDBSource。 FlinkIoTDBSink可实现通过Flink job将时序数据写入到IoTDB中。FlinkIoTDBSource则通过Flink job将时序数据从IoTDB读取出来并且打印。 iotdb-jdbc-example IoTDB JDBC处理数据Java示例程序。 本示例演示了如何使用JDBC接口连接IoTDB,并执行IoTDB SQL语句。 iotdb-kafka-example 通过Kafka访问IoTDB数据的示例程序。 本示例演示了如何先将时序数据发送到Kafka,再使用多线程将数据写入到IoTDB中。 iotdb-session-example IoTDB Session处理数据Java示例程序。 本示例演示了如何使用Session方式连接IoTDB,并执行IoTDB SQL语句。 iotdb-udf-exmaple 该样例程序介绍了如何实现一个简单的IoTDB自定义函数(UDF)。 Kafka kafka-examples Kafka流式数据的处理Java示例程序。 本工程基于Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,然后将统计结果以Key-Value的形式输出。 Manager manager-examples FusionInsight Manager API接口调用示例。 本工程调用Manager API接口实现集群用户的创建、修改及删除等操作。 MapReduce 开启Kerberos认证集群的样例工程目录“mapreduce-example-security”。 未开启Kerberos认证集群的样例工程目录为“mapreduce-example-normal”。 MapReduce任务提交Java示例程序。 本工程提供了一个MapReduce统计数据的应用开发示例,实现数据分析、处理,并输出满足用户需要的数据信息。 另外以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 Oozie 开启Kerberos认证集群的样例工程目录“oozie-examples/ooziesecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“oozie-examples/oozienormal-examples”。 OozieMapReduceExample Oozie提交MapReduce任务示例程序。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,对网站的日志文件进行离线分析。 OozieSparkHBaseExample 使用Oozie调度Spark访问HBase的示例程序。 OozieSparkHiveExample 使用Oozie调度Spark访问Hive的示例程序。 Spark 开启Kerberos认证集群的样例工程目录“spark-examples/sparksecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“spark-examples/sparknormal-examples”。 SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 SparkHbasetoHbasePythonExample SparkHbasetoHbaseScalaExample SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 SparkHivetoHbasePythonExample SparkHivetoHbaseScalaExample SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 SparkPythonExample SparkScalaExample SparkRExample SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 SparkLauncherScalaExample SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 SparkOnHbasePythonExample SparkOnHbaseScalaExample SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 SparkOnHudiPythonExample SparkOnHudiScalaExample SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 本示例不支持未开启Kerberos认证的集群。 SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkSQLPythonExample SparkSQLScalaExample SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 SparkStreamingKafka010ScalaExample SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 SparkStreamingtoHbasePythonExample010 SparkStreamingtoHbaseScalaExample010 SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 SparkStructuredStreamingPythonExample SparkStructuredStreamingScalaExample SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JD BCS erver的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 SparkThriftServerScalaExample StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 SpringBoot(MRS 3.3.0及之后版本支持) clickhouse-examples clickhouse-rest-client-example SpringBoot连接ClickHouse服务应用开发示例。 本示例中,包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据等操作示例。 doris-examples doris-rest-client-example Doris数据读写操作的SpringBoot应用开发示例。 提供SpringBoot连接Doris的样例程序。 flink-examples flink-dws-read-example GaussDB (DWS) SpringBoot方式连接Flink服务的应用开发示例。 flink-dws-sink-example hbase-examples SpringBoot连接Phoenix应用开发示例。 提供SpringBoot连接HBase与Phoenix的样例程序。 hive-examples hive-rest-client-example SpringBoot连接Hive应用开发示例。 本工程使用SpringBoot方式连接Hive,在Hive中执行创建表、加载数据、查询数据、删除表等操作。 kafka-examples SpringBoot连接Kafka实现Topic生产消费的应用开发示例。
  • 获取MRS样例工程 MRS样例工程下载地址为https://github.com/huaweicloud/huaweicloud-mrs-example。 切换分支为与MRS集群相匹配的版本分支,例如“mrs-3.2.0.1”,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 图1 MRS样例工程代码下载 MRS LTS版本对应样例工程下载地址: MRS 3.3.0-LTS版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0。 MRS 3.2.0-LTS.1版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1。 MRS 3.1.2-LTS.3版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.2。 MRS普通版本对应样例工程下载地址: MRS 3.0.2版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.0.2。 MRS 3.1.0版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0。 MRS 3.1.5版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5。 MRS 2.1.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。 MRS 1.9.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9。 MRS 1.8.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.8。 MRS 1.8之前版本:http://mapreduceservice.obs-website.cn-north-1.myhuaweicloud.com/。
  • MRS 3.1.2-LTS.3 表2 MRS 3.1.2-LTS.3版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.12.0 1.12.0-hw-ei-310003 Hive 3.1.0 3.1.0-hw-ei-310003 Tez 0.9.2 0.9.1.0101-hw-ei-12 Spark 2.4.5 2.4.5-hw-ei-310003 CarbonData 2.0.1 - Hadoop 3.1.1 3.1.1-hw-ei-310003 HBase 2.2.3 2.2.3-hw-ei-310003 ZooKeeper 3.5.6 3.5.6-hw-ei-310003 Hue 4.7.0 - Oozie 5.1.0 5.1.0-hw-ei-310003 Flume 1.9.0 - Kafka 2.4.0 2.4.0-hw-ei-310003 Ranger 2.0.0 - ClickHouse 21.3.4.25 0.3.0 scala 2.12 -
  • MRS 3.2.0-LTS.1 表1 MRS 3.2.0-LTS.1版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.15.0 1.15.0-h0.cbu.mrs.320.r33 Hive 3.1.0 3.1.0-h0.cbu.mrs.320.r33 Tez 0.9.2 0.9.2-h0.cbu.mrs.320.r33 Spark2x 3.1.1 3.1.1-h0.cbu.mrs.320.r33 Hadoop 3.3.1 3.3.1-h0.cbu.mrs.320.r33 HBase 2.2.3 2.2.3-h0.cbu.mrs.320.r33 ZooKeeper 3.6.3 3.6.3-h0.cbu.mrs.320.r33 Hue 4.7.0 - IoTDB 0.14.0 0.14.0-h0.cbu.mrs.320.r33 Oozie 5.1.0 5.1.0-h0.cbu.mrs.320.r33 Flume 1.9.0 1.9.0-h0.cbu.mrs.320.r33 Kafka 2.11-2.4.0 2.4.0-h0.cbu.mrs.320.r33 Ranger 2.0.0 2.0.0-h0.cbu.mrs.320.r33 Phoenix 5.0.0 5.0.0-HBase-2.0-h0.cbu.mrs.320.r33 ClickHouse 22.3.2.2 0.3.1-h0.cbu.mrs.320.r33 Loader 1.99.3 1.99.3-h0.cbu.mrs.320.r33 DBService 2.7.0 - HetuEngine 1.2.0 1.2.0-h0.cbu.mrs.320.r33 CDL 1.0.0 1.0.0-h0.cbu.mrs.320.r33 Guardian 0.1.0 1.0.6-h0.cbu.mrs.321.r28
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 追加文件内容 * * @throws java.io.IOException */private void append() throws IOException { final String content = "I append this content."; FSDataOutputStream out = null; try { out = fSystem.append(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG .info("success to append."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); }}
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG.error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH);}/** * create file path * * @param filePath * @return * @throws java.io.IOException */private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true;}
  • 场景说明 在同一个客户端进程内同时访问FusionInsight ZooKeeper和第三方的ZooKeeper时,为了避免访问连接ZooKeeper认证冲突,提供了样例代码使HBase客户端访问FusionInsight ZooKeeper和客户应用访问第三方ZooKeeper。 以下为“src/main/resources”目录下提供的与认证相关的配置文件。 zoo.cfg # The configuration in jaas.conf used to connect fi zookeeper.zookeeper.sasl.clientconfig=Client_new[1]# Principal of fi zookeeper server side.zookeeper.server.principal=zookeeper/hadoop.hadoop.com[2]# Set true if the fi cluster is security mode.# The other two parameters doesn't work if the value is false.zookeeper.sasl.client=true[3] [1] zookeeper.sasl.clientconfig:指定使用jaas.conf文件中的对应配置访问FusionInsight ZooKeeper; [2] zookeeper.server.principal:指定ZooKeeper服务端使用principal; [3] zookeeper.sasl.client:如果MRS集群是安全模式,该值设置为“true”,否则设置为“false”,设置为“false”的情况下,“zookeeper.sasl.clientconfig”和“zookeeper.server.principal”参数不生效。 jaas.conf Client_new { [4] com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\work\\sample_project\\src\\hbase-examples\\hbase-zk-example\\target\\classes\\conf\\user.keytab" [5] principal="hbaseuser1" useTicketCache=false storeKey=true debug=true;};Client { [6] org.apache.zookeeper.server.auth.DigestLoginModule required username="bob" password="xxxxxx"; [7]}; [4] Client_new:zoo.cfg中指定的读取配置,当该名称修改时,需要同步修改zoo.cfg中对应配置。 [5] keyTab :指明工程使用的“user.keytab”在运行样例的主机上的保存路径,使用绝对路径便于更好定位文件位置。在Windows环境和Linux环境下配置时需注意区分不同操作系统路径书写方式,即“\\”与“\”差异。 [6] Client:第三方ZooKeeper使用该配置进行访问连接,具体连接认证配置由第三方ZooKeeper版本决定。 [7] password:密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
  • Flink应用程序开发流程 Flink开发流程参考如下步骤: 图1 Flink应用程序开发流程 表1 Flink应用开发的流程说明 阶段 说明 参考章节 了解基本概念 在开始开发应用前,需要了解Flink的基本概念。 Flink基本概念 准备开发和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 导入并配置Flink样例工程 准备安全认证 如果您使用的是安全集群,需要进行安全认证。 配置Flink应用安全认证 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 开发Flink应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并调测Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用调测结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 组件操作指南中的“Flink性能调优”
  • 操作步骤 在Windows环境下的Intellij IDEA开发环境中,单击IDEA右侧Maven导入依赖。 图1 导入依赖 (可选)如果对接的集群开启了ZooKeeper的SSL认证通信功能,则需要添加JVM配置参数,如下所示: -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true 在“JDBCExampleZK.java”文件下单击右键,在弹出菜单单击“Run 'JDBCExampleZK.main()' ”。 图2 运行程序 在IDEA的console中可以看到输出结果。 图3 输出结果
  • Hive介绍 Hive是一个开源的,建立在Hadoop上的 数据仓库 框架,提供类似SQL的HQL语言操作结构化数据,其基本原理是将HQL语言自动转换成MapReduce任务或Spark任务,从而完成对Hadoop集群中存储的海量数据进行查询和分析。 Hive主要特点如下: 通过HQL语言非常容易的完成数据提取、转换和加载(ETL)。 通过HQL完成海量结构化数据分析。 灵活的数据存储格式,支持JSON、 CS V、TEXTFILE、RCFILE、ORCFILE、SEQUENCEFILE等存储格式,并支持自定义扩展。 多种客户端连接方式,支持JDBC接口。 Hive主要应用于海量数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 为保证Hive服务的高可用性、用户数据的安全及访问服务的可控制,在开源社区的Hive-3.1.0版本基础上,Hive新增如下特性: 数据文件加密机制。 开源社区的Hive特性,请参见https://cwiki.apache.org/confluence/display/hive/designdocs。
  • 通过HSBroker的用户名密码认证实现查询HetuEngine SQL任务 本章节适用于MRS 3.3.0及以后版本。 通过HSBroker方式连接到HetuEngine,组装对应的SQL发送到HetuEngine执行,完成对Hive数据源的增删改查操作。 import jaydebeapi driver = "io.XXX.jdbc.XXXDriver" # need to change the value based on the cluster informationurl = "jdbc:XXX://192.168.37.61:29861,192.168.37.62:29861/hive/default?serviceDiscoveryMode=hsbroker"user = "YourUserName"tenant = "YourTenant"jdbc_location = "Your file path of the jdbc jar" sql = "show catalogs" if __name__ == '__main__': conn = jaydebeapi.connect(driver, url, {"user": user, "SSL": "false", "tenant": tenant}, [jdbc_location]) curs = conn.cursor() curs.execute(sql) result = curs.fetchall() print(result) curs.close() conn.close() 上述代码中各参数说明如下表所示: 表1 参数及参数说明 参数名称 参数说明 url jdbc:XXX://HSBroker1_IP:HSBroker1_Port,HSBroker2_IP:HSBroker2_Port,HSBroker3_IP:HSBroker3_Port/catalog/schema?serviceDiscoveryMode=hsbroker 说明: XXX:驱动名,请以实际样例代码中的内容为准。 catalog、schema分别是JDBC客户端要连接的catalog和schema名称。 HSBroker_IP:HSBroker_Port是HSBroker的URL,多个URL以逗号隔开。例如:“192.168.81.37:2181,192.168.195.232:2181,192.168.169.84:2181” user 访问HetuServer的用户名,即在集群中创建的“人机”用户的用户名。 tenant 指定访问HetuEngine计算实例的租户资源队列。 jdbc_location 导入并配置HetuEngine Python3样例工程中获取的hetu-jdbc-XXX.jar包的完整路径。 Windows系统路径示例:"D:\\hetu-examples-python3\\hetu-jdbc-XXX.jar" Linux系统路径示例:"/opt/hetu-examples-python3/hetu-jdbc-XXX.jar" 父主题: HetuEngine样例程序(Python3)
  • 常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Hive的相关操作。 HQL语言 Hive Query Language,类SQL语句。 HCatalog HCatalog是建立在Hive元数据之上的一个表信息管理层,吸收了Hive的DDL命令。为MapReduce提供读写接口,提供Hive命令行接口来进行数据定义和元数据查询。基于MRS的HCatalog功能,Hive、MapReduce开发人员能够共享元数据信息,避免中间转换和调整,能够提升数据处理的效率。 WebHCat WebHCat运行用户通过Rest API来执行Hive DDL,提交MapReduce任务,查询MapReduce任务执行结果等操作。
共100000条