华为云用户手册

  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testSelect方法中。 /** * Select Data */ public void testSelect() { LOG .info("Entering testSelect."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Query String querySQL = "SELECT * FROM TEST WHERE id = ?"; Connection conn = null; PreparedStatement preStat = null; Statement stat = null; ResultSet result = null; try { // Create Connection conn = DriverManager.getConnection(url, props); // Create Statement stat = conn.createStatement(); // Create PrepareStatement preStat = conn.prepareStatement(querySQL); // Execute query preStat.setInt(1, 1); result = preStat.executeQuery(); // Get result while (result.next()) { int id = result.getInt("id"); String name = result.getString(1); System.out.println("id: " + id); System.out.println("name: " + name); } LOG.info("Select successfully."); } catch (Exception e) { LOG.error("Select failed.", e); } finally { if (null != result) { try { result.close(); } catch (Exception e2) { LOG.error("Result close failed.", e2); } } if (null != stat) { try { stat.close(); } catch (Exception e2) { LOG.error("Stat close failed.", e2); } } if (null != conn) { try { conn.close(); } catch (Exception e2) { LOG.error("Connection close failed.", e2); } } } LOG.info("Exiting testSelect."); }
  • 回答 由于checkpoint中包含了spark应用的对象序列化信息、task执行状态信息、配置信息等,因此,当存在以下问题时,从checkpoint恢复spark应用将会失败。 业务代码变更且变更类未明确指定SerialVersionUID。 spark内部类变更,且变更类未明确指定SerialVersionUID。 另外,由于checkpoint保存了部分配置项,因此可能导致业务修改了部分配置项后,从checkpoint恢复时,配置项依然保持为旧值的情况。当前只有以下部分配置会在从checkpoint恢复时重新加载。 "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", "spark.yarn.principal", "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", "spark.ui.filters", "spark.mesos.driver.frameworkId", "spark.yarn.jars"
  • Impala SQL编写之不支持隐式类型转换 查询语句使用字段的值做过滤时,不支持使用Hive类似的隐式类型转换来编写Impala SQL: Impala示例: select * from default.tbl_src where id = 10001; select * from default.tbl_src where name = 'TestName'; Hive示例(支持隐式类型转换): select * from default.tbl_src where id = '10001'; select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考开发Storm应用,完整代码请参考Strom-examples示例工程。
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Impala表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Impala数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Impala数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见Impala样例程序指导。
  • 部署运行及结果查看 导出本地jar包,请参见打包Strom样例工程应用。 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Strom应用业务。 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml 如果设置业务以本地模式启动,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml 如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。 如果使用了properties文件,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties 拓扑提交成功后请自行登录storm UI查看。
  • 代码样例 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。 下面代码片段仅为演示,具体代码参见SparkHbasetoHbasePythonExample: # -*- coding:utf-8 -*- from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession,设置kryo序列化 spark = SparkSession\ .builder\ .appName("SparkHbasetoHbase") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") \ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.SparkHbasetoHbase') # 创建类实例并调用方法 spark._jvm.SparkHbasetoHbase().hbasetohbase(spark._jsc) # 停止SparkSession spark.stop()
  • 操作步骤 在运行调测环境上创建一个目录作为运行目录,如“/opt/impala_examples”,并在该目录下创建子目录“conf”,将样例代码文件夹impala-examples-normal中的client.properties上传到linux系统上的样例代码文件夹中的/opt/impala_examples/conf文件夹,并在client.properties中填入impalad的ip地址。 在cmd或Intellij中执行mvn package ,在工程target目录下获取jar包,比如“impala-examples-mrs-2.1-jar-with-dependencies.jar”,复制到“/opt/impala_examples”下。 source客户端中的bigdata_env,在Linux环境下执行如下命令运行样例程序。 chmod +x /opt/impala_examples -R cd /opt/impala_examples java -cp impala-examples-mrs-2.1-jar-with-dependencies.jar com.huawei.bigdata.impala.example.ExampleMain 在命令行终端查看样例代码中的Impala SQL所查询出的结果。 Linux环境运行成功结果会有如下信息。 Create table success! _c0 0 Delete table success! 如果出现如下报错提示,请客户根据log报错信息自行配置log4j2信息。
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 问题现象 Spark能对接很多的第三方工具,因此在使用过程中经常会依赖一堆的三方包。而有一些包 MRS 已经自带,这样就有可能造成代码使用的jar包版本和集群自带的jar包版本不一致,在使用过程中就有可能出现jar包冲突的情况。 常见的jar包冲突报错有: 1、报错类找不到:java.lang.NoClassDefFoundError 2、报错方法找不到:java.lang.NoSuchMethodError
  • 原因分析 以自定义UDF为例: 报错信息显示是找不到类。 首先需要确认的是这个类属于的jar包是否在jvm的classpath里面, spark自带的jar都在“spark客户端目录/jars/”。 确认是否存在多个jar包拥有这个类。 如果是其他依赖包,可能是没有使用--jars添加到任务里面。 如果是已经添加到任务里面,但是依旧没有取到,可能是因为配置文件的driver或者executor的classpath配置不正确,可以查看日志确认是否加载到环境。 另外可能报错是类初始化失败导致后面使用这个类的时候出现上述报错,需要确认是否在之前就有初始化失败或者其他报错的情况发生。 报错信息显示找不到方法。 确认这个方法对应的类所在的jar包是否加载到jvm的classpath里面,spark自带的类都在“spark客户端目录/jars/”。 确认是否有多个jar包包含这个类(尤其注意相同工具的不同版本)。 如果报错是Hadoop相关的包,有可能是因为使用的Hadoop版本不一致导致部分方法已经更改。 如果报错的是三方包里面的类,可能是因为Spark已经自带了相关的jar包,但是和代码中使用的版本不一致。
  • 代码样例 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。 下面代码片段仅为演示,具体代码参见SparkHbasetoHbasePythonExample: # -*- coding:utf-8 -*- from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession,设置kryo序列化 spark = SparkSession\ .builder\ .appName("SparkHbasetoHbase") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") \ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.SparkHbasetoHbase') # 创建类实例并调用方法 spark._jvm.SparkHbasetoHbase().hbasetohbase(spark._jsc) # 停止SparkSession spark.stop()
  • 安全认证 图1 Flink系统认证方式 Flink整个系统存在三种认证方式,需参考下表进行配置: 使用kerberos认证:Flink yarn client与Yarn Resource Manager、JobManager与Zookeeper、JobManager与HDFS、TaskManager与HDFS、Kafka与TaskManager、TaskManager和Zookeeper。 使用security cookie进行认证:Flink yarn client与Job Manager、JobManager与TaskManager、TaskManager与TaskManager。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 Flink的JobManager与YARN的AM是在同一个进程下。 如果用户安装安全模式需要使用kerberos认证和security cookie认证。 表1 安全认证方式 安全认证方式 配置方法 Kerberos认证(当前只支持keytab认证方式) 从 FusionInsight Manager上下载准备集群认证用户信息创建的用户keytab文件,并放置到Flink客户端所在节点的某个目录下。 在“客户端安装路径/Flink/flink/conf/flink-conf.yaml”上配置: 配置客户端安装节点的业务IP和Master节点IP到“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”配置项中。 jobmanager.web.access-control-allow-origin: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx jobmanager.web.allow-access-address: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx 说明: 集群外节点业务IP为安装客户端所在的弹性云服务器的IP。集群内节点业务IP获取方式如下: 登录 MapReduce服务 管理控制台,选择“现有集群”,选中当前的集群并单击集群名,进入集群信息页面。在“节点管理”中查看安装客户端所在的节点IP。 keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/flinkuser.keytab 说明: “/home/flinkuser/keytab/”表示的是用户保存keytab文件的目录。 principal名为用于运行作业的用户名。 security.kerberos.login.principal: flinkuser 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。 zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient Security Cookie认证 将“generate_keystore.sh”脚本放置到Flink客户端“bin”目录中并调用“generate_keystore.sh”脚本,生成“Security Cookie”、“flink.keystore”文件和“flink.truststore”文件。具体操作可参考认证和加密。 执行sh generate_keystore.sh,输入用户自定义密码。密码不允许包含#。 说明: 执行脚本后,在Flink客户端的“conf”目录下生成“flink.keystore”和“flink.truststore”文件,并且在客户端配置文件“flink-conf.yaml”中将以下配置项进行了默认赋值。 将配置项“security.ssl.keystore”设置为“flink.keystore”文件所在绝对路径。 将配置项“security.ssl.truststore”设置为“flink.truststore”文件所在的绝对路径。 将配置项“security.cookie”设置为“generate_keystore.sh”脚本自动生成的一串随机规则密码。 默认“flink-conf.yaml”中“security.ssl.encrypt.enabled: false”,“generate_keystore.sh”脚本将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值设置为调用“generate_keystore.sh”脚本时输入的密码。 MRS 3.x及之后版本,如果需要使用密文时,设置“flink-conf.yaml”中“security.ssl.encrypt.enabled: true”,“generate_keystore.sh”脚本不会配置“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值,需要使用Manager明文加密API进行获取,执行curl -k -i -u user name:password -X POST -HContent-type:application/json -d '{"plainText":"password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt' 其中user name:password分别为当前系统登录用户名和密码;"plainText"的password为调用“generate_keystore.sh”脚本时的密码;x.x.x.x为集群Manager的浮动IP。 打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功,例如: security.cookie: ae70acc9-9795-4c48-ad35-8b5adc8071744f605d1d-2726-432e-88ae-dd39bfec40a9 YARN内部认证方式 该方式是YARN内部的认证方式,不需要用户配置。 当前一个Flink集群只支持一个用户,一个用户可以创建多个Flink集群。
  • 注意事项 注[1]:创建联合索引 HBase支持在多个字段上创建二级索引,例如在列name和age上。 HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);
  • 代码样例 下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中: 样例:使用二级索引查找数据 public void testScanDataByIndex() { LOG.info("Entering testScanDataByIndex."); Table table = null; ResultScanner scanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, "Li Gang".getBytes()); Scan scan = new Scan(); scan.setFilter(filter); scanner = table.getScanner(scan); LOG.info("Scan indexed data."); for (Result result : scanner) { for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data by index successfully."); } catch (IOException e) { LOG.error("Scan data by index failed."); } finally { if (scanner != null) { // Close the scanner object. scanner.close(); } try { if (table != null) { table.close(); } } catch (IOException e) { LOG.error("Close table failed."); } } LOG.info("Exiting testScanDataByIndex."); }
  • 功能介绍 针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。 HIndex支持的Filter类型为“SingleColumnValueFilter”,“SingleColumnValueExcludeFilter”以及“SingleColumnValuePartitionFilter”。 HIndex支持的Comparator为“BinaryComparator”,“BitComparator”,“LongComparator”,“DecimalComparator”,“DoubleComparator”,“FloatComparator”,“IntComparator”,“NullComparator”。 二级索引的使用规则如下: 针对某一列或者多列创建了单索引的场景下: 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1) 针对多个列创建的联合索引场景下: 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1)) 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。
  • 场景说明 访问安全集群环境中的服务,需要先通过Kerberos安全认证。所以Kudu应用程序中需要有安全认证代码,确保Kudu程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交Kudu应用程序运行前,在Kudu客户端执行如下命令进行认证。 kinit 组件业务用户 该方式仅适用于Linux操作系统,且安装了Kudu的客户端。 代码认证: 通过获取客户端的principal配置文件和keytab文件进行认证。
  • 在Linux调测程序 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。 运行Consumer样例工程的命令如下。 java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.Consumer
  • ClickHouse应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 ClickHouse应用程序开发流程 表1 ClickHouse应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,ClickHouse的应用程序支持多种语言开发,推荐使用Java语言,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备ClickHouse应用开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群信息文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts 域名 信息。 准备ClickHouse应用运行环境 配置并导入样例工程 ClickHouse提供了不同场景下的样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 导入并配置ClickHouse样例工程 根据业务场景开发程序 提供样例工程,帮助用户快速了解ClickHouse各部件的编程接口。 开发ClickHouse应用 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 在本地Windows环境中调测ClickHouse应用 在Linux环境中调测ClickHouse应用 父主题: ClickHouse应用开发简介
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式(强烈建议采用预分Region建表方式): 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 注意事项 注[1] 可以设置列族的压缩方式,代码片段如下: //设置编码算法,HBase提供了DIFF,FAST_DIFF,PREFIX三种编码算法 hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); //设置文件压缩方式,HBase默认提供了GZ和SNAPPY两种压缩算法 //其中GZ的压缩率高,但压缩和解压性能低,适用于冷数据 //SNAPPY压缩率低,但压缩解压性能高,适用于热数据 //建议默认开启SNAPPY压缩 hcd.setCompressionType(Compression.Algorithm.SNAPPY); 注[2] 可以通过指定起始和结束RowKey,或者通过RowKey数组预分Region两种方式建表,代码片段如下: // 创建一个预划分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits);
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testCreateTable方法中。 public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(tableName);(1) // Set the column family name to info. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"));(2) // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY);//注[1] htd.setColumnFamily(hcd.build()); (3) Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); (4) if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd.build());//注[2] (5) LOG.info(admin.getClusterMetrics().toString()); LOG.info(admin.listNamespaceDescriptors().toString()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
  • 开发思路 通过典型场景,可以快速学习和掌握Kudu的开发过程,并对关键的接口函数有所了解。 作为存储引擎,通常情况下Kudu会和计算引擎一起协同工作: 首先在计算引擎上(比如Impala)用SQL语句创建表对象; 然后通过Kudu的驱动往这个表里写数据; 在计算引擎上直接查询这个表里的数据。 在本开发程序示例中,为了不引入额外的计算引擎,将以Kudu为主,全部通过Java API接口来进行描述: 建立Kudu连接 创建Kudu表 写Kudu数据 修改Kudu表 删除Kudu表
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Strom Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Strom Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Strom Bolt 4 创建topology 请参见创建Strom Topology 部分代码请参考开发Storm应用,完整代码请参考Strom-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • Oozie应用开发常用概念 流程定义文件 描述业务逻辑的XML文件,包括“workflow.xml”、“coordinator.xml”、“bundle.xml”三类,最终由Oozie引擎解析并执行。 流程属性文件 流程运行期间的参数配置文件,对应文件名为“job.properties”,每个流程定义有且仅有一个该属性文件。 Client 客户端直接面向用户,可通过Java API、Shell API、 REST API或者Web UI访问Oozie服务端。 Oozie WebUI界面 通过“https://Oozie服务器IP地址:21003/oozie”登录Oozie WebUI界面。 父主题: Oozie应用开发概述
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见创建Hive用户自定义函数。
共100000条