华为云用户手册

  • Storm原理 基本概念 表1 概念介绍 概念 说明 Tuple Storm核心数据结构,是消息传递的基本单元,不可变Key-Value对,这些Tuple会以一种分布式的方式进行创建和处理。 Stream Storm的关键抽象,是一个无边界的连续Tuple序列。 Topology 在Storm平台上运行的一个实时应用程序,由各个组件(Component)组成的一个DAG(Directed Acyclic Graph)。一个Topology可以并发地运行在多台机器上,每台机器上可以运行该DAG中的一部分。Topology与Hadoop中的MapReduce Job类似,不同的是,它是一个长驻程序,一旦开始就不会停止,除非人工中止。 Spout Topology中产生源数据的组件,是Tuple的来源,通常可以从外部数据源(如消息队列、数据库、文件系统、TCP连接等)读取数据,然后转换为Topology内部的数据结构Tuple,由下一级组件处理。 Bolt Topology中接受数据并执行具体处理逻辑(如过滤,统计、转换、合并、结果持久化等)的组件。 Worker 是Topology运行态的物理进程。每个Worker是一个JVM进程,每个Topology可以由多个Worker并行执行,每个Worker运行Topology中的一个逻辑子集。 Task Worker中每一个Spout/Bolt的线程称为一个Task。 Stream groupings Storm中的Tuple分发策略,即后一级Bolt以什么分发方式来接收数据。当前支持的策略有:Shuffle Grouping, Fields Grouping, All Grouping, Global Grouping, Non Grouping, Directed Grouping。 图3描述了一个由Spout、Bolt组成的DAG,即Topology。图中每个矩形框代表Spout或者Bolt,矩形框内的节点表示各个并发的Task,Task之间的“边”代表数据流——Stream。 图3 Topology示意图 可靠性 Storm提供三种级别的数据可靠性: 至多一次:处理的数据可能会丢失,但不会被重复处理。此情况下,系统吞吐量最大。 至少一次:保证数据传输可靠,但可能会被重复处理。此情况下,对在超时时间内没有获得成功处理响应的数据,会在Spout处进行重发,供后续Bolt再次处理,会对性能稍有影响。 精确一次:数据成功传递,不丢失,不冗余处理。此情况下,性能最差。 可靠性不同级别的选择,需要根据业务对可靠性的要求来选择、设计。例如对于一些对数据丢失不敏感的业务,可以在业务中不考虑数据丢失处理从而提高系统性能;而对于一些严格要求数据可靠性的业务,则需要使用精确一次的可靠性方案,以确保数据被处理且仅被处理一次。 容错 Storm是一个容错系统,提供较高可用性。表2从Storm的不同部件失效的情况角度解释其容错能力: 表2 容错能力 失效场景 说明 Nimbus失效 Nimbus是无状态且快速失效的。当主Nimbus失效时,备Nimbus会接管,并对外提供服务。 Supervisor失效 Supervisor是工作节点的后台守护进程,是一种快速失效机制,且是无状态的,并不影响正在该节点上运行的Worker,但是会无法接收新的Worker分配。当Supervisor失效时, OMS 会侦测到,并及时重启该进程。 Worker失效 该Worker所在节点上的Supervisor会在此节点上重新启动该Worker。如果多次重启失败,则Nimbus会将该任务重新分配到其他节点。 节点失效 该节点上的所有分配的任务会超时,而Nimbus会将这些Worker重新分配到其他节点。
  • Storm开源特性 分布式实时计算框架 开源Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,任务是并发进行数据处理。 高容错 如果在消息处理过程中有节点、进程等出现异常,提供重新部署该处理单元的能力。 可靠的消息保证 支持At-Least Once、At-Most Once、Exactly Once的数据处理模式。 安全机制 提供基于Kerberos的认证以及可插拔的授权机制,提供支持SSL的Storm UI以及Log Viewer界面,同时支持与大数据平台其他组件(如ZooKeeper,HDFS等)进行安全集成。 灵活的拓扑定义及部署 使用Flux框架定义及部署业务拓扑,在业务DAG发生变化时,只需对YAML DSL(domain-specific language)定义进行修改,无需重新编译及打包业务代码。 与外部组件集成 支持与多种外部组件集成,包括:Kafka、HDFS、HBase、Redis或JDBC/RDBMS等服务,便于实现涉及多种数据源的业务。
  • MemArtsCC结构 MemArtsCC由C CS ideCar和CCWorker两个角色组成。 在存算架构下,Spark、Hive等计算分析应用的数据存储在 对象存储服务 (OBS)中。在MemArtsCC集群上一个服务实例称为Worker,对于OBS上的对象数据,Worker缓存其中部分或全部分片到本地的持久化存储(SSD/HDD)中。上层应用通过MemArtsCC SDK读取某个对象时,基于分片索引到特定的Worker上读取分片数据,如果命中缓存则Worker返回对应分片,如果未命中则直接从OBS中读取数据,同时Worker端会异步的加载未命中的分片到本地存储中,供后续使用。 图1 MemArtsCC结构 表1 MemArtsCC结构图说明 名称 说明 MemArtsCC SDK 提供OBSA(OBSA,Hadoop客户端插件) FS客户端使用的可访问OBS服务器对象的SDK。 CCSideCar MemArtsCC的管理面服务,提供MemArtsCC服务监控采集、配置下发、服务启停等能力。 CCWorker MemArtsCC的数据面服务,支持MemArtsCC的缓存数据读写、存储、淘汰等能力。
  • Hive原理 Hive作为一个基于HDFS和MapReduce架构的 数据仓库 ,其主要能力是通过对HQL(Hive Query Language)编译和解析,生成并执行相应的MapReduce任务或者HDFS操作。Hive与HQL相关信息,请参考HQL 语言手册。 图3为Hive的结构简图。 Metastore:对表,列和Partition等的元数据进行读写及更新操作,其下层为关系型数据库。 Driver:管理HQL执行的生命周期并贯穿Hive任务整个执行期间。 Compiler:编译HQL并将其转化为一系列相互依赖的Map/Reduce任务。 Optimizer:优化器,分为逻辑优化器和物理优化器,分别对HQL生成的执行计划和MapReduce任务进行优化。 Executor:按照任务的依赖关系分别执行Map/Reduce任务。 ThriftServer:提供thrift接口,作为JDBC的服务端,并将Hive和其他应用程序集成起来。 Clients:包含WebUI和JDBC接口,为用户访问提供接口。 图3 Hive结构
  • Hive结构 Hive为单实例的服务进程,提供服务的原理是将HQL编译解析成相应的MapReduce或者HDFS任务,图1为Hive的结构概图。 图1 Hive结构 表1 模块说明 名称 说明 HiveServer 一个集群内可部署多个HiveServer,负荷分担。对外提供Hive数据库服务,将用户提交的HQL语句进行编译,解析成对应的Yarn任务或者HDFS操作,从而完成数据的提取、转换、分析。 MetaStore 一个集群内可部署多个MetaStore,负荷分担。提供Hive的元数据服务,负责Hive表的结构和属性信息读、写、维护和修改。 提供Thrift接口,供HiveServer、Spark、WebHCat等MetaStore客户端来访问,操作元数据。 WebHCat 一个集群内可部署多个WebHCat,负荷分担。提供Rest接口,通过Rest执行Hive命令,提交MapReduce任务。 Hive客户端 包括人机交互命令行Beeline、提供给JDBC应用的JDBC驱动、提供给Python应用的Python驱动、提供给MapReduce的HCatalog相关JAR包。 ZooKeeper集群 ZooKeeper作为临时节点记录各HiveServer实例的IP地址列表,客户端驱动连接ZooKeeper获取该列表,并根据路由机制选取对应的HiveServer实例。 HDFS/HBase集群 Hive表数据存储在HDFS集群中。 MapReduce/Yarn集群 提供分布式计算服务:Hive的大部分数据操作依赖MapReduce/Yarn集群,HiveServer的主要功能是将HQL语句转换成分布式计算任务,从而完成对海量数据的处理。 HCatalog建立在Hive Metastore之上,具有Hive的DDL能力。从另外一种意义上说,HCatalog还是Hadoop的表和存储管理层,它使用户能够通过使用不同的数据处理工具(比如MapReduce),更轻松地在网格上读写HDFS上的数据,HCatalog还能为这些数据处理工具提供读写接口,并使用Hive的命令行接口发布数据定义和元数据探索命令。此外,经过封装这些命令,WebHCat Server还对外提供了RESTful接口,如图2所示。 图2 WebHCat的逻辑架构图
  • Kafka开源特性 可靠性 提供At-Least Once,At-Most Once,Exactly Once消息可靠传递。消息被处理的状态是在Consumer端维护,需要结合应用层实现Exactly Once。 高吞吐 同时为发布和订阅提供高吞吐量。 持久化 将消息持久化到磁盘,因此可用于批量消费以及实时应用程序。通过将数据持久化到硬盘以及replication的方式防止数据丢失。 分布式 分布式系统,易于向外扩展。每个集群支持部署多个Producer、Broker和Consumer,从而形成分布式的集群,无需停机即可扩展系统。
  • Kafka UI Kafka UI提供Kafka Web服务,通过界面展示Kafka集群中Broker、Topic、Partition、Consumer等功能模块的基本信息,同时提供Kafka服务常用命令的界面操作入口。该功能作为Kafka Manager替代,提供符合安全规范的Kafka Web服务。 通过Kafka UI可以进行以下操作: 支持界面检查集群状态(主题,消费者,偏移量,分区,副本,节点) 支持界面执行集群内分区重新分配 支持界面选择配置创建主题 支持界面删除主题(Kafka服务设置了参数“delete.topic.enable = true”) 支持为已有主题增加分区 支持更新现有主题的配置 可以为分区级别和主题级别度量标准启用JMX查询
  • Kafka结构 生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。 图1 Kafka结构
  • Kafka原理 消息可靠性 Kafka Broker收到消息后,会持久化到磁盘,同时,Topic的每个Partition有自己的Replica(备份),每个Replica分布在不同的Broker节点上,以保证当某一节点失效时,可以自动故障转移到可用消息节点。 高吞吐量 Kafka通过以下方式提供系统高吞吐量: 数据磁盘持久化:消息不在内存中缓存,直接写入到磁盘,充分利用磁盘的顺序读写性能。 Zero-copy:减少IO操作步骤。 数据批量发送:提高网络利用率。 Topic划分为多个Partition,提高并发度,可以由多个Producer、Consumer数目之间的关系并发来读、写消息。Producer根据用户指定的算法,将消息发送到指定的Partition。 消息订阅-通知机制 消费者对感兴趣的主题进行订阅,并采取pull的方式消费数据,使得消费者可以根据其消费能力自主地控制消息拉取速度,同时,可以根据自身情况自主选择消费模式,例如批量、重复消费,从尾端开始消费等;另外,需要消费者自己负责维护其自身消息的消费记录。 可扩展性 当在Kafka集群中可通过增加Broker节点以提供更大容量时。新增的Broker会向ZooKeeper注册,而Producer及Consumer会及时从ZooKeeper感知到这些变化,并做出调整。
  • Hue结构 Hue是建立在Django Python(开放源代码的Web应用框架)的Web框架上的Web应用程序,采用了MTV(模型M-模板T-视图V)的软件设计模式。 Hue由“Supervisor Process”和“WebServer”构成,“Supervisor Process”是Hue的核心进程,负责应用进程管理。“Supervisor Process”和“WebServer”通过“THRIFT/REST”接口与WebServer上的应用进行交互,如图1所示。 图1 Hue架构示意图 图1中各部分的功能说明如表1所示。 表1 结构图说明 名称 描述 Supervisor Process Supervisor负责WebServer上APP的进程管理:启动、停止、监控等。 Hue WebServer 通过Django Python的Web框架提供如下功能。 部署APPs。 提供图形化用户界面。 与数据库连接,存储APP的持久化数据。
  • Flink关键特性 流式处理 高吞吐、高性能、低时延的实时流处理引擎,能够提供毫秒级时延处理能力。 丰富的状态管理 流处理应用需要在一定时间内存储所接收到的事件或中间结果,以供后续某个时间点访问并进行后续处理。Flink提供了丰富的状态管理相关的特性,包括: 多种基础状态类型:Flink提供了多种不同数据结构的状态支持,如ValueState、ListState、MapState等。用户可以基于业务模型选择最高效、合适状态类型。 丰富的State Backend:State Backend负责管理应用程序的状态,并根据需要进行Checkpoint。Flink提供了不同State Backend,State可以存储在内存上或RocksDB等上,并支持异步以及增量的Checkpoint机制。 精确一次语义:Flink的Checkpoint和故障恢复能力保证了任务在故障发生前后的应用状态一致性,为某些特定的存储支持了事务型输出的功能,即使在发生故障的情况下,也能够保证精确一次的输出。 丰富的时间语义 时间是流处理应用的重要组成部分,对于实时流处理应用来说,基于时间语义的窗口聚合、检测、匹配等运算是很常见的。Flink提供了丰富的时间语义。 Event-time:使用事件本身自带的时间戳进行计算,使乱序到达或延迟到达的事件处理变得更加简单。 Watermark:Flink引入Watermark概念,用以衡量事件时间的发展。Watermark也为平衡处理时延和数据完整性提供了灵活的保障。当处理带有Watermark的事件流时,在计算完成之后仍然有相关数据到达时,Flink提供了多种处理选项,如将数据重定向(side output)或更新之前完成的计算结果。 Processing-time和Ingestion-time。 高度灵活的流式窗口:Flink能够支持时间窗口、计数窗口、会话窗口,以及数据驱动的自定义窗口,可以通过灵活的触发条件定制,实现复杂的流式计算模式。 容错机制 分布式系统,单个Task或节点的崩溃或故障,往往会导致整个任务的失败。Flink提供了任务级别的容错机制,保证任务在异常发生时不会丢失用户数据,并且能够自动恢复。 Checkpoint:Flink基于Checkpoint实现容错,用户可以自定义对整个任务的Checkpoint策略,当任务出现失败时,可以将任务恢复到最近一次Checkpoint的状态,从数据源重发快照之后的数据。 Savepoint:一个Savepoint就是应用状态的一致性快照,Savepoint与Checkpoint机制相似,但Savepoint需要手动触发,Savepoint保证了任务在升级或迁移时,不丢失当前流应用的状态信息,便于任何时间点的任务暂停和恢复。 Flink SQL Table API和SQL借助了Apache Calcite来进行查询的解析,校验以及优化,可以与DataStream和DataSet API无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。简化数据分析、ETL等应用的定义。下面代码示例展示了如何使用Flink SQL语句定义一个会话点击量的计数应用。 SELECT userId, COUNT(*) FROM clicks GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId 有关Flink SQL的更多信息,请参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html。 CEP in SQL Flink允许用户在SQL中表示CEP(Complex Event Processing)查询结果以用于模式匹配,并在Flink上对事件流进行评估。 CEP SQL通过MATCH_RECOGNIZE的SQL语法实现。MATCH_RECOGNIZE子句自Oracle Database 12c起由Oracle SQL支持,用于在SQL中表示事件模式匹配。CEP SQL使用举例如下: SELECT T.aid, T.bid, T.cid FROM MyTable MATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES A.id AS aid, B.id AS bid, C.id AS cid PATTERN (A B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T
  • Flink原理 Stream & Transformation & Operator 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。 Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。 当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。 图3为一个由Flink程序映射为Streaming Dataflow的示意图。 图3 Flink DataStream示例 图3中“FlinkKafkaConsumer”是一个Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一个Sink Operator。 Pipeline Dataflow 在Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask。 Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。 紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度,如图4所示。 图4 Operator 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行,如图5所示。 图5 Operator chain 图5中上半部分表示的是将Source和Map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,KeyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。 图5中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。
  • Flink结构 Flink服务包含了两个重要的角色:FlinkResource和FlinkServer。 FlinkResource:提供客户端配置管理,是必须安装的角色。包括供客户端下载使用的原始lib包和配置文件,以及FlinkServer提交作业所依赖的原始lib包。无实体进程,作业运行过程不依赖FlinkResource。 FlinkServer:基于Web的作业管理二次开发平台,可直接在界面开发与管理FlinkSQL作业。具有运维管理界面化、作业开发SQL标准化等特点。 Flink结构如图2所示。 图2 Flink结构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些TaskManager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 如果您想了解更多关于Flink架构的信息,请参考链接:https://ci.apache.org/projects/flink/flink-docs-master/docs/concepts/flink-architecture/。
  • Flink简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性: DataStream Checkpoint 窗口 Job Pipeline 配置表
  • CDL简介 CDL(全称Change Data Loader)是一个基于Kafka Connect框架的实时数据集成服务。 CDL服务能够从各种OLTP数据库中捕获数据库的Data Change事件,并推送到kafka,再由sink connector推送到大数据生态系统中。 CDL目前支持的数据源有MySQL、PostgreSQL、Hudi、Kafka、ThirdParty-Kafka,目标端支持写入Kafka、Hudi、DWS以及ClickHouse。 更多关于CDL组件操作指导,请参考使用CDL。
  • Impala Catalog Service 负责Impala的元数据管理,进程名为catalogd,将元数据的变化发送到所有的Impalad进程。当创建表、加载数据或者其他的一些从Hive发起的操作后,Impala查询之前需要在Impalad上执行REFRESH或者INVALIDATE METADATA刷新Catalog上缓存的元数据信息。如果元数据变化是通过Impala执行的,则不需要执行刷新。
  • Impala与其他组件的关系 Impala与HDFS间的关系 Impala默认利用HDFS作为其文件存储系统。Impala通过解析和计算处理结构化的数据,Hadoop HDFS则为Impala提供了高可靠性的底层存储支持。使用Impala将无需移动HDFS中的数据并且提供更快的访问。 Impala与Hive间的关系 Impala使用Hive的元数据、ODBC驱动程序和SQL语法。与Hive不同,Impala不基于MapReduce算法,它实现了一个基于守护进程的分布式架构,它负责在同一台机器上运行的查询执行的所有方面。因此,它减少了使用MapReduce的延迟,这使Impala比Hive快。 Impala与Kudu间的关系 Kudu与Impala紧密集成,替代Impala+HDFS+Parquet组合。允许使用Impala的SQL语法从Kudu tablets插入、查询、更新和删除数据。此外,还可以用JDBC或ODBC,Impala作为代理连接Kudu进行数据操作。 Impala与HBase间的关系 Impala表默认使用存储在HDFS上的数据文件,便于全表扫描的批量加载和查询。但是,HBase可以提供对OLTP样式组织的数据的便捷高效查询。
  • Impala Daemon Impala daemon的进程名为Impalad,是Impala的核心进程。 Impalad关键功能如下: 运行在所有的数据节点上。 读写数据文件。 接收来自于Impala-shell命令、Hue、JDBC或者ODBC等客户端的查询请求。 可以并行执行来自集群中其他节点的查询请求,将中间结果返回给调度节点。 可以调用节点将结果返回给客户端。 Impalad进程通过持续的和StateStore通信来确认自己所在的节点是否健康和是否可以接受新的任务请求。
  • Impala Impala直接对存储在HDFS、HBase或对象存储服务(OBS)中的Hadoop数据提供快速、交互式SQL查询。除了使用相同的统一存储平台之外,Impala还使用于Apache Hive相同的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue中的Impala查询UI)。这为实时或面向批处理的查询提供了一个熟悉且统一的平台。作为查询大数据的工具的补充,Impala不会替代基于MapReduce构建的批处理框架,例如Hive。基于MapReduce构建的Hive和其他框架最适合长时间运行的批处理作业。 Impala主要特点如下: 支持Hive查询语言(HQL)中大多数的SQL-92功能,包括SELECT,JOIN和聚合函数。 HDFS,HBase和对象存储服务(OBS)存储,包括: HDFS文件格式:基于分隔符的Text file,Parquet,Avro,SequenceFile和RCFile。 压缩编解码器:Snappy,GZIP,Deflate,BZIP。 常见的数据访问接口包括: JDBC驱动程序。 ODBC驱动程序。 Hue beeswax和Impala查询UI。 Impala-shell命令行接口。 支持Kerberos身份认证。 Impala主要应用于实时查询数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景。 有关Impala的详细信息,请参见https://impala.apache.org/impala-docs.html。 更多关于Impala组件操作指导,请参考使用Impala。 Impala由Impalad、StateStore、Catalog 3个角色组成。
  • Flume原理 Agent之间的可靠性 Agent之间数据交换流程如图4所示。 图4 Agent数据传输流程 Flume采用基于Transactions的方式保证数据传输的可靠性,当数据从一个Agent流向另外一个Agent时,两个Transactions已经开始生效。发送Agent的Sink首先从Channel取出一条消息,并且将该消息发送给另外一个Agent。如果接收消息的Agent成功地接收并处理消息,那么发送Agent将会提交Transactions,标识一次数据传输成功可靠地完成。 当接收Agent接收到发送Agent发送的消息时,开始一个新的Transactions,当该数据被成功处理(写入Channel中),那么接收Agent提交该Transactions,并向发送Agent发送成功响应。 如果在某次提交(commit)之前,数据传输出现了失败,将会再次开始上一次Transactions,并将上次发送失败的数据重新传输。因为commit操作已经将Transactions写入了磁盘,那么在进程故障退出并恢复业务之后,仍然可以继续上次的Transactions。
  • Loader基本原理 Loader是在开源Sqoop组件的基础上进行了一些扩展,实现 MRS 与关系型数据库、文件系统之间交换“数据”、“文件”,同时也可以将数据从关系型数据库或者文件服务器导入到HDFS/HBase中,或者反过来从HDFS/HBase导出到关系型数据库或者文件服务器中。 Loader模型主要由Loader Client和Loader Server组成,如图1所示。 图1 Loader模型 上图中各部分的功能说明如表1所示。 表1 Loader模型组成 名称 描述 Loader Client Loader的客户端,包括WebUI和CLI版本两种交互界面。 Loader Server Loader的服务端,主要功能包括:处理客户端操作请求、管理连接器和元数据、提交MapReduce作业和监控MapReduce作业状态等。 REST API 实现RESTful(HTTP + JSON)接口,处理来自客户端的操作请求。 Job Scheduler 简单的作业调度模块,支持周期性的执行Loader作业。 Transform Engine 数据转换处理引擎,支持字段合并、字符串剪切、字符串反序等。 Execution Engine Loader作业执行引擎,支持以MapReduce方式执行Loader作业。 Submission Engine Loader作业提交引擎,支持将作业提交给MapReduce执行。 Job Manager 管理Loader作业,包括创建作业、查询作业、更新作业、删除作业、激活作业、去激活作业、启动作业、停止作业。 Metadata Repository 元数据仓库,存储和管理Loader的连接器、转换步骤、作业等数据。 HA Manager 管理Loader Server进程的主备状态,Loader Server包含2个节点,以主备方式部署。 Loader通过MapReduce作业实现并行的导入或者导出作业任务,不同类型的导入导出作业可能只包含Map阶段或者同时Map和Reduce阶段。 Loader同时利用MapReduce实现容错,在作业任务执行失败时,可以重新调度。 数据导入到HBase 在MapReduce作业的Map阶段中从外部数据源抽取数据。 在MapReduce作业的Reduce阶段中,按Region的个数启动同样个数的Reduce Task,Reduce Task从Map接收数据,然后按Region生成HFile,存放在HDFS临时目录中。 在MapReduce作业的提交阶段,将HFile从临时目录迁移到HBase目录中。 数据导入HDFS 在MapReduce作业的Map阶段中从外部数据源抽取数据,并将数据输出到HDFS临时目录下(以“输出目录-ldtmp”命名)。 在MapReduce作业的提交阶段,将文件从临时目录迁移到输出目录中。 数据导出到关系型数据库 在MapReduce作业的Map阶段,从HDFS或者HBase中抽取数据,然后将数据通过JDBC接口插入到临时表(Staging Table)中。 在MapReduce作业的提交阶段,将数据从临时表迁移到正式表中。 数据导出到文件系统 在MapReduce作业的Map阶段,从HDFS或者HBase中抽取数据,然后将数据写入到文件服务器临时目录中。 在MapReduce作业的提交阶段,将文件从临时目录迁移到正式目录。 Loader的架构和详细原理介绍,请参见:https://sqoop.apache.org/docs/1.99.3/index.html。 更多关于Loader组件操作指导,请参考使用Loader。 父主题: Loader
  • ZooKeeper结构 ZooKeeper集群中的节点分为三种角色:Leader、Follower和Observer,其结构和相互关系如图1所示。通常来说,需要在集群中配置奇数个(2N+1)ZooKeeper服务,至少(N+1)个投票才能成功的执行写操作。 图1 ZooKeeper结构 图1中各部分的功能说明如表1所示。 表1 结构图说明 名称 描述 Leader 在ZooKeeper集群中只有一个节点作为集群的Leader,由各Follower通过ZooKeeper Atomic Broadcast(ZAB)协议选举产生,主要负责接收和协调所有写请求,并把写入的信息同步到Follower和Observer。 Follower Follower的功能有两个: 每个Follower都作为Leader的储备,当Leader故障时重新选举Leader,避免单点故障。 处理读请求,并配合Leader一起进行写请求处理。 Observer Observer不参与选举和写请求的投票,只负责处理读请求、并向Leader转发写请求,避免系统处理能力浪费。 Client ZooKeeper集群的客户端,对ZooKeeper集群进行读写操作。例如HBase可以作为ZooKeeper集群的客户端,利用ZooKeeper集群的仲裁功能,控制其HMaster的“Active”和“Standby”状态。 如果集群启用了安全服务,在连接ZooKeeper时需要进行身份认证,认证方式有以下两种: keytab方式:需要从MRS集群管理员处获取一个“人机”用户,用于登录MRS平台并通过认证,并且获取到该用户的keytab文件。 票据方式:从MRS集群管理员处获取一个“人机”用户,用于后续的安全登录,开启Kerberos服务的renewable和forwardable开关并且设置票据刷新周期,开启成功后重启kerberos及相关组件。 默认情况下,用户的密码有效期是90天,所以获取的keytab文件的有效期是90天。 Kerberos服务的renewable、forwardable开关和票据刷新周期的设置在Kerberos服务的配置页面的“系统”标签下,票据刷新周期的修改可以根据实际情况修改“kdc_renew_lifetime”和“kdc_max_renewable_life”的值。
  • ZooKeeper原理 写请求 Follower或Observer接收到写请求后,转发给Leader。 Leader协调各Follower,通过投票机制决定是否接受该写请求。 如果超过半数以上的Leader、Follower节点返回写入成功,那么Leader提交该请求并返回成功,否则返回失败。 Follower或Observer返回写请求处理结果。 只读请求 客户端直接向Leader、Follower或Observer读取数据。
  • ZooKeeper常见规格 ZooKeeper服务的常见系统规格如ZooKeeper常见规格所示。 表2 ZooKeeper常见规格 指标名称 规格 说明 单集群ZooKeeper最大实例数 9 ZooKeeper最大实例数 每个ZooKeeper实例,单个IP最大连接数 2000 - 每个ZooKeeper实例,最大连接总数 20000 - 默认参数情况下,最大ZNode数 2000000 ZNode数量过大会对服务稳定性造成影响,降低组件读写性能。 一般业务场景下建议ZNode数量在200w以内,如果集群仅部署了ClickHouse,ZNode数量可以扩大到600w以内。 单个ZNode大小 4MB -
  • DBService简介 DBService是一个高可用性的关系型数据库存储系统,适用于存储小量数据(10GB左右),比如:组件元数据。DBService仅提供给集群内部的组件使用,提供数据存储、查询、删除等功能。 DBService是集群的基础组件,Hive、Hue、Oozie、Loader、CDL、Flink、HetuEngine、Kafka、Metadata、Ranger等组件将元数据存储在DBService上,并由DBService提供这些元数据的备份与恢复功能。 更多关于DBService组件操作指导,请参考使用DBService。
  • DBService结构 DBService组件在集群中采用主备模式部署两个DBServer实例,每个DBServer实例包含三个模块:HA、Database和FloatIP。 其逻辑结构如图1所示。 图1 DBService结构 图1中各模块的说明如表1所示。 表1 模块说明 名称 描述 HA 高可用性管理模块,主备DBServer通过HA进行管理。 Database 数据库模块,存储Client模块的元数据。 FloatIP 浮动IP,对外提供访问功能,只在主DBServer实例上启动浮动IP,Client模块通过该IP访问Database。 Client 使用DBService组件的客户端,部署在组件实例节点上,通过Floatip连接数据库,执行元数据的增加、删除、修改等操作。
  • 参数获取方式 区域表 表5 区域表 区 域名 称 区域 华北-北京二 cn-north-2 华北-北京四 cn-north-4 华北-北京一 cn-north-1 华东-上海二 cn-east-2 华东-上海一 cn-east-3 华南-广州 cn-south-1 华南-深圳 cn-south-2 西南-贵阳一 cn-southwest-2 亚太-新加坡 ap-southeast-3 日志组ID:在 云日志 服务控制台,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。 日志流ID:单击日志组名称对应的,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。
  • 日志上报方式 支持以下上报方式:标准日志、结构化日志(新版)、结构化日志。推荐您使用结构化日志(新版)上报,更加灵活可变,性能更好。 标准日志: 批量上报日志,一次可以上报多条日志。 字段log表示原始日志,即一条日志是一个字符串。 字段log_time_ns表示此条日志上报时间,单位ns纳秒,便于在LTS页面查看日志时按照时间排序。 可以将日志按照不同的labels标签进行分类上报。 上报一批日志结构体如下: [{ "contents": [{ "log": "log content1", "log_time_ns": 1737527157333902200 }, { "log": "log content2", "log_time_ns": 1737527157333914100 }], "labels": "{\"lts-test-count\":\"2\"}" }, { "contents": [{ "log": "log content3", "log_time_ns": 1737527157333986200 }, { "log": "log content4", "log_time_ns": 1737527157333987400 }], "labels": "{\"lts-test-count\":\"2\"}" }] 推荐使用结构化日志(新版),最低SDK版本为1.1.3。 目前该方式仅支持白名单用户使用,如有需要,请提交工单申请开通。 支持批量日志上报、单条日志上报。 字段mContents表示一条日志。 字段mKey表示当前这条日志中的一个key。 字段mValue表示当前这条日志中的一个key对应的value值。 字段mLogTime表示此条日志上报时间,单位ms毫秒。 上报一批日志结构体如下: [{ "mContents": [{ "mKey": "content_key_1", "mValue": "content_value_1" }, { "mKey": "content_key_2", "mValue": "content_value_2" }, { "mKey": "content", "mValue": "sdk-new-struct-log1" }], "mLogTime": 1744159440780 }, { "mContents": [{ "mKey": "content_key_1", "mValue": "content_value_1" }, { "mKey": "content_key_2", "mValue": "content_value_2" }, { "mKey": "content", // 选填,云日志服务内部字段,此字段代表原始日志 "mValue": "sdk-new-struct-log2" }], "mLogTime": 1744159440780 }] 结构化日志: 目前该方式仅支持白名单用户使用,如有需要,请提交工单申请开通。 批量上报日志,一次可以上报多条日志。 字段contents表示某几条日志。单条日志使用K-V结构,即一条日志是一个JSON体。content为LTS保留字段,用来表示原始日志,可以不上报。 字段time表示某几条日志上报时间,单位ms毫秒,LTS会将这几条日志在ms毫秒的基础上拓展为ns纳秒保存。这样会丢失这几条日志的先后顺序,可能会影响到您在LTS页面查看日志的先后顺序。 字段labels表示这批日志公共的标签。 字段path表示这批日志的路径。 字段source表示这批日志的来源。 上报一批日志结构体如下: { "labels": { "label": "label" }, "logs": [{ "contents": [{ "k1": "v1", "k2": "v2", "content": "log content1" }, { "k3": "v3", "k4": "v4", "content": "log content2" }], "time": 1721784021037 }, { "contents": [{ "k5": "v5", "k6": "v6", "content": "log content3" }], "time": 1721784021038 }, ], "path": "path", "source": "source" }
  • Producer性能基线 上报日志时,请参考如下参数的测试性能基线,若超出基线值,可能会导致日志上报异常。 ECS虚拟机配置参考如下: 实例规格:通用计算增强型c7.xlarge.2 CPU:4 vCPU 内存:8 GB 基准带宽:100 Mbit/s OS:Huawei Cloud EulerOS release 2.0 JVM:OpenJDK 64-Bit Server VM (build 17.0.7+7, mixed mode) 测试程序说明(单个producer): totalSizeInBytes: 104857600 maxBlockMs:0 batchSizeThresholdInBytes: 1048576 batchCountThreshold:40960 lingerMs:2000 ioThreadCount:具体用例中调整 JVM初始堆/最大堆大小:1GB 发送日志总条数:100,000,000 发送日志总大小:约50GB 按照参数基线值设置后,使用华为云ECS机器作为日志上报环境,通过华为云内网LB网络服务入口进行上报。 标准日志SDK上报性能基线测试结果参考: 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。为了模拟数据的随机性,测试使用的日志数据为随机字符串,单条日志大小约为510字节。 [ { "contents" : [{ "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 }, { "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 } ] }, { "contents" : [{ "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 }, { "log" : "随机字符串510字节", "log_time_ns" : 1737527157333987400 } ] } ] 性能基线参考如下: 表2 标准日志SDK上报性能基线 IO 线程数量 数据吞吐量 数据吞吐速率 CPU 使用率 2 7.8 MB/S 1.5W 条/S 9 % 4 15.4 MB/S 3.1W 条/S 19 % 6 19.7 MB/S 3.9W 条/S 27 % 结构化日志SDK上报性能基线测试结果参考: 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。 单条日志包含10个键值对以及content、time字段。 为了模拟数据的随机性,测试使用的数据为随机字符串,单条日志大小约为550字节。 { "logs" : [ { "contents" : [ { "content": "sdk-log-new-struct-1", "content_key_1": "XmGFubcemwrceBWbZYRBTgohfxfFih", "..." : "...", "content_key_10": "amchrqwPdigHopmAkNLvJtNxgiPUzh" },{ "content": "sdk-log-new-struct-2", "content_key_1": "zOPDFRCNYsVznSgtnFejWFbaxklkMQ", "..." : "...", "content_key_10": "mLzpbYcumXsIgYtQIbzizoACLtUgwS" } ], "time": 1645374890235 },{ "contents" : [ { "content": "sdk-log-new-struct-3", "content_key_1": "SaGsfDrQskJaHlciNAUXFyxiqCAqXe", "..." : "...", "content_key_10": "wMQNuoVWonxVSsRsocQoDkEjcjiPio" },{ "content": "sdk-log-new-struct-4", "content_key_1": "bHDjNmAvdiLAvWdxoETANqCYxhVMMk", "..." : "...", "content_key_10": "scsxtrXrPUFYVARzOvbCxSofYZBsFV" } ], "time": 1645374890235 } ] } 性能基线参考如下: 表3 结构化日志SDK上报性能基线 IO 线程数量 数据吞吐量 数据吞吐速率 CPU 使用率 2 18.7 MB/S 3.5W 条/S 13 % 4 34.6 MB/S 6.4W 条/S 25 % 6 42.7 MB/S 7.8W 条/S 32 % 新版本结构化日志SDK上报性能基线测试结果参考: 上报日志格式:测试中使用的日志包含10个键值对以及content、logTime 字段。为了模拟数据的随机性,测试使用的数据为随机字符串。 单条日志大小约为540字节。 content: sdk-log-new-struct-1 content_key_1: sshyaqKCfrAPCMpdlxroPuCedeuJ content_key_2: RVFtqFBjBtTAHVvdHBYQsDsoogJc ... content_key_9: zLKeuxDnzGtupeZrQKKIlkQemXvX content_key_10: fYxmtYxKNfBhRfqMbZEOfimlsAIo logTime: 1645390242169 性能基线参考如下: 表4 新版本结构化日志SDK上报性能基线 IO 线程数量 数据吞吐量 数据吞吐速率 CPU 使用率 2 23.7 MB/S 4.5W 条/S 11 % 4 45.6 MB/S 8.7W 条/S 23 % 6 54.7 MB/S 10.3W 条/S 30 % 总结如下: CPU时间主要花费在对象的序列化和压缩上,在吞吐量较高的情况下CPU使用率比较高。但在日常环境中,单机数据流量均值为100KB/S,因此造成的CPU消耗几乎可以忽略不计。 增加IO线程数量可以显著提高吞吐量,尤其是当IO线程数量少于可用处理器个数时。 调整totalSizeInBytes对吞吐量影响不够显著,增加totalSizeInBytes会造成更多的CPU消耗,建议使用默认值。 当日志上报量超过单个producer时: 建议拆分日志流,使用多个producer上报日志,分摊流量,以保障SDK处于正常上报状态。 如果maxBlockMs为0时,SDK处于非阻塞状态,会触发保护机制自动降级,可能会对部分日志做丢弃处理。 如果maxBlockMs大于0时,SDK处于阻塞状态,阻塞时间为maxBlockMs,可能会造成producer.send()发送日志方法处于阻塞状态。
  • 步骤五:关闭Producer 当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。 安全关闭(推荐):建议您使用安全关闭。安全关闭对应的方法是close(),SDK会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行、返回future全部被设置后才会关闭Producer。 有限关闭:如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全