云服务器内容精选

  • 上传UDF 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“UDF管理”进入UDF管理页面。 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。 填写UDF名称以及描述信息后,单击“确定”。 “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。 (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业。
  • UDF java代码及SQL样例 UDF java使用样例 package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } } UDF SQL使用样例 CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
  • UDF java代码及SQL样例 UDF java使用样例 package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } } UDF SQL使用样例 CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
  • 上传UDF至FlinkServer 准备UDF jar文件,大小不能超过200MB。 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“UDF管理”进入UDF管理页面。 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。 填写UDF名称以及描述信息后,单击“确定”。 “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。 (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考如何创建FlinkServer作业。
  • 使用示例 自定义connector依赖 参考上传依赖包上传自定义connector依赖。 如上传依赖名称为“kafka”,自定义connector jar包名称为“flink-connector-kafka-customization.jar”。 参考如何创建FlinkServer作业新建SQL作业,该SQL中的“connector”需填写为对应的依赖名称,如'connector' = 'kafka'。 CREATE TABLE KafkaSinkTable (`user_id` INT, `name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink6', 'properties.bootstrap.servers' = '192.168.20.134:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TABLE datagen (`user_id` INT, `name` VARCHAR) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.user_id.kind' = 'sequence', 'fields.user_id.start' = '1', 'fields.user_id.end' = '1000' ); insert INTO KafkaSinkTable select * from datagen;
  • 上传依赖包 登录 FusionInsight Manager,访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“依赖管理”进入依赖管理页面。 单击“添加依赖”,可参考如下添加依赖。 表1 添加依赖 参数 描述 示例 是否自定义connector 是否自定义connector,根据实际需求选择: 是:文件为自定义connector依赖包。 否:文件为非自定义connector依赖包。 是 名称 添加的依赖名称,需与上传的依赖包中connector的连接名一致。不支持上传同名依赖包。 kafka 注册jar jar包的上传方式: 上传文件:添加本地的jar包 指定路径:已准备好的依赖文件的HDFS路径 上传文件 上传文件 注册jar选择为“上传文件”时,需通过该项上传本地jar文件。 - 指定路径 注册jar选择为“指定路径”时,需通过该项输入依赖文件的HDFS路径(需提前准备好jar包上传至HDFS)。 /flink_upload_test/flink-connector-kafka-customization.jar 描述信息 添加的依赖的描述信息。 - 单击“确定”。