AI开发平台MODELARTS-配置节点参数控制分支执行:控制单节点的执行

时间:2024-12-16 16:12:03

控制单节点的执行

  • 通过参数配置实现
    from modelarts import workflow as wf
    
    condition_equal = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)
    
    # 构建一个OutputStorage对象,对训练输出目录做统一管理
    storage = wf.data.OutputStorage(name="storage_name", title="title_info",
                                    description="description_info")  # name字段必填,title, description可选填
    
    # 定义输入的OBS对象
    obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
    
    # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
    job_step = wf.steps.JobStep(
        name="training_job",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="图像分类训练",  # 标题信息,不填默认使用name
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # 算法订阅ID
            item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
            parameters=[]
    
        ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
    
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        # JobStep的输入在运行时配置;data字段也可使用data=wf.data.OBSPath(obs_path="fake_obs_path")表示
        outputs=wf.steps.JobOutput(name="train_url",
                                   obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
        # JobStep的输出
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")
    
            )
        ), # 训练资源规格信息
        policy=wf.steps.StepPolicy(
            skip_conditions=[condition_equal] # 通过skip_conditions中的计算结果决定job_step是否跳过
        )
    )
    
    workflow = wf.Workflow(
        name="new-condition-demo",
        desc="this is a demo workflow",
        steps=[job_step],
        storages=storage
    )

    案例中job_step配置了相关的跳过策略,并且通过一个bool类型的参数进行控制。当name为is_skip的Placeholder参数配置为True时,condition_equal的计算结果为True,此时job_step会被置为跳过,反之job_step正常执行,其中Condition对象详情可参考构建条件节点控制分支执行

  • 通过获取JobStep输出的相关metric指标信息实现
    from modelarts import workflow as wf
    
    # 构建一个OutputStorage对象,对训练输出目录做统一管理
    storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # name字段必填,title, description可选填
    
    # 定义输入的OBS对象
    obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")
    
    # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
    job_step = wf.steps.JobStep(
        name="training_job",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="图像分类训练",  # 标题信息,不填默认使用name
        algorithm=wf.AIGalleryAlgorithm(
            subscription_id="subscription_id",  # 算法订阅ID
            item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
            parameters=[]
    
        ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
        inputs=wf.steps.JobInput(name="data_url", data=obs_data),
        outputs=[
            wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))),
            wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中)
        ],
        spec=wf.steps.JobSpec(
            resource=wf.steps.JobResource(
                flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")
            )
        )  # 训练资源规格信息
    )
    
    # 定义模型名称参数
    model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)
    
    # 定义条件对象
    condition_lt = wf.steps.Condition(
        condition_type=wf.steps.ConditionTypeEnum.LT,
        left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"),
        right=0.5
    )
    
    model_step = wf.steps.ModelStep(
        name="model_registration",  # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
        title="模型注册",  # 标题信息
        inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()),  # job_step的输出作为输入
        outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep的输出
        depend_steps=[job_step],  # 依赖的作业类型节点对象
        policy=wf.steps.StepPolicy(skip_conditions=condition_lt) # 通过skip_conditions中的计算结果决定model_step是否跳过
    )
    
    workflow = wf.Workflow(
        name="new-condition-demo",
        desc="this is a demo workflow",
        steps=[job_step, model_step],
        storages=storage
    )

    案例中model_step配置了相关的跳过策略,并且通过获取job_step输出的accuracy指标信息与预置的值进行比较,决定是否需要进行模型注册。当job_step输出的accuracy指标数据小于阈值0.5时,condition_lt的计算结果为True,此时model_step会被置为跳过,反之model_step正常执行。

    job_step输出的metric文件格式要求可参考创建Workflow训练作业节点部分,并且在Condition中只支持使用type为float类型的指标数据作为输入。

    此案例中metrics.json的内容示例如下:
    [
        {
            "key": "loss",
            "title": "loss",
            "type": "float",
            "data": {
                "value": 1.2
            }
        },
        {
            "key": "accuracy",
            "title": "accuracy",
            "type": "float",
            "data": {
                "value": 0.8
            }
        }   
    ]
support.huaweicloud.com/usermanual-standard-modelarts/modelarts_workflow_0398.html