检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
具体操作请参考创建DLI程序包。 Flink1.15推荐配置OBS中的程序包,不推荐使用DLI程序包。Flink1.15以上版本将不再支持读取DLI程序包。 主类 指定加载的Jar包类名,如KafkaMessageStreaming。 默认:根据Jar包文件的Manifest文件指定。
当写入obs时必须填写该字段。 connector.partitioned-by 否 分区字段,多个字段以“,”分隔 示例 从kafka中读取数据以parquet的格式写到obs的bucketName桶下的fileName目录中。 create table kafkaSource(
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") 读取HBase上的数据 sparkSession.sql("select * from testhbase").show() 通过DataFrame
authenticationmode=aksk时必须配置 - 由于是对接DLI,所以servicename=dli。 catalog 否 dli 配置执行作业读取的元数据类型。 在“添加数据源配置”页面工具栏中单击“测试连接”,测试通过后,单击“保存”,填写数据源名称,保存该数据源。 目前没有根目录保存权限,需保存到已建文件夹目录下。
请求格式正确,但是由于含有语义错误,无法响应。 429 TooManyRequests 表明请求超出了客户端访问频率的限制或者服务端接收到多于它能处理的请求。建议客户端读取相应的Retry-After首部,然后等待该首部指出的时间后再重试。 500 InternalServerError 系统异常,表明服务端
描述,确认该算子具体功能,以进行下一步优化。 所有算子反压都正常(绿色),但存在数据堆积 该场景说明性能瓶颈点在Source,主要是受数据读取速度影响,此时可以通过增加Kafka分区数并增加source并发解决。 作业一个算子反压高(红色),而其后续的多个并行算子都不存在反压(绿色)
schema) 导入数据到OpenTSDB 1 dataFrame.write.insertInto("opentsdb_test") 读取OpenTSDB上的数据 1 2 3 4 5 6 7 jdbdDF = sparkSession.read .format("opentsdb")\
储位置,请在DLI管理控制台的“全局配置 > 工程配置”中配置桶信息。当作业完成后,系统会自动将结果存储到这个默认桶中。 使用DLI作业桶读取查询结果,需具备以下条件: 在DLI管理控制台“全局配置 > 工程配置”中完成作业桶的配置。作业桶配置请参考配置DLI作业桶。 提交工单申请开启查询结果写入桶特性的白名单。
用户自定义是否永久运行。当前示例选择为:否。 拉取数据超时时间 持续拉取数据多长时间超时,单位分钟。当前示例配置为:15。 等待时间 可选参数,超出等待时间还是无法读取到数据,则不再读取数据,单位秒。当前示例不配置该参数。 消费组ID 用户指定消费组ID。当前使用MRS Kafka默认的消息组ID:“example-group1”。
minute等。 connector.write.max-retries 否 写数据失败时的最大尝试次数,默认值为:3。 示例 从dis中读取数据,并将数据插入到数据库为flinktest、表名为test的ClickHouse数据库中。 创建dis数据源表disSource。 1
option("model","binary")”进行保存 如果需要指定数据过期时间:“.option("ttl",1000)”;秒为单位 读取redis上的数据 1 sparkSession.read.format("redis").option("host", host).option("port"
.option("collection",collection)\ .mode("Overwrite")\ .save() 读取Mongo上的数据 1 2 3 4 5 6 7 8 9 10 jdbcDF = sparkSession.read
number ARRAY array MAP / MULTISET object ROW object 示例 该示例是从kafka的一个topic中读取数据,并使用kafka sink将数据写入到kafka的另一个topic中。 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所
开始的毫秒数,long 值。 ARRAY 不支持 MAP / MULTISET 不支持 ROW 不支持 示例 该示例是从Kafka数据源中读取数据,将HBase表作为维表,从而生成宽表,并将结果写入到Kafka结果表中,其具体步骤如下(该示例中HBase的版本为1.3.1和2.2
开始的毫秒数,long 值。 ARRAY 不支持 MAP / MULTISET 不支持 ROW 不支持 示例 该示例是从DMS Kafka数据源中读取数据,将HBase表作为维表,从而生成宽表,并将结果写入到Kafka结果表中,其具体步骤如下(该示例中HBase的版本2.2.3): 参考
Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据 1 2 3 4 5 6 7 8 9 jdbcDF = sparkSession.read \ .format("jdbc")
createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test") 读取OpenTSDB上的数据 1 2 3 4 5 val map = new mutable.HashMap[String, String]()
Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取RDS上的数据 1 2 3 4 5 6 7 8 9 jdbcDF = sparkSession.read \ .format("jdbc")
mode' = 'earliest-offset', 'value.format' = 'ogg-json' ); 示例 使用ogg-json读取kafka中的ogg记录,并输出到print中。 根据kafka所在的虚拟私有云和子网创建相应的跨源,并绑定所要使用的队列。然后设置安全组
创建提交Flink作业所需的计算资源。 步骤3:使用DEW管理访问凭据 跨源分析场景中,使用DEW管理数据源的访问凭证。 步骤4:创建自定义委托允许DLI访问DEW读取凭证 创建允许DLI访问DEW的委托。 步骤5:创建Flink Jar作业并配置作业信息 创建Flink Jar作业分析数据。 准备工作