AI开发平台MODELARTS-配置节点参数控制分支执行:控制多分支的部分执行

时间:2025-01-09 16:29:25

控制多分支的部分执行

from modelarts import workflow as wf

# 构建一个OutputStorage对象,对训练输出目录做统一管理
storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info")  # name字段必填,title, description可选填

# 定义输入的OBS对象
obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory")

condition_equal_a = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_a_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)

# 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
job_step_a = wf.steps.JobStep(
    name="training_job_a",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="图像分类训练",  # 标题信息,不填默认使用name
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="subscription_id",  # 算法订阅ID
        item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
        parameters=[]

    ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
    inputs=wf.steps.JobInput(name="data_url", data=obs_data),
    outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_a")))],
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")

        )
    ),  # 训练资源规格信息
    policy=wf.steps.StepPolicy(skip_conditions=condition_equal_a)
)

condition_equal_b = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=wf.Placeholder(name="job_step_b_is_skip", placeholder_type=wf.PlaceholderType.BOOL), right=True)

# 通过JobStep来定义一个训练节点,并将训练结果输出到OBS
job_step_b = wf.steps.JobStep(
    name="training_job_b",  # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="图像分类训练",  # 标题信息,不填默认使用name
    algorithm=wf.AIGalleryAlgorithm(
        subscription_id="subscription_id",  # 算法订阅ID
        item_version_id="item_version_id",  # 算法订阅版本ID,也可直接填写版本号
        parameters=[]

    ),  # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值
    inputs=wf.steps.JobInput(name="data_url", data=obs_data),
    outputs=[wf.steps.JobOutput(name="train_url", obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_b")))],
    spec=wf.steps.JobSpec(
        resource=wf.steps.JobResource(
            flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格")

        )
    ),  # 训练资源规格信息
    policy=wf.steps.StepPolicy(skip_conditions=condition_equal_b)
)

# 定义模型名称参数
model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR)

model_step = wf.steps.ModelStep(
    name="model_registration",  # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复
    title="模型注册",  # 标题信息
    inputs=wf.steps.ModelInput(name='model_input', data=wf.data.DataConsumptionSelector(data_list=[job_step_a.outputs["train_url"].as_input(), job_step_b.outputs["train_url"].as_input()])),  # 选择job_step_a或者job_step_b的输出作为输入
    outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")),  # ModelStep的输出
    depend_steps=[job_step_a, job_step_b],  # 依赖的作业类型节点对象
)

workflow = wf.Workflow(
    name="new-condition-demo",
    desc="this is a demo workflow",
    steps=[job_step_a, job_step_b, model_step],
    storages=storage
)
support.huaweicloud.com/usermanual-standard-modelarts/modelarts_workflow_0398.html