refactor: 优化配置管理和异常处理

- 添加YAML配置文件支持
- 改进camera_manager异常处理
- 添加类型提示和URL验证
- 完善依赖注入支持测试
- 新增健康检查API端点
This commit is contained in:
qichi.liang
2026-01-02 06:25:36 +08:00
parent 3e9a840576
commit 6903ee6f0b
9 changed files with 503 additions and 132 deletions

View File

@@ -3,46 +3,91 @@
"""
import logging
from datetime import datetime
from flask import render_template, jsonify, request
from typing import Dict, Any, List
from flask import Flask, render_template, jsonify, request
from app.camera_manager import CameraManager
logger = logging.getLogger(__name__)
# 创建摄像头管理器实例(全局
camera_manager = CameraManager()
# 创建摄像头管理器实例(可通过依赖注入覆盖
_camera_manager: CameraManager = None
def register_main_routes(app):
def get_camera_manager() -> CameraManager:
"""获取摄像头管理器实例(支持依赖注入)"""
global _camera_manager
if _camera_manager is None:
_camera_manager = CameraManager()
return _camera_manager
def set_camera_manager(manager: CameraManager) -> None:
"""设置摄像头管理器实例(用于测试)"""
global _camera_manager
_camera_manager = manager
def register_main_routes(app: Flask) -> None:
"""注册主路由到Flask应用"""
@app.route('/')
def index():
def index() -> str:
"""主页面 - 显示6个摄像头的网格布局"""
camera_manager = get_camera_manager()
cameras = camera_manager.get_all_cameras()
return render_template('index.html',
cameras=cameras,
current_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
return render_template(
'index.html',
cameras=cameras,
current_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
@app.route('/api/cameras')
def get_cameras():
def get_cameras() -> Dict[str, List[Dict[str, Any]]]:
"""获取摄像头列表API"""
camera_manager = get_camera_manager()
cameras = camera_manager.get_all_cameras()
return jsonify(cameras)
@app.route('/api/refresh/<int:camera_id>', methods=['POST'])
def refresh_camera(camera_id):
def refresh_camera(camera_id: int) -> Dict[str, Any]:
"""刷新指定摄像头"""
camera_manager = get_camera_manager()
success = camera_manager.refresh_camera(camera_id)
return jsonify({'success': success, 'camera_id': camera_id})
@app.route('/api/switch', methods=['POST'])
def switch_camera():
def switch_camera() -> Dict[str, Any]:
"""切换摄像头编号"""
camera_manager = get_camera_manager()
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '无效的请求数据'}), 400
camera_id = data.get('camera_id')
camera_number = data.get('camera_number', 'mixed')
if camera_id is None:
return jsonify({'success': False, 'error': '缺少camera_id参数'}), 400
try:
url = camera_manager.get_camera_url(camera_id, camera_number)
return jsonify({'success': True, 'url': url})
except Exception as e:
except ValueError as e:
return jsonify({'success': False, 'error': str(e)}), 400
except Exception as e:
logger.error(f"切换摄像头失败: {e}")
return jsonify({'success': False, 'error': '内部服务器错误'}), 500
@app.route('/api/health')
def health_check() -> Dict[str, Any]:
"""健康检查API"""
camera_manager = get_camera_manager()
is_connected = camera_manager.check_connection()
return jsonify({
'status': 'healthy' if is_connected else 'degraded',
'connection': is_connected,
'timestamp': datetime.now().isoformat()
})