云服务器内容精选

  • 一个事件触发器可以设置多个前缀或者后缀吗? 如果您需要使用一个工作流处理多种后缀的文件,可以为该工作流创建多条事件触发器来实现支持多个前缀或者后缀。 当前一个事件触发器支持设置一个前缀或者后缀,一个桶支持绑定10个触发器。 例如:对于媒资转码工作流,用户需要对后缀mp4和mov两种格式的视频进行数据处理。用户需要为该给工作流创建两个触发器,一个用于触发mp4格式的数据处理,一个用于触发mov格式的数据处理。
  • 场景1:触发器源桶和函数执行输出目标桶是同一个桶的无限循环 案例1:使用DWR内置函数触发“无限循环” 用户创建一条工作流workflowA,在工作流中使用了内置的“媒资转码”函数,且配置“媒资转码”函数的转码输出桶为桶A。然后,在桶A上配置触发器关联工作流workflowA。如果自定义函数输出的对象前缀或后缀匹配触发器triggerA的前后缀条件(比如:前缀和后缀不配置,或者对象满足触发器条件),则转码的输出会再次触发工作流,从而导致用户工作流不停的执行,出现“无限循环”。 图1 工作流配置workflowA 图2 工作流触发器triggerA配置 案例2:自定义函数触发“无限循环” 用户创建一条工作流workflowA,在工作流中使用了自定义函数,且自定义函数会向桶A输出对象。然后,在桶A上配置触发触发器triggerA关联工作流workflowA。如果自定义函数输出的对象前缀或后缀匹配触发器triggerA的前后缀条件(比如:前缀和后缀不配置,或者对象满足触发器条件),则自定义函数的输出会再次触发工作流,从而导致用户工作流不停的执行,出现“无限循环”。 图3 工作流配置workflowA 图4 触发器配置triggerA 后缀.mp4可以匹配所有后缀是mp4的对象。
  • 场景2:触发器源桶和函数执行输出目标桶是不同桶的无限循环 案例1:使用内置函数触发“无限循环” 用户创建一条工作流workflowA,在工作流中使用了内置的“媒资转码”函数,且配置“媒资转码”函数的转码输出桶为桶A。用户创建一条工作流workflowB,在工作流中使用了内置的“媒资转码”函数,且配置“媒资转码”函数的转码输出桶为桶B。然后,在桶B上配置触发器triggerA关联工作流workflowA,在桶A上配置触发器triggerB关联工作流workflowB。如果,triggerA触发workflowA生成的对象匹配上了triggerB的条件(比如:两个触发器的前缀和后缀不配置),最终,转码的输出会再次触发工作流,从而导致用户工作流workflowA和workflowB不停的执行,出现“无限循环”。 说明:triggerA和triggerB都不指定前后缀 图5 工作流配置workflowA 图6 工作流配置workflowB 图7 工作流触发器triggerA配置 图8 触发器triggerB配置
  • 示例流程 图1 给用户授予DWR权限流程 创建用户组并授权 在 IAM 控制台创建用户组,并授予数据工坊只读权限“DWR ReadOnlyAccess”。 创建用户并加入用户组 在IAM控制台创建用户,并将其加入1中创建的用户组。 用户登录并验证权限 新创建的用户登录控制台,切换至授权区域,验证权限:在“服务列表”中选择数据工坊,进入DWR工作流页面,单击右上角“创建工作流”,尝试创建工作流,如果无法创建(假设当前权限仅包含DWR ReadOnlyAccess),表示“DWR ReadOnlyAccess”已生效。
  • DWR如何实现数据处理 DWR提供的近数据处理能力,可以对OBS内存储的数据,按照用户编排的工作流进行自动化处理(如解析、转码、截图等)。 DWR基于 函数工作流 FunctionGraph的函数能力,将复杂的业务处理逻辑编排为工作流,通过事件触发器或API驱动,自动化完成多项复杂的数据处理任务。DWR提供图形化界面,方便用户直观便捷的构建数据处理流程,同时提供了预置的算子和自定义函数能力,覆盖数据处理的各种场景。预置算子的详细介绍,请参见官方算子一览。用户在自行开发自定义函数时,函数的输入参数和输出参数需要遵守自定义函数开发规范。 DWR支持异步和同步两种方式启动工作流,其中同步方式支持直接返回数据: 通过事件触发器启动工作流(异步方式) 在OBS桶上配置事件触发器,指定工作流触发的条件,如桶内什么数据在执行某类操作后开始处理,当事件触发时异步执行满足条件的复杂任务。通常这类复杂任务处理逻辑相同,可以对一类对象进行操作。比较典型的场景是:用户上传视频对象后,可以根据工作流自动完成视频解析或者转码。 通过API启动工作流(同步和异步都支持) 在少数场景下,用户对单个对象或者一类对象进行的复杂操作是有区别的,这就要求用户通过API调用方式来实现单个对象粒度的复杂任务处理,可以指定某个对象立即执行某个特定的工作流。 图1 数据处理工作流
  • 算子参数配置示例 Inputs参数 Input结构体参数说明参见创建工作流API。 regex参数设置的正则表达式请使用regexploit工具校验。 1 2 3 4 5 6 7 8 9101112131415 [{ //算子所在工作流输入列表"parameter_name": "bucket","parameter_value": "","value_type": "","default": "","type": "string","label": "Body","constraints": {"regex": ".*"//正则表达式请使用regexploit工具校验},"invisible": false,"description": "doc destination bucket name"}] 动态参数 12345 {"bucket": {"get_input": "$.inputs.bucket"//该值需要跟inputs参数中的parameter_name取值保持一致}} 权限版本 obs授权参见对象相关授权项和桶相关授权。 1 2 3 4 5 6 7 8 9101112131415161718 [ { //1.1版本支持细粒度授权,可以精确到具体服务的操作、资源以及请求条件等 "version": "1.1", "statement": [ { //对IAM用户组授予OBS指定资源的指定操作权限 "action": [ "obs:bucket:HeadBucket", "obs:bucket:ListBucketMultipartUploads", "obs:object:AbortMultipartUpload", "obs:object:PutObject", "obs:bucket:GetBucketAcl", "obs:object:GetObject" ] } ] } ]
  • 前提条件 算子已开发完成,开发规范参考自定义函数开发规范,建议使用Go、Python开发(算子的冷启动效果更好)。 算子已在华为云云商店上架,上架指导参考发布API类商品操作指导。 开发的新算子如果要支持同步工作流,需要保证同步工作流最后一个算子返回方式为以下的一种: 返回方式1:字符串数据 { "execution_name":"84a3dd2bd67f43aa9b98cdd74604ca68", //工作流实例名称 "graph_name":"test_workflow", //工作流名称 "Records":[ // 处理对象 ], "dynamic_source": {//执行算子的输出结果 "tasks": [ {body}, // 直接返回body字符串 ] }} 返回方式2:文件流数据 { "execution_name":"84a3dd2bd67f43aa9b98cdd74604ca68", //工作流实例名称 "graph_name":"test_workflow", //工作流名称 "Records":[ // 处理对象 ], "dynamic_source":{ //执行算子的输出结果 "tasks":[ { "output":{ // 同步返回的输出文件地址:桶名、对象名、区域 "bucket":"bucketname", "object":"objectname", "location":"cn-north-4" } } ] }}
  • 结构体示例(GO语言) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 package maintype CreateTranscodeDynamicSourceBody struct {Transcodes []*CreateTranscodeTaskBody `json:"transcodes"`}type CreateTranscodeTaskBody struct {//源文件存储地址。Input *FileAddr `json:"input,omitempty"`//转码后的视频文件存储地址。Output *FileAddr `json:"output"`//转码模板ID,数组TransTemplateID []int `json:"trans_template_id,omitempty"`//支持图片水印和文字水印,最多支持20个。Watermarks []*Watermark `json:"watermarks,omitempty"`//任务优先级。Priority string `json:"priority,omitempty"`//输出文件名称,每一路转码输出对应一个名称,需要与转码模板ID数组的顺序对应。OutputFilenames []string `json:"output_filenames,omitempty"`}type Watermark struct {Input *FileAddr `json:"input,omitempty"`TemplateID int `json:"template_id,omitempty"`TextContext string `json:"text_context,omitempty"`ImageWatermark *ImageWatermark `json:"image_watermark,omitempty"`TextWatermark *TextWatermark `json:"text_watermark,omitempty"`}type TextWatermark struct {Dx string `json:"dx,omitempty"`Dy string `json:"dy,omitempty"`ReferPos string `json:"referpos,omitempty"`TimelineStart string `json:"timeline_start,omitempty"`TimelineDuration string `json:"timeline_duration,omitempty"`FontName string `json:"font_name,omitempty"`FontSize string `json:"font_size,omitempty"`FontColor string `json:"font_color,omitempty"`Base string `json:"base,omitempty"`}type ImageWatermark struct {Dx string `json:"dx,omitempty"`Dy string `json:"dy,omitempty"`ReferPos string `json:"referpos,omitempty"`TimelineStart string `json:"timeline_start,omitempty"`TimelineDuration string `json:"timeline_duration,omitempty"`ImageProcess string `json:"image_process,omitempty"`Width string `json:"width,omitempty"`Height string `json:"height,omitempty"`Base string `json:"base,omitempty"`}type CreateThumbnailDynamicSourceBody struct {Thumbnails []*ThumbnailCreateTaskBody `json:"thumbnails"`}//FileAddr 文件路径结构定义type FileAddr struct {Location string `json:"location"`BucketName string `json:"bucket"`Object string `json:"object"`}type ThumbnailCreateTaskBody struct {//源文件地址。Input *FileAddr `json:"input"`//输出地址。Output *FileAddr `json:"output"`//是否压缩抽帧图片生成tar包。Tar int `json:"tar,omitempty"`//是否同步处理,同步处理是指不下载全部文件,快速定位到截图位置进行截图。Mode int `json:"sync,omitempty"`//截图参数ThumbnailParam *ThumbnailParam `json:"thumbnail_para"`}type ThumbnailParam struct {Type string `json:"type"`Time int64 `json:"time,omitempty"`StartTime int64 `json:"start_time,omitempty"`Duration int64 `json:"duration,omitempty"`Dots []int64 `json:"dots,omitempty"`Format int64 `json:"format,omitempty"`AspectRatio int64 `json:"aspect_ratio,omitempty"`Width int64 `json:"width,omitempty"`Height int64 `json:"height,omitempty"`MaxLength int64 `json:"max_length,omitempty"`OutputFileName string `json:"output_filename,omitempty"`}type OBSMessages struct {Records []OBSRecord `json:"Records"`}// OBSRecord OBS消息格式type OBSRecord struct {EventVersion string `json:"eventVersion"`EventSource string `json:"eventSource"`EventRegion string `json:"eventRegion"`EventTime string `json:"eventTime"`EventName string `json:"eventName"`UserIdentity UserIdentity `json:"userIdentity"`RequestParameters RequestParameters `json:"requestParameters"`ResponseElements ResponseElements `json:"responseElements"`Obs *OBSInfo `json:"obs"`}// UserIdentity 用户idtype UserIdentity struct {ID string `json:"ID,omitempty"`}//RequestParameters 原始请求参数type RequestParameters struct {SourceIPAddress string `json:"sourceIPAddress,omitempty"`}//ResponseElements 响应参数type ResponseElements struct {OBSRequestID string `json:"x-obs-request-id"`OBSID2 string `json:"x-obs-id-2"`}//OBSInfo OBS信息type OBSInfo struct {Version string `json:"Version"`ConfigurationID string `json:"configurationId"`Bucket BucketInfo `json:"bucket"`Object ObjectInfo `json:"object"`}//BucketInfo 桶信息type BucketInfo struct {Name string `json:"name"`OwnerIdentity UserIdentity `json:"ownerIdentity"`Bucket string `json:"bucket"`}//ObjectInfo 对象信息type ObjectInfo struct {Key string `json:"key"`Tag string `json:"eTag"`Size uint64 `json:"size"`VersionID string `json:"versionId"`Sequencer string `json:"sequencer"`}type Payload struct {ExecutionName string `json:"execution_name"`GraphName string `json:"graph_name"`OBSMessagesDynamicSource interface{} `json:"dynamic_source"`Inputs map[string]interface{} `json:"inputs"`}
  • 函数输出参数 函数输出参数的JSON格式的结构体定义如下: 表3 函数输出的JSON格式体 名称 是否必选 参数类型 说明 约束 execution_name 是 String 工作流实例名称。 继承函数输入参数的execution_name。 graph_name 是 String 工作流名称。 继承函数输入参数的graph_name。 Records 是 Array 工作流触发的事件源事件消息。 如果没有变化,则继承函数输入参数的records。 inputs 否 Map[String]String 用户可修改参数列表。 如果没有新增,则继承函数输入参数的inputs。 dynamic_source 否 Map 函数的输出参数,可用于传递给下一个执行的函数。 - operation_name 否 String 函数操作名。 系统内置的工作流函数操作名有: 视频解析:MPC.Metadata 视频截图:MPC.Thumbnail 视频转码:MPC.Transcode SMN 消息通知:SMN.Publish
  • 对接截图函数示例(GO语言) 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 package mainimport ("encoding/json""errors""go-runtime/go-api/context")func DemoHandler(jsonData []byte, ctx context.RuntimeContext) (interface{}, error) {var eventMsg Payloaderr := json.Unmarshal(jsonData, &eventMsg)if err != nil {return nil, errors.New("not correct format")}// 存储输入桶和对象值record := eventMsg.Records[0]// 定义输出resp := struct {OBSMessagesInputs map[string]interface{} `json:"inputs"`ExecutionName string `json:"execution_name"`GraphName string `json:"graph_name"`DynamicSource struct {*CreateThumbnailDynamicSourceBody} `json:"dynamic_source"`}{}// 配置截图参数,为下游截图任务提供参数配置resp.DynamicSource.CreateThumbnailDynamicSourceBody = &CreateThumbnailDynamicSourceBody{Thumbnails: []*ThumbnailCreateTaskBody{&ThumbnailCreateTaskBody{//源文件地址。Input: &FileAddr{Location: "cn-north-1",BucketName: record.Obs.Bucket.Name,Object: record.Obs.Object.Key,},//输出地址。Output: &FileAddr{Location: "cn-north-1",BucketName: record.Obs.Bucket.Name,Object: "thumb_out",},//是否压缩抽帧图片生成tar包。Tar: 0,//是否同步处理,同步处理是指不下载全部文件,快速定位到截图位置进行截图。Mode: 0,//截图参数ThumbnailParam: &ThumbnailParam{Type: "DOTS",MaxLength: 0,Dots: []int64{2, 10, 14}, // 截图的位置(s)OutputFileName: "default_cover.jpg",},},},}// 以下参数需要继承传递,方便工作流下游函数获取对应参数值resp.Inputs = eventMsg.Inputsresp.Records = eventMsg.Recordsresp.GraphName = eventMsg.GraphNameresp.ExecutionName = eventMsg.ExecutionNamereturn resp, nil}
  • 对接转码函数示例(GO语言) 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 package mainimport ("encoding/json""errors""go-runtime/go-api/context")func DemoTranscodeHandler(jsonData []byte, ctx context.RuntimeContext) (interface{}, error) {var eventMsg Payloaderr := json.Unmarshal(jsonData, &eventMsg)if err != nil {return nil, errors.New("not correct format")}// 存储输入桶和对象值record := eventMsg.Records[0]// 定义输出resp := struct {OBSMessagesInputs map[string]interface{} `json:"inputs"`ExecutionName string `json:"execution_name"`GraphName string `json:"graph_name"`DynamicSource struct {*CreateTranscodeDynamicSourceBody} `json:"dynamic_source"`}{}// 配置截图参数,为下游截图任务提供参数配置resp.DynamicSource.CreateTranscodeDynamicSourceBody = &CreateTranscodeDynamicSourceBody{Transcodes: []*CreateTranscodeTaskBody{&CreateTranscodeTaskBody{//源文件地址。Input: &FileAddr{Location: "cn-north-4",BucketName: record.Obs.Bucket.Name,Object: record.Obs.Object.Key,},//输出地址。Output: &FileAddr{Location: "cn-north-4",BucketName: record.Obs.Bucket.Name,Object: "transcode_out",},TransTemplateID: []int{7000523, 7000524, 7000526, 7000528, 7000530, 7000538},OutputFilenames: []string{"out_file1", "out_file2", "out_file3", "out_file4", "out_file5", "out_file6"},},},}// 以下参数需要继承传递,方便工作流下游函数获取对应参数值resp.Inputs = eventMsg.Inputsresp.Records = eventMsg.Recordsresp.GraphName = eventMsg.GraphNameresp.ExecutionName = eventMsg.ExecutionNamereturn resp, nil}
  • 函数输入参数 工作流执行自定义函数时,函数输入参数的JSON格式的结构体和环境变量的定义如下: 表1 函数输入的JSON格式体 名称 是否必选 参数类型 说明 execution_name 是 String 工作流实例名称。 graph_name 是 String 工作流名称。 Records 是 Array 工作流触发的事件源事件消息。 inputs 否 Map[String]String 用户可修改参数列表,可以为空。 dynamic_source 否 Map 函数执行必须的参数,可用于传入调用的服务。 表2 函数的环境变量 名称 是否必选 参数类型 说明 region 否 String 当前区 域名 称。 函数输入的JSON示例 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647484950 { "execution_name": "84a3dd2bd67f43aa9b98cdd74604ca68",//工作流实例名称 "graph_name": "test_workflow",//工作流名称 "Records": [ { "eventName": "ObjectCreated:Put",//触发事件通知的事件名 "eventRegion": "cn-north-4",//事件所在的region "eventSource": "OBS",//消息源,固定为"OBS" "eventTime": "2021-12-23T14:50:22.957Z",//事件时间,格式为ISO-8601,示例:2020-07-10T09:24:11.418Z "eventVersion": "3.0",//版本号,目前为"3.0" "obs": { "Version": "1.0", "bucket": { "bucket": "examplebucket",//桶名 "name": "examplebucket",//桶名 "ownerIdentity": { "ID": "08b4efe0fc00d3ce0f17c01b948f6e80"//桶拥有者的账号ID } }, "configurationId":"test-trigger",//此事件匹配的OBS中事件触发器的名称 "object": { "eTag": "fc85a07cff68977bf5b2108e7436ca2d",//对象的etag "key": "exampleobject.docx",//对象名 "oldpsxpth": "",//文件在并行文件系统中rename前的路径 "sequencer": "1",//确定某个特定对象事件顺序的标识 "size": "524298",//对象的大小 "versionId": "G001017DE60E176D0000401106696610null"//对象的版本ID } }, "requestParameters": { "sourceIPAddress": "x.x.x.x"//请求的源IP }, "responseElements": { "x-obs-id-2": "",//帮助定位问题的特殊符号 "x-obs-request-id": "84a3dd2bd67f43aa9b98cdd74604ca68"//请求对应的requestid }, "userIdentity": { "ID": "08b4efe0fc00d3ce0f17c01b948f6e80"//触发事件的用户对应的计费ID } } ], "inputs": { //执行工作流的输入参数 "parametername": "parametervalue", "parametername": "parametervalue" }, "dynamic_source": { //执行自定义函数的输入参数 "parametername": "parametervalue", "parametername": "parametervalue" }}
  • 算子市场介绍 算子市场即为DWR提供的算子库,提供方包括华为和第三方。 华为云自有算子的能力源是华为云数据处理相关的云服务,如 媒体处理 MPC、图像识别Image等,DWR将云服务提供的各种数据处理能力通过函数生成算子并在算子市场发布。 第三方算子是基于DWR的算子注册能力,由第三方开发者创建,专业人员审核发布的公共算子,您也可以将自己创建的算子发布为第三方算子,开放给所有华为云用户使用,详见发布算子。 图1 算子市场 父主题: 算子管理
  • DIS 消息通知 模板作用:发送消息到您指定的DIS通道。该模板实际调用的是DIS服务的上传数据接口。 表20 DIS消息通知属性配置说明 属性类别 参数名称 参数说明 基本属性 名称 任务的名称,修改后将体现在工作流编排区域。 必须以字母或数字开头 只能由字母、数字、下划线和中划线组成 长度范围为1~20个字符 不能和同一工作流中的其他任务重名 超时(秒) 任务超时时间,即任务执行的最长时间。 支持设置0~300秒的超时时间,如果设置为0,则表示超时时间为默认值30秒。 算子提供方 函数模板的提供方。 错误处理 可定义不同类型错误发生时的重试次数、重试间隔,以及重试失败后跳转到的目标任务。 错误类型包括:匹配所有、执行失败、权限不合法、参数不合法、函数不存在、请求太频繁、函数不可用、函数异常 动态参数 stream_name 已创建的DIS通道名称。 partition_id DIS通道分区的唯一标识符。 partition_key 数据将写入的DIS通道分区。
  • 自定义 用户可自定义函数,满足不同场景的任务定制需求。 自定义函数属性配置说明见下表,另外需要遵循自定义函数开发规范。 表23 自定义函数属性配置说明 属性类别 参数名称 参数说明 基本属性 名称 任务的名称,修改后将体现在工作流编排区域。 必须以字母或数字开头 只能由字母、数字、下划线和中划线组成 长度范围为1~20个字符 不能和同一工作流中的其他任务重名 超时(秒) 任务超时时间,即任务执行的最长时间。 支持设置0~300秒的超时时间,如果设置为0,则表示超时时间为默认值30秒。 函数唯一标识 选择需要执行的FunctionGraph中的函数。函数需通过FunctionGraph页面创建。 FunctionGraph服务的操作指导请参见《函数工作流用户指南》中“创建并初始化函数”章节的内容。 下拉列表中仅展示与DWR工作流同区域且同项目的函数。工作流所属区域为创建工作流的桶所属区域。例如工作流A是在桶A中创建的,则桶A的区域即为工作流A的区域。 动态参数 若自定义函数中存在动态参数,可以指定动态参数的参数名和取值,作为函数的输入。