fengyanglei 1 mesiac pred
rodič
commit
b812200708
4 zmenil súbory, kde vykonal 267 pridanie a 221 odobranie
  1. 41 98
      api.py
  2. 44 109
      cargo_height_measure.py
  3. 178 0
      depth_common.py
  4. 4 14
      main.py

+ 41 - 98
api.py

@@ -3,52 +3,29 @@ import time
 import threading
 
 import numpy as np
-import cv2
 from fastapi import FastAPI, HTTPException
-
-from pyorbbecsdk import *
-
-ESC_KEY = 27
-PRINT_INTERVAL = 1  # seconds
-MEDIAN_BLUR_KSIZE = 5  # odd number, 0 to disable
-MORPH_OPEN_KSIZE = 3  # odd number, 0 to disable
-NEAREST_PERCENTILE = 5  # use low percentile to suppress isolated noise (0 for raw min)
-
+import uvicorn
+
+from depth_common import (
+    Settings,
+    TemporalFilter,
+    compute_roi_bounds,
+    extract_depth_data,
+    init_depth_pipeline,
+    nearest_distance_in_roi,
+)
+
+# 采样参数
 SAMPLE_COUNT = 10
 FRAME_TIMEOUT_MS = 200
 SAMPLE_TIMEOUT_SEC = 8
-
-def _get_env_int(name, default):
-    value = os.getenv(name)
-    if value is None or value.strip() == "":
-        return default
-    try:
-        return int(value)
-    except ValueError:
-        return default
-
-MIN_DEPTH = _get_env_int("MIN_DEPTH", 500)  # mm
-MAX_DEPTH = _get_env_int("MAX_DEPTH", 4000)  # mm
-ROI_WIDTH_CM = _get_env_int("ROI_WIDTH_CM", 10)  # cm
-ROI_HEIGHT_CM = _get_env_int("ROI_HEIGHT_CM", 12)  # cm
+# 从环境变量加载测量配置
+SETTINGS = Settings.from_env()
 
 app = FastAPI(title="Cargo Height API")
 
 
-class TemporalFilter:
-    def __init__(self, alpha):
-        self.alpha = alpha
-        self.previous_frame = None
-
-    def process(self, frame):
-        if self.previous_frame is None:
-            result = frame
-        else:
-            result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
-        self.previous_frame = result
-        return result
-
-
+# 相机相关的全局状态(由锁保护)
 _pipeline = None
 _depth_intrinsics = None
 _temporal_filter = None
@@ -56,28 +33,21 @@ _lock = threading.Lock()
 
 
 def _init_camera():
+    # 延迟初始化相机,避免重复启动
     global _pipeline, _depth_intrinsics, _temporal_filter
     if _pipeline is not None:
         return
-    config = Config()
-    pipeline = Pipeline()
     try:
-        profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
-        if profile_list is None:
-            raise RuntimeError("depth profile list is empty")
-        depth_profile = profile_list.get_default_video_stream_profile()
-        if depth_profile is None:
-            raise RuntimeError("default depth profile is empty")
-        _depth_intrinsics = depth_profile.get_intrinsic()
-        config.enable_stream(depth_profile)
+        pipeline, depth_intrinsics, _ = init_depth_pipeline()
     except Exception as exc:
         raise RuntimeError(f"Failed to init depth camera: {exc}") from exc
-    pipeline.start(config)
     _pipeline = pipeline
+    _depth_intrinsics = depth_intrinsics
     _temporal_filter = TemporalFilter(alpha=0.5)
 
 
 def _shutdown_camera():
+    # 关闭相机资源
     global _pipeline
     if _pipeline is None:
         return
@@ -86,76 +56,37 @@ def _shutdown_camera():
 
 
 def _measure_once():
+    # 单次采样:获取一帧并在 ROI 内计算最近距离
     frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS)
     if frames is None:
         return None
     depth_frame = frames.get_depth_frame()
-    if depth_frame is None:
+    depth_data = extract_depth_data(depth_frame, SETTINGS, _temporal_filter)
+    if depth_data is None:
         return None
-    depth_format = depth_frame.get_format()
-    if depth_format != OBFormat.Y16:
+    bounds = compute_roi_bounds(depth_data, _depth_intrinsics, SETTINGS)
+    if bounds is None:
         return None
-
-    width = depth_frame.get_width()
-    height = depth_frame.get_height()
-    scale = depth_frame.get_depth_scale()
-
-    depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
-    depth_data = depth_data.reshape((height, width))
-
-    depth_data = depth_data.astype(np.float32) * scale
-    depth_data = np.where((depth_data > MIN_DEPTH) & (depth_data < MAX_DEPTH), depth_data, 0)
-    depth_data = depth_data.astype(np.uint16)
-    if MEDIAN_BLUR_KSIZE and MEDIAN_BLUR_KSIZE % 2 == 1:
-        depth_data = cv2.medianBlur(depth_data, MEDIAN_BLUR_KSIZE)
-    if MORPH_OPEN_KSIZE and MORPH_OPEN_KSIZE % 2 == 1:
-        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_OPEN_KSIZE, MORPH_OPEN_KSIZE))
-        valid_mask = (depth_data > 0).astype(np.uint8)
-        valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
-        depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
-    depth_data = _temporal_filter.process(depth_data)
-
-    center_y = height // 2
-    center_x = width // 2
-    center_distance = depth_data[center_y, center_x]
-    if center_distance == 0:
-        return None
-    center_distance_m = center_distance / 1000.0
-    half_width_m = (ROI_WIDTH_CM / 100.0) / 2.0
-    half_height_m = (ROI_HEIGHT_CM / 100.0) / 2.0
-    half_width_px = int(_depth_intrinsics.fx * half_width_m / center_distance_m)
-    half_height_px = int(_depth_intrinsics.fy * half_height_m / center_distance_m)
-    if half_width_px <= 0 or half_height_px <= 0:
-        return None
-    half_width_px = min(half_width_px, center_x, width - center_x - 1)
-    half_height_px = min(half_height_px, center_y, height - center_y - 1)
-    if half_width_px <= 0 or half_height_px <= 0:
-        return None
-    x_start = center_x - half_width_px
-    x_end = center_x + half_width_px + 1
-    y_start = center_y - half_height_px
-    y_end = center_y + half_height_px + 1
+    x_start, x_end, y_start, y_end, _ = bounds
     roi = depth_data[y_start:y_end, x_start:x_end]
-    valid_values = roi[(roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)]
-    if valid_values.size == 0:
-        return None
-    if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100:
-        return int(np.percentile(valid_values, NEAREST_PERCENTILE))
-    return int(valid_values.min())
+    return nearest_distance_in_roi(roi, SETTINGS)
 
 
 @app.on_event("startup")
 def on_startup():
+    # 服务启动时初始化相机
     _init_camera()
 
 
 @app.on_event("shutdown")
 def on_shutdown():
+    # 服务关闭时释放相机
     _shutdown_camera()
 
 
 @app.get("/height")
 def get_height():
+    # 采集多次样本并返回中位数高度
     start_time = time.time()
     samples = []
     with _lock:
@@ -176,4 +107,16 @@ def get_height():
 
 @app.get("/health")
 def health():
+    # 健康检查接口
     return {"status": "ok"}
+
+
+def main():
+    # 读取监听地址并启动 API 服务
+    host = os.getenv("API_HOST", "127.0.0.1")
+    port = int(os.getenv("API_PORT", "8080"))
+    uvicorn.run("api:app", host=host, port=port, log_level="info")
+
+
+if __name__ == "__main__":
+    main()

+ 44 - 109
cargo_height_measure.py

@@ -1,142 +1,75 @@
-import os
 import time
 
 import cv2
-import numpy as np
-
-from pyorbbecsdk import *
 
+from depth_common import (
+    Settings,
+    TemporalFilter,
+    compute_roi_bounds,
+    extract_depth_data,
+    find_nearest_point,
+    init_depth_pipeline,
+    nearest_distance_in_roi,
+)
+
+# 键盘退出键
 ESC_KEY = 27
+# 打印间隔(秒)
 PRINT_INTERVAL = 1  # seconds
-MEDIAN_BLUR_KSIZE = 5  # odd number, 0 to disable
-MORPH_OPEN_KSIZE = 3   # odd number, 0 to disable
-NEAREST_PERCENTILE = 5  # use low percentile to suppress isolated noise (0 for raw min)
-
-def _get_env_int(name, default):
-    value = os.getenv(name)
-    if value is None or value.strip() == "":
-        return default
-    try:
-        return int(value)
-    except ValueError:
-        return default
-
-MIN_DEPTH = _get_env_int("MIN_DEPTH", 500)  # mm
-MAX_DEPTH = _get_env_int("MAX_DEPTH", 4000)  # mm
-ROI_WIDTH_CM = _get_env_int("ROI_WIDTH_CM", 10)  # cm
-ROI_HEIGHT_CM = _get_env_int("ROI_HEIGHT_CM", 12)  # cm
-
-class TemporalFilter:
-    def __init__(self, alpha):
-        self.alpha = alpha
-        self.previous_frame = None
-
-    def process(self, frame):
-        if self.previous_frame is None:
-            result = frame
-        else:
-            result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
-        self.previous_frame = result
-        return result
+# 从环境变量加载测量配置
+SETTINGS = Settings.from_env()
 
 
 def main():
-    config = Config()
-    pipeline = Pipeline()
+    # 初始化时间滤波器,减少抖动
     temporal_filter = TemporalFilter(alpha=0.5)
     try:
-        profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
-        assert profile_list is not None
-        depth_profile = profile_list.get_default_video_stream_profile()
-        assert depth_profile is not None
+        # 启动深度相机管线
+        pipeline, depth_intrinsics, depth_profile = init_depth_pipeline()
         print("depth profile: ", depth_profile)
-        depth_intrinsics = depth_profile.get_intrinsic()
-        config.enable_stream(depth_profile)
     except Exception as e:
         print(e)
         return
-    pipeline.start(config)
     last_print_time = time.time()
     while True:
         try:
+            # 获取一帧深度数据
             frames = pipeline.wait_for_frames(100)
             if frames is None:
                 continue
             depth_frame = frames.get_depth_frame()
-            if depth_frame is None:
-                continue
-            depth_format = depth_frame.get_format()
-            if depth_format != OBFormat.Y16:
-                print("depth format is not Y16")
+            depth_data = extract_depth_data(depth_frame, SETTINGS, temporal_filter)
+            if depth_data is None:
                 continue
-            width = depth_frame.get_width()
-            height = depth_frame.get_height()
-            scale = depth_frame.get_depth_scale()
-
-            depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
-            depth_data = depth_data.reshape((height, width))
-
-            depth_data = depth_data.astype(np.float32) * scale
-            depth_data = np.where((depth_data > MIN_DEPTH) & (depth_data < MAX_DEPTH), depth_data, 0)
-            depth_data = depth_data.astype(np.uint16)
-            if MEDIAN_BLUR_KSIZE and MEDIAN_BLUR_KSIZE % 2 == 1:
-                depth_data = cv2.medianBlur(depth_data, MEDIAN_BLUR_KSIZE)
-            if MORPH_OPEN_KSIZE and MORPH_OPEN_KSIZE % 2 == 1:
-                kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (MORPH_OPEN_KSIZE, MORPH_OPEN_KSIZE))
-                valid_mask = (depth_data > 0).astype(np.uint8)
-                valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
-                depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
-            # Apply temporal filtering
-            depth_data = temporal_filter.process(depth_data)
-
-            center_y = height // 2
-            center_x = width // 2
-            center_distance = depth_data[center_y, center_x]
-            if center_distance == 0:
-                continue
-            center_distance_m = center_distance / 1000
-            half_width_m = (ROI_WIDTH_CM / 100) / 2
-            half_height_m = (ROI_HEIGHT_CM / 100) / 2
-            half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
-            half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
-            if half_width_px <= 0 or half_height_px <= 0:
+            # 计算中心 ROI 区域
+            bounds = compute_roi_bounds(depth_data, depth_intrinsics, SETTINGS)
+            if bounds is None:
                 continue
-            half_width_px = min(half_width_px, center_x, width - center_x - 1)
-            half_height_px = min(half_height_px, center_y, height - center_y - 1)
-            if half_width_px <= 0 or half_height_px <= 0:
-                continue
-            x_start = center_x - half_width_px
-            x_end = center_x + half_width_px + 1
-            y_start = center_y - half_height_px
-            y_end = center_y + half_height_px + 1
+            x_start, x_end, y_start, y_end, center_distance = bounds
             roi = depth_data[y_start:y_end, x_start:x_end]
-            valid_values = roi[(roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)]
-            if valid_values.size == 0:
-                nearest_distance = 0
-            else:
-                if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100:
-                    nearest_distance = int(np.percentile(valid_values, NEAREST_PERCENTILE))
-                else:
-                    nearest_distance = int(valid_values.min())
-
-            # Find nearest point in ROI for visualization
-            nearest_point = None
-            if nearest_distance > 0:
-                roi_mask = (roi >= MIN_DEPTH) & (roi <= MAX_DEPTH)
-                roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
-                if NEAREST_PERCENTILE and 0 < NEAREST_PERCENTILE < 100:
-                    roi_candidate = np.where(roi_candidate <= nearest_distance, roi_candidate, np.iinfo(np.uint16).max)
-                min_idx = np.argmin(roi_candidate)
-                min_val = roi_candidate.flat[min_idx]
-                if min_val != np.iinfo(np.uint16).max:
-                    min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
-                    nearest_point = (x_start + min_x, y_start + min_y)
+            # 计算 ROI 内最近距离
+            nearest_distance = nearest_distance_in_roi(roi, SETTINGS) or 0
+
+            # 找出 ROI 内最近点用于可视化
+            nearest_point = find_nearest_point(
+                roi,
+                x_start,
+                y_start,
+                SETTINGS,
+                nearest_distance,
+            )
 
             current_time = time.time()
             if current_time - last_print_time >= PRINT_INTERVAL:
-                print(f"nearest distance in {ROI_WIDTH_CM}cm x {ROI_HEIGHT_CM}cm area: ", nearest_distance)
+                # 定期输出最近距离
+                print(
+                    "nearest distance in "
+                    f"{SETTINGS.roi_width_cm}cm x {SETTINGS.roi_height_cm}cm area: ",
+                    nearest_distance,
+                )
                 last_print_time = current_time
 
+            # 生成彩色深度图并叠加标注
             depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
             depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET)
 
@@ -150,6 +83,7 @@ def main():
             if nearest_point is not None:
                 cv2.circle(depth_image, nearest_point, 4, (0, 0, 0), -1)
                 cv2.circle(depth_image, nearest_point, 6, (0, 255, 255), 2)
+            # 文字标注当前测量值
             label = f"nearest: {nearest_distance} mm"
             cv2.putText(
                 depth_image,
@@ -179,6 +113,7 @@ def main():
                 break
         except KeyboardInterrupt:
             break
+    # 清理窗口与相机资源
     cv2.destroyAllWindows()
     pipeline.stop()
 

+ 178 - 0
depth_common.py

@@ -0,0 +1,178 @@
+import os
+from dataclasses import dataclass
+
+import cv2
+import numpy as np
+
+from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat
+
+
+def _get_env_int(name, default):
+    # 从环境变量读取整数,失败时返回默认值
+    value = os.getenv(name)
+    if value is None or value.strip() == "":
+        return default
+    try:
+        return int(value)
+    except ValueError:
+        return default
+
+
+@dataclass(frozen=True)
+class Settings:
+    # 深度数据处理参数
+    min_depth: int
+    max_depth: int
+    roi_width_cm: int
+    roi_height_cm: int
+    median_blur_ksize: int
+    morph_open_ksize: int
+    nearest_percentile: int
+
+    @classmethod
+    def from_env(
+        cls,
+        *,
+        median_blur_ksize=5,
+        morph_open_ksize=3,
+        nearest_percentile=5,
+    ):
+        # 从环境变量构建配置
+        return cls(
+            min_depth=_get_env_int("MIN_DEPTH", 500),
+            max_depth=_get_env_int("MAX_DEPTH", 4000),
+            roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
+            roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12),
+            median_blur_ksize=median_blur_ksize,
+            morph_open_ksize=morph_open_ksize,
+            nearest_percentile=nearest_percentile,
+        )
+
+
+class TemporalFilter:
+    def __init__(self, alpha):
+        # alpha 越大越偏向当前帧
+        self.alpha = alpha
+        self.previous_frame = None
+
+    def process(self, frame):
+        # 对连续帧做指数加权平滑
+        if self.previous_frame is None:
+            result = frame
+        else:
+            result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
+        self.previous_frame = result
+        return result
+
+
+def init_depth_pipeline():
+    # 初始化相机并获取深度内参
+    config = Config()
+    pipeline = Pipeline()
+    profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
+    if profile_list is None:
+        raise RuntimeError("depth profile list is empty")
+    depth_profile = profile_list.get_default_video_stream_profile()
+    if depth_profile is None:
+        raise RuntimeError("default depth profile is empty")
+    depth_intrinsics = depth_profile.get_intrinsic()
+    config.enable_stream(depth_profile)
+    pipeline.start(config)
+    return pipeline, depth_intrinsics, depth_profile
+
+
+def extract_depth_data(depth_frame, settings, temporal_filter):
+    # 从原始帧中提取并滤波深度数据
+    if depth_frame is None:
+        return None
+    if depth_frame.get_format() != OBFormat.Y16:
+        return None
+    width = depth_frame.get_width()
+    height = depth_frame.get_height()
+    scale = depth_frame.get_depth_scale()
+
+    # 读取深度数据并转换单位(毫米)
+    depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
+    depth_data = depth_data.reshape((height, width))
+    depth_data = depth_data.astype(np.float32) * scale
+    depth_data = np.where(
+        (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
+        depth_data,
+        0,
+    ).astype(np.uint16)
+
+    # 中值滤波与开运算,去除噪点
+    if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
+        depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
+    if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
+        kernel = cv2.getStructuringElement(
+            cv2.MORPH_ELLIPSE,
+            (settings.morph_open_ksize, settings.morph_open_ksize),
+        )
+        valid_mask = (depth_data > 0).astype(np.uint8)
+        valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
+        depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
+
+    # 可选的时间滤波
+    if temporal_filter is not None:
+        depth_data = temporal_filter.process(depth_data)
+
+    return depth_data
+
+
+def compute_roi_bounds(depth_data, depth_intrinsics, settings):
+    # 根据中心点距离动态计算 ROI 的像素范围
+    height, width = depth_data.shape
+    center_y = height // 2
+    center_x = width // 2
+    center_distance = int(depth_data[center_y, center_x])
+    if center_distance <= 0:
+        return None
+    center_distance_m = center_distance / 1000.0
+    if center_distance_m <= 0:
+        return None
+    half_width_m = (settings.roi_width_cm / 100) / 2
+    half_height_m = (settings.roi_height_cm / 100) / 2
+    half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
+    half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
+    if half_width_px <= 0 or half_height_px <= 0:
+        return None
+    half_width_px = min(half_width_px, center_x, width - center_x - 1)
+    half_height_px = min(half_height_px, center_y, height - center_y - 1)
+    if half_width_px <= 0 or half_height_px <= 0:
+        return None
+    x_start = center_x - half_width_px
+    x_end = center_x + half_width_px + 1
+    y_start = center_y - half_height_px
+    y_end = center_y + half_height_px + 1
+    return x_start, x_end, y_start, y_end, center_distance
+
+
+def nearest_distance_in_roi(roi, settings):
+    # 计算 ROI 内的最近距离(或按百分位)
+    valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
+    if valid_values.size == 0:
+        return None
+    if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
+        return int(np.percentile(valid_values, settings.nearest_percentile))
+    return int(valid_values.min())
+
+
+def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
+    # 返回最近点在整图中的坐标
+    if nearest_distance is None or nearest_distance <= 0:
+        return None
+    roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
+    roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
+    if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
+        roi_candidate = np.where(
+            roi_candidate <= nearest_distance,
+            roi_candidate,
+            np.iinfo(np.uint16).max,
+        )
+    min_idx = np.argmin(roi_candidate)
+    min_val = roi_candidate.flat[min_idx]
+    if min_val == np.iinfo(np.uint16).max:
+        return None
+    min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
+    return x_start + min_x, y_start + min_y

+ 4 - 14
main.py

@@ -1,16 +1,6 @@
-# This is a sample Python script.
+from api import main
 
-# Press Shift+F10 to execute it or replace it with your code.
-# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
 
-
-def print_hi(name):
-    # Use a breakpoint in the code line below to debug your script.
-    print(f'Hi, {name}')  # Press Ctrl+F8 to toggle the breakpoint.
-
-
-# Press the green button in the gutter to run the script.
-if __name__ == '__main__':
-    print_hi('PyCharm')
-
-# See PyCharm help at https://www.jetbrains.com/help/pycharm/
+if __name__ == "__main__":
+    # 命令行入口
+    main()