Files
multi_camera/app/camera_manager.py

103 lines
3.3 KiB
Python

"""
摄像头管理器
处理登录、会话管理和摄像头配置
"""
import requests
import logging
from datetime import datetime
from .config import BASE_URL, LOGIN_API, CAMERA_URL, USERNAME, PASSWORD, CAMERAS
logger = logging.getLogger(__name__)
class CameraManager:
def __init__(self):
self.base_url = BASE_URL
self.login_api = LOGIN_API
self.camera_url = CAMERA_URL
self.session = requests.Session()
self.token = None
self.last_login_time = None
self.is_logged_in = False
self.cameras = CAMERAS
# 配置请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
})
# 不自动登录,按需登录
# self.auto_login()
def login(self):
"""登录系统"""
logger.info("正在登录系统...")
login_data = {
'username': USERNAME,
'password': PASSWORD,
'email': USERNAME,
'user': USERNAME,
'account': USERNAME
}
try:
response = self.session.post(
self.login_api,
json=login_data,
timeout=10
)
if response.status_code == 200:
response_data = response.json()
self.token = response_data.get('token')
self.last_login_time = datetime.now()
self.is_logged_in = True
# 更新认证头
if self.token:
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
logger.info("登录成功!")
return True
else:
logger.error(f"登录失败,状态码: {response.status_code}")
return False
except Exception as e:
logger.error(f"登录请求失败: {e}")
return False
def get_camera_url(self, camera_id, camera_number='mixed'):
"""根据摄像头ID和编号生成URL"""
camera = next((c for c in self.cameras if c['id'] == camera_id), None)
if not camera:
raise ValueError(f"摄像头ID {camera_id} 不存在")
room = camera['room']
if camera_number == 'mixed':
return f"{self.camera_url}?room={room}&camera=mixed"
else:
return f"{self.camera_url}?room={room}&camera=camera-{camera_number}"
def refresh_camera(self, camera_id):
"""刷新指定摄像头(模拟操作)"""
logger.info(f"刷新摄像头 {camera_id}")
return True
def get_all_cameras(self):
"""返回所有摄像头配置"""
return self.cameras
def check_connection(self):
"""检查连接状态"""
try:
response = self.session.get(self.base_url, timeout=5)
return response.status_code == 200
except:
return False