Files
Orbitin/src/database/daily_logs.py
qichi.liang 929c4b836f feat: 添加尺寸箱量解析和显示功能
- 更新ShipLog数据类以支持20尺和40尺箱量字段
- 修改日志解析器提取尺寸箱量数据(支持格式如'95TEU(20尺*95)'和'90TEU(20尺*52 40尺*19)')
- 更新数据库表结构存储尺寸箱量
- 修改报告生成器在日报中显示尺寸箱量信息
- 修复解析器分隔符处理逻辑
- 确保二次靠泊记录尺寸箱量正确合并
2025-12-31 05:21:16 +08:00

341 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
每日交接班日志数据库模块
基于新的数据库基类重构
"""
from typing import List, Dict, Optional, Any
from datetime import datetime
from src.database.base import DatabaseBase
from src.logging_config import get_logger
logger = get_logger(__name__)
class DailyLogsDatabase(DatabaseBase):
"""每日交接班日志数据库"""
def __init__(self, db_path: Optional[str] = None):
"""
初始化数据库
参数:
db_path: 数据库文件路径如果为None则使用默认配置
"""
super().__init__(db_path)
self._init_schema()
def _init_schema(self):
"""初始化表结构"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建每日交接班日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_handover_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
shift TEXT NOT NULL,
ship_name TEXT NOT NULL,
teu INTEGER,
efficiency REAL,
vehicles INTEGER,
twenty_feet INTEGER, -- 20尺箱量
forty_feet INTEGER, -- 40尺箱量
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 检查是否需要迁移旧表结构
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='daily_handover_logs'")
table_sql = cursor.fetchone()[0]
if 'twenty_feet' not in table_sql or 'forty_feet' not in table_sql:
logger.warning("检测到旧表结构,正在迁移以添加尺寸箱量字段...")
# 重命名旧表
cursor.execute('ALTER TABLE daily_handover_logs RENAME TO daily_handover_logs_old')
# 创建新表
cursor.execute('''
CREATE TABLE daily_handover_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
shift TEXT NOT NULL,
ship_name TEXT NOT NULL,
teu INTEGER,
efficiency REAL,
vehicles INTEGER,
twenty_feet INTEGER, -- 20尺箱量
forty_feet INTEGER, -- 40尺箱量
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 复制数据(忽略重复)
cursor.execute('''
INSERT OR IGNORE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles, created_at)
SELECT date, shift, ship_name, teu, efficiency, vehicles, created_at
FROM daily_handover_logs_old
''')
# 删除旧表
cursor.execute('DROP TABLE daily_handover_logs_old')
logger.info("迁移完成!已添加尺寸箱量字段")
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON daily_handover_logs(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ship ON daily_handover_logs(ship_name)')
# 创建未统计月报数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS monthly_unaccounted (
id INTEGER PRIMARY KEY AUTOINCREMENT,
year_month TEXT NOT NULL UNIQUE,
teu INTEGER NOT NULL,
note TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
logger.debug("数据库表结构初始化完成")
def insert(self, log: Dict[str, Any]) -> bool:
"""
插入记录(存在则替换,不存在则插入)
参数:
log: 日志记录字典
返回:
是否成功
"""
try:
query = '''
INSERT OR REPLACE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles, twenty_feet, forty_feet, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
'''
params = (
log['date'], log['shift'], log['ship_name'],
log.get('teu'), log.get('efficiency'), log.get('vehicles'),
log.get('twenty_feet'), log.get('forty_feet')
)
self.execute_update(query, params)
logger.debug(f"插入记录: {log['date']} {log['shift']} {log['ship_name']}")
return True
except Exception as e:
logger.error(f"插入记录失败: {e}, 记录: {log}")
return False
def insert_many(self, logs: List[Dict[str, Any]]) -> int:
"""
批量插入
参数:
logs: 日志记录列表
返回:
成功插入的数量
"""
count = 0
for log in logs:
if self.insert(log):
count += 1
logger.info(f"批量插入完成,成功 {count}/{len(logs)} 条记录")
return count
def query_by_date(self, date: str) -> List[Dict[str, Any]]:
"""
按日期查询
参数:
date: 日期字符串
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
WHERE date = ? ORDER BY shift, ship_name
'''
return self.execute_query(query, (date,))
def query_by_ship(self, ship_name: str) -> List[Dict[str, Any]]:
"""
按船名查询
参数:
ship_name: 船名
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
WHERE ship_name LIKE ? ORDER BY date DESC
'''
return self.execute_query(query, (f'%{ship_name}%',))
def query_all(self, limit: int = 1000) -> List[Dict[str, Any]]:
"""
查询所有记录
参数:
limit: 限制返回数量
返回:
日志记录列表
"""
query = '''
SELECT * FROM daily_handover_logs
ORDER BY date DESC, shift LIMIT ?
'''
return self.execute_query(query, (limit,))
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
返回:
统计信息字典
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM daily_handover_logs')
total = cursor.fetchone()[0]
cursor.execute('SELECT DISTINCT ship_name FROM daily_handover_logs')
ships = [row[0] for row in cursor.fetchall()]
cursor.execute('SELECT MIN(date), MAX(date) FROM daily_handover_logs')
date_range = cursor.fetchone()
return {
'total': total,
'ships': ships,
'date_range': {'start': date_range[0], 'end': date_range[1]}
}
def get_ships_with_monthly_teu(self, year_month: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取所有船只及其当月TEU总量
参数:
year_month: 年月字符串,格式 "2025-12"如果为None则统计所有
返回:
船只统计列表
"""
if year_month:
query = '''
SELECT ship_name, SUM(teu) as monthly_teu
FROM daily_handover_logs
WHERE date LIKE ?
GROUP BY ship_name
ORDER BY monthly_teu DESC
'''
return self.execute_query(query, (f'{year_month}%',))
else:
query = '''
SELECT ship_name, SUM(teu) as monthly_teu
FROM daily_handover_logs
GROUP BY ship_name
ORDER BY monthly_teu DESC
'''
return self.execute_query(query)
def insert_unaccounted(self, year_month: str, teu: int, note: str = '') -> bool:
"""
插入未统计数据
参数:
year_month: 年月字符串,格式 "2025-12"
teu: 未统计TEU数量
note: 备注
返回:
是否成功
"""
try:
query = '''
INSERT OR REPLACE INTO monthly_unaccounted
(year_month, teu, note, created_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
'''
self.execute_update(query, (year_month, teu, note))
logger.info(f"插入未统计数据: {year_month} {teu}TEU")
return True
except Exception as e:
logger.error(f"插入未统计数据失败: {e}")
return False
def get_unaccounted(self, year_month: str) -> int:
"""
获取指定月份的未统计数据
参数:
year_month: 年月字符串,格式 "2025-12"
返回:
未统计TEU数量
"""
query = 'SELECT teu FROM monthly_unaccounted WHERE year_month = ?'
result = self.execute_query(query, (year_month,))
return result[0]['teu'] if result else 0
def delete_by_date(self, date: str) -> int:
"""
删除指定日期的记录
参数:
date: 日期字符串
返回:
删除的记录数
"""
query = 'DELETE FROM daily_handover_logs WHERE date = ?'
return self.execute_update(query, (date,))
if __name__ == '__main__':
# 测试代码
db = DailyLogsDatabase()
# 测试插入
test_log = {
'date': '2025-12-30',
'shift': '白班',
'ship_name': '测试船',
'teu': 100,
'efficiency': 3.5,
'vehicles': 5
}
success = db.insert(test_log)
print(f"插入测试: {'成功' if success else '失败'}")
# 测试查询
logs = db.query_by_date('2025-12-30')
print(f"查询结果: {len(logs)} 条记录")
# 测试统计
stats = db.get_stats()
print(f"统计信息: {stats}")
# 测试未统计数据
db.insert_unaccounted('2025-12', 118, '测试备注')
unaccounted = db.get_unaccounted('2025-12')
print(f"未统计数据: {unaccounted}TEU")
# 清理测试数据
db.delete_by_date('2025-12-30')
print("测试数据已清理")