api.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import os
  2. import time
  3. import threading
  4. import numpy as np
  5. from fastapi import FastAPI, HTTPException
  6. import uvicorn
  7. from depth_common import (
  8. Settings,
  9. TemporalFilter,
  10. compute_roi_bounds,
  11. extract_depth_data,
  12. init_depth_pipeline,
  13. nearest_distance_in_roi,
  14. )
  15. # 采样参数
  16. SAMPLE_COUNT = 10
  17. FRAME_TIMEOUT_MS = 200
  18. SAMPLE_TIMEOUT_SEC = 8
  19. # 从环境变量加载测量配置
  20. SETTINGS = Settings.from_env()
  21. app = FastAPI(title="Cargo Height API")
  22. # 相机相关的全局状态(由锁保护)
  23. _pipeline = None
  24. _depth_intrinsics = None
  25. _temporal_filter = None
  26. _lock = threading.Lock()
  27. def _init_camera():
  28. # 延迟初始化相机,避免重复启动
  29. global _pipeline, _depth_intrinsics, _temporal_filter
  30. if _pipeline is not None:
  31. return
  32. try:
  33. pipeline, depth_intrinsics, _ = init_depth_pipeline()
  34. except Exception as exc:
  35. raise RuntimeError(f"Failed to init depth camera: {exc}") from exc
  36. _pipeline = pipeline
  37. _depth_intrinsics = depth_intrinsics
  38. _temporal_filter = TemporalFilter(alpha=0.5)
  39. def _shutdown_camera():
  40. # 关闭相机资源
  41. global _pipeline
  42. if _pipeline is None:
  43. return
  44. _pipeline.stop()
  45. _pipeline = None
  46. def _measure_once():
  47. # 单次采样:获取一帧并在 ROI 内计算最近距离
  48. frames = _pipeline.wait_for_frames(FRAME_TIMEOUT_MS)
  49. if frames is None:
  50. return None
  51. depth_frame = frames.get_depth_frame()
  52. depth_data = extract_depth_data(depth_frame, SETTINGS, _temporal_filter)
  53. if depth_data is None:
  54. return None
  55. bounds = compute_roi_bounds(depth_data, _depth_intrinsics, SETTINGS)
  56. if bounds is None:
  57. return None
  58. x_start, x_end, y_start, y_end, _ = bounds
  59. roi = depth_data[y_start:y_end, x_start:x_end]
  60. return nearest_distance_in_roi(roi, SETTINGS)
  61. @app.on_event("startup")
  62. def on_startup():
  63. # 服务启动时初始化相机
  64. _init_camera()
  65. @app.on_event("shutdown")
  66. def on_shutdown():
  67. # 服务关闭时释放相机
  68. _shutdown_camera()
  69. @app.get("/height")
  70. def get_height():
  71. # 采集多次样本并返回中位数高度
  72. start_time = time.time()
  73. samples = []
  74. with _lock:
  75. while len(samples) < SAMPLE_COUNT and (time.time() - start_time) < SAMPLE_TIMEOUT_SEC:
  76. value = _measure_once()
  77. if value is not None:
  78. samples.append(value)
  79. if len(samples) < SAMPLE_COUNT:
  80. raise HTTPException(status_code=503, detail="Insufficient valid samples from depth camera")
  81. median_value = int(np.median(np.array(samples, dtype=np.int32)))
  82. return {
  83. "height_mm": median_value,
  84. "samples": samples,
  85. "unit": "mm",
  86. "sample_count": SAMPLE_COUNT,
  87. }
  88. @app.get("/health")
  89. def health():
  90. # 健康检查接口
  91. return {"status": "ok"}
  92. def main():
  93. # 读取监听地址并启动 API 服务
  94. host = os.getenv("API_HOST", "127.0.0.1")
  95. port = int(os.getenv("API_PORT", "8080"))
  96. uvicorn.run("api:app", host=host, port=port, log_level="info")
  97. if __name__ == "__main__":
  98. main()