Files
Orbitin/src/database/daily_logs.py

618 lines
21 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
每日交接班日志数据库模块
基于新的数据库基类重构
"""
from typing import List, Dict, Optional, Any
from datetime import datetime
from src.database.base import DatabaseBase
from src.logging_config import get_logger
logger = get_logger(__name__)
class DailyLogsDatabase(DatabaseBase):
"""每日交接班日志数据库"""
def __init__(self, db_path: Optional[str] = None):
"""
初始化数据库
参数:
db_path: 数据库文件路径如果为None则使用默认配置
"""
super().__init__(db_path)
self._init_schema()
def _init_schema(self):
"""初始化表结构"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建每日交接班日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_handover_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
shift TEXT NOT NULL,
ship_name TEXT NOT NULL,
teu INTEGER,
efficiency REAL,
vehicles INTEGER,
twenty_feet INTEGER, -- 20尺箱量
forty_feet INTEGER, -- 40尺箱量
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 检查是否需要迁移旧表结构
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='daily_handover_logs'")
table_sql = cursor.fetchone()[0]
if 'twenty_feet' not in table_sql or 'forty_feet' not in table_sql:
logger.warning("检测到旧表结构,正在迁移以添加尺寸箱量字段...")
# 重命名旧表
cursor.execute('ALTER TABLE daily_handover_logs RENAME TO daily_handover_logs_old')
# 创建新表
cursor.execute('''
CREATE TABLE daily_handover_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
shift TEXT NOT NULL,
ship_name TEXT NOT NULL,
teu INTEGER,
efficiency REAL,
vehicles INTEGER,
twenty_feet INTEGER, -- 20尺箱量
forty_feet INTEGER, -- 40尺箱量
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 复制数据(忽略重复)
cursor.execute('''
INSERT OR IGNORE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles, created_at)
SELECT date, shift, ship_name, teu, efficiency, vehicles, created_at
FROM daily_handover_logs_old
''')
# 删除旧表
cursor.execute('DROP TABLE daily_handover_logs_old')
logger.info("迁移完成!已添加尺寸箱量字段")
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON daily_handover_logs(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ship ON daily_handover_logs(ship_name)')
# 创建未统计月报数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS monthly_unaccounted (
id INTEGER PRIMARY KEY AUTOINCREMENT,
year_month TEXT NOT NULL UNIQUE,
teu INTEGER NOT NULL,
note TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建手动调整数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS manual_adjustments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL, -- 调整适用的日期
ship_name TEXT NOT NULL, -- 船名
teu INTEGER NOT NULL, -- TEU数量
twenty_feet INTEGER DEFAULT 0, -- 20尺箱量
forty_feet INTEGER DEFAULT 0, -- 40尺箱量
adjustment_type TEXT NOT NULL, -- 'add' 'exclude'
note TEXT, -- 备注
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_manual_date ON manual_adjustments(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_manual_type ON manual_adjustments(adjustment_type)')
conn.commit()
logger.debug("数据库表结构初始化完成")
def insert(self, log: Dict[str, Any]) -> bool:
"""
插入记录存在则替换不存在则插入
参数:
log: 日志记录字典
返回:
是否成功
"""
try:
query = '''
INSERT OR REPLACE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles, twenty_feet, forty_feet, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
'''
params = (
log['date'], log['shift'], log['ship_name'],
log.get('teu'), log.get('efficiency'), log.get('vehicles'),
log.get('twenty_feet'), log.get('forty_feet')
)
self.execute_update(query, params)
logger.debug(f"插入记录: {log['date']} {log['shift']} {log['ship_name']}")
return True
except Exception as e:
logger.error(f"插入记录失败: {e}, 记录: {log}")
return False
def insert_many(self, logs: List[Dict[str, Any]]) -> int:
"""
批量插入
参数:
logs: 日志记录列表
返回:
成功插入的数量
"""
count = 0
for log in logs:
if self.insert(log):
count += 1
logger.info(f"批量插入完成,成功 {count}/{len(logs)} 条记录")
return count
def query_by_date(self, date: str) -> List[Dict[str, Any]]:
"""
按日期查询
参数:
date: 日期字符串
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
WHERE date = ? ORDER BY shift, ship_name
'''
return self.execute_query(query, (date,))
def query_by_ship(self, ship_name: str) -> List[Dict[str, Any]]:
"""
按船名查询
参数:
ship_name: 船名
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
WHERE ship_name LIKE ? ORDER BY date DESC
'''
return self.execute_query(query, (f'%{ship_name}%',))
def query_all(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""
查询所有记录
参数:
limit: 限制返回数量
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
ORDER BY date DESC, shift LIMIT ?
'''
return self.execute_query(query, (limit,))
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
返回:
统计信息字典
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM daily_handover_logs')
total = cursor.fetchone()[0]
cursor.execute('SELECT DISTINCT ship_name FROM daily_handover_logs')
ships = [row[0] for row in cursor.fetchall()]
cursor.execute('SELECT MIN(date), MAX(date) FROM daily_handover_logs')
date_range = cursor.fetchone()
return {
'total': total,
'ships': ships,
'date_range': {'start': date_range[0], 'end': date_range[1]}
}
def get_ships_with_monthly_teu(self, year_month: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取所有船只及其当月TEU总量
参数:
year_month: 年月字符串格式 "2025-12"如果为None则统计所有
返回:
船只统计列表
"""
if year_month:
query = '''
SELECT ship_name, SUM(teu) as monthly_teu
FROM daily_handover_logs
WHERE date LIKE ?
GROUP BY ship_name
ORDER BY monthly_teu DESC
'''
return self.execute_query(query, (f'{year_month}%',))
else:
query = '''
SELECT ship_name, SUM(teu) as monthly_teu
FROM daily_handover_logs
GROUP BY ship_name
ORDER BY monthly_teu DESC
'''
return self.execute_query(query)
def insert_unaccounted(self, year_month: str, teu: int, note: str = '') -> bool:
"""
插入未统计数据
参数:
year_month: 年月字符串格式 "2025-12"
teu: 未统计TEU数量
note: 备注
返回:
是否成功
"""
try:
query = '''
INSERT OR REPLACE INTO monthly_unaccounted
(year_month, teu, note, created_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
'''
self.execute_update(query, (year_month, teu, note))
logger.info(f"插入未统计数据: {year_month} {teu}TEU")
return True
except Exception as e:
logger.error(f"插入未统计数据失败: {e}")
return False
def get_unaccounted(self, year_month: str) -> int:
"""
获取指定月份的未统计数据
参数:
year_month: 年月字符串格式 "2025-12"
返回:
未统计TEU数量
"""
query = 'SELECT teu FROM monthly_unaccounted WHERE year_month = ?'
result = self.execute_query(query, (year_month,))
return result[0]['teu'] if result else 0
def reduce_unaccounted(self, year_month: str, teu_to_reduce: int) -> bool:
"""
减少指定月份的未统计数据
参数:
year_month: 年月字符串格式 "2025-12"
teu_to_reduce: 要减少的TEU数量
返回:
是否成功
"""
try:
# 先获取当前值
current_teu = self.get_unaccounted(year_month)
# 计算新值(允许负数)
new_teu = current_teu - teu_to_reduce
if new_teu == 0:
# 如果减少后等于0则删除记录
query = 'DELETE FROM monthly_unaccounted WHERE year_month = ?'
self.execute_update(query, (year_month,))
logger.info(f"减少未统计数据后删除记录: {year_month},原值: {current_teu}TEU减少: {teu_to_reduce}TEU新值: 0TEU")
else:
# 更新记录(允许负数)
query = '''
INSERT OR REPLACE INTO monthly_unaccounted
(year_month, teu, note, created_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
'''
# 保留原有备注
note_query = 'SELECT note FROM monthly_unaccounted WHERE year_month = ?'
note_result = self.execute_query(note_query, (year_month,))
note = note_result[0]['note'] if note_result else ''
self.execute_update(query, (year_month, new_teu, note))
logger.info(f"减少未统计数据: {year_month},原值: {current_teu}TEU减少: {teu_to_reduce}TEU新值: {new_teu}TEU")
return True
except Exception as e:
logger.error(f"减少未统计数据失败: {e}")
return False
def delete_unaccounted(self, year_month: str) -> bool:
"""
删除指定月份的未统计数据
参数:
year_month: 年月字符串格式 "2025-12"
返回:
是否成功删除如果记录不存在也返回True
"""
try:
query = 'DELETE FROM monthly_unaccounted WHERE year_month = ?'
result = self.execute_update(query, (year_month,))
if result > 0:
logger.info(f"删除未统计数据: {year_month}")
return True
else:
logger.warning(f"未找到 {year_month} 月的未统计数据")
return True # 记录不存在也算成功
except Exception as e:
logger.error(f"删除未统计数据失败: {e}")
return False
def delete_by_date(self, date: str) -> int:
"""
删除指定日期的记录
参数:
date: 日期字符串
返回:
删除的记录数
"""
query = 'DELETE FROM daily_handover_logs WHERE date = ?'
return self.execute_update(query, (date,))
def insert_manual_adjustment(self, date: str, ship_name: str, teu: int,
twenty_feet: int = 0, forty_feet: int = 0,
adjustment_type: str = 'add', note: str = '') -> bool:
"""
插入手动调整数据
参数:
date: 日期字符串
ship_name: 船名
teu: TEU数量
twenty_feet: 20尺箱量
forty_feet: 40尺箱量
adjustment_type: 调整类型 'add' 'exclude'
note: 备注
返回:
是否成功
"""
try:
query = '''
INSERT INTO manual_adjustments
(date, ship_name, teu, twenty_feet, forty_feet, adjustment_type, note, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
'''
params = (date, ship_name, teu, twenty_feet, forty_feet, adjustment_type, note)
self.execute_update(query, params)
logger.info(f"插入手动调整数据: {date} {ship_name} {teu}TEU ({adjustment_type})")
return True
except Exception as e:
logger.error(f"插入手动调整数据失败: {e}")
return False
def get_manual_adjustments(self, date: str) -> List[Dict[str, Any]]:
"""
获取指定日期的所有手动调整数据
参数:
date: 日期字符串
返回:
手动调整数据列表
"""
query = '''
SELECT * FROM manual_adjustments
WHERE date = ? ORDER BY created_at DESC
'''
return self.execute_query(query, (date,))
def get_manual_adjustments_by_type(self, date: str, adjustment_type: str) -> List[Dict[str, Any]]:
"""
获取指定日期和类型的调整数据
参数:
date: 日期字符串
adjustment_type: 调整类型 'add' 'exclude'
返回:
手动调整数据列表
"""
query = '''
SELECT * FROM manual_adjustments
WHERE date = ? AND adjustment_type = ? ORDER BY created_at DESC
'''
return self.execute_query(query, (date, adjustment_type))
def delete_manual_adjustment(self, adjustment_id: int) -> bool:
"""
删除指定ID的手动调整数据
参数:
adjustment_id: 调整记录ID
返回:
是否成功删除
"""
try:
query = 'DELETE FROM manual_adjustments WHERE id = ?'
result = self.execute_update(query, (adjustment_id,))
if result > 0:
logger.info(f"删除手动调整数据: ID={adjustment_id}")
return True
else:
logger.warning(f"未找到手动调整数据: ID={adjustment_id}")
return False
except Exception as e:
logger.error(f"删除手动调整数据失败: {e}")
return False
def clear_manual_adjustments(self, date: str) -> int:
"""
清除指定日期的所有手动调整数据
参数:
date: 日期字符串
返回:
删除的记录数
"""
query = 'DELETE FROM manual_adjustments WHERE date = ?'
result = self.execute_update(query, (date,))
logger.info(f"清除 {date} 的手动调整数据: {result} 条记录")
return result
def get_daily_data_with_adjustments(self, date: str) -> Dict[str, Any]:
"""
获取指定日期的数据包含手动调整
参数:
date: 日期字符串
返回:
包含调整的每日数据字典
"""
try:
# 获取原始数据
logs = self.query_by_date(date)
# 获取手动调整数据
adjustments = self.get_manual_adjustments(date)
# 按船名汇总原始数据
ships: Dict[str, Dict[str, Any]] = {}
for log in logs:
ship = log['ship_name']
if ship not in ships:
ships[ship] = {
'teu': 0,
'twenty_feet': 0,
'forty_feet': 0,
'adjustments': []
}
if log.get('teu'):
ships[ship]['teu'] += log['teu']
if log.get('twenty_feet'):
ships[ship]['twenty_feet'] += log['twenty_feet']
if log.get('forty_feet'):
ships[ship]['forty_feet'] += log['forty_feet']
# 应用调整数据
total_adjustment_teu = 0
for adj in adjustments:
ship = adj['ship_name']
if ship not in ships:
ships[ship] = {
'teu': 0,
'twenty_feet': 0,
'forty_feet': 0,
'adjustments': []
}
# 记录调整
ships[ship]['adjustments'].append(adj)
# 根据调整类型计算
if adj['adjustment_type'] == 'add':
ships[ship]['teu'] += adj['teu']
ships[ship]['twenty_feet'] += adj['twenty_feet']
ships[ship]['forty_feet'] += adj['forty_feet']
total_adjustment_teu += adj['teu']
elif adj['adjustment_type'] == 'exclude':
ships[ship]['teu'] -= adj['teu']
ships[ship]['twenty_feet'] -= adj['twenty_feet']
ships[ship]['forty_feet'] -= adj['forty_feet']
total_adjustment_teu -= adj['teu']
# 计算总TEU
total_teu = sum(ship_data['teu'] for ship_data in ships.values())
return {
'date': date,
'ships': ships,
'total_teu': total_teu,
'ship_count': len(ships),
'adjustments': adjustments,
'total_adjustment_teu': total_adjustment_teu
}
except Exception as e:
logger.error(f"获取包含调整的每日数据失败: {date}, 错误: {e}")
return {
'date': date,
'ships': {},
'total_teu': 0,
'ship_count': 0,
'adjustments': [],
'total_adjustment_teu': 0
}
if __name__ == '__main__':
# 测试代码
db = DailyLogsDatabase()
# 测试插入
test_log = {
'date': '2025-12-30',
'shift': '白班',
'ship_name': '测试船',
'teu': 100,
'efficiency': 3.5,
'vehicles': 5
}
success = db.insert(test_log)
print(f"插入测试: {'成功' if success else '失败'}")
# 测试查询
logs = db.query_by_date('2025-12-30')
print(f"查询结果: {len(logs)} 条记录")
# 测试统计
stats = db.get_stats()
print(f"统计信息: {stats}")
# 测试未统计数据
db.insert_unaccounted('2025-12', 118, '测试备注')
unaccounted = db.get_unaccounted('2025-12')
print(f"未统计数据: {unaccounted}TEU")
# 清理测试数据
db.delete_by_date('2025-12-30')
print("测试数据已清理")