Files
multi_camera/app/camera_manager.py

54 lines
1.8 KiB
Python

"""
摄像头管理器
处理摄像头配置和URL生成
"""
import requests
import logging
from .config import BASE_URL, CAMERA_URL, CAMERAS
logger = logging.getLogger(__name__)
class CameraManager:
def __init__(self):
self.base_url = BASE_URL
self.camera_url = CAMERA_URL
self.session = requests.Session()
self.cameras = CAMERAS
# 配置请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
})
def get_camera_url(self, camera_id, camera_number='mixed'):
"""根据摄像头ID和编号生成URL"""
camera = next((c for c in self.cameras if c['id'] == camera_id), None)
if not camera:
raise ValueError(f"摄像头ID {camera_id} 不存在")
room = camera['room']
if camera_number == 'mixed':
return f"{self.camera_url}?room={room}&camera=mixed"
else:
return f"{self.camera_url}?room={room}&camera=camera-{camera_number}"
def refresh_camera(self, camera_id):
"""刷新指定摄像头(模拟操作)"""
logger.info(f"刷新摄像头 {camera_id}")
return True
def get_all_cameras(self):
"""返回所有摄像头配置"""
return self.cameras
def check_connection(self):
"""检查连接状态"""
try:
response = self.session.get(self.base_url, timeout=5)
return response.status_code == 200
except:
return False