import time import threading import numpy as np import cv2 from fastapi import FastAPI, HTTPException from pyorbbecsdk import * ESC_KEY = 27 PRINT_INTERVAL = 1 # seconds MIN_DEPTH = 500 # mm MAX_DEPTH = 4000 # mm ROI_WIDTH_CM = 10.0 # cm ROI_HEIGHT_CM = 12.0 # cm MEDIAN_BLUR_KSIZE = 5 # odd number, 0 to disable MORPH_OPEN_KSIZE = 3 # odd number, 0 to disable NEAREST_PERCENTILE = 5 # use low percentile to suppress isolated noise (0 for raw min) SAMPLE_COUNT = 10 FRAME_TIMEOUT_MS = 200 SAMPLE_TIMEOUT_SEC = 8 app = FastAPI(title="Cargo Height API") class TemporalFilter: def __init__(self, alpha): self.alpha = alpha self.previous_frame = None def process(self, frame): if self.previous_frame is None: result = frame else: result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0) self.previous_frame = result return result _pipeline = None _depth_intrinsics = None _temporal_filter = None _lock = threading.Lock() def _init_camera(): global _pipeline, _depth_intrinsics, _temporal_filter if _pipeline is not None: return config = Config() pipeline = Pipeline() try: profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR) if profile_list is None: raise RuntimeError("depth profile list is empty") depth_profile = profile_list.get_default_video_stream_profile() if depth_profile is None: raise RuntimeError("default depth profile is empty") _depth_intrinsics = depth_profile.get_intrinsic() config.enable_stream(depth_profile) except Exception as exc: raise RuntimeError(f"Failed to init depth camera: {exc}") from exc pipeline.start(config) _pipeline = pipeline _temporal_filter = TemporalFilter(alpha=0.5) def _shutdown_camera(): global _pipeline if _pipeline is None: return _pipeline.stop() _pipeline = None def _measure_once(): frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS) if frames is None: return None depth_frame = frames.get_depth_frame() if depth_frame is None: return None depth_format = depth_frame.get_format() if depth_format != OBFormat.Y16: return None width = depth_frame.get_width() height = depth_frame.get_height() scale = depth_frame.get_depth_scale() depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16) depth_data = depth_data.reshape((height, width)) depth_data = depth_data.astype(np.float32) * scale depth_data = np.where((depth_data > MIN_DEPTH) & (depth_data < MAX_DEPTH), depth_data, 0) depth_data = depth_data.astype(np.uint16) if MEDIAN_BLUR_KSIZE and MEDIAN_BLUR_KSIZE % 2 == 1: depth_data = cv2.medianBlur(depth_data, MEDIAN_BLUR_KSIZE) if MORPH_OPEN_KSIZE and MORPH_OPEN_KSIZE % 2 == 1: kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_OPEN_KSIZE, MORPH_OPEN_KSIZE)) valid_mask = (depth_data > 0).astype(np.uint8) valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel) depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16) depth_data = _temporal_filter.process(depth_data) center_y = height // 2 center_x = width // 2 center_distance = depth_data[center_y, center_x] if center_distance == 0: return None center_distance_m = center_distance / 1000.0 half_width_m = (ROI_WIDTH_CM / 100.0) / 2.0 half_height_m = (ROI_HEIGHT_CM / 100.0) / 2.0 half_width_px = int(_depth_intrinsics.fx * half_width_m / center_distance_m) half_height_px = int(_depth_intrinsics.fy * half_height_m / center_distance_m) if half_width_px <= 0 or half_height_px <= 0: return None half_width_px = min(half_width_px, center_x, width - center_x - 1) half_height_px = min(half_height_px, center_y, height - center_y - 1) if half_width_px <= 0 or half_height_px <= 0: return None x_start = center_x - half_width_px x_end = center_x + half_width_px + 1 y_start = center_y - half_height_px y_end = center_y + half_height_px + 1 roi = depth_data[y_start:y_end, x_start:x_end] valid_values = roi[(roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)] if valid_values.size == 0: return None if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100: return int(np.percentile(valid_values, NEAREST_PERCENTILE)) return int(valid_values.min()) @app.on_event("startup") def on_startup(): _init_camera() @app.on_event("shutdown") def on_shutdown(): _shutdown_camera() @app.get("/height") def get_height(): start_time = time.time() samples = [] with _lock: while len(samples) < SAMPLE_COUNT and (time.time() - start_time) < SAMPLE_TIMEOUT_SEC: value = _measure_once() if value is not None: samples.append(value) if len(samples) < SAMPLE_COUNT: raise HTTPException(status_code=503, detail="Insufficient valid samples from depth camera") median_value = int(np.median(np.array(samples, dtype=np.int32))) return { "height_mm": median_value, "samples": samples, "unit": "mm", "sample_count": SAMPLE_COUNT, } @app.get("/health") def health(): return {"status": "ok"}