云服务器内容精选

  • 取整脱敏 参照操作步骤进入“脱敏规则”页面。 选择“取整脱敏”,进入“取整脱敏”的页面。 系统设置了“日期取整”和“数值取整”两种算法。 “日期取整”算法对应关系型数据库中timestamp,time,data,datatime等与时间相关的字段。 “数值取整”算法对应double,float,int,long等数值类型,脱敏成功后,保持原字段类型不变。 图9 数值脱敏页面 单击“编辑测试”,配置“取整值”。 脱敏原理:结果值取靠近“取整值”倍数的向下值。例如:“取整值”设置为5,“原始数据”为14,5的倍数向下靠近14的数为10,则原始数据14按此规则脱敏后为10,即“脱敏结果”为10。 图10 数值取整 输入“原始数据”,单击“测试”,在“脱敏结果”文本框中展示已完成脱敏的数据。 测试确认无误后,单击“保存”。
  • 仿真脱敏 在敏感数据识别到匹配敏感内容后,会使用仿真的数据替换匹配敏感内容,目前仅适用于OBS脱敏任务。 表1 支持的仿真脱敏类型 编号 敏感数据规则名称 仿真脱敏的类型 1 身份证号(中国内地) 身份证号 2 生日 随机日期(指定范围) 3 日期 随机日期(指定范围) 4 手机号码(中国内地) 手机号码 5 邮箱 邮箱 6 邮政编码(中国内地) 邮政编码 7 地址(中国内地) 地址 8 精确地址(中国) 地址 9 唯一设备识别码IMEI IMEI 10 IPv4地址 ipv4 11 IPv6地址 ipv6 12 银行卡号 银行卡号 13 姓名(简体中文) 人名 14 车牌号(中国内地) 车牌号 15 护照号(中国内地) 护照号
  • 删除脱敏 系统内置“Null脱敏”和“空值脱敏”两种算法。 Null脱敏:将任意类型字段设置为NULL。对于属性设置为“NOT NULL”的字段,该算法在拷贝时将该属性修改为“NULL”。 空值脱敏:将指定字段内容设置为空值。具体来说,将字符型的字段设置为空串,数值类的字段设置为0,日期类的字段设置为1970,时间类的字段设置为零点。 删除脱敏为DSC内置的脱敏规则,不需要配置,可参考以下方法查看脱敏规则。 参照操作步骤进入“脱敏规则”页面。 选择“删除脱敏”页签,进入“删除脱敏”的规则展示页面。 图8 删除脱敏
  • 加密脱敏 通过加密算法和加密主密钥生成一种加密配置,达到数据脱敏的效果。加密脱敏的结果中,初始向量IV为加密字符串的前16个字节,剩余部分是加密的密文。 参照操作步骤进入“脱敏规则”页面。 选择“加密脱敏”页签,进入“加密脱敏”页面。 “主密钥算法”:在下拉框选择加密算法,DSC提供了“AES256”和“SM4”加密算法供您选择。 “KMS加密”可以选择“从KMS密钥中选择”和“输入KMS密钥ID”两种方式: “从KMS密钥中选择”:单击下拉框选择已有的KMS主密钥,如果没有可选择的主密钥,单击“创建KMS主密钥”进行创建,创建方式详情请参见创建KMS主密钥。 默认使用凭据管理为您创建的默认主密钥csm/default作为当前凭证的加密主密钥,您可以前往KMS服务页面创建用户密钥,使用自定义加密密钥。 “输入KMS密钥ID”:输入位于当前Region的密钥。 单击下拉框选择“数据密钥长度”,有128、192和256三种供选择。 配置完成后,单击“生成加密配置”。 如果您需要删除已配置的加密脱敏规则,可在目标规则所在列的“操作”列,单击“删除”。 单击打开轮换策略,轮换周期到期后会更新当前加密配置提升安全性。
  • 关键字替换 利用自定义的字符串替换数据中匹配到的关键字,达到脱敏的效果。例如:原始数据为abcdefgbcdefgkjkoij,“关键字”配置为“bcde”,“替换字符串”配置为12,则“脱敏结果”显示为a12fg12fgkjkoij。 参照操作步骤进入“脱敏规则”页面。 选择“关键字替换”页签,进入“关键字替换”页面。 图6 关键字替换 设置需要替换的“关键字”,以及“替换字符串”。 配置后,“原始数据”中匹配到的“关键字”将被设置的“替换字符串”替换,以完成数据脱敏。 图7 添加关键字 输入“原始数据”,单击“测试”,在“脱敏结果”文本框中展示已完成脱敏的数据。 测试确认无误后,单击“保存”。 如果您想修改已配置的脱敏规则,可以在关键字替换规则列表的操作列,单击“编辑测试”进行修改。 如果您想删除已配置的脱敏规则,可以在关键字替换规则列表的操作列,单击“删除”。
  • 字符掩盖 使用指定字符“*”或随机字符,按照指定方式遮盖部分内容。 支持“保留前n后m”、“保留自x至y”、“遮盖前n后m”、“遮盖自x至y”、“特殊字符前遮盖”和“特殊字符后遮盖”六种字符掩盖的方式。 参照操作步骤进入“脱敏规则”页面。 选择“字符掩盖”页签,进入“字符掩盖”页面。 图4 字符掩盖页面 单击“添加”,配置字符脱敏规则。 图5 添加字符脱敏 输入“原始数据”,单击“测试”,在“脱敏结果”文本框中展示已完成脱敏的数据。 测试确认无误后,单击“保存”。 数据安全中心 服务中已预置多种字符脱敏规则。内置的脱敏规则不支持删除,自定义的规则可以在规则列表的“操作”列,单击“删除”,删除规则。 所有的规则都支持编辑,在规则列表的“操作”列,单击“编辑测试”,修改规则。
  • Hash脱敏 将字符串类型字段用Hash值代替。在关系型数据库中,当该字段长度小于Hash长度时,会将目标库中该字段的长度与Hash值长度设置相同,保证Hash值完整写入目标库。DSC默认配置了SHA256和SHA512两种Hash脱敏的算法。 Hash脱敏为DSC内置的脱敏规则,不需要配置,如果您需要测试脱敏效果,可参考以下方法查看脱敏结果。 参照操作步骤进入“脱敏规则”页面。 选择“Hash脱敏”,进入Hash脱敏的页面。 图2 Hash脱敏 在选择的SHA256或SHA512算法所在列,单击“测试”。 在弹出的页面中输入“原始数据”,并单击“测试”,在“脱敏结果”文本框中展示已完成脱敏的数据。 图3 Hash脱敏测试
  • DSC支持的内置识别规则有哪些? 数据安全中心根据行业敏感信息内置了包含敏感图片信息、个人敏感信息、企业敏感信息等七类规则,具体内置规则如表1所示。 表1 内置规则 敏感数据分类 类型 个人敏感图片信息 驾照图片(中国内地) 银行卡图片(中国内地) 身份证图片(中国内地) 机动车登记证书图片(中国内地) 护照图片(中国内地) 车险保单图片(中国内地) 机动车行驶证图片(中国内地) 个人敏感信息 身份证号(中国内地) 护照号(中国内地) 驾照号(中国内地) 港澳通行证 台胞证 车牌号(中国内地) 军官证号 美国社会保险号码SSN ITIN 社保相关信息 车辆识别代码 姓名(简体中文) 姓名(英文) 国籍 性别 民族 生日 出生地 教育程度 工作单位 工作行业 电话号码(中国内地) 手机号码(中国内地) 邮箱 微信号 QQ号 婚姻状况 家庭成员关系 宗教信仰 银行卡号 信用卡号 万事达信用卡 VISA信用卡 信用卡安全码 企业敏感图片信息 营业执照图片 企业敏感信息 工商注册号 统一社会信用代码 纳税人识别号(税号) 组织机构代码 企业类型 经营状态 企业交付信息 企业规划信息 企业需求信息 设备敏感信息 唯一设备识别码IMEI 移动设备识别码MEID MAC地址 SIM卡IMSI信息 IPv4地址 IPv6地址 Linux-Passwd文件 Linux-Shadow文件 密钥敏感信息 SSL Certificate Access_Key_Id Secret_Access_Key AWS_AC CES S_KEY AWS_SECRET_KEY Facebook_SECRET IAM 账号密码 GitHub_KEY DSA私钥 EC私钥 加密私钥 RSA私钥 位置敏感信息 GPS信息 精确地址(中国) 省份(中国内地) 邮政编码(中国内地) 城市(中国内地) 直辖市(中国) 地址(中国内地) 系统敏感信息 URL链接 LDAP OS类型 通用敏感信息 日期 父主题: 数据识别和数据脱敏
  • 示例代码 用户可以自定义一个脱敏算子,构建为一个合规脱敏镜像,在Octopus平台镜像仓库中导入自己构建的合规脱敏镜像。在合规服务新建算子时,选择镜像仓库中自己导入的合规脱敏镜像,即可使用脱敏算子对数据进行脱敏处理。 下面是rosbag脱敏的算子示例: # mask.py import json import logging import multiprocessing as mp import os import shutil import time from pathlib import Path from typing import cast import av import numpy as np import open3d from rosbags.highlevel import AnyReader from rosbags.interfaces import ConnectionExtRosbag1, ConnectionExtRosbag2 from rosbags.rosbag1 import Writer as Writer1 from rosbags.rosbag2 import Writer as Writer2 from rosbags.serde import cdr_to_ros1, serialize_cdr from rosbags.typesys import get_types_from_msg, register_types from rosbags.typesys.types import builtin_interfaces__msg__Time as Time from rosbags.typesys.types import \ sensor_msgs__msg__CompressedImage as CompressedImage from rosbags.typesys.types import std_msgs__msg__Header as Header logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', ) LOG = logging.getLogger(__file__) # Octopus数据服务拉起镜像时灌入的环境变量 # 获取环境变量 input_path = os.getenv('input_path', 'data/hangyan-move.bag.bak') raw_dir = os.getenv('raw_dir', 'empty_dir/raw') # 抽取的文件存放目录 desens_dir = os.getenv('desensitized_dir', 'empty_dir/desens') # 脱敏后的文件存放目录 output_dir = os.getenv('output_dir', 'empty_dir/output') lidar_process_num = os.getenv('lidar_process_num', 5) # lidar数据进程数 # 用户自定义环境变量 rosbag_version = os.getenv('rosbag_version', '1') # rosbag版本,取值为'1'或'2' image_topics = [ x.strip(' ') for x in os.getenv('image_topics', '/camera_encoded_1').split(',') ] # 图像数据的topic列表 gnss_topic = os.getenv('gnss_topic', '/inspvax') # gnss数据的topic,gnss数据只能有一个topic lidar_topics = [ x.strip(' ') for x in os.getenv('lidar_topic', '/pandar').split(',') ] # 点云数据的topic列表 # 注册自定义消息类型 Video_encoded_data_text = Path('msgs/Video_encoded_data.msg').read_text() NovatelMessageHeader_text = Path('msgs/NovatelMessageHeader.msg').read_text() NovatelExtendedSolutionStatus_text = Path( 'msgs/NovatelExtendedSolutionStatus.msg').read_text() NovatelReceiverStatus_text = Path('msgs/NovatelReceiverStatus.msg').read_text() Inspvax_text = Path('msgs/Inspvax.msg').read_text() add_types = {} add_types.update( get_types_from_msg( Video_encoded_data_text, 'kyber_msgs/msg/Video_encoded_data', )) add_types.update( get_types_from_msg( NovatelMessageHeader_text, 'novatel_gps_msgs/msg/NovatelMessageHeader', )) add_types.update( get_types_from_msg( NovatelExtendedSolutionStatus_text, 'novatel_gps_msgs/msg/NovatelExtendedSolutionStatus', )) add_types.update( get_types_from_msg( NovatelReceiverStatus_text, 'novatel_gps_msgs/msg/NovatelReceiverStatus', )) add_types.update( get_types_from_msg( Inspvax_text, 'novatel_gps_msgs/msg/Inspvax', )) register_types(add_types) # create gnss file gnss_file_path = Path(raw_dir, 'gnss') / f'{gnss_topic}.json'.strip('/') Path.mkdir(gnss_file_path.parent, parents=True, exist_ok=True) def extract_image(input_rosbag): '''从原始rosbag中抽取图像数据.''' LOG.info('Start extracting image.') codec_ctx = av.codec.Codec('hevc', 'r') h265_code = codec_ctx.create() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic in image_topics: deserialized_data = reader.deserialize(data, connection.msgtype) try: data = deserialized_data.raw_data packet = av.packet.Packet(data) out = h265_code.decode(packet) img = None for frame in out: if frame.format.name != 'rgb24': frame = frame.reformat(format='rgb24') img = frame.to_image() # 图像存放路径 file_name = f'{timestamp}.jpg' f_path = Path(raw_dir, 'image') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_image') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) img.save(tmp_file) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) except Exception as e: LOG.info("%s frame can not trans to jpg, message: %s", timestamp, str(e)) LOG.info('Finish extracting image.') def extract_lidar(task_id, task_num, input_rosbag): '''从原始rosbag中抽取点云数据.''' LOG.info('Start extracting pcd.') with AnyReader([Path(input_rosbag)]) as reader: for i, (connection, timestamp, data) in enumerate(reader.messages()): if i % task_num != task_id: continue topic = connection.topic if topic in lidar_topics: deserialized_data = reader.deserialize(data, connection.msgtype) pcd = open3d.geometry.PointCloud() reshaped = deserialized_data.data.reshape( int(len(deserialized_data.data) / 3), 3) pcd.points = open3d.utility.Vector3dVector(reshaped) file_name = f'{timestamp}.pcd' f_path = Path(raw_dir, 'lidar') / topic.strip('/') tmp_path = Path(raw_dir, 'tmp_lidar') / topic.strip('/') Path.mkdir(tmp_path, parents=True, exist_ok=True) tmp_file = tmp_path / file_name file = f_path / file_name # 当未建立目录时,先基于topic名称建立目录 Path.mkdir(file.parent, parents=True, exist_ok=True) open3d.io.write_point_cloud(str(tmp_file), pcd) os.chmod(tmp_file, 0o777) os.chmod(file.parent, 0o777) shutil.move(tmp_file, file) LOG.info('Finish extracting pcd.') def extract_gnss(input_rosbag): '''从原始rosbag中抽取gnss数据.''' LOG.info('Start extracting rosbag.') gnss_file = open(gnss_file_path, 'w') gnss = dict() with AnyReader([Path(input_rosbag)]) as reader: for connection, timestamp, data in reader.messages(): topic = connection.topic if topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) # 这里以msgytpe为NavSatFix为例 latitude = deserialized_data.latitude longitude = deserialized_data.longitude altitude = deserialized_data.altitude gnss[timestamp] = { 'latitude': latitude, 'longitude': longitude, 'altitude': altitude } gnss_file.write(json.dumps(gnss)) gnss_file.close() LOG.info('Finish extracting gnss.') def _get_masked_image(topic, timestamp): '''从脱敏后的图像数据中获取目标图像数据.''' file = Path(desens_dir, 'image') / topic.strip('/') / f'{timestamp}.jpg' if file.is_file(): return np.fromfile(file, dtype='uint8') else: return None def _get_conn_map(rosbag_version: int, reader, writer): '''构建connection的索引.''' conn_map = {} if rosbag_version == '1': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag1, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, conn.msgdef, conn.md5sum, ext.callerid, ext.latching, ) elif rosbag_version == '2': for conn in reader.connections: if conn.topic in image_topics: conn_map[conn.id] = writer.add_connection( '/image', CompressedImage.__msgtype__, ) else: ext = cast(ConnectionExtRosbag2, conn.ext) conn_map[conn.id] = writer.add_connection( conn.topic, conn.msgtype, ext.serialization_format, ext.offered_qos_profiles, ) return conn_map def _serialize_data(rosbag_version, data, msgtype): '''对数据进行序列化.''' if rosbag_version == '1': return cdr_to_ros1(serialize_cdr(data, msgtype), msgtype) elif rosbag_version == '2': return serialize_cdr(data, msgtype) def generate_rosbag(input_rosbag, output_rosbag): '''生成脱敏后rosbag.''' LOG.info('Start generating rosbag.') gnss_file = open( Path(desens_dir, 'gnss') / f'{gnss_topic}.json'.strip('/'), 'r') gnss_data = json.load(gnss_file) gnss_file.close() Writer = Writer1 if rosbag_version == '1' else Writer2 with AnyReader([Path(input_rosbag) ]) as reader, Writer(Path(output_rosbag)) as writer: conn_map = _get_conn_map(rosbag_version, reader, writer) for connection, timestamp, data in reader.messages(): topic = connection.topic # 当topic为图像数据的topic时,读取脱敏后图像数据 if topic in image_topics: masked_data = _get_masked_image(topic, timestamp) if masked_data is None: # 没有解析出图像文件时,不要该帧了 continue deserialized_data = CompressedImage( Header( stamp=Time( sec=int(timestamp // 10**9), nanosec=int(timestamp % 10**9), ), frame_id='0', ), format='jpg', data=masked_data, ) data = _serialize_data( rosbag_version, deserialized_data, CompressedImage.__msgtype__, ) # 当topic为gnss数据时,读取脱敏后gnss数据 elif topic == gnss_topic: deserialized_data = reader.deserialize(data, connection.msgtype) deserialized_data.latitude = gnss_data.get( str(timestamp)).get('latitude') deserialized_data.longitude = gnss_data.get( str(timestamp)).get('longitude') deserialized_data.altitude = gnss_data.get( str(timestamp)).get('altitude') data = _serialize_data( rosbag_version, deserialized_data, connection.msgtype, ) # 当topic为点云数据时,读取脱敏后点云数据 elif topic in lidar_topics: deserialized_data = reader.deserialize( data, connection.msgtype, ) file = Path( desens_dir, 'lidar', ) / topic.strip('/') / f'{timestamp}.pcd' point_cloud = open3d.io.read_point_cloud(str(file)) deserialized_data.data = np.asarray( point_cloud.points).flatten() writer.write(conn_map[connection.id], timestamp, data) # 生成_SUCCESS文件标识完成数据抽取 Path(output_dir, '_SUCCESS').touch() LOG.info('Finish generating rosbag.') if __name__ == "__main__": LOG.info('Start user operator.') process_image = mp.Process(target=extract_image, args=(input_path, )) pool_lidar = mp.Pool(processes=lidar_process_num) for i in range(lidar_process_num): pool_lidar.apply_async(extract_lidar, args=(i, lidar_process_num, input_path)) process_gnss = mp.Process(target=extract_gnss, args=(input_path, )) # 启动子进程 process_image.start() pool_lidar.close() process_gnss.start() process_image.join() pool_lidar.join() process_gnss.join() LOG.info('Child processes exit.') # 生成_SUCCESS文件标识完成数据抽取 Path(raw_dir, '_SUCCESS').touch() # 后面输出的rosbag文件与输入的rosbag文件保持同名 output_rosbag_file = Path(output_dir, Path(input_path).name) # 如果输出文件夹不存在,先创建文件夹 Path.mkdir(output_rosbag_file.parent, parents=True, exist_ok=True) # 检测到脱敏任务结束后,生成新的rosbag文件 while time.sleep(1) is None: if Path(desens_dir).joinpath('_SUCCESS').is_file(): generate_rosbag(Path(input_path), output_rosbag_file) break 父主题: 数据脱敏作业
  • 如何同时启动多个敏感数据识别规则组? DSC根据不同的场景预置了100+条敏感数据识别和脱敏规则,可对个人敏感信息(身份证、银行卡、姓名、手机号、邮箱等)、企业敏感信息(营业执照号码、税务登录证号码等)、密钥敏感信息(PEM证书、HEY私钥等)、设备敏感信息(IP地址、MAC地址、IPV6地址等)、位置敏感信息(省份、城市、GPS位置、地址等)和通用敏感信息(日期)等敏感信息进行识别和脱敏。同时,DSC也支持自定义识别规则,自定义识别规则的操作方法请参见新增敏感数据规则。 在为同一个资产创建扫描任务时,可添加多个“识别规则组”,同时启动多个敏感数据识别规则,实现为同一资产配置多场景的扫描任务,如图1所示。具体的操作请参见创建敏感数据识别任务。 图1 新建任务 父主题: 数据识别和数据脱敏