""" 摄像头管理器 处理摄像头配置和URL生成 """ import requests import logging from .config import BASE_URL, CAMERA_URL, CAMERAS logger = logging.getLogger(__name__) class CameraManager: def __init__(self): self.base_url = BASE_URL self.camera_url = CAMERA_URL self.session = requests.Session() self.cameras = CAMERAS # 配置请求头 self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Content-Type': 'application/json', 'Connection': 'keep-alive', }) def get_camera_url(self, camera_id, camera_number='mixed'): """根据摄像头ID和编号生成URL""" camera = next((c for c in self.cameras if c['id'] == camera_id), None) if not camera: raise ValueError(f"摄像头ID {camera_id} 不存在") room = camera['room'] if camera_number == 'mixed': return f"{self.camera_url}?room={room}&camera=mixed" else: return f"{self.camera_url}?room={room}&camera=camera-{camera_number}" def refresh_camera(self, camera_id): """刷新指定摄像头(模拟操作)""" logger.info(f"刷新摄像头 {camera_id}") return True def get_all_cameras(self): """返回所有摄像头配置""" return self.cameras def check_connection(self): """检查连接状态""" try: response = self.session.get(self.base_url, timeout=5) return response.status_code == 200 except: return False