| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """货物高度测量服务。
- 该服务封装一次完整测量流程:
- 1. 从深度相机连续采样;
- 2. 在 ROI 内计算最近距离;
- 3. 通过中位数汇总结果;
- 4. 保存对应彩色图与深度标注图。
- """
- import os
- import threading
- import time
- from typing import Any, Dict, Optional
- import cv2
- import numpy as np
- from api_config import ApiConfig
- from depth_common import (
- TemporalFilter,
- compute_roi_bounds,
- extract_depth_data,
- find_nearest_point,
- init_depth_pipeline,
- nearest_distance_in_roi,
- )
- from utils import frame_to_bgr_image
- class CargoHeightService:
- """面向 API 的测高服务对象。"""
- def __init__(self, config: ApiConfig) -> None:
- # 配置对象由外部注入,便于统一管理参数。
- self.config = config
- # 以下对象在 startup() 中初始化,在 shutdown() 中释放。
- self._pipeline = None
- self._depth_intrinsics = None
- self._temporal_filter = None
- # 使用互斥锁避免并发请求同时访问同一相机管线。
- self._lock = threading.Lock()
- def startup(self) -> None:
- """初始化相机与滤波器。"""
- if self._pipeline is not None:
- return
- try:
- pipeline, depth_intrinsics, _ = init_depth_pipeline()
- except Exception as exc:
- raise RuntimeError(f"Failed to init depth camera: {exc}") from exc
- self._pipeline = pipeline
- self._depth_intrinsics = depth_intrinsics
- self._temporal_filter = TemporalFilter(alpha=0.5)
- def shutdown(self) -> None:
- """释放相机资源。"""
- if self._pipeline is None:
- return
- self._pipeline.stop()
- self._pipeline = None
- def measure_height(self) -> Optional[Dict[str, Any]]:
- """执行一次测量并返回结果。
- 返回 `None` 表示在规定时间内没有采够有效样本。
- """
- start_time = time.time()
- samples = []
- first_valid_sample = None
- first_color_frame = None
- # 相机读取与状态更新必须在锁内进行,确保线程安全。
- with self._lock:
- while len(samples) < self.config.sample_count and (time.time() - start_time) < self.config.sample_timeout_sec:
- sample = self._measure_once()
- if sample is None:
- continue
- # 记录最近距离样本用于最终统计。
- samples.append(sample["nearest_distance"])
- # 保留第一份有效样本,用于后续生成标注深度图。
- if first_valid_sample is None:
- first_valid_sample = sample
- # 优先复用测量过程中采到的第一帧彩色图。
- if first_color_frame is None and sample.get("color_frame") is not None:
- first_color_frame = sample.get("color_frame")
- # 若测量期间没拿到彩色帧,再额外尝试抓取几次。
- if first_color_frame is None:
- for _ in range(5):
- frames = self._pipeline.wait_for_frames(self.config.frame_timeout_ms)
- if frames is None:
- continue
- color_frame = frames.get_color_frame()
- if color_frame is not None:
- first_color_frame = color_frame
- break
- image_paths = None
- if first_valid_sample is not None:
- if first_color_frame is not None:
- first_valid_sample["color_frame"] = first_color_frame
- image_paths = self._save_current_sample_images(first_valid_sample)
- if len(samples) < self.config.sample_count:
- return None
- # 用中位数抵抗离群值,比均值更稳健。
- median_value = int(np.median(np.array(samples, dtype=np.int32)))
- return {
- "height_mm": median_value,
- "samples": samples,
- "unit": "mm",
- "sample_count": self.config.sample_count,
- "image_paths": image_paths,
- }
- def _measure_once(self) -> Optional[Dict[str, Any]]:
- """采集并计算单帧测量结果。"""
- frames = self._pipeline.wait_for_frames(self.config.frame_timeout_ms)
- if frames is None:
- return None
- color_frame = frames.get_color_frame()
- depth_frame = frames.get_depth_frame()
- # 深度帧预处理:格式检查、量纲转换、噪声过滤、时间滤波。
- depth_data = extract_depth_data(depth_frame, self.config.settings, self._temporal_filter)
- if depth_data is None:
- return None
- # 根据中心距离动态计算 ROI 像素范围。
- bounds = compute_roi_bounds(depth_data, self._depth_intrinsics, self.config.settings)
- if bounds is None:
- return None
- x_start, x_end, y_start, y_end, center_distance = bounds
- roi = depth_data[y_start:y_end, x_start:x_end]
- # 计算 ROI 中最近距离(或分位距离)。
- nearest_distance = nearest_distance_in_roi(roi, self.config.settings)
- if nearest_distance is None:
- return None
- return {
- "nearest_distance": nearest_distance,
- "color_frame": color_frame,
- "depth_data": depth_data,
- "bounds": bounds,
- "center_distance": center_distance,
- }
- def _save_current_sample_images(self, sample: Dict[str, Any]) -> Dict[str, Optional[str]]:
- """保存当前样本的彩色图和深度标注图,并返回可访问路径。"""
- save_image_dir = os.path.join(os.getcwd(), "sample_images")
- os.makedirs(save_image_dir, exist_ok=True)
- # 生成包含日期时间和毫秒的文件名,降低重名概率。
- now = time.localtime()
- time_str = time.strftime("%Y%m%d_%H%M%S", now)
- millis = int((time.time() % 1) * 1000)
- timestamp = f"{time_str}_{millis:03d}"
- color_relative_path = None
- color_frame = sample.get("color_frame")
- if color_frame is not None:
- color_image = frame_to_bgr_image(color_frame)
- if color_image is not None:
- color_height, color_width = color_image.shape[:2]
- color_name = f"color_{color_width}x{color_height}_{timestamp}.png"
- color_file = os.path.join(
- save_image_dir,
- color_name,
- )
- cv2.imwrite(color_file, color_image)
- color_relative_path = f"/sample_images/{color_name}"
- # 下面构建带标注的深度可视化图。
- depth_data = sample["depth_data"]
- x_start, x_end, y_start, y_end, center_distance = sample["bounds"]
- nearest_distance = sample["nearest_distance"]
- roi = depth_data[y_start:y_end, x_start:x_end]
- nearest_point = find_nearest_point(roi, x_start, y_start, self.config.settings, nearest_distance)
- depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
- depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET)
- cv2.rectangle(
- depth_image,
- (x_start, y_start),
- (x_end - 1, y_end - 1),
- (0, 255, 0),
- 2,
- )
- if nearest_point is not None:
- cv2.circle(depth_image, nearest_point, 4, (0, 0, 0), -1)
- cv2.circle(depth_image, nearest_point, 6, (0, 255, 255), 2)
- cv2.putText(
- depth_image,
- f"nearest: {nearest_distance} mm",
- (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX,
- 0.8,
- (255, 255, 255),
- 2,
- cv2.LINE_AA,
- )
- cv2.putText(
- depth_image,
- f"center: {int(center_distance)} mm",
- (10, 60),
- cv2.FONT_HERSHEY_SIMPLEX,
- 0.8,
- (255, 255, 255),
- 2,
- cv2.LINE_AA,
- )
- depth_h, depth_w = depth_image.shape[:2]
- depth_name = f"depth_annotated_{depth_w}x{depth_h}_{timestamp}.png"
- depth_file = os.path.join(
- save_image_dir,
- depth_name,
- )
- cv2.imwrite(depth_file, depth_image)
- # 控制目录体积,超上限时删除最旧图片。
- self._prune_saved_images(save_image_dir, self.config.max_saved_images)
- return {
- "color": color_relative_path,
- "depth": f"/sample_images/{depth_name}",
- }
- @staticmethod
- def _prune_saved_images(save_dir: str, max_images: int) -> None:
- """删除超出上限的旧图片文件。"""
- png_files = [
- os.path.join(save_dir, name)
- for name in os.listdir(save_dir)
- if name.lower().endswith(".png")
- ]
- if len(png_files) <= max_images:
- return
- # 按修改时间升序排序,优先删除最旧文件。
- png_files.sort(key=os.path.getmtime)
- for file_path in png_files[: len(png_files) - max_images]:
- try:
- os.remove(file_path)
- except OSError:
- # 删除失败时忽略,避免影响主流程。
- pass
|