华为云用户手册

  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testSingleColumnValueFilter方法中。 public void testSingleColumnValueFilter() { LOG .info("Entering testSingleColumnValueFilter."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the filter criteria. SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Xu Bing")); scan.setFilter(filter); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Single column value filter successfully."); } catch (IOException e) { LOG.error("Single column value filter failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testSingleColumnValueFilter."); }
  • 准备本地应用开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持集群自带的OpenJDK,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的: X86客户端: Oracle JDK:支持1.8版本; IBM JDK:支持1.8.0.7.20和1.8.0.6.15版本。 ARM客户端: OpenJDK:支持1.8.0_272版本(集群自带JDK,可通过集群客户端安装目录中“JDK”文件夹下获取)。 毕昇JDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情可参考https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 毕昇JDK详细信息可参考https://www.hikunpeng.com/zh/developer/devkit/compiler/jdk。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 父主题: 准备Doris应用开发环境
  • 操作步骤 以客户端安装用户,登录安装HBase客户端的节点。 进入HBase客户端安装目录: 例如:cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建HBase表的权限,具体请参见创建角色配置拥有对应权限的角色,参考创建用户为用户绑定对应角色。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS 集群用户 例如,kinit hbaseuser。 直接执行Phoenix客户端命令。 sqlline.py 建表: CREATE TABLE TEST (id VARCHAR PRIMARY KEY, name VARCHAR); 插入数据: UPSERT INTO TEST(id,name) VALUES ('1','jamee'); 查询数据: SELECT * FROM TEST; 删表: DROP TABLE TEST; 退出Phoenix命令行。 !quit
  • 场景说明 当不同的多个Manager系统下安全模式的集群需要互相访问对方的资源时,管理员可以设置互信的系统,使外部系统的用户可以在本系统中使用。每个系统用户安全使用的范围定义为“域”,不同的Manager系统需要定义唯一的 域名 。跨Manager访问实际上就是用户跨域使用。集群配置互信具体操作步骤请参考集群互信管理章节。 多集群互信场景下,以符合跨域访问的用户身份,使用从其中一个manager系统中获取到的用于Kerberos安全认证的keytab文件和principal文件,以及多个Manager系统各自的客户端配置文件,可实现一次认证登录后访问调用多集群的HBase服务。 以下代码在hbase-example样例工程的“com.huawei.bigdata.hbase.examples”包的“TestMultipleLogin”类中。
  • 操作场景 在程序代码完成开发后,您可以在Windows环境中运行应用。本地和集群业务平面网络互通时,您可以直接在本地进行调测。 MapReduce应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 在IntelliJ IDEA中查看应用程序运行情况。 通过MapReduce日志获取应用程序运行情况。 登录MapReduce WebUI查看应用程序运行情况。 登录Yarn WebUI查看应用程序运行情况。 如果Windows运行环境中使用IBM JDK,不支持在Windows环境中直接运行应用程序。 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。
  • 通过Python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import os import logging import sys from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableEnvironment, EnvironmentSettings def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10) ) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json' ); create TABLE datagen_source_table ( age int, name varchar(10) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink_table SELECT * FROM datagen_source_table;
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import os import logging import sys from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableEnvironment, EnvironmentSettings def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10) ) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json' ); create TABLE datagen_source_table ( age int, name varchar(10) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink_table SELECT * FROM datagen_source_table;
  • 通过Python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
  • 开发思路 根据上述的业务场景进行功能分解,需要开发的功能点如表2所示。 表2 在HBase中开发的功能 序号 步骤 代码实现 1 根据表1中的信息创建表。 请参见创建HBase表。 2 导入用户数据。 请参见向HBase表中插入数据。 3 增加“教育信息”列族,在用户信息中新增用户的学历、职称等信息。 请参见修改HBase表。 4 根据用户编号查询用户姓名和地址。 请参见使用Get API读取HBase表数据。 5 根据用户姓名进行查询。 请参见使用Filter过滤器读取HBase表数据。 6 为提升查询性能,创建二级索引或者删除二级索引。 请参见创建HBase表二级索引和基于二级索引查询HBase表数据。 7 用户销户,删除用户信息表中该用户的数据。 请参见删除HBase表数据。 8 A业务结束后,删除用户信息表。 请参见删除HBase表。
  • 场景说明 假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,如表1所示,A业务操作流程如下: 创建用户信息表。 在用户信息中新增用户的学历、职称等信息。 根据用户编号查询用户姓名和地址。 根据用户姓名进行查询。 查询年龄段在[20-29]之间的用户信息。 数据统计,统计用户信息表的人员数、年龄最大值、年龄最小值、平均年龄。 用户销户,删除用户信息表中该用户的数据。 A业务结束后,删除用户信息表。 表1 用户信息 编号 姓名 性别 年龄 地址 12005000201 张三 男 19 广东省深圳市 12005000202 李婉婷 女 23 河北省石家庄市 12005000203 王明 男 26 浙江省宁波市 12005000204 李刚 男 18 湖北省襄阳市 12005000205 赵恩如 女 21 江西省上饶市 12005000206 陈龙 男 32 湖南省株洲市 12005000207 周微 女 29 河南省南阳市 12005000208 杨艺文 女 30 重庆市开县 12005000209 徐兵 男 26 陕西省渭南市 12005000210 肖凯 男 25 辽宁省大连市
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testGet方法中。 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("name"), Bytes.toBytes("address") }; // Specify RowKey. byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Create the Table instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testGet."); }
  • 功能简介 通过调用“org.apache.hadoop.hbase.hindex.global.GlobalIndexAdmin”中的方法进行HBase全局二级索引的管理,该类中addIndices用于创建全局二级索引。 全局二级索引的创建需要指定索引列、覆盖列(可选)、索引表预分区(可选,建议指定)。 在已有存量数据的表上创建全局二级索引,需要创建索引预分区,防止索引表出现热点,索引表数据的rowkey由索引列构成,并且包含分隔符,格式为“\x01索引值\x00”,因此预分区需要指定成对应格式,例如,当使用id列和age列作为索引列时,两个列均为整数,使用id列完成预分区,可以指定索引表预分区点为: \x010,\x011,\x012....
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testCreateTable方法中。 public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(tableName);(1) // Set the column family name to info. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"));(2) // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY);//注[1] htd.setColumnFamily(hcd.build()); (3) Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); (4) if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd.build());//注[2] (5) LOG.info(admin.getClusterMetrics().toString()); LOG.info(admin.listNamespaceDescriptors().toString()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
  • 注意事项 注[1] 可以设置列族的压缩方式,代码片段如下: //设置编码算法,HBase提供了DIFF,FAST_DIFF,PREFIX三种编码算法。 hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); //设置文件压缩方式,HBase默认提供了GZ和SNAPPY两种压缩算法 //其中GZ的压缩率高,但压缩和解压性能低,适用于冷数据 //SNAPPY压缩率低,但压缩解压性能高,适用于热数据 //建议默认开启SNAPPY压缩 hcd.setCompressionType(Compression.Algorithm.SNAPPY); 注[2] 可以通过指定起始和结束RowKey,或者通过RowKey数组预分Region两种方式建表,代码片段如下: // 创建一个预划分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits);
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式(强烈建议采用预分Region建表方式): 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 创建Doris连接 以下代码片段在“JDBCExample”类的“createConnection”方法中。 USER和PASSWD为在创建连接时用于进行安全认证的用户名和密码。 Class.forName(JDBC_DRIVER); String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT); connection = DriverManager.getConnection(dbUrl, USER, PASSWD); 父主题: Doris JDBC接口调用样例程序
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单地互通,而需要在通信之前进行相互认证,以确保通信的安全性。HBase应用开发需要进行ZooKeeper和Kerberos安全认证。用于ZooKeeper认证的文件为“jaas.conf”,用于Kerberos安全认证文件为keytab文件和krb5.conf文件。具体使用方法在样例代码的“README.md”中会有详细说明。 jaas.conf文件请参考获取MRS应用开发样例工程,进入“src/hbase-examples/hbase-zk-example/src/main/resources/”路径下获取。 keytab和krb5.conf文件获取方法请参考准备MRS应用开发用户。 安全认证主要采用代码认证方式。支持Oracle JAVA平台和IBM JAVA平台。 以下代码在“com.huawei.bigdata.hbase.examples”包的“TestMain”类中。
  • 代码样例 以下为代码示例: hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件 hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件 hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限 hbase.log.file=hbase-client.log //日志文件名 hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效 hbase.log.maxbackupindex=20 //最多保存的日志文件数目 # Security audit appender hbase.security.log.file=hbase-client-audit.log //审计日志文件命令
  • 准备开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发HBase应用程序的工具,版本要求:2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Junit插件 开发环境的基本配置。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行配置。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-Zip 16.04版本。
  • 查看Windows调测结果 Doris应用程序运行完成后,可通过如下方式查看运行情况。 通过IntelliJ IDEA运行结果查看应用程序运行情况。 通过Doris日志获取应用程序运行情况。 各样例程序运行结果如下: “doris-jdbc-example”样例运行成功后,显示信息如下: 2023-08-17 23:13:13,473 | INFO | main | Start execute doris example. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:41) 2023-08-17 23:13:13,885 | INFO | main | Start create database. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:44) 2023-08-17 23:13:13,949 | INFO | main | Database created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:46) 2023-08-17 23:13:13,950 | INFO | main | Start create table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:49) 2023-08-17 23:13:14,132 | INFO | main | Table created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:51) 2023-08-17 23:13:14,133 | INFO | main | Start to insert data into the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:54) 2023-08-17 23:13:14,733 | INFO | main | Inserting data to the table succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:56) 2023-08-17 23:13:14,733 | INFO | main | Start to query table data. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:59) 2023-08-17 23:13:15,079 | INFO | main | Start to print query result. | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:121) 2023-08-17 23:13:15,079 | INFO | main | c1 c2 c3 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:126) 2023-08-17 23:13:15,079 | INFO | main | 0 0 0 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 1 10 100 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 2 20 200 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 3 30 300 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 4 40 400 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 5 50 500 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 6 60 600 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 7 70 700 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 8 80 800 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 9 90 900 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | Querying table data succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:61) 2023-08-17 23:13:15,081 | INFO | main | Start to delete the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:64) 2023-08-17 23:13:15,114 | INFO | main | Table deleted successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:66) 2023-08-17 23:13:15,124 | INFO | main | Doris example execution successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:71) Process finished with exit code 0 Doris对接SpringBoot运行结果 在浏览器中访问链接“http://样例运行节点IP地址:8080/doris/example/executesql”,IDEA正常打印日志,请求返回如下图所示: 图7 返回样例运行信息
  • 查看Linux调测结果 “doris-jdbc-example”样例运行成功后,显示信息如下: 2023-08-17 23:13:13,473 | INFO | main | Start execute doris example. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:41) 2023-08-17 23:13:13,885 | INFO | main | Start create database. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:44) 2023-08-17 23:13:13,949 | INFO | main | Database created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:46) 2023-08-17 23:13:13,950 | INFO | main | Start create table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:49) 2023-08-17 23:13:14,132 | INFO | main | Table created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:51) 2023-08-17 23:13:14,133 | INFO | main | Start to insert data into the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:54) 2023-08-17 23:13:14,733 | INFO | main | Inserting data to the table succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:56) 2023-08-17 23:13:14,733 | INFO | main | Start to query table data. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:59) 2023-08-17 23:13:15,079 | INFO | main | Start to print query result. | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:121) 2023-08-17 23:13:15,079 | INFO | main | c1 c2 c3 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:126) 2023-08-17 23:13:15,079 | INFO | main | 0 0 0 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 1 10 100 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 2 20 200 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 3 30 300 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 4 40 400 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 5 50 500 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 6 60 600 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 7 70 700 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 8 80 800 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 9 90 900 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | Querying table data succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:61) 2023-08-17 23:13:15,081 | INFO | main | Start to delete the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:64) 2023-08-17 23:13:15,114 | INFO | main | Table deleted successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:66) 2023-08-17 23:13:15,124 | INFO | main | Doris example execution successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:71) Doris对接SpringBoot运行结果 在浏览器中访问链接“http://样例运行节点IP地址:8080/doris/example/executesql”,IDEA正常打印日志,请求返回如下图所示: 图3 返回样例运行信息
  • 操作场景 在程序代码完成开发后,您可以在Windows环境中运行应用。本地和集群业务平面网络互通时,您可以直接在本地进行调测。 MapReduce应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 在IntelliJ IDEA中查看应用程序运行情况。 通过MapReduce日志获取应用程序运行情况。 登录MapReduce WebUI查看应用程序运行情况。 登录Yarn WebUI查看应用程序运行情况。 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。
  • 运行多组件样例程序 将hive-site.xml、hbase-site.xml、hiveclient.properties放入工程的conf目录。 确保样例工程依赖的所有Hive、HBase相关jar包已正常获取。 打开MultiComponentLocalRunner.java,确认代码中System.setProperty("HADOOP_USER_NAME", "root");设置了用户为root,请确保场景说明中上传的数据的用户为root,或者在代码中将root修改为上传数据的用户名。 在IntelliJ IDEA开发环境中,选中“MultiComponentLocalRunner.java”工程,单击运行对应的应用程序工程。或者右键工程,选择“Run MultiComponentLocalRunner.main()”运行应用工程。 如果集群开启了ZooKeeper SSL,则运行该样例前,需要检查配置文件mapred-site.xml(准备运行环境中样例工程的“conf”配置文件目录中获取)的配置项“mapreduce.admin.map.child.java.opts”和“mapreduce.admin.reduce.child.java.opts”是否包含如下内容: -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty 如果不包含,将上述内容添加到配置项末尾处。
  • 删除Doris表 本章节介绍删除Doris表样例代码。 以下代码片段在“JDBCExample”类中。 以Java JDBC方式执行SQl语句删除集群中的dbName.tableName表。 String dropSql = "drop table " + dbName + "." + tableName; public static void execDDL(Connection connection, String sql) throws Exception { try (PreparedStatement statement = connection.prepareStatement(sql)) { statement.execute(); } catch (Exception e) { logger.error("Execute sql {} failed.", sql, e); throw new Exception(e); } } 父主题: Doris JDBC接口调用样例程序
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“GlobalSecondaryIndexSample”类中。 本样例用于查询指定id对应的id、age、name信息,并命中idx_id_age索引,由于查询结果被完全覆盖,且无需回原表查询,能够达到最优的查询性能。 /** * Scan data by secondary index. */ public void testScanDataByIndex() { LOG.info("Entering testScanDataByIndex."); Scan scan = new Scan(); // Create a filter for indexed column. SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("id"), CompareOperator.EQUAL, Bytes.toBytes("3")); filter.setFilterIfMissing(true); scan.setFilter(filter); // Specify returned columns // If returned columns not included in index table, will query back user table, // it's not the fast way to get data, suggest to cover all needed columns. // If you want to confirm whether using index for scanning, please set hbase client log level to DEBUG. scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id")); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age")); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); LOG.info("Scan indexed data."); try (Table table = conn.getTable(tableName); ResultScanner scanner = table.getScanner(scan)) { for (Result result : scanner) { for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data by index successfully."); } catch (IOException e) { LOG.error("Scan data by index failed ", e); } LOG.info("Exiting testScanDataByIndex."); }
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseMapPartitionExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseMapPartitionExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseMapPartitionExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为 、spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample SparkOnHbaseJavaExample.jar table2 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar table2 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testModifyTable方法中。 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = admin.getTableDescriptor(tableName); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. TableDescriptor tableBuilder = TableDescriptorBuilder.newBuilder(htd) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build()).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(tableBuilder); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting testModifyTable."); }
共100000条