云服务器内容精选

  • 工作空间相关操作 表1 工作空间相关操作 任务 操作步骤 编辑工作空间 单击操作栏中的“编辑”,编辑工作空间。支持修改空间名称、描述和授权对象。 删除工作空间 单击操作栏中的“删除”,删除工作空间。工作空间删除后会默认清理该工作空间下的所有资源。删除操作无法恢复,请谨慎操作。 查询工作空间 支持按状态和输入关键字两种方式查询工作空间。 按状态查询:在“请选择状态”下拉列表选择工作空间状态,即可查询该状态的工作空间。 输入关键字查询:在搜索输入框中输入搜索条件,按回车键即可查询工作空间。 查看工作空间详情 单击工作空间名称,查看工作空间详情。
  • 新建工作空间 在左侧菜单栏中,单击“工作空间”。 单击“新建工作空间”。 图1 新建工作空间 完成工作空间基本信息。 图2 新建工作空间 空间名称:输入工作空间的名称,只能包含数字、英文、中文、下划线、中划线。 描述:简单描述空间名称,最大长度为255。 授权类型:当前可选择“Internal”。 工作空间授权类型分为“Public”和“Internal”。 “Public”仅在租户(主账号和所有子账号)内部访问。 “Internal”为创建者、主账号、指定的子账号可访问。当授权类型为Internal时,需要制定可访问的子账号的账号名或者账号id,可选择多个。 授权对象:请选择需要授权的对象。当创建的 IAM 子用户没有IAM管理权限时,无法选择授权对象,推荐为IAM子用户添加IAM ReadOnlyAccess策略。 单击“创建”,在工作空间列表查看创建好的工作空间。
  • 工作空间简介 在使用Octopus自动驾驶云服务之前,如果您想实现资源隔离管理,您可以创建工作空间,实现资源隔离。 default工作空间为系统预置,Public授权类型,无法创建、编辑和删除,默认为主工作空间。 创建不同的工作空间后,您可以在控制台左侧导航栏中,单击“当前工作空间”下方的下拉框,单击工作空间,进入所选工作空间内使用Octopus自动驾驶云服务。 图1 当前工作空间 父主题: 工作空间
  • 示例代码 下面是rosbag脱敏的算子示例: # mask.py import json import logging import multiprocessing as mp import os import shutil import time from pathlib import Path from typing import cast import av import numpy as np import open3d from rosbags.highlevel import AnyReader from rosbags.interfaces import ConnectionExtRosbag1, ConnectionExtRosbag2 from rosbags.rosbag1 import Writer as Writer1 from rosbags.rosbag2 import Writer as Writer2 from rosbags.serde import cdr_to_ros1, serialize_cdr from rosbags.typesys import get_types_from_msg, register_types from rosbags.typesys.types import builtin_interfaces__msg__Time as Time from rosbags.typesys.types import \ sensor_msgs__msg__CompressedImage as CompressedImage from rosbags.typesys.types import std_msgs__msg__Header as Header logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', ) LOG = logging.getLogger(__file__) # Octopus数据服务拉起镜像时灌入的环境变量 # 获取环境变量 input_path = os.getenv('input_path', 'data/hangyan-move.bag.bak') raw_dir = os.getenv('raw_dir', 'empty_dir/raw') # 抽取的文件存放目录 desens_dir = os.getenv('desensitized_dir', 'empty_dir/desens') # 脱敏后的文件存放目录 output_dir = os.getenv('output_dir', 'empty_dir/output') lidar_process_num = os.getenv('lidar_process_num', 5) # lidar数据进程数 # 用户自定义环境变量 rosbag_version = os.getenv('rosbag_version', '1') # rosbag版本,取值为'1'或'2' image_topics = [ x.strip(' ') for x in os.getenv('image_topics', '/camera_encoded_1').split(',') ] # 图像数据的topic列表 gnss_topic = os.getenv('gnss_topic', '/inspvax') # gnss数据的topic,gnss数据只能有一个topic lidar_topics = [ x.strip(' ') for x in os.getenv('lidar_topic', '/pandar').split(',') ] # 点云数据的topic列表 # 注册自定义消息类型 Video_encoded_data_text = Path('msgs/Video_encoded_data.msg').read_text() NovatelMessageHeader_text = Path('msgs/NovatelMessageHeader.msg').read_text() NovatelExtendedSolutionStatus_text = Path( 'msgs/NovatelExtendedSolutionStatus.msg').read_text() NovatelReceiverStatus_text = Path('msgs/NovatelReceiverStatus.msg').read_text() Inspvax_text = Path('msgs/Inspvax.msg').read_text() add_types = {} add_types.update( get_types_from_msg( Video_encoded_data_text, 'kyber_msgs/msg/Video_encoded_data', )) add_types.update( get_types_from_msg( NovatelMessageHeader_text, 'novatel_gps_msgs/msg/NovatelMessageHeader', )) add_types.update( get_types_from_msg( NovatelExtendedSolutionStatus_text, 'novatel_gps_msgs/msg/NovatelExtendedSolutionStatus', )) add_types.update( get_types_from_msg( NovatelReceiverStatus_text, 'novatel_gps_msgs/msg/NovatelReceiverStatus', )) add_types.update( get_types_from_msg( Inspvax_text, 'novatel_gps_msgs/msg/Inspvax', )) register_types(add_types) # create gnss file gnss_file_path = Path(raw_dir, 'gnss') / f'{gnss_topic}.json'.strip('/') Path.mkdir(gnss_file_path.parent, parents=True, exist_ok=True) def extract_image(input_rosbag): '''从原始rosbag中抽取图像数据.''' LOG.info('Start extracting image.') codec_ctx = av.codec.Codec('hevc', 'r') h265_code = codec_ctx.create() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic in image_topics: deserialized_data = reader.deserialize(data, connection.msgtype) try: data = deserialized_data.raw_data packet = av.packet.Packet(data) out = h265_code.decode(packet) img = None for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() # 图像存放路径 file_name = f'{timestamp}.jpg' f_path = Path(raw_dir, 'image') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_image') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) img.save(tmp_file) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) except Exception as e: LOG.info("%s frame can not trans to jpg, message: %s", timestamp, str(e)) LOG.info('Finish extracting image.') def extract_lidar(task_id, task_num, input_rosbag): '''从原始rosbag中抽取点云数据.''' LOG.info('Start extracting pcd.') with AnyReader([Path(input_rosbag)]) as reader: for i, (connection, timestamp, data) in enumerate(reader.messages()): if i % task_num != task_id: continue topic = connection.topic if topic in lidar_topics: deserialized_data = reader.deserialize(data, connection.msgtype) pcd = open3d.geometry.PointCloud() reshaped = deserialized_data.data.reshape( int(len(deserialized_data.data) / 3), 3) pcd.points = open3d.utility.Vector3dVector(reshaped) file_name = f'{timestamp}.pcd' f_path = Path(raw_dir, 'lidar') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_lidar') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) open3d.io.write_point_cloud(str(tmp_file), pcd) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) LOG.info('Finish extracting pcd.') def extract_gnss(input_rosbag): '''从原始rosbag中抽取gnss数据.''' LOG.info('Start extracting rosbag.') gnss_file = open(gnss_file_path, 'w') gnss = dict() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) # 这里以msgytpe为NavSatFix为例 latitude = deserialized_data.latitude longitude = deserialized_data.longitude altitude = deserialized_data.altitude gnss[timestamp] = { 'latitude': latitude, 'longitude': longitude, 'altitude': altitude } gnss_file.write(json.dumps(gnss)) gnss_file.close() LOG.info('Finish extracting gnss.') def _get_masked_image(topic, timestamp): '''从脱敏后的图像数据中获取目标图像数据.''' file = Path(desens_dir, 'image') / topic.strip('/') / f'{timestamp}.jpg' if file.is_file(): return np.fromfile(file, dtype='uint8') else: return None def _get_conn_map(rosbag_version: int, reader, writer): '''构建connection的索引.''' conn_map = {} if rosbag_version == '1': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag1, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, conn.msgdef, conn.md5sum, ext.callerid, ext.latching, ) elif rosbag_version == '2': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag2, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, ext.serialization_format, ext.offered_qos_profiles, ) return conn_map def _serialize_data(rosbag_version, data, msgtype): '''对数据进行序列化.''' if rosbag_version == '1': return cdr_to_ros1(serialize_cdr(data, msgtype), msgtype) elif rosbag_version == '2': return serialize_cdr(data, msgtype) def generate_rosbag(input_rosbag, output_rosbag): '''生成脱敏后rosbag.''' LOG.info('Start generating rosbag.') gnss_file = open( Path(desens_dir, 'gnss') / f'{gnss_topic}.json'.strip('/'), 'r') gnss_data = json.load(gnss_file) gnss_file.close() Writer = Writer1 if rosbag_version == '1' else Writer2 with AnyReader([Path(input_rosbag) ]) as reader, Writer(Path(output_rosbag)) as writer: conn_map = _get_conn_map(rosbag_version, reader, writer) for connection, timestamp, data in reader.messages(): topic = connection.topic # 当topic为图像数据的topic时,读取脱敏后图像数据 if topic in image_topics: masked_data = _get_masked_image(topic, timestamp) if masked_data is None: # 没有解析出图像文件时,不要该帧了 continue deserialized_data = CompressedImage( Header( stamp=Time( sec=int(timestamp // 10**9), nanosec=int(timestamp % 10**9), ), frame_id='0', ), format='jpg', data=masked_data, ) data = _serialize_data( rosbag_version, deserialized_data, CompressedImage.__msgtype__, ) # 当topic为gnss数据时,读取脱敏后gnss数据 elif topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) deserialized_data.latitude = gnss_data.get( str(timestamp)).get('latitude') deserialized_data.longitude = gnss_data.get( str(timestamp)).get('longitude') deserialized_data.altitude = gnss_data.get( str(timestamp)).get('altitude') data = _serialize_data( rosbag_version, deserialized_data, connection.msgtype, ) # 当topic为点云数据时,读取脱敏后点云数据 elif topic in lidar_topics: deserialized_data = reader.deserialize( data, connection.msgtype, ) file = Path( desens_dir, 'lidar', ) / topic.strip('/') / f'{timestamp}.pcd' point_cloud = open3d.io.read_point_cloud(str(file)) deserialized_data.data = np.asarray( point_cloud.points).flatten() writer.write(conn_map[connection.id], timestamp, data) # 生成_SUC CES S文件标识完成数据抽取 Path(output_dir, '_SUCCESS').touch() LOG.info('Finish generating rosbag.') if __name__ == "__main__": LOG.info('Start user operator.') process_image = mp.Process(target=extract_image, args=(input_path, )) pool_lidar = mp.Pool(processes=lidar_process_num) for i in range(lidar_process_num): pool_lidar.apply_async(extract_lidar, args=(i, lidar_process_num, input_path)) process_gnss = mp.Process(target=extract_gnss, args=(input_path, )) # 启动子进程 process_image.start() pool_lidar.close() process_gnss.start() process_image.join() pool_lidar.join() process_gnss.join() LOG.info('Child processes exit.') # 生成_SUCCESS文件标识完成数据抽取 Path(raw_dir, '_SUCCESS').touch() # 后面输出的rosbag文件与输入的rosbag文件保持同名 output_rosbag_file = Path(output_dir, Path(input_path).name) # 如果输出文件夹不存在,先创建文件夹 Path.mkdir(output_rosbag_file.parent, parents=True, exist_ok=True) # 检测到脱敏任务结束后,生成新的rosbag文件 while time.sleep(1) is None: if Path(desens_dir).joinpath('_SUCCESS').is_file(): generate_rosbag(Path(input_path), output_rosbag_file) break
  • 作业输入输出规范 用户完成自定义脱敏算子创建,运行作业容器时Octopus平台向其中注入以下环境变量: input_file:待脱敏的文件路径 raw_dir:抽取的image,gnss,lidar数据存放路径 desensitized_dir:脱敏后的image,gnss,lidar数据存放路径 output_dir:脱敏后的文件存放路径 用户根据需要可以自定义环境变量,以rosbag文件为例,可以定义如下环境变量: rosbag_version:robag版本 image_topics:图像数据topic列表 gnss_topic:gnss数据topic lidar_topics:lidar数据topic列表 用户作业容器需要将input_file中的image,gnss,lidar数据抽取到raw_dir,待系统内置算子脱敏完成,并将脱敏后的数据存放到desensitized_dir后,用户算子根据input_file和desensitized_dir中的脱敏数据生成新的数据文件存放到output_dir。 文件结构如下: -raw --image ---topic0 ----timestamp1.jpg ----timestamp2.jpg ---topic1 ----timestamp1.jpg --gnss ---gnss_topic.json --lidar ---pcd_topic0 ----timestamp0.pcd ----timestamp1.pcd --SUCCESS -desensitized --image ---topic0 ----timestamp1.jpg ----timestamp2.jpg ---topic1 ----timestamp1.jpg --gnss ---gnss_topic.json --lidar ---pcd_topic0 ----timestamp0.pcd ----timestamp1.pcd --SUCCESS -output --test.bag --SUCCESS 其中,“SUCCESS”文件为标识文件,标识所在阶段的任务结束。
  • 构建镜像 Dockerfile示例 FROM python:3.10 COPY mask.py /home/main/ WORKDIR /home/main/ RUN pip install rosbags requests numpy USER root 构建镜像 sudo docker build --no-cache -f Dockerfile -t rosbagmask:0.1 . 本地调试 准备一个待处理的rosbag包。 sudo docker run -v ${HOME}/data/test.bag:/home/main/test.bag --env input_file=/home/main/test.bag –env raw_dir=/home/main/raw_dir –env desensitized_dir=/home/main/desensitized_dir –env output_dir=/home/main/output_dir -it rosbagmask:0.1 /bin/sh -c "python mask.py"
  • 创建脱敏作业 查看数据递送订单详情,选择待脱敏的数据包,单击创建脱敏作业。 图3 创建脱敏作业 完成算子基本配置。 图4 算子基本信息 处理算子:根据需要选择数据脱敏算子。 资源规格:选择合适的资源规格。 优先级:设定任务的优先级,数值取[-50,50]的整数,数字越大,优先级越高。 环境变量:配置算子的环境变量。允许添加的环境变量个数不超过10个。 Key:只能由英文、数字、和特殊符号(,-_)组成,且需要以字母开头 。长度不超过64个字符。 Value:只能由英文、数字和特殊符号(\/,.[]-_)组成 。长度不超过512个字符。 合规数据ID:展示选择数据包的ID。 单击“创建”,执行数据包脱敏任务。脱敏完成后,数据包状态变为脱敏完成。
  • 数据合规简介 数据合规服务是自动驾驶云服务为用户提供合规监管服务,包含合规室管理、数据流监管、账号管理、权限管理、日志审计服务能力。支持对上传合规服务的数据包进行脱敏处理,提供合规脱敏能力,支持将数据包中人脸、车牌、点云脱敏。支持用户使用合规服务创建脱敏算子、下发脱敏任务。 在使用数据合规服务之前,需要先购买合规服务和数据服务。如何购买服务? 在使用数据合规脱敏能力之前,需要先开通合规脱敏服务。 图1 使用合规服务和数据服务时序图 父主题: 数据合规
  • junction 简述:地图场景为交叉口.lead_vehicle和主车Ego一前一后分别以LeadVehicle_TargetSpeed_Ve0和Ego_TargetSpeed_Ve0的初始速度向交叉口行驶,Ego设定了目标在右转车道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position).另一路段上有一辆车vehicle1,也以LeadVehicle_TargetSpeed_Ve0的速度朝交叉口行驶..控制器有时会根据环境车的位置更改主车Ego的速度. 地图文件(odr) scenario Junction: m_scene: scenery lane_num: int = [2, 3, 4] bikeway: bool = false sidewalk: bool = false junction_type: junction_type = ["crossroad", "T-junction"] junction_1: junction with: keep(it.lane_num == lane_num) keep(it.bikeway == bikeway) keep(it.sidewalk == sidewalk) keep(it.junction_type == junction_type) 场景文件(osc) import standard scenario Junction: # map map: map map.set_map_file("./junction.odr") # parameter Ego_TargetSpeed_Ve0: speed = [50kph..60kph] Ego_InitPosition_RoadId: string = '1' Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..100m] Ego_Odr: odr_point = map.create_odr_point(road_id: Ego_InitPosition_RoadId, lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 225m, y: -100m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) LeadVehicle_TargetSpeed_Ve0: speed = 45kph Vehicle1_Odr: odr_point = map.create_odr_point(road_id: '2', lane_id: '2', s: 100.0m, t: 0.0m) m_orientation: orientation_3d with: keep(it.roll == 0.0rad) keep(it.pitch == 0.0rad) keep(it.yaw == -1.57rad) Vehicle1_InitPosition: pose_3d with: keep(it.odr_point == Vehicle1_Odr) keep(it.orientation == m_orientation) m_profile: dynamics_shape = [sinusoidal, linear, step] m_rate_peak: acceleration = [5kmphps, 10kmphps] Duration: time = [100s, 120s] # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") vehicle1: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_TargetSpeed_Ve0) lead_vehicle.assign_init_position() with: lane(same_as: Ego) position(distance: 30.0m, ahead_of: Ego) lead_vehicle.assign_init_speed(LeadVehicle_TargetSpeed_Ve0) vehicle1.assign_init_position(position: Vehicle1_InitPosition) vehicle1.assign_init_speed(LeadVehicle_TargetSpeed_Ve0) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • split 简述:地图场景为匝道分流.lead_vehicle和主车Ego在主道的同一车道上分别以35kph和Ego_InitSpeed_Ve0的初始速度一前一后行驶,Ego设定了目标在匝道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position).控制器有时会根据lead_vehicle的位置更改主车Ego的速度. 使用xyz坐标创建终点时,由于匝道地图泛化会使终点偏移,建议在创建测评任务时为检测终点设置合适的的半径,例如"到达半径5m". 地图文件(odr) scenario Split: m_scene: scenery lane_width: length = [3m..4m] main_speed: speed = 120kph ramp_speed: speed = 60kph ramp_length: length = [300m..350m] road_aids_type: road_aids_type = "DType-2" split_1: split with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == 3) keep(it.ramp_lane_num == 1) keep(it.main_speed == main_speed) keep(it.ramp_speed == ramp_speed) keep(it.radius_of_curvature == 200m) keep(it.ramp_length == ramp_length) keep(it.road_aids_type == road_aids_type) 场景文件(osc) import standard scenario Split: # map map: map map.set_map_file("./split.odr") # parameter Ego_InitSpeed_Ve0: speed = [90kph..110kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..100m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) m_InitDistance: length = [60m..100m] Target_xyz: xyz_point = map.create_xyz_point(x: 445m, y: -46.5m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) lead_vehicle.assign_init_position() with: lane(same_as: Ego) position(distance: m_InitDistance, ahead_of: Ego) lead_vehicle.assign_init_speed(35kph) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • merge 简述:地图场景为匝道合流.主车Ego在主道行驶,初始速度为Ego_InitSpeed_Ve0,Ego设定了目标在主道右侧2车道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position),从车side_vehicle在匝道行驶,初始速度为SideVehicle_InitSpeed_Ve0.side_vehicle从匝道汇入主道.控制器有时会根据side_vehicle的位置更改主车Ego的速度. 地图文件(odr) scenario Merge: m_scene: scenery lane_width: length = [3m, 4m] radius_of_curvature: length = [200m..1000m] ramp_lane_num: int = 1 ramp_length: length = [200m..300m] main_speed: speed = 120kph ramp_speed: speed = 60kph road_aids_type: road_aids_type = ["DType-1", "DType-2", "PType"] merge_1: merge with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == 2) keep(it.ramp_lane_num == ramp_lane_num) keep(it.main_speed == main_speed) keep(it.ramp_speed == ramp_speed) keep(it.radius_of_curvature == radius_of_curvature) keep(it.ramp_length == ramp_length) keep(it.road_aids_type == road_aids_type) 场景文件(osc) import standard scenario Merge: # map map: map map.set_map_file("./merge.odr") # parameter Ego_InitSpeed_Ve0: speed = [90kph..110kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) SideVehicle_InitSpeed_Ve0: speed = [45kph, 50kph, 55kph] SideVehicle_s: length = [30.0m..80.0m] SideVehicle_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: '-1', s: SideVehicle_s, t: 0.0m) SideVehicle_InitPosition: pose_3d with: keep(it.odr_point == SideVehicle_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 530m, y: -2m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") side_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) side_vehicle.assign_init_position(position: SideVehicle_InitPosition) side_vehicle.assign_init_speed(SideVehicle_InitSpeed_Ve0) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • straight 简述:地图场景为直道.lead_vehicle和主车Ego在主道上分别以40kph和Ego_InitSpeed_Ve0的初始速度一前一后行驶,Ego设定了目标在主道右2车道上的目标点Target_position,同时激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position),控制器有时会根据lead_vehicle的位置更改主车Ego的速度. 地图文件(odr) scenario Straight: m_scene: scenery lane_width: length = [3m..4m] right_lane_num: int = [2, 3] bikeway: bool = [true, false] sidewalk: bool = [true, false] main_speed: speed = 60kph road_length: length = [550m, 600m] straight_1: straight with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == right_lane_num) keep(it.bikeway == bikeway) keep(it.sidewalk == sidewalk) keep(it.main_speed == main_speed) keep(it.road_length == road_length) 场景文件(osc) import standard scenario Straight: # map map: map map.set_map_file("./straight.odr") # parameter Ego_InitSpeed_Ve0: speed = [55kph..60kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) m_distance: length = [50m..80m] LeadVehicle_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: '-1', s: m_distance, t: 0.0m) LeadVehicle_InitPosition: pose_3d with: keep(it.odr_point == LeadVehicle_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 450m, y: -4.5m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) lead_vehicle.assign_init_position(position: LeadVehicle_InitPosition) lead_vehicle.assign_init_speed(40kph) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • 基于样例的拓展 本章的场景样例可以结合第一章介绍的各模块代码样例进行修改和拓展.以straigh场景的osc为例,设定初始位置时,除了使用st坐标系(odr_point,即osc1.0中的LanePosition)还可以使用xyz坐标系(xyz_point,即osc1.0中的WorldPosition)来替代: st坐标系(odr_point)写法: Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) 对应xyz坐标系(xyz_point)写法,其中y值-1.5m和-4.5m分别对应车道宽度为3m时的'-1'和'-2'车道的中心位置: y: length = [-1.5m, -4.5m] x: length = [0m..30m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 以上对等替换仅存在于odr_point的road_id固定的情形,当road_id(仿真器A中的TrackId)是一个可泛化的值时,起点可以出现在多条道路上,对应多段x和y值的组合区间,无法使用一个xyz_point直接对等泛化.此时可以使用多个动态场景来实现同等的泛化,例如: 假设地图上存在多条road,且对odr_point的road_id进行了泛化: Ego_InitPosition_Road_id: string = ['10', '1'] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: Ego_InitPosition_Road_id, lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) 可以在两个动态场景文件中分别使用一组xyz_point来替代,例如: 第一组对应road_id为'10'的情况(同前例): y: length = [-1.5m, -4.5m] x: length = [0m..30m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 第二组对应road_id为'1'的情况,该道路与road_id为'10'的道路垂直,x值的101.5m和104.5m分别对应车道宽度为3m时的'-1'和'-2'车道的中心位置: y: length = [-40m..-10m] x: length = [101.5m, 104.5m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 父主题: 种子地图的逻辑场景样例(仿真器A)
  • 种子地图的逻辑场景样例(仿真器A) 配合静态场景的种子场景,在本节提供了对应的适配仿真器A的逻辑场景样例.同样,考虑到仿真器A的支持程度,建议在转换时选择osc1.0版本. 使用仿真器A打开osc1.0场景时,由于编辑器显示问题,车辆会在道路初始位置重叠,但这不会影响场景运行,运行场景后车辆会从场景文件设置的初始位置开始行驶. straight merge split junction one way junction 基于样例的拓展 父主题: 动静态配套样例
  • road_aids_type 匝道类型,用于静态场景的split场景和merge场景 road_aids_type list ENUM_ROAD_AIDS_TYPE = ("DType-1", "DType-2", "PType") DType-1(直接式1): DType-1(直接式1)是指匝道合入(分流)时直接汇入(离开)主路,并且匝道部分逐渐收缩(扩展)直到合入(离开)主路,如下图所示: DType-2(直接式2): DType-2(直接式2)是指匝道合入(分流)时直接汇入(离开)主路,主路的车道数也随之发生变化,匝道部分不进行收缩(扩展),如下图所示: PType(平行式): PType(平行式)是指匝道合入(分流)时先有一段平行于主路,然后匝道再逐渐收缩(扩展)直到合入(离开)主路,如下图所示: 父主题: Enum Lists