| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import os
- import time
- import threading
- import numpy as np
- from fastapi import FastAPI, HTTPException
- import uvicorn
- from depth_common import (
- Settings,
- TemporalFilter,
- compute_roi_bounds,
- extract_depth_data,
- init_depth_pipeline,
- nearest_distance_in_roi,
- )
- # 采样参数
- SAMPLE_COUNT = 10
- FRAME_TIMEOUT_MS = 200
- SAMPLE_TIMEOUT_SEC = 8
- # 从环境变量加载测量配置
- SETTINGS = Settings.from_env()
- app = FastAPI(title="Cargo Height API")
- # 相机相关的全局状态(由锁保护)
- _pipeline = None
- _depth_intrinsics = None
- _temporal_filter = None
- _lock = threading.Lock()
- def _init_camera():
- # 延迟初始化相机,避免重复启动
- global _pipeline, _depth_intrinsics, _temporal_filter
- if _pipeline is not None:
- return
- try:
- pipeline, depth_intrinsics, _ = init_depth_pipeline()
- except Exception as exc:
- raise RuntimeError(f"Failed to init depth camera: {exc}") from exc
- _pipeline = pipeline
- _depth_intrinsics = depth_intrinsics
- _temporal_filter = TemporalFilter(alpha=0.5)
- def _shutdown_camera():
- # 关闭相机资源
- global _pipeline
- if _pipeline is None:
- return
- _pipeline.stop()
- _pipeline = None
- def _measure_once():
- # 单次采样:获取一帧并在 ROI 内计算最近距离
- frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS)
- if frames is None:
- return None
- depth_frame = frames.get_depth_frame()
- depth_data = extract_depth_data(depth_frame, SETTINGS, _temporal_filter)
- if depth_data is None:
- return None
- bounds = compute_roi_bounds(depth_data, _depth_intrinsics, SETTINGS)
- if bounds is None:
- return None
- x_start, x_end, y_start, y_end, _ = bounds
- roi = depth_data[y_start:y_end, x_start:x_end]
- return nearest_distance_in_roi(roi, SETTINGS)
- @app.on_event("startup")
- def on_startup():
- # 服务启动时初始化相机
- _init_camera()
- @app.on_event("shutdown")
- def on_shutdown():
- # 服务关闭时释放相机
- _shutdown_camera()
- @app.get("/height")
- def get_height():
- # 采集多次样本并返回中位数高度
- start_time = time.time()
- samples = []
- with _lock:
- while len(samples) < SAMPLE_COUNT and (time.time() - start_time) < SAMPLE_TIMEOUT_SEC:
- value = _measure_once()
- if value is not None:
- samples.append(value)
- if len(samples) < SAMPLE_COUNT:
- raise HTTPException(status_code=503, detail="Insufficient valid samples from depth camera")
- median_value = int(np.median(np.array(samples, dtype=np.int32)))
- return {
- "height_mm": median_value,
- "samples": samples,
- "unit": "mm",
- "sample_count": SAMPLE_COUNT,
- }
- @app.get("/health")
- def health():
- # 健康检查接口
- return {"status": "ok"}
- def main():
- # 读取监听地址并启动 API 服务
- host = os.getenv("API_HOST", "127.0.0.1")
- port = int(os.getenv("API_PORT", "8080"))
- uvicorn.run("api:app", host=host, port=port, log_level="info")
- if __name__ == "__main__":
- main()
|