| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import time
- import threading
- import numpy as np
- import cv2
- from fastapi import FastAPI, HTTPException
- from pyorbbecsdk import *
- ESC_KEY = 27
- PRINT_INTERVAL = 1 # seconds
- MIN_DEPTH = 500 # mm
- MAX_DEPTH = 4000 # mm
- ROI_WIDTH_CM = 10.0 # cm
- ROI_HEIGHT_CM = 12.0 # cm
- MEDIAN_BLUR_KSIZE = 5 # odd number, 0 to disable
- MORPH_OPEN_KSIZE = 3 # odd number, 0 to disable
- NEAREST_PERCENTILE = 5 # use low percentile to suppress isolated noise (0 for raw min)
- SAMPLE_COUNT = 10
- FRAME_TIMEOUT_MS = 200
- SAMPLE_TIMEOUT_SEC = 8
- app = FastAPI(title="Cargo Height API")
- class TemporalFilter:
- def __init__(self, alpha):
- self.alpha = alpha
- self.previous_frame = None
- def process(self, frame):
- if self.previous_frame is None:
- result = frame
- else:
- result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
- self.previous_frame = result
- return result
- _pipeline = None
- _depth_intrinsics = None
- _temporal_filter = None
- _lock = threading.Lock()
- def _init_camera():
- global _pipeline, _depth_intrinsics, _temporal_filter
- if _pipeline is not None:
- return
- config = Config()
- pipeline = Pipeline()
- try:
- profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
- if profile_list is None:
- raise RuntimeError("depth profile list is empty")
- depth_profile = profile_list.get_default_video_stream_profile()
- if depth_profile is None:
- raise RuntimeError("default depth profile is empty")
- _depth_intrinsics = depth_profile.get_intrinsic()
- config.enable_stream(depth_profile)
- except Exception as exc:
- raise RuntimeError(f"Failed to init depth camera: {exc}") from exc
- pipeline.start(config)
- _pipeline = pipeline
- _temporal_filter = TemporalFilter(alpha=0.5)
- def _shutdown_camera():
- global _pipeline
- if _pipeline is None:
- return
- _pipeline.stop()
- _pipeline = None
- def _measure_once():
- frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS)
- if frames is None:
- return None
- depth_frame = frames.get_depth_frame()
- if depth_frame is None:
- return None
- depth_format = depth_frame.get_format()
- if depth_format != OBFormat.Y16:
- return None
- width = depth_frame.get_width()
- height = depth_frame.get_height()
- scale = depth_frame.get_depth_scale()
- depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
- depth_data = depth_data.reshape((height, width))
- depth_data = depth_data.astype(np.float32) * scale
- depth_data = np.where((depth_data > MIN_DEPTH) & (depth_data < MAX_DEPTH), depth_data, 0)
- depth_data = depth_data.astype(np.uint16)
- if MEDIAN_BLUR_KSIZE and MEDIAN_BLUR_KSIZE % 2 == 1:
- depth_data = cv2.medianBlur(depth_data, MEDIAN_BLUR_KSIZE)
- if MORPH_OPEN_KSIZE and MORPH_OPEN_KSIZE % 2 == 1:
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_OPEN_KSIZE, MORPH_OPEN_KSIZE))
- valid_mask = (depth_data > 0).astype(np.uint8)
- valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
- depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
- depth_data = _temporal_filter.process(depth_data)
- center_y = height // 2
- center_x = width // 2
- center_distance = depth_data[center_y, center_x]
- if center_distance == 0:
- return None
- center_distance_m = center_distance / 1000.0
- half_width_m = (ROI_WIDTH_CM / 100.0) / 2.0
- half_height_m = (ROI_HEIGHT_CM / 100.0) / 2.0
- half_width_px = int(_depth_intrinsics.fx * half_width_m / center_distance_m)
- half_height_px = int(_depth_intrinsics.fy * half_height_m / center_distance_m)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- half_width_px = min(half_width_px, center_x, width - center_x - 1)
- half_height_px = min(half_height_px, center_y, height - center_y - 1)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- x_start = center_x - half_width_px
- x_end = center_x + half_width_px + 1
- y_start = center_y - half_height_px
- y_end = center_y + half_height_px + 1
- roi = depth_data[y_start:y_end, x_start:x_end]
- valid_values = roi[(roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)]
- if valid_values.size == 0:
- return None
- if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100:
- return int(np.percentile(valid_values, NEAREST_PERCENTILE))
- return int(valid_values.min())
- @app.on_event("startup")
- def on_startup():
- _init_camera()
- @app.on_event("shutdown")
- def on_shutdown():
- _shutdown_camera()
- @app.get("/height")
- def get_height():
- start_time = time.time()
- samples = []
- with _lock:
- while len(samples) < SAMPLE_COUNT and (time.time() - start_time) < SAMPLE_TIMEOUT_SEC:
- value = _measure_once()
- if value is not None:
- samples.append(value)
- if len(samples) < SAMPLE_COUNT:
- raise HTTPException(status_code=503, detail="Insufficient valid samples from depth camera")
- median_value = int(np.median(np.array(samples, dtype=np.int32)))
- return {
- "height_mm": median_value,
- "samples": samples,
- "unit": "mm",
- "sample_count": SAMPLE_COUNT,
- }
- @app.get("/health")
- def health():
- return {"status": "ok"}
|