华为云用户手册

  • 修改标注 当数据完成标注后,您还可以进入已标注页签,对已标注的数据进行修改。 基于图片修改 在标注作业详情页面,单击“已标注”页签,然后在图片列表中选中待修改的图片(选择一个或多个)。在右侧标签信息区域中对图片信息进行修改。 修改标签:在“选中文件标签”区域中,单击操作列的编辑图标,然后在文本框中输入正确的标签名,然后单击确定图标完成修改。 删除标签:在“选中文件标签”区域中,单击操作列的删除图标删除该标签。此操作仅删除选中图片中的标签。 图2 编辑标签 基于标签修改 在标注作业详情页,单击右侧区域的“标签管理”,显示全部标签列表。 修改标签:单击操作列的“修改”,然后在弹出的对话框中输入修改后的标签名,然后单击“确定”完成修改。修改后,之前添加了此标签的图片,都将被标注为新的标签名称。 删除标签:单击操作列“删除”,之前添加了此标签的图片,都将删除此标签。 图3 标签管理 图4 全部标签的信息 单击标注作业操作列的“标签”,可跳转至标签管理页。 单击操作列的“修改”,即可完成标签的修改。 单击操作列的“删除”,即可删除该标签。 当数据完成标注后,您还可以进入已标注页签,对已标注的数据进行修改。 基于文本修改 在标注作业详情页,单击“已标注”页签,然后在文本列表中选中待修改的文本。 在文本列表中,单击文本,当文本背景变为蓝色时,表示已选择。当文本有多个标签时,可以单击文本标签上方的删除单个标签。 基于标签修改 在标注作业详情页,单击“已标注”页签,在图片列表右侧,显示全部标签的信息。 批量修改:在“全部标签”区域中,单击操作列的编辑图标,然后在文本框中修改标签名称,选择标签颜色,单击“确定”完成修改。 批量删除:在“全部标签”区域中,单击操作列的删除图标,在弹出对话框中,可选择“仅删除标签”或“删除标签及仅包含此标签的标注对象”,然后单击“确定”。 当数据完成标注后,您还可以进入“已标注”页签,对已标注的数据进行修改。 基于音频修改 在标注作业详情页面,单击“已标注”页签,然后在音频列表中选中待修改的音频(选择一个或多个)。在右侧标签信息区域中对标签进行修改。 修改标签:在“选中文件标签”区域中,单击操作列的编辑图标,然后在文本框中输入正确的标签名,然后单击确定图标完成修改。 删除标签:在“选中文件标签”区域中,单击操作列的删除图标删除该标签。 基于标签修改 在标注作业详情页面,单击“已标注”页签,在音频列表右侧,显示全部标签的信息。 图5 全部标签信息 修改标签:单击操作列的编辑图标,然后在弹出的对话框中输入修改后的标签名,然后单击“确定”完成修改。修改后,之前添加了此标签的音频,都将被标注为新的标签名称。 删除标签:单击操作列的删除图标,在弹出的对话框中,根据提示框选择需要删除的对象,然后单击“确定”完成删除。
  • 同步新数据 ModelArts会自动将数据集中新增的数据同步至标注作业,包含数据及当前标注作业支持的标注信息。 为了快速获取数据集中最新数据,可在标注作业详情页的“全部”、“未标注”或“已标注”页签中,单击“同步新数据”,快速将数据集中的数据添加到标注作业中。 问题现象: 将已标注好的数据上传至OBS,同步数据后,显示为未标注。 原因分析: 可能是OBS桶设置了自动加密导致此问题。 解决方法: 需要新建OBS桶重新上传数据,或者取消桶加密后,重新上传数据。
  • 本地构建镜像 以linux x86_x64架构的主机为例,您可以购买相同规格的E CS 或者应用本地已有的主机进行自定义镜像的制作。 购买ECS服务器的具体操作请参考购买并登录弹性云服务器。镜像选择公共镜像,推荐使用ubuntu18.04的镜像。 图1 创建ECS服务器-选择X86架构的公共镜像 登录主机后,安装Docker,可参考Docker官方文档。也可执行以下命令安装docker。 curl -fsSL get.docker.com -o get-docker.sh sh get-docker.sh 获取基础镜像。本示例以Ubuntu18.04为例。 docker pull ubuntu:18.04 新建文件夹“self-define-images”,在该文件夹下编写自定义镜像的“Dockerfile”文件和应用服务代码“test_app.py”。本样例代码中,应用服务代码采用了flask框架。 文件结构如下所示 self-define-images/ --Dockerfile --test_app.py “Dockerfile” From ubuntu:18.04 # 配置华为云的源,安装 python、python3-pip 和 Flask RUN cp -a /etc/apt/sources.list /etc/apt/sources.list.bak && \ sed -i "s@http://.*security.ubuntu.com@http://repo.huaweicloud.com@g" /etc/apt/sources.list && \ sed -i "s@http://.*archive.ubuntu.com@http://repo.huaweicloud.com@g" /etc/apt/sources.list && \ apt-get update && \ apt-get install -y python3 python3-pip && \ pip3 install --trusted-host https://repo.huaweicloud.com -i https://repo.huaweicloud.com/repository/pypi/simple Flask # 复制应用服务代码进镜像里面 COPY test_app.py /opt/test_app.py # 指定镜像的启动命令 CMD python3 /opt/test_app.py “test_app.py” from flask import Flask, request import json app = Flask(__name__) @app.route('/greet', methods=['POST']) def say_hello_func(): print("----------- in hello func ----------") data = json.loads(request.get_data(as_text=True)) print(data) username = data['name'] rsp_msg = 'Hello, {}!'.format(username) return json.dumps({"response":rsp_msg}, indent=4) @app.route('/goodbye', methods=['GET']) def say_goodbye_func(): print("----------- in goodbye func ----------") return '\nGoodbye!\n' @app.route('/', methods=['POST']) def default_func(): print("----------- in default func ----------") data = json.loads(request.get_data(as_text=True)) return '\n called default func !\n {} \n'.format(str(data)) # host must be "0.0.0.0", port must be 8080 if __name__ == '__main__': app.run(host="0.0.0.0", port=8080) 进入“self-define-images”文件夹,执行以下命令构建自定义镜像“test:v1”。 docker build -t test:v1 . 您可以使用“docker images”查看您构建的自定义镜像。
  • 将自定义镜像创建为模型 参考从容器镜像中选择元模型导入元模型,您需要特别关注以下参数: 元模型来源:选择“从容器镜像中选择” 容器镜像所在的路径:选择已制作好的自有镜像 图4 选择已制作好的自有镜像 容器调用接口:指定模型启动的协议和端口号。请确保协议和端口号与自定义镜像中提供的协议和端口号保持一致。 镜像复制:选填,选择是否将容器镜像中的模型镜像复制到ModelArts中。 健康检查:选填,用于指定模型的健康检查。仅当自定义镜像中配置了健康检查接口,才能配置“健康检查”,否则会导致模型创建失败。 apis定义:选填,用于编辑自定义镜像的apis定义。模型apis定义需要遵循ModelArts的填写规范,参见模型配置文件说明。 本样例的配置文件如下所示: [{ "url": "/", "method": "post", "request": { "Content-type": "application/json" }, "response": { "Content-type": "application/json" } }, { "url": "/greet", "method": "post", "request": { "Content-type": "application/json" }, "response": { "Content-type": "application/json" } }, { "url": "/goodbye", "method": "get", "request": { "Content-type": "application/json" }, "response": { "Content-type": "application/json" } } ]
  • 制作流程 场景一: 预置镜像的环境软件满足要求,只需要导入模型包,就能用于创建模型,通过镜像保存功能制作。具体案例参考在Notebook中通过镜像保存功能制作自定义镜像用于推理。 图1 模型的自定义镜像制作场景一 场景二: 预置镜像既不满足软件环境要求,同时需要放入模型包,在Notebook中通过Dockerfile制作。具体案例参考在Notebook中通过Dockerfile从0制作自定义镜像用于推理。 图2 模型的自定义镜像制作场景二 场景三:预置镜像既不满足软件环境要求,同时需要放入模型包,新的镜像超过35G,在服务器(如ECS)上制作。具体案例参考在ECS中通过Dockerfile从0制作自定义镜像用于推理。 图3 模型的自定义镜像制作场景三
  • 自定义镜像的配置规范 镜像对外接口 设置镜像的对外服务接口,推理接口需与config.json文件中apis定义的url一致,当镜像启动时可以直接访问。下面是mnist镜像的访问示例,该镜像内含mnist数据集训练的模型,可以识别手写数字。其中listen_ip为容器IP,您可以通过启动自定义镜像,在容器中获取容器IP。 请求示例 curl -X POST \ http://{listen_ip}:8080/ \ -F images=@seven.jpg 图4 listen_ip获取示例 返回示例 {"mnist_result": 7} (可选)健康检查接口 如果在滚动升级时要求不中断业务,那么必须在config.json文件中配置健康检查的接口,供ModelArts调用,在config.json文件中配置。当业务可提供正常服务时,健康检查接口返回健康状态,否则返回异常状态。 如果要实现无损滚动升级,必须配置健康检查接口。 自定义镜像如果需要在“在线服务”模块使用OBS外部存储挂载功能,需要新建一个OBS挂载专属目录如“/obs-mount/”,避免选择存量目录覆盖已有文件。OBS挂载仅开放对挂载目录文件新增、查看、修改功能,如果需要删除文件请到OBS并行文件系统中手动删除。 健康检查接口示例如下。 URI GET /health 请求示例curl -X GET \ http://{listen_ip}:8080/health 响应示例 {"health": "true"} 状态码 表1 状态码 状态码 编码 状态码说明 200 OK 请求成功 日志文件输出 为保证日志内容可以正常显示,日志信息需要打印到标准输出。 镜像启动入口 如果需要部署批量服务,镜像的启动入口文件需要为“/home/run.sh”,采用CMD设置默认启动路径,例如Dockerfile配置如下: CMD ["sh", "/home/run.sh"] 镜像依赖组件 如果需要部署批量服务,镜像内需要集成python、jre/jdk、zip等组件包。 (可选)保持Http长链接,无损滚动升级 如果需要支持滚动升级的过程中不中断业务,那么需要将服务的Http的“keep-alive”参数设置为200s。以gunicorn服务框架为例,gunicorn缺省情形下不支持keep-alive,需要同时安装gevent并配置启动参数“--keep-alive 200 -k gevent”。不同服务框架参数设置有区别,请以实际情况为准。 (可选)处理SIGTERM信号,容器优雅退出 如果需要支持滚动升级的过程中不中断业务,那么需要在容器中捕获SIGTERM信号,并且在收到SIGTERM信号之后等待60秒再优雅退出容器。提前优雅退出容器可能会导致在滚动升级的过程中业务概率中断。要保证容器优雅退出,从收到SIGTERM信号开始,业务需要将收到的请求全部处理完毕再结束,这个处理时长最多不超过90秒。例如run.sh如下所示: #!/bin/bash gunicorn_pid="" handle_sigterm() { echo "Received SIGTERM, send SIGTERM to $gunicorn_pid" if [ $gunicorn_pid != "" ]; then sleep 60 kill -15 $gunicorn_pid # 传递 SIGTERM 给gunicorn进程 wait $gunicorn_pid # 等待gunicorn进程完全终止 fi } trap handle_sigterm TERM
  • 设置告警规则 通过设置ModelArts在线服务和模型负载告警规则,用户可自定义监控目标与通知策略,及时了解ModelArts在线服务和模型负载状况,从而起到预警作用。 设置ModelArts服务和模型的告警规则包括设置告警规则名称、监控对象、监控指标、告警阈值、监控周期和是否发送通知等参数。本节介绍了设置ModelArts服务和模型告警规则的具体方法。 只有“运行中”的在线服务,支持对接 CES 监控。
  • ModelArts的自定义镜像使用场景 当用户对深度学习引擎、开发库有特殊需求场景的时候,预置镜像已经不能满足用户需求。ModelArts提供自定义镜像功能支持用户自定义运行引擎。 ModelArts底层采用容器技术,自定义镜像指的是用户自行制作容器镜像并在ModelArts上运行。自定义镜像功能支持自由文本形式的命令行参数和环境变量,灵活性比较高,便于支持任意计算引擎的作业启动需求。 在制作自定义镜像的时候,可以把ModelArts提供的预置镜像作为基础镜像,通过在Dockerfile中使用预置镜像的SWR地址来拉取预置镜像后进行改造。可在ModelArts预置镜像列表里获取镜像的SWR地址,参考ModelArts支持的预置镜像列表章节。 制作自定义镜像用于创建Notebook 当Notebook预置镜像不能满足需求时,用户可以制作自定义镜像。在镜像中自行安装与配置环境依赖软件及信息,并制作为自定义镜像,用于创建新的Notebook实例。同时也支持用户在Notebook中,基于已有镜像制作新的自定义镜像。 制作自定义镜像用于训练模型 如果您已经在本地完成模型开发或训练脚本的开发,且您使用的AI引擎是ModelArts不支持的框架。您可以制作自定义镜像,并上传至SWR服务。您可以在ModelArts使用此自定义镜像创建训练作业,使用ModelArts提供的资源训练模型。 制作自定义镜像用于推理 如果您使用了ModelArts不支持的AI引擎开发模型,可以通过制作自定义镜像,导入ModelArts创建为模型,并支持进行统一管理和部署为服务。 用户制作的自定义镜像,使用的场景不同,镜像规则也不同,具体如下: 通用规则:SWR镜像类型为“私有”时,才可以共享给他人,适用于开发环境、训练作业、模型。 开发环境:SWR镜像类型为“公开”时,其他用户才可以在ModelArts镜像管理页面注册使用。 训练作业:SWR镜像类型为“公开”时,在使用自定义镜像创建训练作业时,在“镜像”输入框内直接填写“组织名称/镜像名称:版本名称”即可。例如:公开镜像的SWR地址为“swr.cn-north-4.myhuaweicloud.com/test-image/tensorflow2_1_1:1.1.1”,则在创建训练作业的“镜像”输入框里内直接填“test-images/tensorflow2_1_1:1.1.1”。
  • ModelArts的预置镜像使用场景 ModelArts给用户提供了一组预置镜像,用户可以直接使用预置镜像创建Notebook实例,在实例中进行依赖安装与配置后,保存为自定义镜像,可直接用于ModelArts训练,而不需要做适配。同时也可以使用预置镜像直接提交训练作业、创建模型等。 ModelArts提供的预置镜像版本是依据用户反馈和版本稳定性决定的。当用户的功能开发基于ModelArts提供的版本能够满足的时候,比如用户开发基于MindSpore1.X,建议用户使用预置镜像,这些镜像经过充分的功能验证,并且已经预置了很多常用的安装包,用户无需花费过多的时间来配置环境即可使用。 ModelArts默认提供了一组预置镜像供开发使用,这些镜像有以下特点: 零配置,即开即用,面向特定的场景,将AI开发过程中常用的依赖环境进行固化,提供合适的软件、操作系统、网络等配置策略,通过在硬件上的充分测试,确保其兼容性和性能最合适。 方便自定义,预置镜像已经在SWR仓库中,通过对预置镜像的扩展完成自定义镜像注册。 安全可信,基于安全加固最佳实践,访问策略、用户权限划分、开发软件 漏洞扫描 、操作系统安全加固等方式,确保镜像使用的安全性。
  • 自定义镜像功能关联服务介绍 容器镜像服务 容器 镜像服务 (Software Repository for Container,SWR)是一种支持镜像全生命周期管理的服务, 提供简单易用、安全可靠的镜像管理功能,帮助您快速部署容器化服务。您可以通过界面、社区CLI和原生API上传、下载和管理容器镜像。 您制作的自定义镜像需要上传至SWR服务。ModelArts开发环境、训练和创建模型使用的自定义镜像需要从SWR服务管理列表获取。 图1 获取镜像列表 对象存储服务 对象存储服务(Object Storage Service,OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。 在使用ModelArts时存在与OBS的数据交互,您需要使用的数据可以存储至OBS。 弹性云服务器 弹性云服务器(Elastic Cloud Server,ECS)是由CPU、内存、操作系统、云硬盘组成的基础的计算组件。弹性云服务器创建成功后,您就可以像使用自己的本地PC或物理服务器一样,使用弹性云服务器。 在制作自定义镜像时,您可以在本地环境或者ECS上完成自定义镜像制作。
  • 利用MoXing使h5py.File支持OBS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import os import h5py import numpy as np import moxing as mox h5py_File_class = h5py.File class OBSFile(h5py_File_class): def __init__(self, name, *args, **kwargs): self._tmp_name = None self._target_name = name if name.startswith('obs://'): self._tmp_name = name.replace('/', '_') if mox.file.exists(name): mox.file.copy(name, os.path.join('cache', 'h5py_tmp', self._tmp_name)) name = self._tmp_name super(OBSFile, self).__init__(name, *args, **kwargs) def close(self): if self._tmp_name: mox.file.copy(self._tmp_name, self._target_name) super(OBSFile, self).close() setattr(h5py, 'File', OBSFile) arr = np.random.randn(1000) with h5py.File('obs://bucket/random.hdf5', 'r') as f: f.create_dataset("default", data=arr) with h5py.File('obs://bucket/random.hdf5', 'r') as f: print(f.require_dataset("default", dtype=np.float32, shape=(1000,)))
  • 利用文件对象读取图片 使用opencv打开一张图片时,无法传入一个OBS路径,需要利用文件对象读取,考虑以下代码是无法读取到该图片的。 1 2 import cv2 cv2.imread('obs://bucket_name/xxx.jpg', cv2.IMREAD_COLOR) 修改为如下代码: 1 2 3 4 import cv2 import numpy as np import moxing as mox img = cv2.imdecode(np.fromstring(mox.file.read('obs://bucket_name/xxx.jpg', binary=True), np.uint8), cv2.IMREAD_COLOR)
  • 将一个不支持OBS路径的API改造成支持OBS路径的API pandas中对h5的文件读写to_hdf和read_hdf既不支持OBS路径,也不支持输入一个文件对象,考虑以下代码会出现错误。 1 2 3 4 import pandas as pd df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}, index=['a', 'b', 'c']) df.to_hdf('obs://wolfros-net/hdftest.h5', key='df', mode='w') pd.read_hdf('obs://wolfros-net/hdftest.h5') 通过重写pandas源码API的方式,将该API改造成支持OBS路径的形式。 写h5到OBS = 写h5到本地缓存 + 上传本地缓存到OBS + 删除本地缓存 从OBS读h5 = 下载h5到本地缓存 + 读取本地缓存 + 删除本地缓存 即将以下代码写在运行脚本的最前面,就能使运行过程中的to_hdf和read_hdf支持OBS路径。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import os import moxing as mox import pandas as pd from pandas.io import pytables from pandas.core.generic import NDFrame to_hdf_origin = getattr(NDFrame, 'to_hdf') read_hdf_origin = getattr(pytables, 'read_hdf') def to_hdf_override(self, path_or_buf, key, **kwargs): tmp_dir = '/cache/hdf_tmp' file_name = os.path.basename(path_or_buf) mox.file.make_dirs(tmp_dir) local_file = os.path.join(tmp_dir, file_name) to_hdf_origin(self, local_file, key, **kwargs) mox.file.copy(local_file, path_or_buf) mox.file.remove(local_file) def read_hdf_override(path_or_buf, key=None, mode='r', **kwargs): tmp_dir = '/cache/hdf_tmp' file_name = os.path.basename(path_or_buf) mox.file.make_dirs(tmp_dir) local_file = os.path.join(tmp_dir, file_name) mox.file.copy(path_or_buf, local_file) result = read_hdf_origin(local_file, key, mode, **kwargs) mox.file.remove(local_file) return result setattr(NDFrame, 'to_hdf', to_hdf_override) setattr(pytables, 'read_hdf', read_hdf_override) setattr(pd, 'read_hdf', read_hdf_override)
  • 利用pandas读或写一个OBS文件 利用pandas读一个OBS文件。 1 2 3 4 import pandas as pd import moxing as mox with mox.file.File("obs://bucket_name/b.txt", "r") as f: csv = pd.read_csv(f) 利用pandas写一个OBS文件。 1 2 3 4 5 import pandas as pd import moxing as mox df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) with mox.file.File("obs://bucket_name/b.txt", "w") as f: df.to_csv(f)
  • 读取完毕后将文件关闭 当读取OBS文件时,实际调用的是HTTP连接读取网络流,注意要记得在读取完毕后将文件关闭。为了防止忘记文件关闭操作,推荐使用with语句,在with语句退出时会自动调用mox.file.File对象的close()方法: 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'r') as f: data = f.readlines()
  • 删除操作 删除一个OBS文件。 例如删除“obs://bucket_name/obs_file.txt”。 1 2 import moxing as mox mox.file.remove('obs://bucket_name/obs_file.txt') 删除一个OBS目录,并且递归的删除这个目录下的所有内容。如果这个目录不存在,则会报错。 例如删除“obs://bucket_name/sub_dir_0”下的所有内容。 1 2 import moxing as mox mox.file.remove('obs://bucket_name/sub_dir_0', recursive=True)
  • 移动和复制操作 移动一个OBS文件或文件夹。移动操作本身是用“复制+删除”来实现的。 一个OBS文件移动到另一个OBS文件,例如将“obs://bucket_name/obs_file.txt”移动到“obs://bucket_name/obs_file_2.txt”。 1 2 import moxing as mox mox.file.rename('obs://bucket_name/obs_file.txt', 'obs://bucket_name/obs_file_2.txt') 移动和复制操作不可以跨桶,必须在同一个桶内操作。 从OBS移动到本地,例如将“obs://bucket_name/obs_file.txt”移动到“/tmp/obs_file.txt”。 1 2 import moxing as mox mox.file.rename('obs://bucket_name/obs_file.txt', '/tmp/obs_file.txt') 从本地移动到OBS,例如将“/tmp/obs_file.txt”移动到“obs://bucket_name/obs_file.txt”。 1 2 import moxing as mox mox.file.rename('/tmp/obs_file.txt', 'obs://bucket_name/obs_file.txt') 从本地移动到本地,例如将“/tmp/obs_file.txt”移动到“/tmp/obs_file_2.txt”,该操作相当于os.rename。 1 2 import moxing as mox mox.file.rename('/tmp/obs_file.txt', '/tmp/obs_file_2.txt') 所有的移动操作均可以操作文件夹,如果操作的是文件夹,那么则会递归移动文件夹下所有的内容。 复制一个文件。mox.file.copy仅支持对文件操作,如果要对文件夹进行操作,请使用mox.file.copy_parallel。 从OBS复制到OBS,例如将“obs://bucket_name/obs_file.txt”复制到“obs://bucket_name/obs_file_2.txt”。 1 2 import moxing as mox mox.file.copy('obs://bucket_name/obs_file.txt', 'obs://bucket_name/obs_file_2.txt') 将OBS文件复制到本地,也就是下载一个OBS文件。例如下载“obs://bucket_name/obs_file.txt”到本地“/tmp/obs_file.txt”。 1 2 import moxing as mox mox.file.copy('obs://bucket_name/obs_file.txt', '/tmp/obs_file.txt') 将本地文件复制到OBS,也就是上传一个OBS文件,例如上传“/tmp/obs_file.txt”到“obs://bucket_name/obs_file.txt”。 1 2 import moxing as mox mox.file.copy('/tmp/obs_file.txt', 'obs://bucket_name/obs_file.txt') 将本地文件复制到本地,操作等价于shutil.copyfile,例如将“/tmp/obs_file.txt”复制到“/tmp/obs_file_2.txt”。 1 2 import moxing as mox mox.file.copy('/tmp/obs_file.txt', '/tmp/obs_file_2.txt') 复制一个文件夹。mox.file.copy_parallel仅支持对文件夹操作,如果要对文件进行操作,请使用mox.file.copy。 从OBS复制到OBS,例如将obs://bucket_name/sub_dir_0复制到obs://bucket_name/sub_dir_1 1 2 import moxing as mox mox.file.copy_parallel('obs://bucket_name/sub_dir_0', 'obs://bucket_name/sub_dir_1') 将OBS文件夹复制到本地,也就是下载一个OBS文件夹。例如下载“obs://bucket_name/sub_dir_0”到本地“/tmp/sub_dir_0”。 1 2 import moxing as mox mox.file.copy_parallel('obs://bucket_name/sub_dir_0', '/tmp/sub_dir_0') 将本地文件夹复制到OBS,也就是上传一个OBS文件夹,例如上传“/tmp/sub_dir_0”到“obs://bucket_name/sub_dir_0”。 1 2 import moxing as mox mox.file.copy_parallel('/tmp/sub_dir_0', 'obs://bucket_name/sub_dir_0') 将本地文件夹复制到本地,操作等价于shutil.copytree,例如将“/tmp/sub_dir_0”复制到“/tmp/sub_dir_1”。 1 2 import moxing as mox mox.file.copy_parallel('/tmp/sub_dir_0', '/tmp/sub_dir_1')
  • 列举操作 列举一个OBS目录,只返回顶层结果(相对路径),不做递归列举。 例如列举“obs://bucket_name/object_dir”,返回该目录下所有的文件和文件夹,不会递归查询。 假设“obs://bucket_name/object_dir”中有如下结构 1 2 3 4 5 bucket_name |- object_dir |- dir0 |- file00 |- file1 调用如下代码: 1 2 import moxing as mox mox.file.list_directory('obs://bucket_name/object_dir') 返回一个list: ['dir0', 'file1'] 递归列举一个OBS目录,返回目录中所有的文件和文件夹(相对路径),并且会做递归查询。 假设obs://bucket_name/object_dir中有如下结构。 1 2 3 4 5 bucket_name |- object_dir |- dir0 |- file00 |- file1 调用如下代码: 1 2 import moxing as mox mox.file.list_directory('obs://bucket_name/object_dir', recursive=True) 返回一个list: ['dir0', 'dir0/file00', 'file1']
  • 查询操作 判断一个OBS文件是否存在,如果存在则返回True,如果不存在则返回False。 1 2 import moxing as mox mox.file.exists('obs://bucket_name/sub_dir_0/file.txt') 判断一个OBS文件夹是否存在,如果存在则返回True,如果不存在则返回False。 1 2 import moxing as mox mox.file.exists('obs://bucket_name/sub_dir_0/sub_dir_1') 由于OBS允许同名的文件和文件夹(Unix操作系统不允许),如果存在同名的文件和文件夹,例如“obs://bucket_name/sub_dir_0/abc”,当调用mox.file.exists时,不论abc是文件还是文件夹,都会返回True。 判断一个OBS路径是否为文件夹,如果是则返回True,否则返回False。 1 2 import moxing as mox mox.file.is_directory('obs://bucket_name/sub_dir_0/sub_dir_1') 由于OBS允许同名的文件和文件夹(Unix操作系统不允许),如果存在同名的文件和文件夹,例如obs://bucket_name/sub_dir_0/abc,当调用mox.file.is_directory时,会返回True。 获取一个OBS文件的大小,单位为bytes。 例如获取“obs://bucket_name/obs_file.txt”的文件大小。 1 2 import moxing as mox mox.file.get_size('obs://bucket_name/obs_file.txt') 递归获取一个OBS文件夹下所有文件的大小,单位为bytes。 例如获取“obs://bucket_name/object_dir”目录下所有文件大小的总和。 1 2 import moxing as mox mox.file.get_size('obs://bucket_name/object_dir', recursive=True) 获取一个OBS文件或文件夹的stat信息,stat信息中包含如下信息。 length:文件大小。 mtime_nsec:创建时间戳。 is_directory:是否为目录。 例如查询一个OBS文件“obs://bucket_name/obs_file.txt”,此文件地址也可以替换成一个文件夹地址。 1 2 3 4 5 import moxing as mox stat = mox.file.stat('obs://bucket_name/obs_file.txt') print(stat.length) print(stat.mtime_nsec) print(stat.is_directory)
  • 读写操作 读取一个OBS文件。 例如读取“obs://bucket_name/obs_file.txt”文件内容,返回string(字符串类型)。 1 2 import moxing as mox file_str = mox.file.read('obs://bucket_name/obs_file.txt') 也可以使用打开文件对象并读取的方式来实现,两者是等价的。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'r') as f: file_str = f.read() 从文件中读取一行,返回string,以换行符结尾。同样可以打开OBS的文件对象。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'r') as f: file_line = f.readline() 从文件中读取所有行,返回一个list,每个元素是一行,以换行符结尾。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'r') as f: file_line_list = f.readlines() 以二进制模式读取一个OBS文件。 例如读取“obs://bucket_name/obs_file.bin”文件内容,返回bytes(字节类型)。 1 2 import moxing as mox file_bytes = mox.file.read('obs://bucket_name/obs_file.bin', binary=True) 也可以使用打开文件对象并读取的方式来实现,两者是等价的。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.bin', 'rb') as f: file_bytes = f.read() 以二进制模式打开的文件也支持读取一行或者读取所有行,用法不变。 将字符串写入一个文件。 例如将字符串“Hello World!”写入OBS文件“obs://bucket_name/obs_file.txt”中。 1 2 import moxing as mox mox.file.write('obs://bucket_name/obs_file.txt', 'Hello World!') 也可以使用打开文件对象并写入的方式来实现,两者是等价的。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'w') as f: f.write('Hello World!') 用写入模式打开文件或者调用mox.file.write时,如果被写入文件不存在,则会创建,如果已经存在,则会覆盖。 追加一个OBS文件。 例如将字符串“Hello World!”追加到“obs://bucket_name/obs_file.txt”文件中。 1 2 import moxing as mox mox.file.append('obs://bucket_name/obs_file.txt', 'Hello World!') 也可以使用打开文件对象并追加的方式来实现,两者是等价的。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/obs_file.txt', 'a') as f: f.write('Hello World!') 用追加模式打开文件或者调用mox.file.append时,如果被写入文件不存在,则会创建,如果已经存在,则直接追加。 当被追加的源文件比较大时,例如“obs://bucket_name/obs_file.txt”文件大小超过5MB时,追加一个OBS文件的性能比较低。 如果以写入模式或追加模式打开文件,当调用write方法时,待写入内容只是暂时的被存在的缓冲区,直到关闭文件对象(退出with语句时会自动关闭文件对象)或者主动调用文件对象的close()方法或flush()方法时,文件内容才会被写入。
  • API对应关系 Python:指本地使用Python对本地文件的操作接口。支持一键切换为对应的MoXing文件操作接口(mox.file)。 mox.file:指MoXing框架中用于文件操作的接口,其与python接口一一对应关系。 tf.gfile:指MoXing文件操作接口一一对应的TensorFlow相同功能的接口,在MoXing中,无法自动将文件操作接口自动切换为TensorFlow的接口,下表呈现内容仅表示功能类似,帮助您更快速地了解MoXing文件操作接口的功能。 表1 API对应关系 Python(本地文件操作接口) mox.file(MoXing文件操作接口) tf.gfile(TensorFlow文件操作接口) glob.glob mox.file.glob tf.gfile.Glob os.listdir mox.file.list_directory(..., recursive=False) tf.gfile.ListDirectory os.makedirs mox.file.make_dirs tf.gfile.MakeDirs os.mkdir mox.file.mk_dir tf.gfile.MkDir os.path.exists mox.file.exists tf.gfile.Exists os.path.getsize mox.file.get_size - os.path.isdir mox.file.is_directory tf.gfile.IsDirectory os.remove mox.file.remove(..., recursive=False) tf.gfile.Remove os.rename mox.file.rename tf.gfile.Rename os.scandir mox.file.scan_dir - os.stat mox.file.stat tf.gfile.Stat os.walk mox.file.walk tf.gfile.Walk open mox.file.File tf.gfile.FastGFile(tf.gfile.Gfile) shutil.copyfile mox.file.copy tf.gfile.Copy shutil.copytree mox.file.copy_parallel - shutil.rmtree mox.file.remove(..., recursive=True) tf.gfile.DeleteRecursively
  • 调用mox.file 输入如下代码,实现如下几个简单的功能。 引入MoXing Framework。 在已有的“modelarts-test08/moxing”目录下,创建一个“test01”文件夹。 调用代码检查“test01”文件夹是否存在,如果存在,表示上一个操作已成功。 1 2 3 4 import moxing as mox mox.file.make_dirs('obs://modelarts-test08/moxing/test01') mox.file.exists('obs://modelarts-test08/moxing/test01') 执行结果如图3所示。注意,每输入一行代码,单击下“Run”运行。您也可以进入OBS管理控制台,检查“modelarts-test08/moxing”目录,查看“test01”文件夹是否已创建成功。更多MoXing的常用操作请参见MoXing常用操作的样例代码。 图3 运行示例
  • 引入MoXing Framework的相关说明 在引入MoXing模块后,Python的标准logging模块会被设置为INFO级别,并打印版本号信息。可以通过以下API重新设置logging的等级。 1 2 3 4 import logging from moxing.framework.util import runtime runtime.reset_logger(level=logging.WARNING) 可以在引入MoXing之前,配置环境变量MOX_SILENT_MODE=1,来防止MoXing打印版本号。使用如下Python代码来配置环境变量,需要在import moxing之前就将环境变量配置好。 1 2 3 import os os.environ['MOX_SILENT_MODE'] = '1' import moxing as mox
  • 为什么要用mox.file 使用Python打开一个本地文件,如下所示: 1 2 with open('/tmp/a.txt', 'r') as f: print(f.read()) OBS目录以“obs://”开头,比如“obs://bucket/XXX.txt”。用户无法直接使用open方法打开OBS文件,上面描述的打开本地文件的代码将会报错。 OBS提供了很多方式和工具给用户使用,如SDK、API、console、OBS Browser等,ModelArts mox.file提供了一套更为方便地访问OBS的API,允许用户通过一系列模仿操作本地文件系统的API来操作OBS文件。例如,可以使用以下代码来打开一个OBS上的文件。 1 2 3 import moxing as mox with mox.file.File('obs://bucket_name/a.txt', 'r') as f: print(f.read()) 例如,列举一个本地路径会使用如下Python代码。 1 2 import os os.listdir('/tmp/my_dir/') 如果要列举一个OBS路径,mox.file则需要如下代码: 1 2 import moxing as mox mox.file.list_directory('obs://bucket_name/my_dir/')
  • 创建OBS操作步骤 登录OBS管理控制台,在桶列表页面右上角单击“创建桶”,创建OBS桶。 图2 创建桶 创建桶的区域需要与ModelArts所在的区域一致。例如:当前ModelArts在华北-北京四区域,在对象存储服务创建桶时,请选择华北-北京四。 如何查看OBS桶与ModelArts的所处区域,请参见查看OBS桶与ModelArts是否在同一区域。 请勿开启桶加密,ModelArts不支持加密的OBS桶,会导致ModelArts读取OBS中的数据失败。 在桶列表页面,单击桶名称,进入该桶的概览页面。 图3 桶列表 单击左侧导航的“对象”,在对象页面单击新建文件夹,创建OBS文件夹。例如,在已创建的OBS桶“c-flowers”中新建一个文件夹“flowers”。 图4 新建文件夹 在OBS桶中创建完文件夹,即可以上传文件,上传文件操作请参见OBS上传操作。
  • 创建训练作业 本案例创建训练作业时,需要配置如下参数。 表1 创建训练作业的配置说明 参数名称 说明 “创建方式” 选择“自定义算法”。 “启动方式” 选择“自定义”。 “镜像” 选择用于训练的自定义镜像。 “代码目录” 执行本次训练作业所需的代码目录。本文示例的代码目录为“obs://test-modelarts/ascend/code/”。 “启动命令” 镜像的Python启动命令。本文示例的启动命令为“bash ${MA_JOB_DIR}/code/run_torch_ddp_npu.sh”。其中,启动脚本的完整代码请参见代码示例。
  • 代码示例 文件目录结构如下所示,将以下文件上传至OBS桶中: code # 代码根目录 └─torch_ddp.py # PyTorch DDP训练代码文件 └─main.py # 使用PyTorch预置框架功能,通过mp.spawn命令启动训练的启动文件 └─torchlaunch.sh # 使用自定义镜像功能,通过torch.distributed.launch命令启动训练的启动文件 └─torchrun.sh # 使用自定义镜像功能,通过torch.distributed.run命令启动训练的启动文件 torch_ddp.py内容如下: import os import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP # 用于通过 mp.spawn 启动 def init_from_arg(local_rank, base_rank, world_size, init_method): rank = base_rank + local_rank dist.init_process_group("nccl", rank=rank, init_method=init_method, world_size=world_size) ddp_train(local_rank) # 用于通过 torch.distributed.launch 或 torch.distributed.run 启动 def init_from_env(): dist.init_process_group(backend='nccl', init_method='env://') local_rank=int(os.environ["LOCAL_RANK"]) ddp_train(local_rank) def cleanup(): dist.destroy_process_group() class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def ddp_train(device_id): # create model and move it to GPU with id rank model = ToyModel().to(device_id) ddp_model = DDP(model, device_ids=[device_id]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(device_id) loss_fn(outputs, labels).backward() optimizer.step() cleanup() if __name__ == "__main__": init_from_env() main.py内容如下: import argparse import torch import torch.multiprocessing as mp parser = argparse.ArgumentParser(description='ddp demo args') parser.add_argument('--world_size', type=int, required=True) parser.add_argument('--rank', type=int, required=True) parser.add_argument('--init_method', type=str, required=True) args, unknown = parser.parse_known_args() if __name__ == "__main__": n_gpus = torch.cuda.device_count() world_size = n_gpus * args.world_size base_rank = n_gpus * args.rank # 调用 DDP 示例代码中的启动函数 from torch_ddp import init_from_arg mp.spawn(init_from_arg, args=(base_rank, world_size, args.init_method), nprocs=n_gpus, join=True) torchlaunch.sh内容如下: #!/bin/bash # 系统默认环境变量,不建议修改 MASTER_HOST="$VC_WORKER_HOSTS" MASTER_ADDR="${VC_WORKER_HOSTS%%,*}" MASTER_PORT="6060" JOB_ID="1234" NNODES="$MA_NUM_HOSTS" NODE_RANK="$VC_TASK_INDEX" NGPUS_PER_NODE="$MA_NUM_GPUS" # 自定义环境变量,指定python脚本和参数 PYTHON_SCRIPT=${MA_JOB_DIR}/code/torch_ddp.py PYTHON_ARGS="" CMD="python -m torch.distributed.launch \ --nnodes=$NNODES \ --node_rank=$NODE_RANK \ --nproc_per_node=$NGPUS_PER_NODE \ --master_addr $MASTER_ADDR \ --master_port=$MASTER_PORT \ --use_env \ $PYTHON_SCRIPT \ $PYTHON_ARGS " echo $CMD $CMD torchrun.sh内容如下: PyTorch 2.1版本需要将“rdzv_backend”参数设置为“static:--rdzv_backend=static”。 #!/bin/bash # 系统默认环境变量,不建议修改 MASTER_HOST="$VC_WORKER_HOSTS" MASTER_ADDR="${VC_WORKER_HOSTS%%,*}" MASTER_PORT="6060" JOB_ID="1234" NNODES="$MA_NUM_HOSTS" NODE_RANK="$VC_TASK_INDEX" NGPUS_PER_NODE="$MA_NUM_GPUS" # 自定义环境变量,指定python脚本和参数 PYTHON_SCRIPT=${MA_JOB_DIR}/code/torch_ddp.py PYTHON_ARGS="" if [[ $NODE_RANK == 0 ]]; then EXT_ARGS="--rdzv_conf=is_host=1" else EXT_ARGS="" fi CMD="python -m torch.distributed.run \ --nnodes=$NNODES \ --node_rank=$NODE_RANK \ $EXT_ARGS \ --nproc_per_node=$NGPUS_PER_NODE \ --rdzv_id=$JOB_ID \ --rdzv_backend=c10d \ --rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT \ $PYTHON_SCRIPT \ $PYTHON_ARGS " echo $CMD $CMD
  • 创建训练作业 方式一:使用PyTorch预置框架功能,通过mp.spawn命令启动训练作业。 创建训练作业的关键参数如表1所示。 表1 创建训练作业(预置框架) 参数名称 说明 创建方式 选择“自定义算法”。 启动方式 选择“预置框架”,引擎选择“PyTorch”,PyTorch版本根据训练要求选择。 代码目录 选择OBS桶中训练code文件夹所在路径,例如“obs://test-modelarts/code/”。 启动文件 选择代码目录中训练作业的Python启动脚本。例如“obs://test-modelarts/code/main.py”。 超参 当资源规格为单机多卡时,需要指定超参world_size和rank。 当资源规格为多机时(即实例数大于 1),无需设置超参world_size和rank,超参会由平台自动注入。 方式二:使用自定义镜像功能,通过torch.distributed.launch命令启动训练作业。 创建训练作业的关键参数如表2所示。 表2 创建训练作业(自定义镜像+torch.distributed.launch命令) 参数名称 说明 创建方式 选择“自定义算法”。 启动方式 选择“自定义”。 镜像 选择用于训练的PyTorch镜像。 代码目录 选择OBS桶中训练code文件夹所在路径,例如“obs://test-modelarts/code/”。 启动命令 输入镜像的Python启动命令,例如: bash ${MA_JOB_DIR}/code/torchlaunch.sh 方式三:使用自定义镜像功能,通过torch.distributed.run命令启动训练作业。 创建训练作业的关键参数如表3所示。 表3 创建训练作业(自定义镜像+torch.distributed.run命令) 参数名称 说明 创建方式 选择“自定义算法”。 启动方式 选择“自定义”。 镜像 选择用于训练的PyTorch镜像。 代码目录 选择OBS桶中训练code文件夹所在路径,例如“obs://test-modelarts/code/”。 启动命令 输入镜像的Python启动命令,例如: bash ${MA_JOB_DIR}/code/torchrun.sh
  • 分布式训练完整代码示例 以下对resnet18在cifar10数据集上的分类任务,给出了分布式训练改造(DDP)的完整代码示例。 训练启动文件main.py内容如下(如果需要执行单机单卡训练作业,则将分布式改造的代码删除): import datetime import inspect import os import pickle import random import logging import argparse import numpy as np from sklearn.metrics import accuracy_score import torch from torch import nn, optim import torch.distributed as dist from torch.utils.data import TensorDataset, DataLoader from torch.utils.data.distributed import DistributedSampler file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename) def load_pickle_data(path): with open(path, 'rb') as file: data = pickle.load(file, encoding='bytes') return data def _load_data(file_path): raw_data = load_pickle_data(file_path) labels = raw_data[b'labels'] data = raw_data[b'data'] filenames = raw_data[b'filenames'] data = data.reshape(10000, 3, 32, 32) / 255 return data, labels, filenames def load_cifar_data(root_path): train_root_path = os.path.join(root_path, 'cifar-10-batches-py/data_batch_') train_data_record = [] train_labels = [] train_filenames = [] for i in range(1, 6): train_file_path = train_root_path + str(i) data, labels, filenames = _load_data(train_file_path) train_data_record.append(data) train_labels += labels train_filenames += filenames train_data = np.concatenate(train_data_record, axis=0) train_labels = np.array(train_labels) val_file_path = os.path.join(root_path, 'cifar-10-batches-py/test_batch') val_data, val_labels, val_filenames = _load_data(val_file_path) val_labels = np.array(val_labels) tr_data = torch.from_numpy(train_data).float() tr_labels = torch.from_numpy(train_labels).long() val_data = torch.from_numpy(val_data).float() val_labels = torch.from_numpy(val_labels).long() return tr_data, tr_labels, val_data, val_labels def get_data(root_path, custom_data=False): if custom_data: train_samples, test_samples, img_size = 5000, 1000, 32 tr_label = [1] * int(train_samples / 2) + [0] * int(train_samples / 2) val_label = [1] * int(test_samples / 2) + [0] * int(test_samples / 2) random.seed(2021) random.shuffle(tr_label) random.shuffle(val_label) tr_data, tr_labels = torch.randn((train_samples, 3, img_size, img_size)).float(), torch.tensor(tr_label).long() val_data, val_labels = torch.randn((test_samples, 3, img_size, img_size)).float(), torch.tensor( val_label).long() tr_set = TensorDataset(tr_data, tr_labels) val_set = TensorDataset(val_data, val_labels) return tr_set, val_set elif os.path.exists(os.path.join(root_path, 'cifar-10-batches-py')): tr_data, tr_labels, val_data, val_labels = load_cifar_data(root_path) tr_set = TensorDataset(tr_data, tr_labels) val_set = TensorDataset(val_data, val_labels) return tr_set, val_set else: try: import torchvision from torchvision import transforms tr_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transforms) val_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transforms) return tr_set, val_set except Exception as e: raise Exception( f"{e}, you can download and unzip cifar-10 dataset manually, " "the data url is http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz") class Block(nn.Module): def __init__(self, in_channels, out_channels, stride=1): super().__init__() self.residual_function = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(out_channels) ) self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = self.residual_function(x) + self.shortcut(x) return nn.ReLU(inplace=True)(out) class ResNet(nn.Module): def __init__(self, block, num_classes=10): super().__init__() self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True)) self.conv2 = self.make_layer(block, 64, 64, 2, 1) self.conv3 = self.make_layer(block, 64, 128, 2, 2) self.conv4 = self.make_layer(block, 128, 256, 2, 2) self.conv5 = self.make_layer(block, 256, 512, 2, 2) self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.dense_layer = nn.Linear(512, num_classes) def make_layer(self, block, in_channels, out_channels, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(in_channels, out_channels, stride)) in_channels = out_channels return nn.Sequential(*layers) def forward(self, x): out = self.conv1(x) out = self.conv2(out) out = self.conv3(out) out = self.conv4(out) out = self.conv5(out) out = self.avg_pool(out) out = out.view(out.size(0), -1) out = self.dense_layer(out) return out def setup_seed(seed): torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.deterministic = True def obs_transfer(src_path, dst_path): import moxing as mox mox.file.copy_parallel(src_path, dst_path) logging.info(f"end copy data from {src_path} to {dst_path}") def main(): seed = datetime.datetime.now().year setup_seed(seed) parser = argparse.ArgumentParser(description='Pytorch distribute training', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--enable_gpu', default='true') parser.add_argument('--lr', default='0.01', help='learning rate') parser.add_argument('--epochs', default='100', help='training iteration') parser.add_argument('--init_method', default=None, help='tcp_port') parser.add_argument('--rank', type=int, default=0, help='index of current task') parser.add_argument('--world_size', type=int, default=1, help='total number of tasks') parser.add_argument('--custom_data', default='false') parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir')) parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir')) args, unknown = parser.parse_known_args() args.enable_gpu = args.enable_gpu == 'true' args.custom_data = args.custom_data == 'true' args.lr = float(args.lr) args.epochs = int(args.epochs) if args.custom_data: logging.warning('you are training on custom random dataset, ' 'validation accuracy may range from 0.4 to 0.6.') ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ### dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank) ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ### tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data) batch_per_gpu = 128 gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1 batch = batch_per_gpu * gpus_per_node tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False) ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ### tr_sampler = DistributedSampler(tr_set, num_replicas=args.world_size, rank=args.rank) tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True) ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ### val_loader = DataLoader(val_set, batch_size=batch, shuffle=False) lr = args.lr * gpus_per_node * args.world_size max_epoch = args.epochs model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block) ### 分布式改造,构建DDP分布式模型 ### model = nn.parallel.DistributedDataParallel(model) ### 分布式改造,构建DDP分布式模型 ### optimizer = optim.Adam(model.parameters(), lr=lr) loss_func = torch.nn.CrossEntropyLoss() os.makedirs(args.output_dir, exist_ok=True) for epoch in range(1, max_epoch + 1): model.train() train_loss = 0 ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ### tr_sampler.set_epoch(epoch) ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ### for step, (tr_x, tr_y) in enumerate(tr_loader): if args.enable_gpu: tr_x, tr_y = tr_x.cuda(), tr_y.cuda() out = model(tr_x) loss = loss_func(out, tr_y) optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader))) val_loss = 0 pred_record = [] real_record = [] model.eval() with torch.no_grad(): for step, (val_x, val_y) in enumerate(val_loader): if args.enable_gpu: val_x, val_y = val_x.cuda(), val_y.cuda() out = model(val_x) pred_record += list(np.argmax(out.cpu().numpy(), axis=1)) real_record += list(val_y.cpu().numpy()) val_loss += loss_func(out, val_y).item() val_accu = accuracy_score(real_record, pred_record) print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n') if args.rank == 0: # save ckpt every epoch torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth')) if __name__ == '__main__': main()
  • 常见问题 示例代码中如何使用不同的数据集? 上述代码如果使用cifar10数据集,则将数据集下载并解压后,上传至OBS桶中,文件目录结构如下: DDP |--- main.py |--- input_dir |------ cifar-10-batches-py |-------- data_batch_1 |-------- data_batch_2 |-------- ... 其中“DDP”为创建训练作业时的“代码目录”,“main.py”为上文代码示例(即创建训练作业时的“启动文件”),“cifar-10-batches-py”为解压后的数据集文件夹(放在input_dir文件夹下)。 如果使用自定义的随机数据,则将代码示例中的参数“custom_data”改为“true”,修改后内容如下: parser.add_argument('--custom_data', default='true') 然后直接运行代码示例“main.py”即可,创建训练作业的参数与上图相同。 为什么DDP可以不输入主节点ip? “parser.add_argument('--init_method', default=None, help='tcp_port')”中的init method参数值会包含主节点的ip和端口,由平台自动入参,不需要用户输入主节点的ip和端口。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全