import os import time import threading import cv2 import numpy as np from fastapi import FastAPI, HTTPException import uvicorn from depth_common import ( Settings, TemporalFilter, compute_roi_bounds, extract_depth_data, find_nearest_point, init_depth_pipeline, nearest_distance_in_roi, ) from utils import frame_to_bgr_image # 采样参数 SAMPLE_COUNT = 10 FRAME_TIMEOUT_MS = 200 SAMPLE_TIMEOUT_SEC = 8 MAX_SAVED_IMAGES = int(os.getenv("MAX_SAVED_IMAGES", "1000")) # 从环境变量加载测量配置 SETTINGS = Settings.from_env() app = FastAPI(title="Cargo Height API") # 相机相关的全局状态(由锁保护) _pipeline = None _depth_intrinsics = None _temporal_filter = None _lock = threading.Lock() def _init_camera(): # 延迟初始化相机,避免重复启动 global _pipeline, _depth_intrinsics, _temporal_filter if _pipeline is not None: return try: pipeline, depth_intrinsics, _ = init_depth_pipeline() except Exception as exc: raise RuntimeError(f"Failed to init depth camera: {exc}") from exc _pipeline = pipeline _depth_intrinsics = depth_intrinsics _temporal_filter = TemporalFilter(alpha=0.5) def _shutdown_camera(): # 关闭相机资源 global _pipeline if _pipeline is None: return _pipeline.stop() _pipeline = None def _measure_once(): # 单次采样:获取一帧并在 ROI 内计算最近距离 frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS) if frames is None: return None color_frame = frames.get_color_frame() depth_frame = frames.get_depth_frame() depth_data = extract_depth_data(depth_frame, SETTINGS, _temporal_filter) if depth_data is None: return None bounds = compute_roi_bounds(depth_data, _depth_intrinsics, SETTINGS) if bounds is None: return None x_start, x_end, y_start, y_end, center_distance = bounds roi = depth_data[y_start:y_end, x_start:x_end] nearest_distance = nearest_distance_in_roi(roi, SETTINGS) if nearest_distance is None: return None return { "nearest_distance": nearest_distance, "color_frame": color_frame, "depth_data": depth_data, "bounds": bounds, "center_distance": center_distance, } def _save_current_sample_images(sample): save_image_dir = os.path.join(os.getcwd(), "sample_images") os.makedirs(save_image_dir, exist_ok=True) now = time.localtime() time_str = time.strftime("%Y%m%d_%H%M%S", now) millis = int((time.time() % 1) * 1000) timestamp = f"{time_str}_{millis:03d}" color_frame = sample.get("color_frame") if color_frame is not None: color_image = frame_to_bgr_image(color_frame) if color_image is not None: color_height, color_width = color_image.shape[:2] color_file = os.path.join( save_image_dir, f"color_{color_width}x{color_height}_{timestamp}.png", ) cv2.imwrite(color_file, color_image) depth_data = sample["depth_data"] x_start, x_end, y_start, y_end, center_distance = sample["bounds"] nearest_distance = sample["nearest_distance"] roi = depth_data[y_start:y_end, x_start:x_end] nearest_point = find_nearest_point(roi, x_start, y_start, SETTINGS, nearest_distance) depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET) cv2.rectangle( depth_image, (x_start, y_start), (x_end - 1, y_end - 1), (0, 255, 0), 2, ) if nearest_point is not None: cv2.circle(depth_image, nearest_point, 4, (0, 0, 0), -1) cv2.circle(depth_image, nearest_point, 6, (0, 255, 255), 2) cv2.putText( depth_image, f"nearest: {nearest_distance} mm", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) cv2.putText( depth_image, f"center: {int(center_distance)} mm", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) depth_h, depth_w = depth_image.shape[:2] depth_file = os.path.join( save_image_dir, f"depth_annotated_{depth_w}x{depth_h}_{timestamp}.png", ) cv2.imwrite(depth_file, depth_image) _prune_saved_images(save_image_dir, MAX_SAVED_IMAGES) def _prune_saved_images(save_dir, max_images): png_files = [ os.path.join(save_dir, name) for name in os.listdir(save_dir) if name.lower().endswith(".png") ] if len(png_files) <= max_images: return png_files.sort(key=os.path.getmtime) for file_path in png_files[: len(png_files) - max_images]: try: os.remove(file_path) except OSError: pass @app.on_event("startup") def on_startup(): # 服务启动时初始化相机 _init_camera() @app.on_event("shutdown") def on_shutdown(): # 服务关闭时释放相机 _shutdown_camera() @app.get("/height") def get_height(): # 采集多次样本并返回中位数高度 start_time = time.time() samples = [] first_valid_sample = None first_color_frame = None with _lock: while len(samples) < SAMPLE_COUNT and (time.time() - start_time) < SAMPLE_TIMEOUT_SEC: sample = _measure_once() if sample is not None: samples.append(sample["nearest_distance"]) if first_valid_sample is None: first_valid_sample = sample if first_color_frame is None and sample.get("color_frame") is not None: first_color_frame = sample.get("color_frame") # If no color frame arrived during valid depth sampling, try a few extra pulls. if first_color_frame is None: for _ in range(5): frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS) if frames is None: continue color_frame = frames.get_color_frame() if color_frame is not None: first_color_frame = color_frame break if first_valid_sample is not None: if first_color_frame is not None: first_valid_sample["color_frame"] = first_color_frame _save_current_sample_images(first_valid_sample) if len(samples) < SAMPLE_COUNT: raise HTTPException(status_code=503, detail="Insufficient valid samples from depth camera") median_value = int(np.median(np.array(samples, dtype=np.int32))) return { "height_mm": median_value, "samples": samples, "unit": "mm", "sample_count": SAMPLE_COUNT, } @app.get("/health") def health(): # 健康检查接口 return {"status": "ok"} def main(): # 读取监听地址并启动 API 服务 host = os.getenv("API_HOST", "127.0.0.1") port = int(os.getenv("API_PORT", "8080")) uvicorn.run("api:app", host=host, port=port, log_level="info") if __name__ == "__main__": main()