云服务器内容精选

  • Flink WebUI应用流程 Flink WebUI应用流程参考如下步骤: 图1 Flink WebUI应用流程 表2 Flink WebUI应用流程说明 阶段 说明 参考章节 创建应用 通过应用来隔离不同的上层业务。 创建FlinkServer应用 创建集群连接 通过集群连接配置访问不同的集群。 创建FlinkServer集群连接 创建数据连接 通过数据连接,访问不同的数据服务,包括HDFS、Kafka等。 创建FlinkServer数据连接 创建流表 通过数据表,定义源表、维表、输出表的基本属性和字段信息。 创建FlinkServer流表源 创建SQL/JAR作业(流式/批作业) 定义Flink作业的API,包括Flink SQL和Flink Jar作业。 创建FlinkServer作业 作业管理 管理创建的作业,包括作业启动、开发、停止、删除和编辑等。 创建FlinkServer作业
  • Flink WebUI特点 Flink WebUI主要有以下特点: 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 “数据连接类型”为“Kafka”时,认证类型不支持“KERBEROS”。 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 图形化作业管理:简单易用。
  • Flink WebUI关键能力 Flink WebUI关键能力如表1: 表1 Flink WebUI关键能力 关键能力分类 描述 批流一体 支持一套FlinkSQL定义批作业和流作业。 Flink SQL内核能力 Flink SQL支持自定义大小窗、24小时以内流计算、超出24小时批处理。 FlinkSQL支持Kafka、HDFS读取;支持写入Kafka和HDFS。 支持同一个作业定义多个FlinkSQL,多个指标合并在一个作业计算。当一个作业是相同主键、相同的输入和输出时,该作业支持多个窗口的计算。 支持AVG、SUM、COUNT、MAX和MIN统计方法。 Flink SQL可视化定义 集群连接管理,配置Kafka、HDFS等服务所属的集群信息。 数据连接管理,配置Kafka、HDFS等服务信息。 数据表管理,定义Sql访问的数据表信息,用于生成DDL语句。 FlinkSQL作业定义,根据用户输入的Sql,校验、解析、优化、转换成Flink作业并提交运行。 Flink作业可视化管理 支持可视化定义流作业和批作业。 支持作业资源、故障恢复策略、Checkpoint策略可视化配置。 流作业和批作业的状态监控。 Flink作业运维能力增强,包括原生监控页面跳转。 性能&可靠性 流处理支持24小时窗口聚合计算,毫秒级性能。 批处理支持90天窗口聚合计算,分钟级计算完成。 支持对流处理和批处理的数据进行过滤配置,过滤无效数据。 读取HDFS数据时,提前根据计算周期过滤。 作业定义平台故障、服务降级,不支持再定义作业,但是不影响已有作业计算。 作业故障有自动重启机制,重启策略可配置。
  • 缓冲区超时设置 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例如下: env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  • 解决办法 从 MRS 上下载用户的keytab认证文件,并放置到Flink客户端所在节点的某个目录下。 在“flink-conf.yaml”文件中配置: keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab “/home/flinkuser/keytab/abc222.keytab”表示的是用户目录,为1中放置目录。 请确保客户端用户具备对应目录权限。 principal名。 security.kerberos.login.principal: abc222 对于HA模式,如果配置了ZooKeeper,还需要设置ZooKeeper Kerberos认证相关的配置。 zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka Client和Kafka Broker之间也需要做Kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient
  • 原因分析 在安全集群环境下,Flink需要进行安全认证。当前客户端未进行相关安全认证设置。 Flink整个系统有两种认证方式: 使用kerberos认证:Flink yarn client、Yarn Resource Manager、JobManager、HDFS、TaskManager、Kafka和Zookeeper。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 如果用户安装安全集群需要使用kerberos认证和security cookie认证。根据日志提示,发现配置文件中“security.kerberos.login.keytab :”配置项错误,未进行安全配置。
  • 问题背景与现象 客户端安装成功,执行客户端命令例如yarn-session.sh时报错,提示如下: [root@host01 bin]# yarn-session.sh 2018-10-25 01:22:06,454 | ERROR | [main] | Error while trying to split key and value in configuration file /opt/flinkclient/Flink/flink/conf/flink-conf.yaml:80: "security.kerberos.login.keytab: " | org.apache.flink.configuration.GlobalConfiguration (GlobalConfiguration.java:160) Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Error while parsing YAML configuration file :80: "security.kerberos.login.keytab: "
  • Flink WebUI应用流程 Flink WebUI应用流程参考如下步骤: 图3 Flink WebUI应用流程 表2 Flink WebUI应用流程说明 阶段 说明 参考章节 创建应用 通过应用来隔离不同的上层业务。 创建FlinkServer应用 创建集群连接 通过集群连接配置访问不同的集群。 创建FlinkServer集群连接 创建数据连接 通过数据连接,访问不同的数据服务,包括HDFS、Kafka等。 创建FlinkServer数据连接 创建流表 通过数据表,定义源表、维表、输出表的基本属性和字段信息。 创建FlinkServer流表源 创建SQL/JAR作业(流式/批作业) 定义Flink作业的API,包括Flink SQL和Flink Jar作业。 如何创建FlinkServer作业 作业管理 管理创建的作业,包括作业启动、开发、停止、删除和编辑等。 如何创建FlinkServer作业
  • Flink WebUI关键能力 FlinkWebUI关键能力如表1: 表1 Flink WebUI关键能力 关键能力分类 描述 批流一体 支持一套FlinkSQL定义批作业和流作业。 Flink SQL内核能力 Flink SQL支持自定义大小窗、24小时以内流计算、超出24小时批处理。 FlinkSQL支持Kafka、HDFS读取;支持写入Kafka和HDFS。 支持同一个作业定义多个FlinkSQL,多个指标合并在一个作业计算。当一个作业是相同主键、相同的输入和输出时,该作业支持多个窗口的计算。 支持AVG、SUM、COUNT、MAX和MIN统计方法。 Flink SQL可视化定义 集群连接管理,配置Kafka、HDFS等服务所属的集群信息。 数据连接管理,配置Kafka、HDFS等服务信息。 数据表管理,定义Sql访问的数据表信息,用于生成DDL语句。 FlinkSQL作业定义,根据用户输入的Sql,校验、解析、优化、转换成Flink作业并提交运行。 Flink作业可视化管理 支持可视化定义流作业和批作业。 支持作业资源、故障恢复策略、Checkpoint策略可视化配置。 流作业和批作业的状态监控。 Flink作业运维能力增强,包括原生监控页面跳转。 性能&可靠性 流处理支持24小时窗口聚合计算,毫秒级性能。 批处理支持90天窗口聚合计算,分钟级计算完成。 支持对流处理和批处理的数据进行过滤配置,过滤无效数据。 读取HDFS数据时,提前根据计算周期过滤。 作业定义平台故障、服务降级,不支持再定义作业,但是不影响已有作业计算。 作业故障有自动重启机制,重启策略可配置。
  • Flink WebUI特点 Flink WebUI主要有以下特点: 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 图1 可视化运维 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 “数据连接类型”为“Kafka”时,认证类型不支持“KERBEROS”。 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 图形化作业管理:简单易用。 图2 图形化作业管理
  • Flink客户端执行命令报错security.kerberos.login.keytab 客户端安装成功,执行客户端命令例如yarn-session.sh时报错,提示如下: [root@host01 bin]# yarn-session.sh 2018-10-25 01:22:06,454 | ERROR | [main] | Error while trying to split key and value in configuration file /opt/flinkclient/Flink/flink/conf/flink-conf.yaml:80: "security.kerberos.login.keytab: " | org.apache.flink.configuration.GlobalConfiguration (GlobalConfiguration.java:160) Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Error while parsing YAML configuration file :80: "security.kerberos.login.keytab: " 在安全集群环境下,Flink需要进行安全认证。当前客户端未进行相关安全认证设置。 Flink整个系统有两种认证方式: 使用kerberos认证:Flink yarn client、Yarn Resource Manager、JobManager、HDFS、TaskManager、Kafka和Zookeeper。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 如果用户安装安全集群需要使用kerberos认证和security cookie认证。根据日志提示,发现配置文件中“security.kerberos.login.keytab :”配置项错误,未进行安全配置。 解决方法如下: 从MRS上下载用户的keytab认证文件,并放置到Flink客户端所在节点的某个目录下。 在“flink-conf.yaml”文件中配置: keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab “/home/flinkuser/keytab/abc222.keytab”表示的是用户目录,为1中放置目录。 请确保客户端用户具备对应目录权限。 principal名。 security.kerberos.login.principal: abc222 对于HA模式,如果配置了ZooKeeper,还需要设置ZooKeeper Kerberos认证相关的配置。 zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka Client和Kafka Broker之间也需要做Kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient
  • 使用不同用户执行yarn-session创建Flink集群失败 使用Flink过程中,具有两个相同权限用户testuser和bdpuser。使用用户testuser创建Flink集群正常,但是切换至bdpuser用户创建Fllink集群时,执行yarn-session.sh命令报错: 2019-01-02 14:28:09,098 | ERROR | [main] | Ensure path threw exception | org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl (CuratorFrameworkImpl.java:566) org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /flink/application_1545397824912_0022 原因是高可用配置项未修改。由于在Flink的配置文件中,“high-availability.zookeeper.client.acl”默认为“creator”,仅创建者有权限访问,新用户无法访问ZooKeeper上的目录导致yarn-session.sh执行失败。 解决方法如下: 修改客户端配置文件“conf/flink-conf.yaml”中配置项“high-availability.zookeeper.path.root”,例如: high-availability.zookeeper.path.root: flink2 重新提交Flink任务。
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime(ROWS number PRECEDING) |(RANGE (BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW | UNBOUNDED preceding)) ) 语法说明 表3 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 同一select里所有聚合函数定义的窗口都必须保持一致。 当前Over窗口只支持前向计算(preceding),不支持following计算。 必须指定ORDER BY 按processing time或event time。 不支持对常量做聚合操作,如sum(2)。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: time_attr可以设置processing-time或者event-time。 time_attr设置为event-time时参数类型为bigint或者timestamp类型。 time_attr设置为processing-time时无需指定类型。 interval设置窗口周期。 分组函数 表1 分组函数表 函数名 说明 TUMBLE(time_attr, interval) 跳跃窗口。 HOP(time_attr, interval, interval) 拓展的跳跃窗口(等价于datastream的滑动窗口),可以分别设置输出触发周期和窗口周期。 SESSION(time_attr, interval) 会话窗口,interval表示多长时间没有记录则关闭窗口。 窗口函数 表2 窗口函数表 函数名 说明 TUMBLE_START(time_attr, interval) 返回跳跃窗口开始时间。为UTC时区。 TUMBLE_END(time_attr, interval) 返回跳跃窗口结束时间。为UTC时区。 HOP_START(time_attr, interval, interval) 返回拓展的跳跃窗口开始时间。为UTC时区。 HOP_END(time_attr, interval, interval) 返回拓展的跳跃窗口结束时间。为UTC时区。 SESSION_START(time_attr, interval) 返回会话窗口开始时间。为UTC时区。 SESSION_END(time_attr, interval) 返回会话窗口结束时间。为UTC时区。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • 配置Event Time Event Time是指事件产生的时间,即数据产生时自带时间戳。 语法格式 1 2 3 CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY {attr_name}.rowtime SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval}); 语法说明 设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。 由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到 DLI 服务,中间有一个过程。 Watermark有两种设置策略: 按时间周期 1 SET WATERMARK(range interval {time_unit}, interval {time_unit}) 按事件个数 1 SET WATERMARK(rows literal, interval {time_unit}) 一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。 注意事项 无。 示例 time2事件产生时间开始,每10s发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores; 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;