#!/usr/bin/env python3 """ 每日交接班日志数据库模块 基于新的数据库基类重构 """ from typing import List, Dict, Optional, Any from datetime import datetime from src.database.base import DatabaseBase from src.logging_config import get_logger logger = get_logger(__name__) class DailyLogsDatabase(DatabaseBase): """每日交接班日志数据库""" def __init__(self, db_path: Optional[str] = None): """ 初始化数据库 参数: db_path: 数据库文件路径,如果为None则使用默认配置 """ super().__init__(db_path) self._init_schema() def _init_schema(self): """初始化表结构""" with self.get_connection() as conn: cursor = conn.cursor() # 创建每日交接班日志表 cursor.execute(''' CREATE TABLE IF NOT EXISTS daily_handover_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, shift TEXT NOT NULL, ship_name TEXT NOT NULL, teu INTEGER, efficiency REAL, vehicles INTEGER, twenty_feet INTEGER, -- 20尺箱量 forty_feet INTEGER, -- 40尺箱量 created_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE ) ''') # 检查是否需要迁移旧表结构 cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='daily_handover_logs'") table_sql = cursor.fetchone()[0] if 'twenty_feet' not in table_sql or 'forty_feet' not in table_sql: logger.warning("检测到旧表结构,正在迁移以添加尺寸箱量字段...") # 重命名旧表 cursor.execute('ALTER TABLE daily_handover_logs RENAME TO daily_handover_logs_old') # 创建新表 cursor.execute(''' CREATE TABLE daily_handover_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, shift TEXT NOT NULL, ship_name TEXT NOT NULL, teu INTEGER, efficiency REAL, vehicles INTEGER, twenty_feet INTEGER, -- 20尺箱量 forty_feet INTEGER, -- 40尺箱量 created_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE ) ''') # 复制数据(忽略重复) cursor.execute(''' INSERT OR IGNORE INTO daily_handover_logs (date, shift, ship_name, teu, efficiency, vehicles, created_at) SELECT date, shift, ship_name, teu, efficiency, vehicles, created_at FROM daily_handover_logs_old ''') # 删除旧表 cursor.execute('DROP TABLE daily_handover_logs_old') logger.info("迁移完成!已添加尺寸箱量字段") # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON daily_handover_logs(date)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ship ON daily_handover_logs(ship_name)') # 创建未统计月报数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS monthly_unaccounted ( id INTEGER PRIMARY KEY AUTOINCREMENT, year_month TEXT NOT NULL UNIQUE, teu INTEGER NOT NULL, note TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # 创建手动调整数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS manual_adjustments ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, -- 调整适用的日期 ship_name TEXT NOT NULL, -- 船名 teu INTEGER NOT NULL, -- TEU数量 twenty_feet INTEGER DEFAULT 0, -- 20尺箱量 forty_feet INTEGER DEFAULT 0, -- 40尺箱量 adjustment_type TEXT NOT NULL, -- 'add' 或 'exclude' note TEXT, -- 备注 created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_manual_date ON manual_adjustments(date)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_manual_type ON manual_adjustments(adjustment_type)') # 创建Confluence页面ID映射表 cursor.execute(''' CREATE TABLE IF NOT EXISTS confluence_pages ( id INTEGER PRIMARY KEY AUTOINCREMENT, month_key TEXT NOT NULL UNIQUE, -- 月份键,格式:'2025-12', '2026-01' page_id TEXT NOT NULL, -- Confluence页面ID page_title TEXT, -- 页面标题(可选) created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_confluence_month ON confluence_pages(month_key)') conn.commit() logger.debug("数据库表结构初始化完成") def insert(self, log: Dict[str, Any]) -> bool: """ 插入记录(存在则替换,不存在则插入) 参数: log: 日志记录字典 返回: 是否成功 """ try: query = ''' INSERT OR REPLACE INTO daily_handover_logs (date, shift, ship_name, teu, efficiency, vehicles, twenty_feet, forty_feet, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''' params = ( log['date'], log['shift'], log['ship_name'], log.get('teu'), log.get('efficiency'), log.get('vehicles'), log.get('twenty_feet'), log.get('forty_feet') ) self.execute_update(query, params) logger.debug(f"插入记录: {log['date']} {log['shift']} {log['ship_name']}") return True except Exception as e: logger.error(f"插入记录失败: {e}, 记录: {log}") return False def insert_many(self, logs: List[Dict[str, Any]]) -> int: """ 批量插入 参数: logs: 日志记录列表 返回: 成功插入的数量 """ count = 0 for log in logs: if self.insert(log): count += 1 logger.info(f"批量插入完成,成功 {count}/{len(logs)} 条记录") return count def query_by_date(self, date: str) -> List[Dict[str, Any]]: """ 按日期查询 参数: date: 日期字符串 返回: 日志记录列表 """ query = ''' SELECT * FROM daily_handover_logs WHERE date = ? ORDER BY shift, ship_name ''' return self.execute_query(query, (date,)) def query_by_ship(self, ship_name: str) -> List[Dict[str, Any]]: """ 按船名查询 参数: ship_name: 船名 返回: 日志记录列表 """ query = ''' SELECT * FROM daily_handover_logs WHERE ship_name LIKE ? ORDER BY date DESC ''' return self.execute_query(query, (f'%{ship_name}%',)) def query_all(self, limit: int = 1000) -> List[Dict[str, Any]]: """ 查询所有记录 参数: limit: 限制返回数量 返回: 日志记录列表 """ query = ''' SELECT * FROM daily_handover_logs ORDER BY date DESC, shift LIMIT ? ''' return self.execute_query(query, (limit,)) def get_stats(self) -> Dict[str, Any]: """ 获取统计信息 返回: 统计信息字典 """ with self.get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM daily_handover_logs') total = cursor.fetchone()[0] cursor.execute('SELECT DISTINCT ship_name FROM daily_handover_logs') ships = [row[0] for row in cursor.fetchall()] cursor.execute('SELECT MIN(date), MAX(date) FROM daily_handover_logs') date_range = cursor.fetchone() return { 'total': total, 'ships': ships, 'date_range': {'start': date_range[0], 'end': date_range[1]} } def get_ships_with_monthly_teu(self, year_month: Optional[str] = None) -> List[Dict[str, Any]]: """ 获取所有船只及其当月TEU总量 参数: year_month: 年月字符串,格式 "2025-12",如果为None则统计所有 返回: 船只统计列表 """ if year_month: query = ''' SELECT ship_name, SUM(teu) as monthly_teu FROM daily_handover_logs WHERE date LIKE ? GROUP BY ship_name ORDER BY monthly_teu DESC ''' return self.execute_query(query, (f'{year_month}%',)) else: query = ''' SELECT ship_name, SUM(teu) as monthly_teu FROM daily_handover_logs GROUP BY ship_name ORDER BY monthly_teu DESC ''' return self.execute_query(query) def insert_unaccounted(self, year_month: str, teu: int, note: str = '') -> bool: """ 插入未统计数据 参数: year_month: 年月字符串,格式 "2025-12" teu: 未统计TEU数量 note: 备注 返回: 是否成功 """ try: query = ''' INSERT OR REPLACE INTO monthly_unaccounted (year_month, teu, note, created_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ''' self.execute_update(query, (year_month, teu, note)) logger.info(f"插入未统计数据: {year_month} {teu}TEU") return True except Exception as e: logger.error(f"插入未统计数据失败: {e}") return False def get_unaccounted(self, year_month: str) -> int: """ 获取指定月份的未统计数据 参数: year_month: 年月字符串,格式 "2025-12" 返回: 未统计TEU数量 """ query = 'SELECT teu FROM monthly_unaccounted WHERE year_month = ?' result = self.execute_query(query, (year_month,)) return result[0]['teu'] if result else 0 def reduce_unaccounted(self, year_month: str, teu_to_reduce: int) -> bool: """ 减少指定月份的未统计数据 参数: year_month: 年月字符串,格式 "2025-12" teu_to_reduce: 要减少的TEU数量 返回: 是否成功 """ try: # 先获取当前值 current_teu = self.get_unaccounted(year_month) # 计算新值(允许负数) new_teu = current_teu - teu_to_reduce if new_teu == 0: # 如果减少后等于0,则删除记录 query = 'DELETE FROM monthly_unaccounted WHERE year_month = ?' self.execute_update(query, (year_month,)) logger.info(f"减少未统计数据后删除记录: {year_month},原值: {current_teu}TEU,减少: {teu_to_reduce}TEU,新值: 0TEU") else: # 更新记录(允许负数) query = ''' INSERT OR REPLACE INTO monthly_unaccounted (year_month, teu, note, created_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ''' # 保留原有备注 note_query = 'SELECT note FROM monthly_unaccounted WHERE year_month = ?' note_result = self.execute_query(note_query, (year_month,)) note = note_result[0]['note'] if note_result else '' self.execute_update(query, (year_month, new_teu, note)) logger.info(f"减少未统计数据: {year_month},原值: {current_teu}TEU,减少: {teu_to_reduce}TEU,新值: {new_teu}TEU") return True except Exception as e: logger.error(f"减少未统计数据失败: {e}") return False def delete_unaccounted(self, year_month: str) -> bool: """ 删除指定月份的未统计数据 参数: year_month: 年月字符串,格式 "2025-12" 返回: 是否成功删除(如果记录不存在也返回True) """ try: query = 'DELETE FROM monthly_unaccounted WHERE year_month = ?' result = self.execute_update(query, (year_month,)) if result > 0: logger.info(f"删除未统计数据: {year_month}") return True else: logger.warning(f"未找到 {year_month} 月的未统计数据") return True # 记录不存在也算成功 except Exception as e: logger.error(f"删除未统计数据失败: {e}") return False def delete_by_date(self, date: str) -> int: """ 删除指定日期的记录 参数: date: 日期字符串 返回: 删除的记录数 """ query = 'DELETE FROM daily_handover_logs WHERE date = ?' return self.execute_update(query, (date,)) def insert_manual_adjustment(self, date: str, ship_name: str, teu: int, twenty_feet: int = 0, forty_feet: int = 0, adjustment_type: str = 'add', note: str = '') -> bool: """ 插入手动调整数据 参数: date: 日期字符串 ship_name: 船名 teu: TEU数量 twenty_feet: 20尺箱量 forty_feet: 40尺箱量 adjustment_type: 调整类型 'add' 或 'exclude' note: 备注 返回: 是否成功 """ try: query = ''' INSERT INTO manual_adjustments (date, ship_name, teu, twenty_feet, forty_feet, adjustment_type, note, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''' params = (date, ship_name, teu, twenty_feet, forty_feet, adjustment_type, note) self.execute_update(query, params) logger.info(f"插入手动调整数据: {date} {ship_name} {teu}TEU ({adjustment_type})") return True except Exception as e: logger.error(f"插入手动调整数据失败: {e}") return False def get_manual_adjustments(self, date: str) -> List[Dict[str, Any]]: """ 获取指定日期的所有手动调整数据 参数: date: 日期字符串 返回: 手动调整数据列表 """ query = ''' SELECT * FROM manual_adjustments WHERE date = ? ORDER BY created_at DESC ''' return self.execute_query(query, (date,)) def get_manual_adjustments_by_type(self, date: str, adjustment_type: str) -> List[Dict[str, Any]]: """ 获取指定日期和类型的调整数据 参数: date: 日期字符串 adjustment_type: 调整类型 'add' 或 'exclude' 返回: 手动调整数据列表 """ query = ''' SELECT * FROM manual_adjustments WHERE date = ? AND adjustment_type = ? ORDER BY created_at DESC ''' return self.execute_query(query, (date, adjustment_type)) def delete_manual_adjustment(self, adjustment_id: int) -> bool: """ 删除指定ID的手动调整数据 参数: adjustment_id: 调整记录ID 返回: 是否成功删除 """ try: query = 'DELETE FROM manual_adjustments WHERE id = ?' result = self.execute_update(query, (adjustment_id,)) if result > 0: logger.info(f"删除手动调整数据: ID={adjustment_id}") return True else: logger.warning(f"未找到手动调整数据: ID={adjustment_id}") return False except Exception as e: logger.error(f"删除手动调整数据失败: {e}") return False def clear_manual_adjustments(self, date: str) -> int: """ 清除指定日期的所有手动调整数据 参数: date: 日期字符串 返回: 删除的记录数 """ query = 'DELETE FROM manual_adjustments WHERE date = ?' result = self.execute_update(query, (date,)) logger.info(f"清除 {date} 的手动调整数据: {result} 条记录") return result def get_daily_data_with_adjustments(self, date: str) -> Dict[str, Any]: """ 获取指定日期的数据(包含手动调整) 参数: date: 日期字符串 返回: 包含调整的每日数据字典 """ try: # 获取原始数据 logs = self.query_by_date(date) # 获取手动调整数据 adjustments = self.get_manual_adjustments(date) # 按船名汇总原始数据 ships: Dict[str, Dict[str, Any]] = {} for log in logs: ship = log['ship_name'] if ship not in ships: ships[ship] = { 'teu': 0, 'twenty_feet': 0, 'forty_feet': 0, 'adjustments': [] } if log.get('teu'): ships[ship]['teu'] += log['teu'] if log.get('twenty_feet'): ships[ship]['twenty_feet'] += log['twenty_feet'] if log.get('forty_feet'): ships[ship]['forty_feet'] += log['forty_feet'] # 应用调整数据 total_adjustment_teu = 0 for adj in adjustments: ship = adj['ship_name'] if ship not in ships: ships[ship] = { 'teu': 0, 'twenty_feet': 0, 'forty_feet': 0, 'adjustments': [] } # 记录调整 ships[ship]['adjustments'].append(adj) # 根据调整类型计算 if adj['adjustment_type'] == 'add': ships[ship]['teu'] += adj['teu'] ships[ship]['twenty_feet'] += adj['twenty_feet'] ships[ship]['forty_feet'] += adj['forty_feet'] total_adjustment_teu += adj['teu'] elif adj['adjustment_type'] == 'exclude': ships[ship]['teu'] -= adj['teu'] ships[ship]['twenty_feet'] -= adj['twenty_feet'] ships[ship]['forty_feet'] -= adj['forty_feet'] total_adjustment_teu -= adj['teu'] # 计算总TEU total_teu = sum(ship_data['teu'] for ship_data in ships.values()) return { 'date': date, 'ships': ships, 'total_teu': total_teu, 'ship_count': len(ships), 'adjustments': adjustments, 'total_adjustment_teu': total_adjustment_teu } except Exception as e: logger.error(f"获取包含调整的每日数据失败: {date}, 错误: {e}") return { 'date': date, 'ships': {}, 'total_teu': 0, 'ship_count': 0, 'adjustments': [], 'total_adjustment_teu': 0 } def insert_confluence_page(self, month_key: str, page_id: str, page_title: str = '') -> bool: """ 插入或更新Confluence页面ID映射 参数: month_key: 月份键,格式:'2025-12', '2026-01' page_id: Confluence页面ID page_title: 页面标题(可选) 返回: 是否成功 """ try: query = ''' INSERT OR REPLACE INTO confluence_pages (month_key, page_id, page_title, updated_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ''' params = (month_key, page_id, page_title) self.execute_update(query, params) logger.info(f"插入Confluence页面映射: {month_key} -> {page_id}") return True except Exception as e: logger.error(f"插入Confluence页面映射失败: {e}") return False def get_confluence_page(self, month_key: str) -> Optional[Dict[str, Any]]: """ 获取指定月份的Confluence页面ID映射 参数: month_key: 月份键,格式:'2025-12', '2026-01' 返回: 页面映射字典,如果不存在则返回None """ query = 'SELECT * FROM confluence_pages WHERE month_key = ?' result = self.execute_query(query, (month_key,)) return result[0] if result else None def get_all_confluence_pages(self) -> List[Dict[str, Any]]: """ 获取所有Confluence页面ID映射 返回: 页面映射列表 """ query = 'SELECT * FROM confluence_pages ORDER BY month_key DESC' return self.execute_query(query) def delete_confluence_page(self, month_key: str) -> bool: """ 删除指定月份的Confluence页面ID映射 参数: month_key: 月份键,格式:'2025-12', '2026-01' 返回: 是否成功删除 """ try: query = 'DELETE FROM confluence_pages WHERE month_key = ?' result = self.execute_update(query, (month_key,)) if result > 0: logger.info(f"删除Confluence页面映射: {month_key}") return True else: logger.warning(f"未找到Confluence页面映射: {month_key}") return False except Exception as e: logger.error(f"删除Confluence页面映射失败: {e}") return False def get_confluence_page_for_date(self, date: str) -> Optional[str]: """ 根据日期获取对应的Confluence页面ID 参数: date: 日期字符串,格式:'2025-12-31' 返回: Confluence页面ID,如果不存在则返回None """ try: # 从日期中提取年月 year_month = date[:7] # '2025-12-31' -> '2025-12' # 查询数据库 page_info = self.get_confluence_page(year_month) if page_info: return page_info['page_id'] # 如果没有找到,尝试从环境变量中获取 import os from src.config import config # 检查环境变量中的配置 env_key = f"CONFLUENCE_PAGE_{year_month.replace('-', '_')}" page_id = os.getenv(env_key) if page_id: # 保存到数据库以便下次使用 self.insert_confluence_page(year_month, page_id, f"从环境变量获取: {env_key}") return page_id # 使用默认配置 default_page_id = config.CONFLUENCE_CONTENT_ID if default_page_id: logger.warning(f"未找到 {year_month} 的Confluence页面映射,使用默认页面ID: {default_page_id}") return default_page_id return None except Exception as e: logger.error(f"获取Confluence页面ID失败: {date}, 错误: {e}") return None if __name__ == '__main__': # 测试代码 db = DailyLogsDatabase() # 测试插入 test_log = { 'date': '2025-12-30', 'shift': '白班', 'ship_name': '测试船', 'teu': 100, 'efficiency': 3.5, 'vehicles': 5 } success = db.insert(test_log) print(f"插入测试: {'成功' if success else '失败'}") # 测试查询 logs = db.query_by_date('2025-12-30') print(f"查询结果: {len(logs)} 条记录") # 测试统计 stats = db.get_stats() print(f"统计信息: {stats}") # 测试未统计数据 db.insert_unaccounted('2025-12', 118, '测试备注') unaccounted = db.get_unaccounted('2025-12') print(f"未统计数据: {unaccounted}TEU") # 清理测试数据 db.delete_by_date('2025-12-30') print("测试数据已清理")