云服务器内容精选

  • 开始导入 下面我们通过几个实际的场景示例来看Broker Load的使用。 数据样例: '100','101','102','103','104','105',100.00,100.01,100.02,'100',200,100.08,2022-04-01 '101','102','103','104','105','105',100.00,100.01,100.02,'100',200,100.08,2022-04-02 '102','103','104','105','106','105',100.00,100.01,100.02,'100',200,100.08,2022-04-03 准备工作: 在本地创建示例数据文件source_text.txt,并上传至hdfs的/tmp/。 在hive中创建ods_source表。 CREATE TABLE `ods_source`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` string, `actual_price` double, `day ` string ) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile; 将hdfs创建的txt文件导入到ods_source表。 load data inpath '/tmp/source_text.txt' into table ods_source;
  • 相关系统配置 FE配置。 下面几个配置属于Broker load的系统级别配置,也就是作用于所有Broker load导入任务的配置。主要通过修改FE配置项来调整配置值。 max_bytes_per_broker_scanner/max_broker_concurrency max_bytes_per_broker_scanner配置限制了单个BE处理的数据量的最大值。max_broker_concurrency配置限制了一个作业的最大的导入并发数。最小处理的数据量(默认64M),最大并发数,源文件的大小和当前集群BE的个数 共同决定了本次导入的并发数。 本次导入并发数=Math.min(源文件大小/最小处理量(默认64M),最大并发数,当前BE节点个数)。 本次导入单个BE的处理量=源文件大小/本次导入的并发数。 通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner*BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。 默认配置: 参数名:max_broker_concurrency, 默认10。 参数名:max_bytes_per_broker_scanner,默认3G,单位bytes。
  • 作业调度 系统会限制一个集群内正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。 首先,FE的配置参数:desired_max_waiting_jobs会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过这个阈值,新提交的作业将会被直接拒绝。 一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。 FE的配置参数async_pending_load_task_pool_size用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入LOADING状态开始执行,而其他作业处于PENDING等待状态。 FE的配置参数async_loading_load_task_pool_size用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1 pending task和多个loading task(等于LOAD语句中DATA INFILE子句的个数)。所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size。
  • 基本原理 用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。 BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。 + | 1. user create broker load v +----+----+ | | | FE | | | +----+----+ | | 2. BE etl and load the data +--------------------------+ | | | +---v---+ +--v----+ +---v---+ | | | | | | | BE | | BE | | BE | | | | | | | +---+-^-+ +---+-^-+ +--+-^--+ | | | | | | | | | | | | 3. pull data from broker +---v-+-+ +---v-+-+ +--v-+--+ | | | | | | |Broker | |Broker | |Broker | | | | | | | +---+-^-+ +---+-^-+ +---+-^-+ | | | | | | +---v-+-----------v-+----------v-+-+ | HDFS/BOS/AFS cluster | | | +----------------------------------+
  • 基本原理 下图展示了Stream load的主要流程,省略了一些导入细节。 ^ + | | | | 1A. User submit load to FE | | | +--v-----------+ | | FE | 5. Return result to user | +--+-----------+ | | | | 2. Redirect to BE | | | +--v-----------+ +---+Coordinator BE| 1B. User submit load to BE +-+-----+----+-+ | | | +-----+ | +-----+ | | | 3. Distrbute data | | | +-v-+ +-v-+ +-v-+ |BE | |BE | |BE | +---+ +---+ +---+ Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给用户。