import time import cv2 import numpy as np from pyorbbecsdk import * ESC_KEY = 27 PRINT_INTERVAL = 1 # seconds MIN_DEPTH = 500 # mm MAX_DEPTH = 4000 # mm ROI_WIDTH_CM = 10.0 # cm ROI_HEIGHT_CM = 12.0 # cm MEDIAN_BLUR_KSIZE = 5 # odd number, 0 to disable MORPH_OPEN_KSIZE = 3 # odd number, 0 to disable NEAREST_PERCENTILE = 5 # use low percentile to suppress isolated noise (0 for raw min) class TemporalFilter: def __init__(self, alpha): self.alpha = alpha self.previous_frame = None def process(self, frame): if self.previous_frame is None: result = frame else: result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0) self.previous_frame = result return result def main(): config = Config() pipeline = Pipeline() temporal_filter = TemporalFilter(alpha=0.5) try: profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR) assert profile_list is not None depth_profile = profile_list.get_default_video_stream_profile() assert depth_profile is not None print("depth profile: ", depth_profile) depth_intrinsics = depth_profile.get_intrinsic() config.enable_stream(depth_profile) except Exception as e: print(e) return pipeline.start(config) last_print_time = time.time() while True: try: frames = pipeline.wait_for_frames(100) if frames is None: continue depth_frame = frames.get_depth_frame() if depth_frame is None: continue depth_format = depth_frame.get_format() if depth_format != OBFormat.Y16: print("depth format is not Y16") continue width = depth_frame.get_width() height = depth_frame.get_height() scale = depth_frame.get_depth_scale() depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16) depth_data = depth_data.reshape((height, width)) depth_data = depth_data.astype(np.float32) * scale depth_data = np.where((depth_data > MIN_DEPTH) & (depth_data < MAX_DEPTH), depth_data, 0) depth_data = depth_data.astype(np.uint16) if MEDIAN_BLUR_KSIZE and MEDIAN_BLUR_KSIZE % 2 == 1: depth_data = cv2.medianBlur(depth_data, MEDIAN_BLUR_KSIZE) if MORPH_OPEN_KSIZE and MORPH_OPEN_KSIZE % 2 == 1: kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_OPEN_KSIZE, MORPH_OPEN_KSIZE)) valid_mask = (depth_data > 0).astype(np.uint8) valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel) depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16) # Apply temporal filtering depth_data = temporal_filter.process(depth_data) center_y = height // 2 center_x = width // 2 center_distance = depth_data[center_y, center_x] if center_distance == 0: continue center_distance_m = center_distance / 1000.0 half_width_m = (ROI_WIDTH_CM / 100.0) / 2.0 half_height_m = (ROI_HEIGHT_CM / 100.0) / 2.0 half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m) half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m) if half_width_px <= 0 or half_height_px <= 0: continue half_width_px = min(half_width_px, center_x, width - center_x - 1) half_height_px = min(half_height_px, center_y, height - center_y - 1) if half_width_px <= 0 or half_height_px <= 0: continue x_start = center_x - half_width_px x_end = center_x + half_width_px + 1 y_start = center_y - half_height_px y_end = center_y + half_height_px + 1 roi = depth_data[y_start:y_end, x_start:x_end] valid_values = roi[(roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)] if valid_values.size == 0: nearest_distance = 0 else: if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100: nearest_distance = int(np.percentile(valid_values, NEAREST_PERCENTILE)) else: nearest_distance = int(valid_values.min()) # Find nearest point in ROI for visualization nearest_point = None if nearest_distance > 0: roi_mask = (roi >= MIN_DEPTH) & (roi <= MAX_DEPTH) roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max) if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100: roi_candidate = np.where(roi_candidate <= nearest_distance, roi_candidate, np.iinfo(np.uint16).max) min_idx = np.argmin(roi_candidate) min_val = roi_candidate.flat[min_idx] if min_val != np.iinfo(np.uint16).max: min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape) nearest_point = (x_start + min_x, y_start + min_y) current_time = time.time() if current_time - last_print_time >= PRINT_INTERVAL: print(f"nearest distance in {ROI_WIDTH_CM}cm x {ROI_HEIGHT_CM}cm area: ", nearest_distance) last_print_time = current_time depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET) cv2.rectangle( depth_image, (x_start, y_start), (x_end - 1, y_end - 1), (0, 255, 0), 2, ) if nearest_point is not None: cv2.circle(depth_image, nearest_point, 4, (0, 0, 0), -1) cv2.circle(depth_image, nearest_point, 6, (0, 255, 255), 2) label = f"nearest: {nearest_distance} mm" cv2.putText( depth_image, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) center_label = f"center: {int(center_distance)} mm" cv2.putText( depth_image, center_label, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) cv2.imshow("Depth Viewer", depth_image) key = cv2.waitKey(1) if key == ord('q') or key == ESC_KEY: break except KeyboardInterrupt: break cv2.destroyAllWindows() pipeline.stop() if __name__ == "__main__": main()