华为云用户手册

  • D CS 过期Key扫描机制 基于开源Redis以上机制,分布式缓存服务提供了一种通用的方式,来定时释放所有已经过期Key占用的内存,通过自行配置定时任务,在任务执行期间,会对所有缓存实例的主节点进行扫描操作,扫描操作会遍历整个实例的键空间,触发Redis引擎中对Key过期的判断,从而释放已过期的Key。 只有Redis 4.0、Redis 5.0、和Redis 6.0基础版实例支持过期key扫描。 建议在业务低峰时段执行过期Key扫描,降低CPU被用满的可能。 不支持查询已释放的过期Key。
  • 自动扫描参数配置 在执行过期key扫描时,若您想设置自动扫描,单击“自动扫描”右侧的,弹出“自动扫描设置”页面,进行相应设置后,单击“确定”自动扫描配置完成。 自动扫描配置参数说明如下表1。 表1 自动扫描配置参数 配置参数 参数含义 取值范围 默认值 备注 首次扫描时间 设定的第一次扫描时间,须设定在当前时间之后。 取值格式:YYYY/MM/DD hh:mm:ss - - 扫描间隔 从首次扫描时间开始,每隔一个间隔时间,便启动一次扫描。 0~43,200,单位:分 1440 如果到达启动时刻,上一次扫描还未结束,则本次轮空。 启动扫描的时机有五分钟冗余量,即超过本次启动时刻,不足五分钟,仍然会启动,不至于轮空。 说明: 连续扫描可能使cpu占用率较高,建议根据实例中key总量以及key增长情况来配置,可参考下面性能说明和配置建议。 扫描超时 此参数的目的在于避免不可知原因造成的扫描超时,导致后面的定时任务无法执行。设定此参数,超过超时时间后,返回失败,以便能继续进行下一轮扫描。 1~86,400,单位:分 2880 时间不少于扫描间隔时间的2倍。 可根据每次过期key扫描的时间,以及使用场景所能承受的最大超时时间,设定一个经验值。 迭代扫描key数量 SCAN命令用于迭代当前数据库中的key集合。 COUNT选项的作用就是让用户告知迭代命令, 在每次迭代中应该从数据集里返回多少元素。具体参见scan命令介绍。迭代式扫描可降低一次扫描过多key而造成扫描时间过长,影响redis性能的问题。 10~1,000,单位:个 10 举例:redis中有1000万个key,迭代扫描key数量设为1000,则迭代10000次可完成全库扫描。 性能说明: 数据面底层SCAN扫描间隔5ms,相当于1秒钟扫描200次。迭代扫描key数量设为10/50/100/1000时,每秒钟扫描2000/10000/20000/200000个key。 每秒钟扫描key数量越大,cpu占用率也相应增加。 测试参考: 使用主备实例测试,在有1000万不过期和500万过期的key,过期时间为1-10秒的场景下,完成一次全库扫描,测试数据如下: 以下测试结果仅供参考,不同局点环境和网络波动等客观条件可能产生差异。 自然删除,每秒删除1万条过期key,删除500万过期key,耗时约为8分钟,cpu占用率约为5%。 “迭代扫描key数量”设为10,耗时约为 1500万/0.2万/60秒 = 125分,cpu占用率约为8%。 “迭代扫描key数量”设为50,耗时约为 1500万/1万/60秒 = 25分, 删除key时cpu占用率约10%。 “迭代扫描key数量”设为100,耗时约为 1500万/2万/60秒 = 12.5分, 删除key时cpu占用率约20%。 “迭代扫描key数量”设为1000,耗时约为 1500万/20万/60秒 = 1.25分,删除key时cpu占用率约为25%。 建议配置:
  • DCS的Memcached过期数据清除策略是什么? DCS的Memcached作为缓存产品是允许用户根据业务需求设置在其中存放数据的过期时间的。例如在执行add操作的时候可以设置expire过期时间。 DCS的Memcached默认策略为不逐出(noeviction)。支持通过修改Memcached实例配置参数(maxmemory-policy)修改实例的数据逐出策略。 Memcached实例支持的数据逐出策略和配置参数的方式,请参考修改DCS实例配置参数。 父主题: Memcached使用
  • 操作流程 以B账号将委托分配给 IAM 用户进行管理为例,说明使用IAM的用户授权功能,实现分配委托以及对委托进行精细授权的操作方法,委托权限分配完成后,B账号中的IAM用户通过切换角色的方式,可以切换到A账号中,管理委托方授权的资源。B账号需要提前获取委托公司的华为账号名称、所创建的委托名称以及委托的ID。 创建用户组并授权。 B账号在 统一身份认证 服务左侧导航窗格中,单击“用户组”。 在“用户组”界面中,单击“创建用户组”。 输入“用户组名称”,例如“委托管理”。 单击“确定”。 返回用户组列表,用户组列表中显示新创建的用户组。 单击新建用户组右侧的“授权”,进入授权界面。 如果需要用户仅管理一个特定的委托,请执行以下步骤对委托进行精细授权。 如果需要用户管理所有委托,请跳过该步骤,直接执行下一步。 在选择策略页面,单击权限列表右上角“新建策略”。 输入“策略名称”,例如“管理A公司的委托1”。 “策略配置方式”选择“JSON视图”。 在“策略内容”区域,填入以下内容: { "Version": "1.1", "Statement": [ { "Action": [ "iam:agencies:assume" ], "Resource": { "uri": [ "/iam/agencies/b36b1258b5dc41a4aa8255508xxx..." ] }, "Effect": "Allow" } ] } "b36b1258b5dc41a4aa8255508xxx..."需要替换为待授权委托的ID,需要提前向委托方获取,其他内容不需修改,直接拷贝即可。 单击“下一步”,继续完成授权。 选择上一步中创建的自定义策略“管理A公司的委托1”,或者“Agent Operator”权限。 自定义策略:用户仅能管理指定ID的委托,不能管理其他委托。 “Agent Operator”权限:用户可以管理所有委托。 选择授权作用范围。 单击“确定”。 创建用户并加入用户组。 在统一身份认证服务左侧导航窗格中,单击“用户” 在“用户”界面,单击“创建用户”。 在“创建用户”界面,输入“用户名”“邮箱”。 “访问方式”选择“管理控制台访问”。 “凭证类型”选择“首次登录时设置”。 “登录保护”选择“开启”,并选择身份验证方式,单击“下一步”。 在“加入用户组”页面,选择步骤2中创建的用户组“委托管理”,单击“创建用户”。 切换角色。 使用步骤3创建的用户,使用“IAM用户登录”方式,登录华为云。登录方法,请参见:IAM用户登录。 在控制台页面,右上方的用户名中,选择“切换角色”。 图1 切换角色 在“切换角色”页面中,输入委托方的账号名称,输入账号名称后,系统将会按照顺序自动匹配委托名称。 如果自动匹配的是没有授权的委托,系统将提示没有权限访问,可以删除委托名称,在下拉框中选择已授权的委托名称。 单击“确定”,切换至委托方账号中。
  • 被委托方跨账号管理 当A账号与B账号创建委托关系后,即B账号为被委托方,B账号通过切换角色的方法,可以切换到A账号中,管理委托方授权的资源。B账号需要提前获取A账号的华为账号名称以及所创建的委托名称。 B账号登录华为云,进入控制台。 在右上方的用户名中,选择“切换角色”。 图3 切换角色 在“切换角色”页面中,输入委托方的账号名称,输入账号名称后,系统将会按照顺序自动匹配委托名称。 图4 切换角色 单击“确定”,切换至委托方A账号中。
  • 解决方案 针对以上企业需求,可以使用IAM的委托功能来实现跨账号的资源授权与管理。 A账号在IAM控制台创建一个委托,指定委托的使用者为B账号,并将需要代运维的资源授权给这个委托。 B账号进一步授权,将A账号委托的资源分配给账号下专职管理委托的IAM用户,让IAM用户帮助管理。 当合作关系发生变更时,A账号随时可以修改或者删除这个委托,B账号以及账号下可以管理该委托的用户对该委托的使用权限将自动修改或者撤销。 图1 跨账号授权模型
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略一来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locator所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个......)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 - HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 如果需要对colocation上传的文件做balance操作,为避免colocation失效,可以通过 MRS Manager界面中的oi.dfs.colocation.file.pattern参数进行设置,设置该参数值为对应数据文件块的路径,多个路径之间以逗号分开。例如/test1,/test2。
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要将HDFS用户绑定supergroup用户组。 初始化 使用Colocation前需要进行kerberos安全认证。 private static void init() throws IOException { LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 public static void main(String[] args) throws IOException { init(); dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); createGroup(); put(); delete(); deleteGroup(); dfs.close(); dfsAdmin.close(); } 创建group 样例:创建一个gid01组,组中包含3个locator。 private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path("/testfile.txt"), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入到HDFS的数据. byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 public static void delete() throws IOException { dfs.delete(new Path("/testfile.txt")); } 删除group 样例:删除gid01。 private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • Client Alluxio Client主要包括三种方式:Java API、Shell、HTTP REST API。 Java API 提供Alluxio文件系统的应用接口,本开发指南主要介绍如何使用Java API进行Alluxio客户端的开发。 Shell 提供shell命令完成Alluxio文件系统的基本操作。 HTTP REST API 提供除Shell、Java API以外的其他接口,可通过此接口查询信息,具体请参考Alluxio API接口介绍。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的addIndicesExample方法中: addIndices(): 将索引添加到没有数据的表中 public void addIndicesExample() { LOG .info("Entering Adding a Hindex."); // Create index instance TableIndices tableIndices = new TableIndices(); HIndexSpecification spec = new HIndexSpecification(indexNameToAdd); spec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.STRING); tableIndices.addIndex(spec); Admin admin = null; HIndexAdmin iAdmin = null; try { admin = conn.getAdmin(); iAdmin = HIndexClient.newHIndexAdmin(admin); // add index to the table iAdmin.addIndices(tableName, tableIndices); // Alternately, add the specified indices with data // iAdmin.addIndicesWithData(tableName, tableIndices); LOG.info("Successfully added indices to the table " + tableName); } catch (IOException e) { LOG.error("Add Indices failed for table " + tableName + "." + e); } finally { if (iAdmin != null) { try { // Close the HIndexAdmin object. iAdmin.close(); } catch (IOException e) { LOG.error("Failed to close HIndexAdmin ", e); } } if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting Adding a Hindex."); } 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的addIndicesExampleWithData方法中: addIndicesWithData():将索引添加到具有大量预先存在数据的表中 public void addIndicesExampleWithData() { LOG.info("Entering Adding a Hindex With Data."); // Create index instance TableIndices tableIndices = new TableIndices(); HIndexSpecification spec = new HIndexSpecification(indexNameToAdd); spec.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.STRING); tableIndices.addIndex(spec); Admin admin = null; HIndexAdmin iAdmin = null; try { admin = conn.getAdmin(); iAdmin = HIndexClient.newHIndexAdmin(admin); // add index to the table iAdmin.addIndicesWithData(tableName, tableIndices); // Alternately, add the specified indices with data // iAdmin.addIndicesWithData(tableName, tableIndices); LOG.info("Successfully added indices to the table " + tableName); } catch (IOException e) { LOG.error("Add Indices failed for table " + tableName + "." + e); } finally { if (iAdmin != null) { try { // Close the HIndexAdmin object. iAdmin.close(); } catch (IOException e) { LOG.error("Failed to close HIndexAdmin ", e); } } if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting Adding a Hindex With Data."); }
  • Kafka应用开发常用概念 Topic Kafka维护的同一类的消息称为一个Topic。 Partition 每一个Topic可以被分为多个Partition,每个Partition对应一个可持续追加的、有序不可变的log文件。 Producer 将消息发往Kafka topic中的角色称为Producer。 Consumer 从Kafka topic中获取消息的角色称为Consumer。 Broker Kafka集群中的每一个节点服务器称为Broker。 父主题: Kafka应用开发概述
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Impala表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Impala数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Impala数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见分析Impala数据。
  • 常用CLI Flink常用的CLI如下所示: yarn-session.sh 可以使用yarn-session.sh启动一个常驻的Flink集群,接受来自客户端提交的任务。启动一个有3个TaskManager实例的Flink集群示例如下: bin/yarn-session.sh -n 3 yarn-session.sh的其他参数可以通过以下命令获取: bin/yarn-session.sh -help Flink 使用flink命令可以提交Flink作业,作业既可以被提交到一个常驻的Flink集群上,也可以使用单机模式运行。 提交到常驻Flink集群上的一个示例如下: bin/flink run examples/streaming/WindowJoin.jar 用户在用该命令提交任务前需要先用yarn-session启动Flink集群。 以单机模式运行作业的一个示例如下: bin/flink run -m yarn-cluster -yn 2 examples/streaming/WindowJoin.jar 通过参数-m yarn-cluster使作业以单机模式运行,-yn表示TaskManager的数量。 flink脚本的其他参数可以通过以下命令获取: bin/flink --help
  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -n 3 -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar 如果yarn-session.sh不使用-z配置特定的zookeeper的namespace,则在使用flink run时不要使用-yz指定特定的zookeeper的namespace。 举例: bin/yarn-session.sh -n 3 bin/flink run examples/streaming/WindowJoin.jar 如果使用flink run -m yarn-cluster时启动集群则可以使用-yz指定一个zookeeper的namespace。 不能同时启动两个或两个以上的集群来共享一个namespace。 用户在启动集群或提交作业时如果使用了-z配置项,则在删除、停止及查询作业、触发savepoint时也要使用-z配置项指明namespace。
  • 问题 使用运行的Spark Streaming任务回写kafka时,kafka上接收不到回写的数据,且kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从kafka中读取数据,执行对应处理之后,然后将结果数据回写至kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将kafka的阈值调大,建议在MRS Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 功能分解 根据上述场景进行功能分解,如表1所示。 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考创建Storm Spout,创建Storm Bolt,创建Storm Topology,完整代码请参考Storm-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见开发Hive用户自定义函数。
  • Presto应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Presto应用程序开发流程 表1 Presto应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Presto的基本概念。 Presto应用开发常用概念 准备开发和运行环境 Presto的应用程序支持使用Java进行开发。推荐使用Eclipse工具,请根据指导完成开发环境配置。 Presto应用开发环境简介 根据场景开发工程 提供了Java语言的样例工程和数据查询的样例工程。 Presto样例程序开发思路 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 JDBC客户端运行及结果查看 父主题: Presto应用开发概述
  • 简介 Storm是一个分布式的、可靠的、容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm的目标是提供对大数据流的实时处理,可以可靠地处理无限的数据流。 Storm有很多适用的场景:实时分析、在线机器学习、持续计算和分布式ETL等,易扩展、支持容错,可确保数据得到处理,易于构建和操控。 Storm有如下几个特点: 适用场景广泛 易扩展,可伸缩性高 保证无数据丢失 容错性好 多语言 易于构建和操控
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testGet方法中 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = {Bytes.toBytes("name"), Bytes.toBytes("address")}; // Specify RowKey. byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的scanDataByHIndex方法中 public void scanDataByHIndex() { LOG.info("Entering HIndex-based Query."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes("26")); filter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(filter); rScanner = table.getScanner(scan); // Scan the data LOG.info("Scan data using indices.."); for (Result result : rScanner) { LOG.info("Scanned row is:"); for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Successfully scanned data using indices for table " + tableName + "."); } catch (IOException e) { LOG.error("Failed to scan data using indices for table " + tableName + "." + e); } finally { if (rScanner != null) { rScanner.close(); } if (table != null) { try { table.close(); } catch (IOException e) { LOG.error("failed to close table, ", e); } } } LOG.info("Entering HIndex-based Query."); }
  • 在Linux中调测Hive JDBC应用 执行mvn package生成jar包,在工程目录target目录下获取,比如:hive-examples-1.0.jar。 在运行调测环境上创建一个目录作为运行目录,如“/opt/hive_examples”(Linux环境),并在该目录下创建子目录“conf”。 将1导出的hive-examples-1.0.jar拷贝到“/opt/hive_examples”下。 将客户端下的配置文件拷贝到“conf”下,开启Kerberos认证的安全集群下把从5获取的user.keytab和krb5.conf拷贝到的/opt/hive_examples/conf下,未开启Kerberos认证集群可不必拷贝user.keytab和krb5.conf文件。复制${HIVE_HOME}/../config/hiveclient.properties文件到/opt/hive_examples/conf目录下。 cd /opt/hive_examples/conf cp /opt/client/Hive/config/hiveclient.properties . 准备样例程序相关依赖jar包。 在调测环境上创建一个目录作为存放依赖jar包的目录,如"/opt/hive_examples/lib"(Linux环境),将${HIVE_HOME}/lib/下面的包全部复制到该目录下,然后删除里面的derby-10.10.2.0.jar(jar包版本号以实际为准)。 mkdir /opt/hive_examples/lib cp ${HIVE_HOME}/lib/* /opt/hive_examples/lib rm -f /opt/hive_examples/lib/derby-10.10.2.0.jar 在Linux环境下执行如下命令运行样例程序。 chmod +x /opt/hive_examples -R cd /opt/hive_examples source /opt/client/bigdata_env java -cp .:hive-examples-1.0.jar:/opt/hive_examples/conf:/opt/hive_examples/lib/*:/opt/client/HDFS/hadoop/lib/* com.huawei.bigdata.hive.example.ExampleMain 在命令行终端查看样例代码中的HiveQL所查询出的结果。 Linux环境运行成功结果会有如下信息。 Create table success! _c0 0 Delete table success! 父主题: 调测Hive应用
  • 操作步骤 登录linux环境,创建运行OpenTSDB样例的工作目录,比如“/opt/opentsdb-example”,配置文件存放目录,比如“/opt/opentsdb-example/conf”,并编辑配置文件“/opt/opentsdb-example/conf/opentsdb.properties”使其对应于实际环境中的信息。 mkdir -p /opt/opentsdb-example/conf [root@node-master1rLqO ~]# cat /opt/opentsdb-example/conf/opentsdb.properties tsd_hostname = node-ana-corejnWt tsd_port = 4242 tsd_protocol = https 执行mvn package生成jar包,在工程目录target目录下获取,比如:opentsdb-examples-mrs-xxx-jar-with-dependencies.jar,其中mrs-xxx表示MRS的版本号,将获取的包上传到“/opt/opentsdb-example”目录下。 执行Jar包。 加载环境变量。 source /opt/client/bigdata_env 认证集群用户(未启用kerberos的集群可跳过此步骤)。 人机用户:kinit kerberos用户 机机用户: kinit -kt 认证文件路径 kerberos用户 运行opentsdb样例程序。 java -cp /opt/opentsdb-example/conf:/opt/opentsdb-example/opentsdb-examples-mrs-xxx-jar-with-dependencies.jar com.huawei.bigdata.opentsdb.examples.OpentsdbExample
  • 基于API的Glob路径模式以获取LocatedFileStatus和从FileStatus打开文件 在DistributedFileSystem中添加了以下API,以获取具有块位置的FileStatus,并从FileStatus对象打开文件。这些API将减少从客户端到Namenode的RPC调用的数量。 表6 FileSystem API接口说明 Interface接口 Description说明 public LocatedFileStatus[] globLocatedStatus(Path, PathFilter, boolean) throws IOException 返回一个LocatedFileStatus对象数组,其对应文件路径符合路径过滤规则。 public FSDataInputStream open(FileStatus stat) throws IOException 如果stat对象是LocatedFileStatusHdfs的实例,该实例已具有位置信息,则直接创建InputStream而不联系Namenode。
  • 操作步骤 安装Eclipse程序。安装要求Eclipse使用3.0及以上版本。 安装JDK程序。安装要求JDK使用1.7及或者1.8版本,支持IBM JDK和Oracle JDK。 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。
  • 操作步骤 按安装客户端时编译并运行HBase应用中的方式修改样例。 执行mvn package生成jar包,在工程目录target目录下获取,比如:hbase-examples-2.0.jar。 准备依赖的Jar包和配置文件。 在Linux环境新建目录,例如“/opt/test”,并创建子目录“lib”和“conf”。将集群中任一master节点“/opt/client/HBase/hbase/lib”目录下的jar包,以及2中导出的Jar包,上传到当前Linux环境新建目录“/opt/test”的“lib”目录下。将集群中任一master节点“/opt/client/HBase/hbase/conf”目录下的hbase-site.xml,hdfs-site.xml,core-site.xml文件拷贝到“/opt/test”中“conf”目录下。 将准备HBase应用开发用户中获取的krb5.conf和user.keytab文件拷贝上传至“/opt/test/conf”目录中,并新建hbaseclient.properties文件,文件中user.name对应新建的用户hbaseuser,userKeytabName和krb5ConfName路径对应从准备HBase应用开发用户中获取的认证相关文件名称(未开启Kerberos认证集群可跳过此步)。 user.name=hbaseuser userKeytabName=user.keytab krb5ConfName=krb5.conf 在“/opt/test”根目录新建脚本“run.sh”,修改内容如下并保存: 当前以com.huawei.bigdata.hbase.examples.TestMain为举例,具体以实际样例代码为准。 #!/bin/sh BASEDIR=`pwd` cd ${BASEDIR} for file in ${BASEDIR}/lib/*.jar do i_cp=$i_cp:$file echo "$file" done if [ -d ${BASEDIR}/lib/client-facing-thirdparty ]; then for file in ${BASEDIR}/lib/client-facing-thirdparty/*.jar do i_cp=$i_cp:$file done fi java -cp ${BASEDIR}/conf:${i_cp} com.huawei.bigdata.hbase.examples.TestMain 切换到“/opt/test”,执行以下命令,运行Jar包。 sh run.sh
  • JDBC客户端运行及结果查看 执行mvn clean compile assembly:single生成jar包,在工程目录target目录下获取,比如:presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar。 在运行调测环境上创建一个目录作为运行目录,如或“/opt/presto_examples”(Linux环境),并在该目录下创建子目录“conf”。 将1导出的presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar拷贝到“/opt/presto_examples”下。 开启Kerberos认证集群需要将4获取的user.keytab和krb5.conf拷贝到的/opt/presto_examples/conf下,并修改样例代码中conf目录下的presto.preperties。未开启Kerberos认证集群无须执行此步骤。 表1 presto.preperties参数说明 参数 说明 user 用于Kerberos认证的用户名,即准备Presto应用开发用户中创建的开发用户的用户名。 KerberosPrincipal 用于认证的名字,即认证准备Presto应用开发用户中创建的开发用户的用户名。 KerberosConfigPath krb5.conf的路径。 KerberosKeytabPath user.keytab的路径。 presto.preperties样例 user = prestouser SSL = true KerberosRemoteServiceName = HTTP KerberosPrincipal = prestouser KerberosConfigPath = /opt/presto_examples/conf/krb5.conf KerberosKeytabPath = /opt/presto_examples/conf/user.keytab 在Linux环境下执行运行样例程序。 chmod +x /opt/presto_examples -R cd /opt/presto_examples java -jar presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar 在命令行终端查看样例代码所查询出的结果。 Linux环境运行成功结果会有如下信息: NY Metro : 2 Mid Atlantic : 6 Mid Atlantic : 6 North Midwest : 1 North Midwest : 3 North Midwest : 7
共100000条