Add README and monthly unaccounted data feature

This commit is contained in:
2025-12-29 01:03:54 +08:00
parent 3b60ae9ecf
commit 283a035ab1
7 changed files with 1363 additions and 17 deletions

View File

@@ -49,31 +49,80 @@ class DailyLogsDatabase:
efficiency REAL,
vehicles INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name)
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 检查是否需要迁移旧表结构
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='daily_handover_logs'")
table_sql = cursor.fetchone()[0]
if 'UNIQUE' not in table_sql:
# 旧表结构,需要迁移
print("检测到旧表结构,正在迁移...")
# 重命名旧表
cursor.execute('ALTER TABLE daily_handover_logs RENAME TO daily_handover_logs_old')
# 创建新表
cursor.execute('''
CREATE TABLE daily_handover_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
shift TEXT NOT NULL,
ship_name TEXT NOT NULL,
teu INTEGER,
efficiency REAL,
vehicles INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, shift, ship_name) ON CONFLICT REPLACE
)
''')
# 复制数据(忽略重复)
cursor.execute('''
INSERT OR IGNORE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles, created_at)
SELECT date, shift, ship_name, teu, efficiency, vehicles, created_at
FROM daily_handover_logs_old
''')
# 删除旧表
cursor.execute('DROP TABLE daily_handover_logs_old')
print("迁移完成!")
# 索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON daily_handover_logs(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ship ON daily_handover_logs(ship_name)')
# 创建未统计月报数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS monthly_unaccounted (
id INTEGER PRIMARY KEY AUTOINCREMENT,
year_month TEXT NOT NULL UNIQUE,
teu INTEGER NOT NULL,
note TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def insert(self, log: Dict) -> bool:
"""插入单条记录"""
"""插入记录(存在则更新,不存在则插入)"""
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO daily_handover_logs
(date, shift, ship_name, teu, efficiency, vehicles)
VALUES (?, ?, ?, ?, ?, ?)
(date, shift, ship_name, teu, efficiency, vehicles, created_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
log['date'], log['shift'], log['ship_name'],
log.get('teu'), log.get('efficiency'), log.get('vehicles')
))
self.conn.commit()
return True
except sqlite3.Error:
except sqlite3.Error as e:
print(f"数据库错误: {e}")
return False
def insert_many(self, logs: List[Dict]) -> int:
@@ -130,6 +179,31 @@ class DailyLogsDatabase:
'date_range': {'start': date_range[0], 'end': date_range[1]}
}
def insert_unaccounted(self, year_month: str, teu: int, note: str = '') -> bool:
"""插入未统计数据"""
try:
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO monthly_unaccounted
(year_month, teu, note, created_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
''', (year_month, teu, note))
self.conn.commit()
return True
except sqlite3.Error as e:
print(f"数据库错误: {e}")
return False
def get_unaccounted(self, year_month: str) -> int:
"""获取指定月份的未统计数据"""
cursor = self.conn.cursor()
cursor.execute(
'SELECT teu FROM monthly_unaccounted WHERE year_month = ?',
(year_month,)
)
result = cursor.fetchone()
return result[0] if result else 0
def close(self):
"""关闭连接"""
if self.conn:

View File

@@ -85,7 +85,7 @@ class HandoverLogParser:
shift_start = block.find(shift_pattern) + len(shift_pattern)
# 找到下一个班次注意事项
# 找到下一个班次作为边界,不限制"注意事项"
next_pos = len(block)
for next_shift in ['白班', '夜班']:
if next_shift != shift:
@@ -93,10 +93,6 @@ class HandoverLogParser:
if pos != -1 and pos < next_pos:
next_pos = pos
notes_pos = block.find('注意事项:', shift_start)
if notes_pos != -1 and notes_pos < next_pos:
next_pos = notes_pos
shift_content = block[shift_start:next_pos]
self._parse_ships(shift_content, date, shift, logs)
@@ -109,15 +105,17 @@ class HandoverLogParser:
continue
cleaned = part.replace('\xa0', ' ').strip()
ship_match = re.search(r'#\s+(\S+)', cleaned)
# 匹配 "xxx# 船名" 格式(船号和船名分开)
ship_match = re.search(r'(\d+)#\s*(\S+)', cleaned)
if not ship_match:
continue
ship_name = ship_match.group(1)
ship_name = f"{ship_match.group(1)}#{ship_match.group(2)}"
vehicles_match = re.search(r'上场车辆数:(\d+)', cleaned)
teu_eff_match = re.search(
r'作业量/效率:(\d+)TEU[,\s]+([\d.]+)循环/车/小时', cleaned
r'作业量/效率:(\d+)TEU[,\s]*', cleaned
)
log = ShipLog(
@@ -125,7 +123,7 @@ class HandoverLogParser:
shift=shift,
ship_name=ship_name,
teu=int(teu_eff_match.group(1)) if teu_eff_match else None,
efficiency=float(teu_eff_match.group(2)) if teu_eff_match else None,
efficiency=None,
vehicles=int(vehicles_match.group(1)) if vehicles_match else None
)
logs.append(log)

182
src/report.py Normal file
View File

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
日报生成模块
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.database import DailyLogsDatabase
class DailyReportGenerator:
"""每日作业报告生成器"""
DAILY_TARGET = 300 # 每日目标作业量
def __init__(self, db_path: str = 'data/daily_logs.db'):
"""初始化"""
self.db = DailyLogsDatabase(db_path)
def get_latest_date(self) -> str:
"""获取数据库中最新的日期"""
logs = self.db.query_all(limit=1)
if logs:
return logs[0]['date']
return datetime.now().strftime('%Y-%m-%d')
def get_daily_data(self, date: str) -> Dict:
"""获取指定日期的数据"""
logs = self.db.query_by_date(date)
# 按船名汇总
ships = {}
for log in logs:
ship = log['ship_name']
if ship not in ships:
ships[ship] = 0
if log.get('teu'):
ships[ship] += log['teu']
return {
'date': date,
'ships': ships,
'total_teu': sum(ships.values()),
'ship_count': len(ships)
}
def get_monthly_stats(self, date: str) -> Dict:
"""获取月度统计(截止到指定日期)"""
year_month = date[:7] # YYYY-MM
target_date = datetime.strptime(date, '%Y-%m-%d').date()
logs = self.db.query_all(limit=10000)
# 只统计当月且在指定日期之前的数据
monthly_logs = [
log for log in logs
if log['date'].startswith(year_month)
and datetime.strptime(log['date'], '%Y-%m-%d').date() <= target_date
]
# 按日期汇总
daily_totals = {}
for log in monthly_logs:
d = log['date']
if d not in daily_totals:
daily_totals[d] = 0
if log.get('teu'):
daily_totals[d] += log['teu']
# 计算当月天数(已过的天数)
current_date = datetime.strptime(date, '%Y-%m-%d')
if current_date.day == 1:
days_passed = 1
else:
days_passed = current_date.day
# 获取未统计数据
unaccounted = self.db.get_unaccounted(year_month)
planned = days_passed * self.DAILY_TARGET
actual = sum(daily_totals.values()) + unaccounted
return {
'year_month': year_month,
'days_passed': days_passed,
'planned': planned,
'actual': actual,
'unaccounted': unaccounted,
'completion': round(actual / planned * 100, 2) if planned > 0 else 0,
'daily_totals': daily_totals
}
def get_shift_personnel(self, date: str) -> Dict:
"""获取班次人员(从日志文本中解析,需要配合 parser 使用)"""
# 目前数据库中没有人员信息,返回空
# 可以后续扩展添加人员追踪功能
return {
'day_shift': '',
'night_shift': '',
'duty_phone': '13107662315'
}
def generate_report(self, date: str = None) -> str:
"""生成日报"""
if not date:
date = self.get_latest_date()
# 转换日期格式 2025-12-28 -> 12/28同时确保查询格式正确
try:
# 尝试解析各种日期格式
parsed = datetime.strptime(date, '%Y-%m-%d')
display_date = parsed.strftime('%m/%d')
query_date = parsed.strftime('%Y-%m-%d') # 标准化为双数字格式
except ValueError:
# 如果已经是标准格式,直接使用
display_date = datetime.strptime(date, '%Y-%m-%d').strftime('%m/%d')
query_date = date
daily_data = self.get_daily_data(query_date)
monthly_data = self.get_monthly_stats(query_date)
personnel = self.get_shift_personnel(query_date)
# 月度统计
month_display = date[5:7] + '/' + date[:4] # MM/YYYY
lines = []
lines.append(f"日期:{display_date}")
lines.append("")
# 船次信息(紧凑格式,不留空行)
ship_lines = []
for ship, teu in sorted(daily_data['ships'].items(), key=lambda x: -x[1]):
ship_lines.append(f"船名:{ship}")
ship_lines.append(f"作业量:{teu}TEU")
lines.extend(ship_lines)
lines.append("")
# 当日实际作业量
lines.append(f"当日实际作业量:{daily_data['total_teu']}TEU")
lines.append("")
# 月度统计
lines.append(f"当月计划作业量:{monthly_data['planned']}TEU (用天数*{self.DAILY_TARGET}TEU)")
if monthly_data.get('unaccounted', 0) > 0:
lines.append(f"当月未统计数据:{monthly_data['unaccounted']}TEU")
lines.append(f"当月实际作业量:{monthly_data['actual']}TEU")
lines.append(f"当月完成比例:{monthly_data['completion']}%")
lines.append("")
# 人员信息(需要配合 Confluence 日志中的班次人员信息)
day_personnel = personnel['day_shift']
night_personnel = personnel['night_shift']
duty_phone = personnel['duty_phone']
# 班次日期使用次日
next_day = (parsed + timedelta(days=1)).strftime('%m/%d')
lines.append(f"{next_day} 白班人员:{day_personnel}")
lines.append(f"{next_day} 夜班人员:{night_personnel}")
lines.append(f"24小时值班手机{duty_phone}")
return "\n".join(lines)
def print_report(self, date: str = None):
"""打印日报"""
report = self.generate_report(date)
print(report)
return report
def close(self):
"""关闭数据库连接"""
self.db.close()
if __name__ == '__main__':
generator = DailyReportGenerator()
generator.print_report()
generator.close()