#!/usr/bin/env python3 """ 日报生成模块 """ from datetime import datetime, timedelta from typing import Dict, List, Optional import sys import os import logging # 添加项目根目录到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.database import DailyLogsDatabase from src.feishu_v2 import FeishuScheduleManagerV2 as FeishuScheduleManager logger = logging.getLogger(__name__) class DailyReportGenerator: """每日作业报告生成器""" DAILY_TARGET = 300 # 每日目标作业量 def __init__(self, db_path: str = 'data/daily_logs.db'): """初始化""" self.db = DailyLogsDatabase(db_path) def get_latest_date(self) -> str: """获取数据库中最新的日期""" logs = self.db.query_all(limit=1) if logs: return logs[0]['date'] return datetime.now().strftime('%Y-%m-%d') def get_daily_data(self, date: str) -> Dict: """获取指定日期的数据""" logs = self.db.query_by_date(date) # 按船名汇总 ships = {} for log in logs: ship = log['ship_name'] if ship not in ships: ships[ship] = 0 if log.get('teu'): ships[ship] += log['teu'] return { 'date': date, 'ships': ships, 'total_teu': sum(ships.values()), 'ship_count': len(ships) } def get_monthly_stats(self, date: str) -> Dict: """获取月度统计(截止到指定日期)""" year_month = date[:7] # YYYY-MM target_date = datetime.strptime(date, '%Y-%m-%d').date() logs = self.db.query_all(limit=10000) # 只统计当月且在指定日期之前的数据 monthly_logs = [ log for log in logs if log['date'].startswith(year_month) and datetime.strptime(log['date'], '%Y-%m-%d').date() <= target_date ] # 按日期汇总 daily_totals = {} for log in monthly_logs: d = log['date'] if d not in daily_totals: daily_totals[d] = 0 if log.get('teu'): daily_totals[d] += log['teu'] # 计算当月天数(已过的天数) current_date = datetime.strptime(date, '%Y-%m-%d') if current_date.day == 1: days_passed = 1 else: days_passed = current_date.day # 获取未统计数据 unaccounted = self.db.get_unaccounted(year_month) planned = days_passed * self.DAILY_TARGET actual = sum(daily_totals.values()) + unaccounted return { 'year_month': year_month, 'days_passed': days_passed, 'planned': planned, 'actual': actual, 'unaccounted': unaccounted, 'completion': round(actual / planned * 100, 2) if planned > 0 else 0, 'daily_totals': daily_totals } def get_shift_personnel(self, date: str) -> Dict: """ 获取班次人员(从飞书排班表获取) 注意:日报中显示的是次日的班次人员,所以需要获取 date+1 的排班 例如:生成 12/29 的日报,显示的是 12/30 的人员 """ try: # 初始化飞书排班管理器 manager = FeishuScheduleManager() # 计算次日日期(日报中显示的是次日班次) parsed_date = datetime.strptime(date, '%Y-%m-%d') tomorrow = (parsed_date + timedelta(days=1)).strftime('%Y-%m-%d') logger.info(f"获取 {date} 日报的班次人员,对应排班表日期: {tomorrow}") # 获取次日的排班信息(使用缓存) schedule = manager.get_schedule_for_date(tomorrow) # 如果从飞书获取到数据,使用飞书数据 if schedule.get('day_shift') or schedule.get('night_shift'): return { 'day_shift': schedule.get('day_shift', ''), 'night_shift': schedule.get('night_shift', ''), 'duty_phone': '13107662315' } # 如果飞书数据为空,返回空值 logger.warning(f"无法从飞书获取 {tomorrow} 的排班信息") return { 'day_shift': '', 'night_shift': '', 'duty_phone': '13107662315' } except Exception as e: logger.error(f"获取排班信息失败: {e}") # 降级处理:返回空值 return { 'day_shift': '', 'night_shift': '', 'duty_phone': '13107662315' } def generate_report(self, date: str = None) -> str: """生成日报""" if not date: date = self.get_latest_date() # 转换日期格式 2025-12-28 -> 12/28,同时确保查询格式正确 try: # 尝试解析各种日期格式 parsed = datetime.strptime(date, '%Y-%m-%d') display_date = parsed.strftime('%m/%d') query_date = parsed.strftime('%Y-%m-%d') # 标准化为双数字格式 except ValueError: # 如果已经是标准格式,直接使用 display_date = datetime.strptime(date, '%Y-%m-%d').strftime('%m/%d') query_date = date daily_data = self.get_daily_data(query_date) monthly_data = self.get_monthly_stats(query_date) personnel = self.get_shift_personnel(query_date) # 月度统计 month_display = date[5:7] + '/' + date[:4] # MM/YYYY lines = [] lines.append(f"日期:{display_date}") lines.append("") # 船次信息(紧凑格式,不留空行) ship_lines = [] for ship, teu in sorted(daily_data['ships'].items(), key=lambda x: -x[1]): ship_lines.append(f"船名:{ship}") ship_lines.append(f"作业量:{teu}TEU") lines.extend(ship_lines) lines.append("") # 当日实际作业量 lines.append(f"当日实际作业量:{daily_data['total_teu']}TEU") # 月度统计 lines.append(f"当月计划作业量:{monthly_data['planned']}TEU (用天数*{self.DAILY_TARGET}TEU)") lines.append(f"当月实际作业量:{monthly_data['actual']}TEU") lines.append(f"当月完成比例:{monthly_data['completion']}%") lines.append("") # 人员信息(需要配合 Confluence 日志中的班次人员信息) day_personnel = personnel['day_shift'] night_personnel = personnel['night_shift'] duty_phone = personnel['duty_phone'] # 班次日期使用次日 next_day = (parsed + timedelta(days=1)).strftime('%m/%d') lines.append(f"{next_day} 白班人员:{day_personnel}") lines.append(f"{next_day} 夜班人员:{night_personnel}") lines.append(f"24小时值班手机:{duty_phone}") return "\n".join(lines) def print_report(self, date: str = None): """打印日报""" report = self.generate_report(date) print(report) return report def close(self): """关闭数据库连接""" self.db.close() if __name__ == '__main__': generator = DailyReportGenerator() generator.print_report() generator.close()