华为云用户手册

  • 数据类型表 表1 数据类型表 分类 关键字 数据类型 描述 整数类型 Int8 Int8 取值范围:【-128,127】 Int16 Int16 取值范围:【-32768,32767】 Int32 Int32 取值范围:【-2147483648,2147483647】 Int64 Int64 取值范围:【-9223372036854775808,9223372036854775807】 浮点类型 Float32 单精度浮点数 同C语言Float类型,单精度浮点数在机内占4个字节,用32位二进制描述。 Float64 双精度浮点数 同C语言Double类型,双精度浮点数在机内占8个字节,用64位二进制描述。 Decimal类型 Decimal Decimal 有符号的定点数,可在加、减和乘法运算过程中保持精度。支持几种写法: Decimal(P, S) Decimal32(S) Decimal64(S) Decimal128(S) 说明: P:精度,有效范围:[1:38],决定可以有多少个十进制数字(包括分数)。 S:规模,有效范围:[0:P],决定数字的小数部分中包含的小数位数。 字符串类型 String 字符串 字符串可以是任意长度的。它可以包含任意的字节集,包含空字节。因此,字符串类型可以代替其他DBMSs中的VARCHAR、BLOB、CLOB等类型。 FixedString 固定字符串 当数据的长度恰好为N个字节时,FixedString类型是高效的。 在其他情况下,这可能会降低效率。可以有效存储在FixedString类型的列中的值的示例: 二进制表示的IP地址(IPv6使用FixedString(16)) 语言代码(ru_RU, en_US … ) 货币代码(USD, RUB … ) 二进制表示的哈希值(MD5使用FixedString(16),SHA256使用FixedString(32)) 时间日期类型 Date 日期 用两个字节存储,表示从1970-01-01(无符号)到当前的日期值。日期中没有存储时区信息。 DateTime 时间戳 用四个字节(无符号的)存储Unix时间戳。允许存储与日期类型相同的范围内的值。最小值为1970-01-01 00:00:00。时间戳类型值精确到秒(没有闰秒)。时区使用启动客户端或服务器时的系统时区。 DateTime64 DateTime64 此类型允许以日期(date)加时间(time)的形式来存储一个时刻的时间值。 布尔型 Boolean Boolean ClickHouse没有单独的类型来存储布尔值。可以使用UInt8 类型,取值限制为0或1。 数组类型 Array Array Array(T),由T类型元素组成的数组。T可以是任意类型,包含数组类型。但不推荐使用多维数组,ClickHouse对多维数组的支持有限。例如,不能在MergeTree表中存储多维数组。 元组类型 Tuple Tuple Tuple(T1, T2, ...),元组,其中每个元素都有单独的类型,不能在表中存储元组(除了内存表)。它们可以用于临时列分组。在查询中,IN表达式和带特定参数的lambda函数可以来对临时列进行分组。 Domains数据类型 Domains Domains Domains类型是特定实现的类型: IPv4是与UInt32类型保持二进制兼容的Domains类型,用于存储IPv4地址的值。它提供了更为紧凑的二进制存储的同时支持识别可读性更加友好的输入输出格式。 IPv6是与FixedString(16)类型保持二进制兼容的Domain类型,用于存储IPv6地址的值。它提供了更为紧凑的二进制存储的同时支持识别可读性更加友好的输入输出格式。 枚举类型 Enum8 Enum8 取值范围:【-128,127】 Enum保存 'string'= integer的对应关系,例如:Enum8('hello' = 1, 'world' = 2) Enum16 Enum16 取值范围:【-32768,32767】 可为空 Nullable Nullable 除非在ClickHouse服务器配置中另有说明,否则NULL是任何Nullable类型的默认值。Nullable类型字段不能包含在表索引中。 可以与TypeName的正常值存放一起。例如,Nullable(Int8) 类型的列可以存储Int8类型值,而没有值的行将存储NULL。 嵌套类型 nested nested 嵌套的数据结构就像单元格内的表格。嵌套数据结构的参数(列名和类型)的指定方式与CREATE TABLE查询中的指定方式相同。每个表行都可以对应于嵌套数据结构中的任意数量的行。 示例:Nested(Name1 Type1, Name2 Type2, …)
  • Distributed表引擎 Distributed表引擎本身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据到集群中的各个节点,分布式表需要和其他本地数据表一起协同工作。分布式表会将接收到的读写任务分发到各个本地表,而实际上数据的存储在各个节点的本地表中。 图2 Distributed Distributed表引擎创建模板: ENGINE = Distributed(cluster_name, database_name, table_name, [sharding_key]) 表6 Distributed表参数说明 参数 说明 cluster_name 集群名称,在对分布式表执行读写的过程中,使用集群的配置信息查找对应的ClickHouse实例节点。 database_name 数据库名称。 table_name 数据库下对应的本地表名称,用于将分布式表映射到本地表上。 sharding_key 分片键(可选参数),分布式表会按照这个规则,将数据分发到各个本地表中。 使用示例。 先创建一个表名为demo的ReplicatedMergeTree本地表。 CREATE TABLE default.demo ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/demo', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY id; 基于本地表demo创建表名为demo_all的Distributed表。 CREATE TABLE default.demo_all ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = Distributed(default_cluster, default, demo, rand()); 分布式表创建规则。 创建Distributed表时需加上on cluster cluster_name,这样建表语句在某一个ClickHouse实例上执行一次即可分发到集群中所有实例上执行。 分布式表通常以本地表加“_all”命名。它与本地表形成一对多的映射关系,之后可以通过分布式表代理操作多张本地表。 分布式表的表结构尽量和本地表的结构一致。如果不一致,在建表时不会报错,但在查询或者插入时可能会抛出异常。
  • VersionedCollapsingMergeTree 为了解决CollapsingMergeTree表引擎乱序写入导致无法正常折叠(删除)问题,云数据库ClickHouse提供了VersionedCollapsingMergeTree表引擎,在建表语句中新增一列Version,用于在乱序情况下记录状态行与取消行的对应关系。后台Compaction时会将主键相同、Version相同、Sign相反的行折叠(删除)。 建表语句。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = VersionedCollapsingMergeTree(sign, version) [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...]
  • AggregatingMergeTree AggregatingMergeTree表引擎也是预先聚合引擎的一种,用于提升聚合计算的性能。 建表语句。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = AggregatingMergeTree() [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [TTL expr] [SETTINGS name=value, ...]
  • Replicated*MergeTree引擎 ClickHouse中的所有MergeTree家族引擎前面加上Replicated就成了支持副本的合并树引擎。 图1 合并树引擎图 Replicated表引擎的创建模板: ENGINE = Replicated*MergeTree('ZooKeeper存储路径','副本名称', ...) 表5 参数表 参数 说明 ZooKeeper存储路径 ZooKeeper中该表相关数据的存储路径,建议规范化,如:/clickhouse/tables/{shard}/数据库名/表名。 副本名称 一般用{replica}即可。
  • SummingMergeTree SummingMergeTree表引擎用于对主键列进行预先聚合,将所有相同主键的行合并为一行,从而大幅度降低存储空间占用,提升聚合计算性能。 建表语句。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = SummingMergeTree([columns]) [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...] 使用示例。 创建一个SummingMergeTree表testTable。 CREATE TABLE testTable(id UInt32,value UInt32)ENGINE = SummingMergeTree() ORDER BY id; testTable表中插入数据。 INSERT INTO testTable Values(5,9),(5,3),(4,6),(1,2),(2,5),(1,4),(3,8); INSERT INTO testTable Values(88,5),(5,5),(3,7),(3,5),(1,6),(2,6),(4,7),(4,6),(43,5),(5,9),(3,6); 在未合并parts查询所有数据。 SELECT * FROM testTable; 查询结果。 ┌─id─┬─value─┐ │ 1 │ 6 │ │ 2 │ 5 │ │ 3 │ 8 │ │ 4 │ 6 │ │ 5 │ 12 │ └────┴───────┘ ┌─id─┬─value─┐ │ 1 │ 6 │ │ 2 │ 6 │ │ 3 │ 18 │ │ 4 │ 13 │ │ 5 │ 14 │ │ 43 │ 5 │ │ 88 │ 5 │ └────┴───────┘ ClickHouse还没有汇总所有行,需要通过ID进行汇总聚合,需要用到sum和GROUP BY子句。 SELECT id, sum(value) FROM testTable GROUP BY id; 查询结果。 ┌─id─┬─sum(value)─┐ │ 4 │ 19 │ │ 3 │ 26 │ │ 88 │ 5 │ │ 2 │ 11 │ │ 5 │ 26 │ │ 1 │ 12 │ │ 43 │ 5 │ └────┴────────────┘ 手工执行合并操作。 OPTIMIZE TABLE testTable; 查询表数据。 SELECT * FROM testTable; 查询结果。 ┌─id─┬─value─┐ │ 1 │ 12 │ │ 2 │ 11 │ │ 3 │ 26 │ │ 4 │ 19 │ │ 5 │ 26 │ │ 43 │ 5 │ │ 88 │ 5 │ └────┴───────┘ SummingMergeTree根据ORDER BY排序键作为聚合数据的条件Key。即如果排序key是相同的,则会合并成一条数据,并对指定的合并字段进行聚合。 后台执行合并操作时才会进行数据的预先聚合,而合并操作的执行时机无法预测,所以可能存在部分数据已经被预先聚合、部分数据尚未被聚合的情况。因此,在执行聚合计算时,SQL中仍需要使用GROUP BY子句。
  • ReplacingMergeTree 为了解决MergeTree表引擎相同主键无法去重的问题,云数据库ClickHouse提供了ReplacingMergeTree表引擎,用于删除主键值相同的重复项。 建表语句。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = ReplacingMergeTree([ver]) [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...]
  • CollapsingMergeTree CollapsingMergeTree表引擎用于消除ReplacingMergeTree表引擎的功能限制。该表引擎要求在建表语句中指定一个标记列Sign,按照Sign的值将行分为两类:Sign=1的行称为状态行,用于新增状态。Sign=-1的行称为取消行,用于删除状态。 建表语句。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = CollapsingMergeTree(sign) [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name=value, ...]
  • MergeTree 建表语法。 CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1, INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2 ) ENGINE = MergeTree() ORDER BY expr [PARTITION BY expr] [PRIMARY KEY expr] [SAMPLE BY expr] [TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...] [SETTINGS name=value, ...] 使用示例。 CREATE TABLE default.test (name1 DateTime,name2 String,name3 String,name4 String,name5 Date) ENGINE = MergeTree() PARTITION BY toYYYYMM(name5) ORDER BY (name1, name2) SETTINGS index_granularity = 8192; 示例参数说明: 表2 参数说明 参数 说明 ENGINE = MergeTree() MergeTree表引擎。 PARTITION BY toYYYYMM(name5) 分区,示例数据将以月份为分区,每个月份一个文件夹。 ORDER BY 排序字段,支持多字段的索引排序,第一个相同的时候按照第二个排序依次类推。 index_granularity = 8192 排序索引的颗粒度,每8192条数据记录一个排序索引值。 如果被查询的数据存在于分区或排序字段中,能极大降低数据查找时间。
  • 概述 表引擎即表的类型,在云数据库ClickHouse中决定了如何存储和读取数据、是否支持索引、是否支持主备复制等。云数据库ClickHouse支持的表引擎,请参见下表。 MergeTree引擎为单副本,无法保证高可用和数据可靠性,建议只在测试环境中使用。Replicated*MergeTree引擎用于生产环境。 表1 表引擎 系列 描述 表引擎 特点 MergeTree MergeTree系列引擎适用于高负载任务,支持大数据量的快速写入并进行后续的数据处理,通用程度高且功能强大。 该系列引擎的共同特点是支持数据副本、分区、数据采样等特性。 MergeTree 基于分区键(partitioning key)的数据分区分块存储。 数据索引排序(基于primary key和order by)。 支持数据复制(带Replicated前缀的表引擎)。 支持数据抽样。 在写入数据时,该系列引擎表会按照分区键将数据分成不同的文件夹,文件夹内每列数据为不同的独立文件,以及创建数据的序列化索引排序记录文件。该结构使得数据读取时能够减少数据检索时的数据量,极大的提高查询效率。 RelacingMergeTree 用于解决MergeTree表引擎相同主键无法去重的问题,可以删除主键值相同的重复项。 CollapsingMergeTree CollapsingMergeTree它通过定义一个sign标记位字段记录数据行的状态。如果sign标记为1,则表示这是一行有效的数据。如果sign标记为-1,则表示这行数据需要被删除。 VersionedCollapsingMergeTree 在建表语句中新增Version列,用于解决CollapsingMergeTree表引擎乱序写入导致无法正常折叠(删除)的问题。 SummigMergeTree 用于对主键列进行预先聚合,将所有相同主键的行合并为一行,从而大幅度降低存储空间占用,提升聚合计算性能。 AggregatingMergeTree AggregatingMergeTree是预先聚合引擎的一种,用于提升聚合计算的性能。AggregatingMergeTree引擎能够在合并分区时,按照预先定义的条件聚合数据,同时根据预先定义的聚合函数计算数据并通过二进制的格式存入表内。 GraphiteMergeTree 用于存储Graphite数据并进行汇总,可以减少存储空间,提高Graphite数据的查询效率。 Replicated*MergeTree ClickHouse中的所有MergeTree家族引擎前面加上Replicated就成了支持副本的合并树引擎。 Replicated*MergeTree系列 Replicated系列引擎借助ZooKeeper实现数据的同步,创建Replicated复制表时通过注册到ZooKeeper上的信息实现同一个分片的所有副本数据进行同步。 Distributed - Distributed 本身不存储数据,可以在多个服务器上进行分布式查询。
  • JDBC Connector 如果使用mysql jdbc connector来连接Doris,可以使用jdbc的自动重试机制: private static String URL = "jdbc:mysql:loadbalance://" + "cloudtable-2e68-ya-frontend-2-1-M07K7np5.mycloudtable.com:9030,cloudtable-2e68-ya-frontend-1-1-y5R8YNiy.mycloudtable.com:9030,cloudtable-2e68-ya-frontend-3-1-fGg7P4tA.mycloudtable.com:9030/demo?" + "loadBalanceConnectionGroup=first&ha.enableJMX=true"; 样例代码: public class Test { private static String URL = "jdbc:mysql:loadbalance://" + "FE1:9030,FE2:9030,FE3:9030/demo?" + "loadBalanceConnectionGroup=first&ha.enableJMX=true"; static Connection getNewConnection() throws SQLException, ClassNotFoundException { Class.forName("com.mysql.cj.jdbc.Driver"); // 认证用的密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量 String password = System.getenv("USER_PASSWORD"); return DriverManager.getConnection(URL, "admin", password); } public static void main(String[] args) throws Exception { Connection c = getNewConnection(); while (true) { try { String query = "your sqlString"; c.setAutoCommit(false); Statement s = c.createStatement(); ResultSet resultSet = s.executeQuery(query); System.out.println("begin print"); while(resultSet.next()) { int id = resultSet.getInt(1); System.out.println("id is: "+id); } System.out.println("end print"); Thread.sleep(Math.round(100 * Math.random())); } catch (Exception e) { e.printStackTrace(); } } } }
  • 代码样例 不指定HOT_ONLY参数来查询数据。在这种情况下,将会查询冷存储中的数据。 public void testScanData() { LOG .info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); byte[] startRow = Bytes.toBytes(Shenzhen#Longgang#2017/7/1 00:00:00); byte[] stopRow = Bytes.toBytes(Shenzhen#Longgang#2017/7/3 00:00:00); scan.setStartRow(startRow); scan.setStopRow(stopRow); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("temp")); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testScanData."); } 通过指定HOT_ONLY参数来查询数据。在这种情况下,只会查询热存储中的数据。 public void testScanData() { LOG.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); byte[] startRow = Bytes.toBytes(Shenzhen#Longgang#2017/7/1 00:00:00); byte[] stopRow = Bytes.toBytes(Shenzhen#Longgang#2017/7/3 00:00:00); scan.setStartRow(startRow); scan.setStopRow(stopRow); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("temp")); // Set HOT_ONLY. scan.setAttribute(HBaseConstants.HOT_ONLY, Bytes.toBytes(true)); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testScanData."); }
  • 代码样例 不指定HOT_ONLY参数来查询数据。在这种情况下,将会查询冷存储中的数据。 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("temp"), Bytes.toBytes("hum") }; // Specify RowKey. byte[] rowKey = Bytes.toBytes("Shenzhen#Longgang#2017/7/1 03:00:00"); Table table = null; try { // Create the Table instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testGet."); } 通过指定HOT_ONLY参数来查询数据。在这种情况下,只会查询热存储中的数据。 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("temp"), Bytes.toBytes("hum") }; // Specify RowKey. byte[] rowKey = Bytes.toBytes("Shenzhen#Longgang#2017/7/2 10:00:00"); Table table = null; try { // Create the Table instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set HOT_ONLY. get.setAttribute(HBaseConstants.HOT_ONLY, Bytes.toBytes(true)); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testGet."); }
  • 代码样例 public void testPut() { LOG.info("Entering testPut."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifiers = { Bytes.toBytes("temp"), Bytes.toBytes("hum") }; Table table = null; try { // Instantiate an HTable object. table = conn.getTable(tableName); // Instantiate a Put object. Every Hour insert one data. Put put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 00:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 01:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("53.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 02:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("52.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 03:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); puts.add(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 04:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 05:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 06:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 07:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 08:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 09:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 10:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 11:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 12:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 13:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("33.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 14:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("33.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 15:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 16:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("31.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 17:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 18:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 19:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 20:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("52.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 21:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("53.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 22:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/1 23:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 00:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 01:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("53.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 02:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("52.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 03:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); puts.add(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 04:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 05:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 06:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 07:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("27.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 08:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 09:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("46.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 10:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 11:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("48.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 12:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 13:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("33.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("49.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 14:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("33.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 15:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("32.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("50.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 16:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("31.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 17:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 18:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("30.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); puts.clear(); puts.add(put); table.put(puts); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 19:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("51.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 20:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("52.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 21:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("29.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("53.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 22:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); put = new Put(Bytes.toBytes("Shenzhen#Longgang#2017/7/2 23:00:00")); put.addColumn(familyName, qualifiers[0], Bytes.toBytes("28.0")); put.addColumn(familyName, qualifiers[1], Bytes.toBytes("54.0")); table.put(put); LOG.info("Put successfully."); } catch (IOException e) { LOG.error("Put failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testPut."); }
  • 代码样例 取消冷热时间线。 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. HTableDescriptor htd = admin.getTableDescriptor(tableName); // Check whether the column family is specified before modification. if (!htd.hasFamily(familyName)) { // Create the column descriptor. HColumnDescriptor hcd = new HColumnDescriptor(familyName); //Disable hot and cold separation. hcd .setValue(HColumnDescriptor.COLD_BOUNDARY, null); htd.addFamily(hcd); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName); // Submit a modifyTable request. admin.modifyTable(tableName, htd); //注[1] // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting testModifyTable."); } 注意事项。 注[1] 只有在调用disableTable接口后, 再调用modifyTable接口才能将表修改成功。之后,请调用enableTable接口重新启用表。 注[1] 指的是代码样例中的“admin.modifyTable(tableName, htd); //注[1]”。
  • 代码样例 public void dropTable() { LOG.info("Entering dropTable."); Admin admin = null; try { admin = conn.getAdmin(); if (admin.tableExists(tableName)) { // Disable the table before deleting it. admin.disableTable(tableName); // Delete table. admin.deleteTable(tableName);//注[1] } LOG.info("Drop table successfully."); } catch (IOException e) { LOG.error("Drop table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting dropTable."); }
  • 注意事项 注[1]只有在调用disableTable接口后, 再调用deleteTable接口才能将表删除成功。 因此,deleteTable常与disableTable,enableTable,tableExists,isTableEnabled,isTableDisabled结合在一起使用。 注[1]指的是代码样例中的“admin.deleteTable(tableName);//注[1]”。
  • 功能介绍 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名、冷热时间线。 创建表有两种方式(强烈建议采用预分Region建表方式): 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 代码样例 public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. HTableDescriptor htd = new HTableDescriptor(tableName); // (1) // Set the column family name to info. HColumnDescriptor hcd = new HColumnDescriptor("info"); // (2) // Set hot and cold data boundary hcd.setValue(HColumnDescriptor.COLD_BOUNDARY, "86400"); htd.addFamily(hcd); // (3) Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // (4) if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd); // 注[1] (5) LOG.info(admin.getClusterStatus()); LOG.info(admin.listNamespaceDescriptors()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed.", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting testCreateTable."); } 代码编号解释 (1)创建表描述符。 (2)创建列族描述符。 (3)添加列族描述符到表描述符中。 (4)获取Admin对象,Admin提供了建表、创建列族、检查表是否存在、修改表结构和列族结构以及删除表等功能。 (5)调用Admin的建表方法。 注意事项 注[1] 表和列族其它属性设置可以参考开发HBase应用。 注[1] 指的是代码样例中的“admin.createTable(htd); // 注[1] (5)”。
  • 功能介绍 HBase通过ConnectionFactory.createConnection(configuration)方法创建Connection对象。传递的参数为上一步创建的Configuration。 Connection封装了底层与各实际服务器的连接以及与ZooKeeper的连接。Connection通过ConnectionFactory类实例化。创建Connection是重量级操作,而且Connection是线程安全的,因此,多个客户端线程可以共享一个Connection。 典型的用法,一个客户端程序共享一个单独的Connection,每一个线程获取自己的Admin或Table实例,然后调用Admin对象或Table对象提供的操作接口。不建议缓存或者池化Table、Admin。Connection的生命周期由调用者维护,调用者通过调用close(),释放资源。 建议业务代码连接同一个CloudTable集群时,多线程创建并复用同一个Connection,不必每个线程都创建各自Connection。Connection是连接CloudTable集群的连接器,创建过多连接会加重Zookeeper负载,并损耗业务读写性能。
  • 代码样例 以下代码片段是创建Connection对象的示例: private TableName tableName = null; private Connection conn = null; public HBaseSample(Configuration conf) throws IOException { this.tableName = TableName.valueOf("hbase_sample_table"); this.conn = ConnectionFactory.createConnection(conf); }
  • 代码样例 下面代码片段在com.huawei.cloudtable.hbase.examples.coldhotexample包中。 private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); // 注[1] String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; Path hbaseSite = new Path(userdir + "hbase-site.xml"); if (new File(hbaseSite.toString()).exists()) { conf.addResource(hbaseSite); } }
  • 场景说明 假定用户开发一个应用程序,用于实时记录和查询城市的气象信息,记录数据如下表: 表1 原始数据 城市 区域 时间 温度 湿度 Shenzhen Longgang 2017/7/1 00:00:00 28 54 Shenzhen Longgang 2017/7/1 01:00:00 27 53 Shenzhen Longgang 2017/7/1 02:00:00 27 52 Shenzhen Longgang 2017/7/1 03:00:00 27 51 Shenzhen Longgang 2017/7/1 04:00:00 27 50 Shenzhen Longgang 2017/7/1 05:00:00 27 49 Shenzhen Longgang 2017/7/1 06:00:00 27 48 Shenzhen Longgang 2017/7/1 07:00:00 27 46 Shenzhen Longgang 2017/7/1 08:00:00 29 46 Shenzhen Longgang 2017/7/1 09:00:00 30 48 Shenzhen Longgang 2017/7/1 10:00:00 32 48 Shenzhen Longgang 2017/7/1 11:00:00 32 49 Shenzhen Longgang 2017/7/1 12:00:00 33 49 Shenzhen Longgang 2017/7/1 13:00:00 33 50 Shenzhen Longgang 2017/7/1 14:00:00 32 50 Shenzhen Longgang 2017/7/1 15:00:00 32 50 Shenzhen Longgang 2017/7/1 16:00:00 31 51 Shenzhen Longgang 2017/7/1 17:00:00 30 51 Shenzhen Longgang 2017/7/1 18:00:00 30 51 Shenzhen Longgang 2017/7/1 19:00:00 29 51 Shenzhen Longgang 2017/7/1 20:00:00 29 52 Shenzhen Longgang 2017/7/1 21:00:00 29 53 Shenzhen Longgang 2017/7/1 22:00:00 28 54 Shenzhen Longgang 2017/7/1 23:00:00 28 54 Shenzhen Longgang 2017/7/2 00:00:00 28 54 Shenzhen Longgang 2017/7/2 01:00:00 27 53 Shenzhen Longgang 2017/7/2 02:00:00 27 52 Shenzhen Longgang 2017/7/2 03:00:00 27 51 Shenzhen Longgang 2017/7/2 04:00:00 27 50 Shenzhen Longgang 2017/7/2 05:00:00 27 49 Shenzhen Longgang 2017/7/2 06:00:00 27 48 Shenzhen Longgang 2017/7/2 07:00:00 27 46 Shenzhen Longgang 2017/7/2 08:00:00 29 46 Shenzhen Longgang 2017/7/2 09:00:00 30 48 Shenzhen Longgang 2017/7/2 10:00:00 32 48 Shenzhen Longgang 2017/7/2 11:00:00 32 49 Shenzhen Longgang 2017/7/2 12:00:00 33 49 Shenzhen Longgang 2017/7/2 13:00:00 33 50 Shenzhen Longgang 2017/7/2 14:00:00 32 50 Shenzhen Longgang 2017/7/2 15:00:00 32 50 Shenzhen Longgang 2017/7/2 16:00:00 31 51 Shenzhen Longgang 2017/7/2 17:00:00 30 51 Shenzhen Longgang 2017/7/2 18:00:00 30 51 Shenzhen Longgang 2017/7/2 19:00:00 29 51 Shenzhen Longgang 2017/7/2 20:00:00 29 52 Shenzhen Longgang 2017/7/2 21:00:00 29 53 Shenzhen Longgang 2017/7/2 22:00:00 28 54 Shenzhen Longgang 2017/7/2 23:00:00 28 54
  • 基本原理 下图展示了Stream load的主要流程,省略了一些导入细节。 ^ + | | | | 1A. User submit load to FE | | | +--v-----------+ | | FE | 5. Return result to user | +--+-----------+ | | | | 2. Redirect to BE | | | +--v-----------+ +---+Coordinator BE| 1B. User submit load to BE +-+-----+----+-+ | | | +-----+ | +-----+ | | | 3. Distrbute data | | | +-v-+ +-v-+ +-v-+ |BE | |BE | |BE | +---+ +---+ +---+ Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给用户。
  • 开始导入 下面我们通过几个实际的场景示例来看Broker Load的使用。 数据样例: '100','101','102','103','104','105',100.00,100.01,100.02,'100',200,100.08,2022-04-01 '101','102','103','104','105','105',100.00,100.01,100.02,'100',200,100.08,2022-04-02 '102','103','104','105','106','105',100.00,100.01,100.02,'100',200,100.08,2022-04-03 准备工作: 在本地创建示例数据文件source_text.txt,并上传至hdfs的/tmp/。 在hive中创建ods_source表。 CREATE TABLE `ods_source`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` string, `actual_price` double, `day ` string ) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile; 将hdfs创建的txt文件导入到ods_source表。 load data inpath '/tmp/source_text.txt' into table ods_source;
  • 相关系统配置 FE配置。 下面几个配置属于Broker load的系统级别配置,也就是作用于所有Broker load导入任务的配置。主要通过修改FE配置项来调整配置值。 max_bytes_per_broker_scanner/max_broker_concurrency max_bytes_per_broker_scanner配置限制了单个BE处理的数据量的最大值。max_broker_concurrency配置限制了一个作业的最大的导入并发数。最小处理的数据量(默认64M),最大并发数,源文件的大小和当前集群BE的个数 共同决定了本次导入的并发数。 本次导入并发数=Math.min(源文件大小/最小处理量(默认64M),最大并发数,当前BE节点个数)。 本次导入单个BE的处理量=源文件大小/本次导入的并发数。 通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner*BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。 默认配置: 参数名:max_broker_concurrency, 默认10。 参数名:max_bytes_per_broker_scanner,默认3G,单位bytes。
  • 作业调度 系统会限制一个集群内正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。 首先,FE的配置参数:desired_max_waiting_jobs会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过这个阈值,新提交的作业将会被直接拒绝。 一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。 FE的配置参数async_pending_load_task_pool_size用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入LOADING状态开始执行,而其他作业处于PENDING等待状态。 FE的配置参数async_loading_load_task_pool_size用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1 pending task和多个loading task(等于LOAD语句中DATA INFILE子句的个数)。所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size。
  • 基本原理 用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。 BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。 + | 1. user create broker load v +----+----+ | | | FE | | | +----+----+ | | 2. BE etl and load the data +--------------------------+ | | | +---v---+ +--v----+ +---v---+ | | | | | | | BE | | BE | | BE | | | | | | | +---+-^-+ +---+-^-+ +--+-^--+ | | | | | | | | | | | | 3. pull data from broker +---v-+-+ +---v-+-+ +--v-+--+ | | | | | | |Broker | |Broker | |Broker | | | | | | | +---+-^-+ +---+-^-+ +---+-^-+ | | | | | | +---v-+-----------v-+----------v-+-+ | HDFS/BOS/AFS cluster | | | +----------------------------------+
  • 导入的原子性保证 Doris的每一个导入任务,不论是使用Broker Load进行批量导入,还是使用INSERT语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。 同时,每个导入任务都会有一个Label。这个Label在数据库(Database)中是唯一的,用于唯一标识一个导入任务。Label可以由用户指定,部分导入功能也会由系统自动生成。 Label是用于保证对应的导入任务,仅能成功导入一次。一个被成功导入的Label,再次使用时,会被拒绝并报错Label already used。通过这个机制,可以在Doris侧做到At-Most-Once语义。如果结合上游系统的At-Least-Once语义,则可以实现导入数据的Exactly-Once语义。
  • 同步和异步 Doris目前的导入方式分为两类,同步和异步。如果是外部程序接入Doris的导入功能,需要判断使用导入方式是哪类再确定接入逻辑。 同步 同步导入方式即用户创建导入任务,Doris同步执行导入,执行完成后返回用户导入结果。用户可直接根据创建导入任务命令返回的结果同步判断导入是否成功。 异步 异步导入方式即用户创建导入任务后,Doris直接返回创建成功。创建成功不代表数据已经导入。导入任务会被异步执行,用户在创建成功后,需要通过轮询的方式发送命令查看导入作业的状态。如果创建失败,则可以根据失败信息,判断是否需要再次创建。 无论是异步还是同步的导入类型,都不应该在Doris返回导入失败或导入创建失败后,无休止的重试。外部系统在有限次数重试并失败后,保留失败信息,大部分多次重试均失败问题都是使用方法问题或数据本身问题。
共100000条