"""货物高度测量服务。 该服务封装一次完整测量流程: 1. 从深度相机连续采样; 2. 在 ROI 内计算最近距离; 3. 通过中位数汇总结果; 4. 保存对应彩色图与深度标注图。 """ import os import threading import time from typing import Any, Dict, Optional import cv2 import numpy as np from api_config import ApiConfig from depth_common import ( TemporalFilter, compute_roi_bounds, extract_depth_data, find_nearest_point, init_depth_pipeline, nearest_distance_in_roi, ) from utils import frame_to_bgr_image class CargoHeightService: """面向 API 的测高服务对象。""" def __init__(self, config: ApiConfig) -> None: # 配置对象由外部注入,便于统一管理参数。 self.config = config # 以下对象在 startup() 中初始化,在 shutdown() 中释放。 self._pipeline = None self._depth_intrinsics = None self._temporal_filter = None # 使用互斥锁避免并发请求同时访问同一相机管线。 self._lock = threading.Lock() def startup(self) -> None: """初始化相机与滤波器。""" if self._pipeline is not None: return try: pipeline, depth_intrinsics, _ = init_depth_pipeline() except Exception as exc: raise RuntimeError(f"Failed to init depth camera: {exc}") from exc self._pipeline = pipeline self._depth_intrinsics = depth_intrinsics self._temporal_filter = TemporalFilter(alpha=0.5) def shutdown(self) -> None: """释放相机资源。""" if self._pipeline is None: return self._pipeline.stop() self._pipeline = None def measure_height(self) -> Optional[Dict[str, Any]]: """执行一次测量并返回结果。 返回 `None` 表示在规定时间内没有采够有效样本。 """ start_time = time.time() samples = [] first_valid_sample = None first_color_frame = None # 相机读取与状态更新必须在锁内进行,确保线程安全。 with self._lock: while len(samples) < self.config.sample_count and (time.time() - start_time) < self.config.sample_timeout_sec: sample = self._measure_once() if sample is None: continue # 记录最近距离样本用于最终统计。 samples.append(sample["nearest_distance"]) # 保留第一份有效样本,用于后续生成标注深度图。 if first_valid_sample is None: first_valid_sample = sample # 优先复用测量过程中采到的第一帧彩色图。 if first_color_frame is None and sample.get("color_frame") is not None: first_color_frame = sample.get("color_frame") # 若测量期间没拿到彩色帧,再额外尝试抓取几次。 if first_color_frame is None: for _ in range(5): frames = self._pipeline.wait_for_frames(self.config.frame_timeout_ms) if frames is None: continue color_frame = frames.get_color_frame() if color_frame is not None: first_color_frame = color_frame break image_paths = None if first_valid_sample is not None: if first_color_frame is not None: first_valid_sample["color_frame"] = first_color_frame image_paths = self._save_current_sample_images(first_valid_sample) if len(samples) < self.config.sample_count: return None # 用中位数抵抗离群值,比均值更稳健。 median_value = int(np.median(np.array(samples, dtype=np.int32))) return { "height_mm": median_value, "samples": samples, "unit": "mm", "sample_count": self.config.sample_count, "image_paths": image_paths, } def _measure_once(self) -> Optional[Dict[str, Any]]: """采集并计算单帧测量结果。""" frames = self._pipeline.wait_for_frames(self.config.frame_timeout_ms) if frames is None: return None color_frame = frames.get_color_frame() depth_frame = frames.get_depth_frame() # 深度帧预处理:格式检查、量纲转换、噪声过滤、时间滤波。 depth_data = extract_depth_data(depth_frame, self.config.settings, self._temporal_filter) if depth_data is None: return None # 根据中心距离动态计算 ROI 像素范围。 bounds = compute_roi_bounds(depth_data, self._depth_intrinsics, self.config.settings) if bounds is None: return None x_start, x_end, y_start, y_end, center_distance = bounds roi = depth_data[y_start:y_end, x_start:x_end] # 计算 ROI 中最近距离(或分位距离)。 nearest_distance = nearest_distance_in_roi(roi, self.config.settings) if nearest_distance is None: return None return { "nearest_distance": nearest_distance, "color_frame": color_frame, "depth_data": depth_data, "bounds": bounds, "center_distance": center_distance, } def _save_current_sample_images(self, sample: Dict[str, Any]) -> Dict[str, Optional[str]]: """保存当前样本的彩色图和深度标注图,并返回可访问路径。""" save_image_dir = os.path.join(os.getcwd(), "sample_images") os.makedirs(save_image_dir, exist_ok=True) # 生成包含日期时间和毫秒的文件名,降低重名概率。 now = time.localtime() time_str = time.strftime("%Y%m%d_%H%M%S", now) millis = int((time.time() % 1) * 1000) timestamp = f"{time_str}_{millis:03d}" color_relative_path = None color_frame = sample.get("color_frame") if color_frame is not None: color_image = frame_to_bgr_image(color_frame) if color_image is not None: color_height, color_width = color_image.shape[:2] color_name = f"color_{color_width}x{color_height}_{timestamp}.png" color_file = os.path.join( save_image_dir, color_name, ) cv2.imwrite(color_file, color_image) color_relative_path = f"/sample_images/{color_name}" # 下面构建带标注的深度可视化图。 depth_data = sample["depth_data"] x_start, x_end, y_start, y_end, center_distance = sample["bounds"] nearest_distance = sample["nearest_distance"] roi = depth_data[y_start:y_end, x_start:x_end] nearest_point = find_nearest_point(roi, x_start, y_start, self.config.settings, nearest_distance) depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET) cv2.rectangle( depth_image, (x_start, y_start), (x_end - 1, y_end - 1), (0, 255, 0), 2, ) if nearest_point is not None: cv2.circle(depth_image, nearest_point, 4, (0, 0, 0), -1) cv2.circle(depth_image, nearest_point, 6, (0, 255, 255), 2) cv2.putText( depth_image, f"nearest: {nearest_distance} mm", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) cv2.putText( depth_image, f"center: {int(center_distance)} mm", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA, ) depth_h, depth_w = depth_image.shape[:2] depth_name = f"depth_annotated_{depth_w}x{depth_h}_{timestamp}.png" depth_file = os.path.join( save_image_dir, depth_name, ) cv2.imwrite(depth_file, depth_image) # 控制目录体积,超上限时删除最旧图片。 self._prune_saved_images(save_image_dir, self.config.max_saved_images) return { "color": color_relative_path, "depth": f"/sample_images/{depth_name}", } @staticmethod def _prune_saved_images(save_dir: str, max_images: int) -> None: """删除超出上限的旧图片文件。""" png_files = [ os.path.join(save_dir, name) for name in os.listdir(save_dir) if name.lower().endswith(".png") ] if len(png_files) <= max_images: return # 按修改时间升序排序,优先删除最旧文件。 png_files.sort(key=os.path.getmtime) for file_path in png_files[: len(png_files) - max_images]: try: os.remove(file_path) except OSError: # 删除失败时忽略,避免影响主流程。 pass