AI开发平台MODELARTS-构建条件节点控制分支执行:简单示例
简单示例
- 通过参数配置实现
import modelarts.workflow as wf left_value = wf.Placeholder(name="left_value", placeholder_type=wf.PlaceholderType.BOOL, default=True) # 条件对象 condition = wf.steps.Condition(condition_type=wf.steps.ConditionTypeEnum.EQ, left=left_value, right=True) # 条件对象,包含类型以及左右值 # 条件节点 condition_step = wf.steps.ConditionStep( name="condition_step_test", # 条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 conditions=condition, # 条件对象,允许多个条件,条件之间的关系为&& if_then_steps="job_step_1", # 当condition结果为true时,名称为job_step_1的节点允许执行,名称为job_step_2的节点跳过不执行 else_then_steps="job_step_2" # 当condition结果为false时,名称为job_step_2的节点允许执行,名称为job_step_1的节点跳过不执行 ) # 该节点仅作为示例使用,其他字段需自行补充 job_step_1 = wf.steps.JobStep( name="job_step_1", depend_steps=condition_step ) # 该节点仅作为示例使用,其他字段需自行补充 model_step_1 = wf.steps.ModelStep( name="model_step_1", depend_steps=job_step_1 ) # 该节点仅作为示例使用,其他字段需自行补充 job_step_2 = wf.steps.JobStep( name="job_step_2", depend_steps=condition_step ) # 该节点仅作为示例使用,其他字段需自行补充 model_step_2 = wf.steps.ModelStep( name="model_step_2", depend_steps=job_step_2 ) workflow = wf.Workflow( name="condition-demo", desc="this is a demo workflow", steps=[condition_step, job_step_1, job_step_2, model_step_1, model_step_2] )
场景说明:job_step_1和job_step_2表示两个训练节点,并且均直接依赖于condition_step。condition_step通过参数配置决定后继节点的执行行为。
执行情况分析:- 参数left_value默认值为True,则condition逻辑表达式计算结果为True:job_step_1执行,job_step_2跳过,并且以job_step_2为唯一根节点的分支所包含的所有节点也将跳过,即model_step_2会跳过,因此最终执行的节点有condition_step、job_step_1、model_step_1。
- 如果设置left_value的值为Fasle,则condition逻辑表达式计算结果为False:job_step_2执行,job_step_1跳过,并且以job_step_1为唯一根节点的分支所包含的所有节点也将跳过,即model_step_1会跳过,因此最终执行的节点有condition_step、job_step_2、model_step_2。
- 通过获取JobStep输出的相关metric指标信息实现
from modelarts import workflow as wf # 构建一个OutputStorage对象,对训练输出目录做统一管理 storage = wf.data.Storage(name="storage_name", title="title_info", with_execution_id=True, create_dir=True, description="description_info") # name字段必填,title, description可选填 # 定义输入的OBS对象 obs_data = wf.data.OBSPlaceholder(name="obs_placeholder_name", object_type="directory") # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step = wf.steps.JobStep( name="training_job", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), outputs=[ wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path"))), wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中) ], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ) # 训练资源规格信息 ) # 定义条件对象 condition_lt = wf.steps.Condition( condition_type=wf.steps.ConditionTypeEnum.LT, left=wf.steps.MetricInfo(job_step.outputs["metrics"].as_input(), "accuracy"), right=0.5 ) condition_step = wf.steps.ConditionStep( name="condition_step_test", # 条件节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 conditions=condition_lt, # 条件对象,允许多个条件,条件之间的关系为&& if_then_steps="training_job_retrain", # 当condition结果为true时,名称为training_job_retrain的节点允许执行,名称为model_registration的节点跳过不执行 else_then_steps="model_registration", # 当condition结果为false时,名称为model_registration的节点允许执行,名称为training_job_retrain的节点跳过不执行 depend_steps=job_step ) # 通过JobStep来定义一个训练节点,并将训练结果输出到OBS job_step_retrain = wf.steps.JobStep( name="training_job_retrain", # 训练节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="图像分类重新训练训练", # 标题信息,不填默认使用name algorithm=wf.AIGalleryAlgorithm( subscription_id="subscription_id", # 算法订阅ID item_version_id="item_version_id", # 算法订阅版本ID,也可直接填写版本号 parameters=[] ), # 训练使用的算法对象,示例中使用AIGallery订阅的算法;部分算法超参的值如果无需修改,则在parameters字段中可以不填写,系统自动填充相关超参值 inputs=wf.steps.JobInput(name="data_url", data=obs_data), outputs=[ wf.steps.JobOutput(name="train_url",obs_config=wf.data.OBSOutputConfig(obs_path=storage.join("directory_path_retrain"))), wf.steps.JobOutput(name="metrics", metrics_config=wf.data.MetricsConfig(metric_files=storage.join("directory_path_retrain/metrics.json", create_dir=False))) # 指定metric的输出路径,相关指标信息由作业脚本代码根据指定的数据格式自行输出(示例中需要将metric信息输出到训练输出目录下的metrics.json文件中) ], spec=wf.steps.JobSpec( resource=wf.steps.JobResource( flavor=wf.Placeholder(name="train_flavor_retrain", placeholder_type=wf.PlaceholderType.JSON, description="训练资源规格") ) ), # 训练资源规格信息 depend_steps=condition_step ) # 定义模型名称参数 model_name = wf.Placeholder(name="placeholder_name", placeholder_type=wf.PlaceholderType.STR) model_step = wf.steps.ModelStep( name="model_registration", # 模型注册节点的名称,命名规范(只能包含英文字母、数字、下划线(_)、中划线(-),并且只能以英文字母开头,长度限制为64字符),一个Workflow里的两个step名称不能重复 title="模型注册", # 标题信息 inputs=wf.steps.ModelInput(name='model_input', data=job_step.outputs["train_url"].as_input()), # job_step的输出作为输入 outputs=wf.steps.ModelOutput(name='model_output', model_config=wf.steps.ModelConfig(model_name=model_name, model_type="TensorFlow")), # ModelStep的输出 depend_steps=condition_step, ) workflow = wf.Workflow( name="condition-demo", desc="this is a demo workflow", steps=[job_step, condition_step, job_step_retrain, model_step], storages=storage )
案例中ConditionStep节点通过获取job_step输出的accuracy指标信息与预置的值进行比较,决定重新训练还是模型注册。当job_step输出的accuracy指标数据小于阈值0.5时,condition_lt的计算结果为True,此时job_step_retrain运行,model_step跳过;反之job_step_retrain跳过,model_step执行。
job_step输出的metric文件格式要求可参考创建Workflow训练作业节点部分,并且在Condition中只支持使用type为float类型的指标数据作为输入。
此案例中metrics.json的内容示例如下:
[ { "key": "loss", "title": "loss", "type": "float", "data": { "value": 1.2 } }, { "key": "accuracy", "title": "accuracy", "type": "float", "data": { "value": 0.8 } } ]