云服务器内容精选

  • 加载场景 Octopus平台支持用户在仿真器中加载场景库中的场景,在线编辑、运行并回放。 单击进入在线仿真页面,单击页面右上方的“加载场景”。 选择加载场景方式。 选择需要加载的场景、泛化场景或测试用例。 图2 加载场景 如果场景数量多,用户也可根据场景的标签进行过滤,并选择需要加载的场景。 图3 标签筛选 在搜索框中输入搜索内容,单击搜索,并在筛选出的场景、测试用例或泛化场景列表中勾选中需要加载的场景。 多项搜索:可根据需要决定是否启用多项搜索,输入多个关键字,中间用“;”隔开,可搜索多个关键字。 单击“确认”,仿真场景加载成功。 查看加载场景过程文件。 使用仿真器在线加载场景后,会在“/home/{user}/workspace/Data/Project/Current/Scenarios/”目录下出现加载过的场景文件,打开指定日期的场景文件夹。 图4 场景文件 单击打开该场景文件夹下的“.xml”文件,即可查看下载下来的“osgb”以及“xodr”文件所在路径。 图5 文件所在路径
  • 前提条件 当前Octopus支持仿真器A、仿真器B和仿真器C三类在线仿真器。在使用仿真器之前需要提前购买扩展资源包。 表1 扩展资源包列表 名称 描述 仿真场景编辑器 支持自动驾驶仿真静态路网和动态交通参与物场景编辑。 感知规控仿真引擎-在线 支持用户通过网页/图形化界面方式运行仿真引擎,支持自动驾驶感知决策规划控制算法在线图形化开发调试,支持高精度渲染引擎集成。 规控仿真引擎-在线 支持用户通过图形化界面方式运行仿真引擎,支持自动驾驶决策规划控制算法在线图形化开发调试。 说明: 不支持加载场景、重播场景和回放场景。
  • 工作空间相关操作 表1 工作空间相关操作 任务 操作步骤 编辑工作空间 单击操作栏中的“编辑”,编辑工作空间。支持修改空间名称、描述和授权对象。 删除工作空间 单击操作栏中的“删除”,删除工作空间。工作空间删除后会默认清理该工作空间下的所有资源。删除操作无法恢复,请谨慎操作。 查询工作空间 支持按状态和输入关键字两种方式查询工作空间。 按状态查询:在“请选择状态”下拉列表选择工作空间状态,即可查询该状态的工作空间。 输入关键字查询:在搜索输入框中输入搜索条件,按回车键即可查询工作空间。 查看工作空间详情 单击工作空间名称,查看工作空间详情。
  • 新建工作空间 在左侧菜单栏中,单击“工作空间”。 单击“新建工作空间”。 图1 新建工作空间 完成工作空间基本信息。 图2 新建工作空间 空间名称:输入工作空间的名称,只能包含数字、英文、中文、下划线、中划线。 描述:简单描述空间名称,最大长度为255。 授权类型:当前可选择“Internal”。 工作空间授权类型分为“Public”和“Internal”。 “Public”仅在租户(主账号和所有子账号)内部访问。 “Internal”为创建者、主账号、指定的子账号可访问。当授权类型为Internal时,需要制定可访问的子账号的账号名或者账号id,可选择多个。 授权对象:请选择需要授权的对象。当创建的 IAM 子用户没有IAM管理权限时,无法选择授权对象,推荐为IAM子用户添加IAM ReadOnlyAccess策略。 单击“创建”,在工作空间列表查看创建好的工作空间。
  • 工作空间简介 在使用Octopus自动驾驶云服务之前,如果您想实现资源隔离管理,您可以创建工作空间,实现资源隔离。 default工作空间为系统预置,Public授权类型,无法创建、编辑和删除,默认为主工作空间。 创建不同的工作空间后,您可以在控制台左侧导航栏中,单击“当前工作空间”下方的下拉框,单击工作空间,进入所选工作空间内使用Octopus自动驾驶云服务。 图1 当前工作空间 父主题: 工作空间
  • 示例代码 下面是rosbag脱敏的算子示例: # mask.py import json import logging import multiprocessing as mp import os import shutil import time from pathlib import Path from typing import cast import av import numpy as np import open3d from rosbags.highlevel import AnyReader from rosbags.interfaces import ConnectionExtRosbag1, ConnectionExtRosbag2 from rosbags.rosbag1 import Writer as Writer1 from rosbags.rosbag2 import Writer as Writer2 from rosbags.serde import cdr_to_ros1, serialize_cdr from rosbags.typesys import get_types_from_msg, register_types from rosbags.typesys.types import builtin_interfaces__msg__Time as Time from rosbags.typesys.types import \ sensor_msgs__msg__CompressedImage as CompressedImage from rosbags.typesys.types import std_msgs__msg__Header as Header logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', ) LOG = logging.getLogger(__file__) # Octopus数据服务拉起镜像时灌入的环境变量 # 获取环境变量 input_path = os.getenv('input_path', 'data/hangyan-move.bag.bak') raw_dir = os.getenv('raw_dir', 'empty_dir/raw') # 抽取的文件存放目录 desens_dir = os.getenv('desensitized_dir', 'empty_dir/desens') # 脱敏后的文件存放目录 output_dir = os.getenv('output_dir', 'empty_dir/output') lidar_process_num = os.getenv('lidar_process_num', 5) # lidar数据进程数 # 用户自定义环境变量 rosbag_version = os.getenv('rosbag_version', '1') # rosbag版本,取值为'1'或'2' image_topics = [ x.strip(' ') for x in os.getenv('image_topics', '/camera_encoded_1').split(',') ] # 图像数据的topic列表 gnss_topic = os.getenv('gnss_topic', '/inspvax') # gnss数据的topic,gnss数据只能有一个topic lidar_topics = [ x.strip(' ') for x in os.getenv('lidar_topic', '/pandar').split(',') ] # 点云数据的topic列表 # 注册自定义消息类型 Video_encoded_data_text = Path('msgs/Video_encoded_data.msg').read_text() NovatelMessageHeader_text = Path('msgs/NovatelMessageHeader.msg').read_text() NovatelExtendedSolutionStatus_text = Path( 'msgs/NovatelExtendedSolutionStatus.msg').read_text() NovatelReceiverStatus_text = Path('msgs/NovatelReceiverStatus.msg').read_text() Inspvax_text = Path('msgs/Inspvax.msg').read_text() add_types = {} add_types.update( get_types_from_msg( Video_encoded_data_text, 'kyber_msgs/msg/Video_encoded_data', )) add_types.update( get_types_from_msg( NovatelMessageHeader_text, 'novatel_gps_msgs/msg/NovatelMessageHeader', )) add_types.update( get_types_from_msg( NovatelExtendedSolutionStatus_text, 'novatel_gps_msgs/msg/NovatelExtendedSolutionStatus', )) add_types.update( get_types_from_msg( NovatelReceiverStatus_text, 'novatel_gps_msgs/msg/NovatelReceiverStatus', )) add_types.update( get_types_from_msg( Inspvax_text, 'novatel_gps_msgs/msg/Inspvax', )) register_types(add_types) # create gnss file gnss_file_path = Path(raw_dir, 'gnss') / f'{gnss_topic}.json'.strip('/') Path.mkdir(gnss_file_path.parent, parents=True, exist_ok=True) def extract_image(input_rosbag): '''从原始rosbag中抽取图像数据.''' LOG.info('Start extracting image.') codec_ctx = av.codec.Codec('hevc', 'r') h265_code = codec_ctx.create() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic in image_topics: deserialized_data = reader.deserialize(data, connection.msgtype) try: data = deserialized_data.raw_data packet = av.packet.Packet(data) out = h265_code.decode(packet) img = None for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() # 图像存放路径 file_name = f'{timestamp}.jpg' f_path = Path(raw_dir, 'image') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_image') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) img.save(tmp_file) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) except Exception as e: LOG.info("%s frame can not trans to jpg, message: %s", timestamp, str(e)) LOG.info('Finish extracting image.') def extract_lidar(task_id, task_num, input_rosbag): '''从原始rosbag中抽取点云数据.''' LOG.info('Start extracting pcd.') with AnyReader([Path(input_rosbag)]) as reader: for i, (connection, timestamp, data) in enumerate(reader.messages()): if i % task_num != task_id: continue topic = connection.topic if topic in lidar_topics: deserialized_data = reader.deserialize(data, connection.msgtype) pcd = open3d.geometry.PointCloud() reshaped = deserialized_data.data.reshape( int(len(deserialized_data.data) / 3), 3) pcd.points = open3d.utility.Vector3dVector(reshaped) file_name = f'{timestamp}.pcd' f_path = Path(raw_dir, 'lidar') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_lidar') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) open3d.io.write_point_cloud(str(tmp_file), pcd) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) LOG.info('Finish extracting pcd.') def extract_gnss(input_rosbag): '''从原始rosbag中抽取gnss数据.''' LOG.info('Start extracting rosbag.') gnss_file = open(gnss_file_path, 'w') gnss = dict() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) # 这里以msgytpe为NavSatFix为例 latitude = deserialized_data.latitude longitude = deserialized_data.longitude altitude = deserialized_data.altitude gnss[timestamp] = { 'latitude': latitude, 'longitude': longitude, 'altitude': altitude } gnss_file.write(json.dumps(gnss)) gnss_file.close() LOG.info('Finish extracting gnss.') def _get_masked_image(topic, timestamp): '''从脱敏后的图像数据中获取目标图像数据.''' file = Path(desens_dir, 'image') / topic.strip('/') / f'{timestamp}.jpg' if file.is_file(): return np.fromfile(file, dtype='uint8') else: return None def _get_conn_map(rosbag_version: int, reader, writer): '''构建connection的索引.''' conn_map = {} if rosbag_version == '1': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag1, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, conn.msgdef, conn.md5sum, ext.callerid, ext.latching, ) elif rosbag_version == '2': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag2, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, ext.serialization_format, ext.offered_qos_profiles, ) return conn_map def _serialize_data(rosbag_version, data, msgtype): '''对数据进行序列化.''' if rosbag_version == '1': return cdr_to_ros1(serialize_cdr(data, msgtype), msgtype) elif rosbag_version == '2': return serialize_cdr(data, msgtype) def generate_rosbag(input_rosbag, output_rosbag): '''生成脱敏后rosbag.''' LOG.info('Start generating rosbag.') gnss_file = open( Path(desens_dir, 'gnss') / f'{gnss_topic}.json'.strip('/'), 'r') gnss_data = json.load(gnss_file) gnss_file.close() Writer = Writer1 if rosbag_version == '1' else Writer2 with AnyReader([Path(input_rosbag) ]) as reader, Writer(Path(output_rosbag)) as writer: conn_map = _get_conn_map(rosbag_version, reader, writer) for connection, timestamp, data in reader.messages(): topic = connection.topic # 当topic为图像数据的topic时,读取脱敏后图像数据 if topic in image_topics: masked_data = _get_masked_image(topic, timestamp) if masked_data is None: # 没有解析出图像文件时,不要该帧了 continue deserialized_data = CompressedImage( Header( stamp=Time( sec=int(timestamp // 10**9), nanosec=int(timestamp % 10**9), ), frame_id='0', ), format='jpg', data=masked_data, ) data = _serialize_data( rosbag_version, deserialized_data, CompressedImage.__msgtype__, ) # 当topic为gnss数据时,读取脱敏后gnss数据 elif topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) deserialized_data.latitude = gnss_data.get( str(timestamp)).get('latitude') deserialized_data.longitude = gnss_data.get( str(timestamp)).get('longitude') deserialized_data.altitude = gnss_data.get( str(timestamp)).get('altitude') data = _serialize_data( rosbag_version, deserialized_data, connection.msgtype, ) # 当topic为点云数据时,读取脱敏后点云数据 elif topic in lidar_topics: deserialized_data = reader.deserialize( data, connection.msgtype, ) file = Path( desens_dir, 'lidar', ) / topic.strip('/') / f'{timestamp}.pcd' point_cloud = open3d.io.read_point_cloud(str(file)) deserialized_data.data = np.asarray( point_cloud.points).flatten() writer.write(conn_map[connection.id], timestamp, data) # 生成_SUC CES S文件标识完成数据抽取 Path(output_dir, '_SUCCESS').touch() LOG.info('Finish generating rosbag.') if __name__ == "__main__": LOG.info('Start user operator.') process_image = mp.Process(target=extract_image, args=(input_path, )) pool_lidar = mp.Pool(processes=lidar_process_num) for i in range(lidar_process_num): pool_lidar.apply_async(extract_lidar, args=(i, lidar_process_num, input_path)) process_gnss = mp.Process(target=extract_gnss, args=(input_path, )) # 启动子进程 process_image.start() pool_lidar.close() process_gnss.start() process_image.join() pool_lidar.join() process_gnss.join() LOG.info('Child processes exit.') # 生成_SUCCESS文件标识完成数据抽取 Path(raw_dir, '_SUCCESS').touch() # 后面输出的rosbag文件与输入的rosbag文件保持同名 output_rosbag_file = Path(output_dir, Path(input_path).name) # 如果输出文件夹不存在,先创建文件夹 Path.mkdir(output_rosbag_file.parent, parents=True, exist_ok=True) # 检测到脱敏任务结束后,生成新的rosbag文件 while time.sleep(1) is None: if Path(desens_dir).joinpath('_SUCCESS').is_file(): generate_rosbag(Path(input_path), output_rosbag_file) break
  • 作业输入输出规范 用户完成自定义脱敏算子创建,运行作业容器时Octopus平台向其中注入以下环境变量: input_file:待脱敏的文件路径 raw_dir:抽取的image,gnss,lidar数据存放路径 desensitized_dir:脱敏后的image,gnss,lidar数据存放路径 output_dir:脱敏后的文件存放路径 用户根据需要可以自定义环境变量,以rosbag文件为例,可以定义如下环境变量: rosbag_version:robag版本 image_topics:图像数据topic列表 gnss_topic:gnss数据topic lidar_topics:lidar数据topic列表 用户作业容器需要将input_file中的image,gnss,lidar数据抽取到raw_dir,待系统内置算子脱敏完成,并将脱敏后的数据存放到desensitized_dir后,用户算子根据input_file和desensitized_dir中的脱敏数据生成新的数据文件存放到output_dir。 文件结构如下: -raw --image ---topic0 ----timestamp1.jpg ----timestamp2.jpg ---topic1 ----timestamp1.jpg --gnss ---gnss_topic.json --lidar ---pcd_topic0 ----timestamp0.pcd ----timestamp1.pcd --SUCCESS -desensitized --image ---topic0 ----timestamp1.jpg ----timestamp2.jpg ---topic1 ----timestamp1.jpg --gnss ---gnss_topic.json --lidar ---pcd_topic0 ----timestamp0.pcd ----timestamp1.pcd --SUCCESS -output --test.bag --SUCCESS 其中,“SUCCESS”文件为标识文件,标识所在阶段的任务结束。
  • 构建镜像 Dockerfile示例 FROM python:3.10 COPY mask.py /home/main/ WORKDIR /home/main/ RUN pip install rosbags requests numpy USER root 构建镜像 sudo docker build --no-cache -f Dockerfile -t rosbagmask:0.1 . 本地调试 准备一个待处理的rosbag包。 sudo docker run -v ${HOME}/data/test.bag:/home/main/test.bag --env input_file=/home/main/test.bag –env raw_dir=/home/main/raw_dir –env desensitized_dir=/home/main/desensitized_dir –env output_dir=/home/main/output_dir -it rosbagmask:0.1 /bin/sh -c "python mask.py"
  • 创建脱敏作业 查看数据递送订单详情,选择待脱敏的数据包,单击创建脱敏作业。 图3 创建脱敏作业 完成算子基本配置。 图4 算子基本信息 处理算子:根据需要选择数据脱敏算子。 资源规格:选择合适的资源规格。 优先级:设定任务的优先级,数值取[-50,50]的整数,数字越大,优先级越高。 环境变量:配置算子的环境变量。允许添加的环境变量个数不超过10个。 Key:只能由英文、数字、和特殊符号(,-_)组成,且需要以字母开头 。长度不超过64个字符。 Value:只能由英文、数字和特殊符号(\/,.[]-_)组成 。长度不超过512个字符。 合规数据ID:展示选择数据包的ID。 单击“创建”,执行数据包脱敏任务。脱敏完成后,数据包状态变为脱敏完成。
  • 数据合规简介 数据合规服务是自动驾驶云服务为用户提供合规监管服务,包含合规室管理、数据流监管、账号管理、权限管理、日志审计服务能力。支持对上传合规服务的数据包进行脱敏处理,提供合规脱敏能力,支持将数据包中人脸、车牌、点云脱敏。支持用户使用合规服务创建脱敏算子、下发脱敏任务。 在使用数据合规服务之前,需要先购买合规服务和数据服务。如何购买服务? 在使用数据合规脱敏能力之前,需要先开通合规脱敏服务。 图1 使用合规服务和数据服务时序图 父主题: 数据合规
  • junction 简述:地图场景为交叉口.lead_vehicle和主车Ego一前一后分别以LeadVehicle_TargetSpeed_Ve0和Ego_TargetSpeed_Ve0的初始速度向交叉口行驶,Ego设定了目标在右转车道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position).另一路段上有一辆车vehicle1,也以LeadVehicle_TargetSpeed_Ve0的速度朝交叉口行驶..控制器有时会根据环境车的位置更改主车Ego的速度. 地图文件(odr) scenario Junction: m_scene: scenery lane_num: int = [2, 3, 4] bikeway: bool = false sidewalk: bool = false junction_type: junction_type = ["crossroad", "T-junction"] junction_1: junction with: keep(it.lane_num == lane_num) keep(it.bikeway == bikeway) keep(it.sidewalk == sidewalk) keep(it.junction_type == junction_type) 场景文件(osc) import standard scenario Junction: # map map: map map.set_map_file("./junction.odr") # parameter Ego_TargetSpeed_Ve0: speed = [50kph..60kph] Ego_InitPosition_RoadId: string = '1' Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..100m] Ego_Odr: odr_point = map.create_odr_point(road_id: Ego_InitPosition_RoadId, lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 225m, y: -100m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) LeadVehicle_TargetSpeed_Ve0: speed = 45kph Vehicle1_Odr: odr_point = map.create_odr_point(road_id: '2', lane_id: '2', s: 100.0m, t: 0.0m) m_orientation: orientation_3d with: keep(it.roll == 0.0rad) keep(it.pitch == 0.0rad) keep(it.yaw == -1.57rad) Vehicle1_InitPosition: pose_3d with: keep(it.odr_point == Vehicle1_Odr) keep(it.orientation == m_orientation) m_profile: dynamics_shape = [sinusoidal, linear, step] m_rate_peak: acceleration = [5kmphps, 10kmphps] Duration: time = [100s, 120s] # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") vehicle1: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_TargetSpeed_Ve0) lead_vehicle.assign_init_position() with: lane(same_as: Ego) position(distance: 30.0m, ahead_of: Ego) lead_vehicle.assign_init_speed(LeadVehicle_TargetSpeed_Ve0) vehicle1.assign_init_position(position: Vehicle1_InitPosition) vehicle1.assign_init_speed(LeadVehicle_TargetSpeed_Ve0) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • split 简述:地图场景为匝道分流.lead_vehicle和主车Ego在主道的同一车道上分别以35kph和Ego_InitSpeed_Ve0的初始速度一前一后行驶,Ego设定了目标在匝道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position).控制器有时会根据lead_vehicle的位置更改主车Ego的速度. 使用xyz坐标创建终点时,由于匝道地图泛化会使终点偏移,建议在创建测评任务时为检测终点设置合适的的半径,例如"到达半径5m". 地图文件(odr) scenario Split: m_scene: scenery lane_width: length = [3m..4m] main_speed: speed = 120kph ramp_speed: speed = 60kph ramp_length: length = [300m..350m] road_aids_type: road_aids_type = "DType-2" split_1: split with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == 3) keep(it.ramp_lane_num == 1) keep(it.main_speed == main_speed) keep(it.ramp_speed == ramp_speed) keep(it.radius_of_curvature == 200m) keep(it.ramp_length == ramp_length) keep(it.road_aids_type == road_aids_type) 场景文件(osc) import standard scenario Split: # map map: map map.set_map_file("./split.odr") # parameter Ego_InitSpeed_Ve0: speed = [90kph..110kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..100m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) m_InitDistance: length = [60m..100m] Target_xyz: xyz_point = map.create_xyz_point(x: 445m, y: -46.5m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) lead_vehicle.assign_init_position() with: lane(same_as: Ego) position(distance: m_InitDistance, ahead_of: Ego) lead_vehicle.assign_init_speed(35kph) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • merge 简述:地图场景为匝道合流.主车Ego在主道行驶,初始速度为Ego_InitSpeed_Ve0,Ego设定了目标在主道右侧2车道上的目标点Target_position,仿真开始后激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position),从车side_vehicle在匝道行驶,初始速度为SideVehicle_InitSpeed_Ve0.side_vehicle从匝道汇入主道.控制器有时会根据side_vehicle的位置更改主车Ego的速度. 地图文件(odr) scenario Merge: m_scene: scenery lane_width: length = [3m, 4m] radius_of_curvature: length = [200m..1000m] ramp_lane_num: int = 1 ramp_length: length = [200m..300m] main_speed: speed = 120kph ramp_speed: speed = 60kph road_aids_type: road_aids_type = ["DType-1", "DType-2", "PType"] merge_1: merge with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == 2) keep(it.ramp_lane_num == ramp_lane_num) keep(it.main_speed == main_speed) keep(it.ramp_speed == ramp_speed) keep(it.radius_of_curvature == radius_of_curvature) keep(it.ramp_length == ramp_length) keep(it.road_aids_type == road_aids_type) 场景文件(osc) import standard scenario Merge: # map map: map map.set_map_file("./merge.odr") # parameter Ego_InitSpeed_Ve0: speed = [90kph..110kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) SideVehicle_InitSpeed_Ve0: speed = [45kph, 50kph, 55kph] SideVehicle_s: length = [30.0m..80.0m] SideVehicle_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: '-1', s: SideVehicle_s, t: 0.0m) SideVehicle_InitPosition: pose_3d with: keep(it.odr_point == SideVehicle_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 530m, y: -2m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") side_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) side_vehicle.assign_init_position(position: SideVehicle_InitPosition) side_vehicle.assign_init_speed(SideVehicle_InitSpeed_Ve0) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • straight 简述:地图场景为直道.lead_vehicle和主车Ego在主道上分别以40kph和Ego_InitSpeed_Ve0的初始速度一前一后行驶,Ego设定了目标在主道右2车道上的目标点Target_position,同时激活Ego控制器(控制器会影响Ego去往Target_position的寻路算法,但目前仿真器B尚不支持寻路动作acquire_position),控制器有时会根据lead_vehicle的位置更改主车Ego的速度. 地图文件(odr) scenario Straight: m_scene: scenery lane_width: length = [3m..4m] right_lane_num: int = [2, 3] bikeway: bool = [true, false] sidewalk: bool = [true, false] main_speed: speed = 60kph road_length: length = [550m, 600m] straight_1: straight with: keep(it.lane_width == lane_width) keep(it.left_lane_num == 0) keep(it.right_lane_num == right_lane_num) keep(it.bikeway == bikeway) keep(it.sidewalk == sidewalk) keep(it.main_speed == main_speed) keep(it.road_length == road_length) 场景文件(osc) import standard scenario Straight: # map map: map map.set_map_file("./straight.odr") # parameter Ego_InitSpeed_Ve0: speed = [55kph..60kph] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) m_distance: length = [50m..80m] LeadVehicle_Odr: odr_point = map.create_odr_point(road_id: '1', lane_id: '-1', s: m_distance, t: 0.0m) LeadVehicle_InitPosition: pose_3d with: keep(it.odr_point == LeadVehicle_Odr) Target_xyz: xyz_point = map.create_xyz_point(x: 450m, y: -4.5m ,z: 0.0m) Target_position: pose_3d with: keep(it.xyz_point == Target_xyz) Duration: time = 100s # entity Ego: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") lead_vehicle: vehicle with: keep(it.name == "Saimo") keep(it.initial_bm == "默认驾驶员") # storyboard do parallel(duration: Duration): # init Ego.assign_init_position(position: Ego_InitPosition) Ego.assign_init_speed(Ego_InitSpeed_Ve0) lead_vehicle.assign_init_position(position: LeadVehicle_InitPosition) lead_vehicle.assign_init_speed(40kph) Ego.activate_controller(true, true) Ego.acquire_position(target: Target_position) 父主题: 种子地图的逻辑场景样例(仿真器B)
  • 基于样例的拓展 本章的场景样例可以结合第一章介绍的各模块代码样例进行修改和拓展.以straigh场景的osc为例,设定初始位置时,除了使用st坐标系(odr_point,即osc1.0中的LanePosition)还可以使用xyz坐标系(xyz_point,即osc1.0中的WorldPosition)来替代: st坐标系(odr_point)写法: Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: '10', lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) 对应xyz坐标系(xyz_point)写法,其中y值-1.5m和-4.5m分别对应车道宽度为3m时的'-1'和'-2'车道的中心位置: y: length = [-1.5m, -4.5m] x: length = [0m..30m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 以上对等替换仅存在于odr_point的road_id固定的情形,当road_id(仿真器A中的TrackId)是一个可泛化的值时,起点可以出现在多条道路上,对应多段x和y值的组合区间,无法使用一个xyz_point直接对等泛化.此时可以使用多个动态场景来实现同等的泛化,例如: 假设地图上存在多条road,且对odr_point的road_id进行了泛化: Ego_InitPosition_Road_id: string = ['10', '1'] Ego_InitPosition_LaneId: string = ['-1', '-2'] Ego_InitPosition_s: length = [0m..30m] Ego_Odr: odr_point = map.create_odr_point(road_id: Ego_InitPosition_Road_id, lane_id: Ego_InitPosition_LaneId, s: Ego_InitPosition_s, t: 0.0m) Ego_InitPosition: pose_3d with: keep(it.odr_point == Ego_Odr) 可以在两个动态场景文件中分别使用一组xyz_point来替代,例如: 第一组对应road_id为'10'的情况(同前例): y: length = [-1.5m, -4.5m] x: length = [0m..30m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 第二组对应road_id为'1'的情况,该道路与road_id为'10'的道路垂直,x值的101.5m和104.5m分别对应车道宽度为3m时的'-1'和'-2'车道的中心位置: y: length = [-40m..-10m] x: length = [101.5m, 104.5m] Ego_xyz: xyz_point = map.create_xyz_point(x: x, y: y, z: 0.0m) Ego_InitPosition: pose_3d with: keep(it.xyz_point == Ego_xyz) 父主题: 种子地图的逻辑场景样例(仿真器A)