"""保存图像示例。 从相机采集少量彩色帧和深度帧并落盘,便于离线分析。 """ import os import cv2 import numpy as np from pyorbbecsdk import * from utils import frame_to_bgr_image def save_depth_frame(frame: DepthFrame, index): """保存单帧深度数据为原始 .raw 文件。""" if frame is None: return width = frame.get_width() height = frame.get_height() timestamp = frame.get_timestamp() scale = frame.get_depth_scale() depth_format = frame.get_format() if depth_format != OBFormat.Y16: print("depth format is not Y16") return # 深度数据转毫米后按 uint16 存储。 data = np.frombuffer(frame.get_data(), dtype=np.uint16) data = data.reshape((height, width)) data = data.astype(np.float32) * scale data = data.astype(np.uint16) save_image_dir = os.path.join(os.getcwd(), "depth_images") if not os.path.exists(save_image_dir): os.mkdir(save_image_dir) raw_filename = save_image_dir + "/depth_{}x{}_{}_{}.raw".format(width, height, index, timestamp) data.tofile(raw_filename) def save_color_frame(frame: ColorFrame, index): """保存单帧彩色图为 PNG 文件。""" if frame is None: return width = frame.get_width() height = frame.get_height() timestamp = frame.get_timestamp() save_image_dir = os.path.join(os.getcwd(), "color_images") if not os.path.exists(save_image_dir): os.mkdir(save_image_dir) filename = save_image_dir + "/color_{}x{}_{}_{}.png".format(width, height, index, timestamp) image = frame_to_bgr_image(frame) if image is None: print("failed to convert frame to image") return cv2.imwrite(filename, image) def main(): """采集并保存若干帧样例图片。""" pipeline = Pipeline() config = Config() saved_color_cnt: int = 0 saved_depth_cnt: int = 0 has_color_sensor = False try: # 优先尝试启用彩色流。 profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR) if profile_list is not None: color_profile: VideoStreamProfile = profile_list.get_default_video_stream_profile() config.enable_stream(color_profile) has_color_sensor = True except OBError as e: print(e) # 启用深度流。 depth_profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR) if depth_profile_list is not None: depth_profile = depth_profile_list.get_default_video_stream_profile() config.enable_stream(depth_profile) pipeline.start(config) while True: try: frames = pipeline.wait_for_frames(100) if frames is None: continue # 已达到目标帧数则结束采集。 if has_color_sensor: if saved_color_cnt >= 5 and saved_depth_cnt >= 5: break elif saved_depth_cnt >= 5: break color_frame = frames.get_color_frame() if color_frame is not None and saved_color_cnt < 5: save_color_frame(color_frame, saved_color_cnt) saved_color_cnt += 1 depth_frame = frames.get_depth_frame() if depth_frame is not None and saved_depth_cnt < 5: save_depth_frame(depth_frame, saved_depth_cnt) saved_depth_cnt += 1 except KeyboardInterrupt: break if __name__ == "__main__": main()