#!/usr/bin/env python3 """ 班次交接报告生成模块 分别统计白班(08:00-20:00)和夜班(20:00-次日08:00)的作业情况。 生成格式化的交接班报告。 """ from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import os import sys import re import argparse # 加载环境变量 from dotenv import load_dotenv load_dotenv() # 添加项目根目录到路径 project_root = os.path.dirname(os.path.abspath(__file__)) if project_root not in sys.path: sys.path.insert(0, project_root) from feishu.manager import FeishuScheduleManager class ShiftReportGenerator: """班次交接报告生成器""" # 班次定义 SHIFTS = { "day": { "name": "白班", "start_hour": 8, "end_hour": 20, }, "night": { "name": "夜班", "start_hour": 20, "end_hour": 8, # 次日 }, } def __init__(self): self.feishu_manager = FeishuScheduleManager() def get_shift_time_range(self, report_date: datetime, shift_type: str) -> tuple: """ 获取班次时间范围 Args: report_date: 报告日期 shift_type: 班次类型 ("day" 或 "night") Returns: (start_time, end_time) 元组 """ shift = self.SHIFTS.get(shift_type) if not shift: raise ValueError(f"未知的班次类型: {shift_type}") start_hour = shift["start_hour"] end_hour = shift["end_hour"] if shift_type == "day": # 白班:当天 08:00 - 当天 20:00 start_time = report_date.replace( hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = report_date.replace( hour=end_hour, minute=0, second=0, microsecond=0 ) else: # 夜班:当天 20:00 - 次日 08:00 start_time = report_date.replace( hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = report_date.replace( hour=end_hour, minute=0, second=0, microsecond=0 ) + timedelta(days=1) return start_time, end_time def get_shift_personnel(self, report_date: datetime, shift_type: str) -> str: """ 获取班次人员 Args: report_date: 报告日期 shift_type: 班次类型 Returns: 人员姓名字符串 """ date_str = report_date.strftime("%Y-%m-%d") schedule = self.feishu_manager.get_schedule_for_date(date_str) if shift_type == "day": return schedule.get("day_shift", "") else: return schedule.get("night_shift", "") def get_vessels_in_time_range( self, start_time: datetime, end_time: datetime ) -> List[Dict[str, Any]]: """ 获取指定时间范围内作业的所有船舶列表 Args: start_time: 开始时间 end_time: 结束时间 Returns: 船舶作业列表 """ try: # 使用原生 SQL 查询获取船舶数据 start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") query = f""" SELECT vesselVisitID, MIN(_time_end) as start_time, MAX(_time_end) as end_time, SUM(num20) as cnt20, SUM(num40) as cnt40, SUM(num20) + SUM(num40)*2 as teu, COUNT(DISTINCT vehicleId) as vehicles FROM ( SELECT CASE WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1)) THEN cn.num20 ELSE cc.num20 END AS num20, CASE WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3))) THEN cn.num40 ELSE cc.num40 END AS num40, cc.vehicleId, cc.movementType, cc.vesselVisitID, cc.batchName, cc._time_end FROM cnt_cycles cc LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end` WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR) AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR) ) AS basedata GROUP BY vesselVisitID ORDER BY vesselVisitID """ result = self._query_metabase_native(query) if result and "data" in result: rows = result["data"].get("rows", []) cols = result["data"].get("cols", []) col_idx = {col["name"]: i for i, col in enumerate(cols)} vessels = [] for row in rows: vessel_id = row[col_idx.get("vesselVisitID", 0)] cnt20 = row[col_idx.get("cnt20", 3)] or 0 cnt40 = row[col_idx.get("cnt40", 4)] or 0 teu = row[col_idx.get("teu", 5)] or 0 vehicles = row[col_idx.get("vehicles", 6)] or 0 vessel_name = self._extract_vessel_name(vessel_id) vessel_code = self._extract_vessel_code(vessel_id) vessels.append( { "vessel_code": vessel_code, "vessel_name": vessel_name, "vehicles": vehicles, "teu_20ft": cnt20, "teu_40ft": cnt40, "total_teu": teu, "efficiency": self._calculate_efficiency(teu, vehicles), # 故障和人工介入暂无数据接口,显示占位符 "failures": "--", "failure_rate": "--", "interventions": "--", "intervention_rate": "--", } ) return vessels except Exception as e: print(f"获取船舶列表失败: {e}") return [] def _extract_vessel_name(self, vessel_visit_id: str) -> str: """从 vessel_visit_id 中提取船名""" if not vessel_visit_id: return "" parts = vessel_visit_id.split("-") if len(parts) >= 2: name = parts[1] if "_" in name: name = name.split("_")[0] return name return vessel_visit_id def _extract_vessel_code(self, vessel_visit_id: str) -> str: """从 vessel_visit_id 中提取船舶代码(日期部分)""" if not vessel_visit_id: return "" parts = vessel_visit_id.split("-") if len(parts) >= 1: return parts[0] return vessel_visit_id def _calculate_efficiency(self, teu: int, vehicles: int) -> str: """ 计算效率(简化版本,后续可接入真实效率数据) 暂时返回 "--",等待后续接口 """ return "--" def _query_metabase_native( self, query: str, database_id: int = 3 ) -> Optional[Dict]: """ 执行 Metabase 原生查询 Args: query: SQL 查询语句 database_id: 数据库ID Returns: 查询结果字典 """ import requests username = os.getenv("MATEBASE_USERNAME") password = os.getenv("MATEBASE_PASSWORD") base_url = "http://10.80.0.11:30001" if not username or not password: raise Exception("缺少 Metabase 认证信息") # 认证 auth_url = f"{base_url}/api/session" auth_response = requests.post( auth_url, json={"username": username, "password": password}, timeout=30 ) auth_response.raise_for_status() session_token = auth_response.json().get("id") # 执行查询 query_url = f"{base_url}/api/dataset" headers = { "X-Metabase-Session": session_token, "Content-Type": "application/json", } payload = { "type": "native", "native": {"query": query}, "database": database_id, } response = requests.post(query_url, headers=headers, json=payload, timeout=30) response.raise_for_status() return response.json() def read_template(self, template_path: str) -> str: """读取模板文件""" try: with open(template_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"读取模板失败: {e}") return "" def render_template(self, template: str, data: Dict[str, Any]) -> str: """ 模板渲染 支持 {{variable}} 和 {{#list}}...{{/list}} """ result = template # 处理列表循环 def replace_list(match): list_name = match.group(1) list_template = match.group(2) if list_name in data and isinstance(data[list_name], list): items = [] for item in data[list_name]: item_str = list_template for key, value in item.items(): placeholder = f"{{{{{key}}}}}" item_str = item_str.replace(placeholder, str(value)) items.append(item_str) return "".join(items) return "" list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}" result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL) # 处理简单变量 for key, value in data.items(): if not isinstance(value, (list, dict)): placeholder = f"{{{{{key}}}}}" result = result.replace(placeholder, str(value)) return result def generate_shift_report( self, report_date: Optional[datetime] = None, shift_type: str = "day" ) -> str: """ 生成班次交接报告 Args: report_date: 报告日期,默认为当天 shift_type: 班次类型 ("day" 或 "night") Returns: 生成的报告内容 """ if report_date is None: report_date = datetime.now() shift = self.SHIFTS.get(shift_type) shift_name = shift["name"] if shift else shift_type print(f"正在生成 {report_date.strftime('%Y-%m-%d')} {shift_name} 交接报告...") # 1. 获取班次时间范围 start_time, end_time = self.get_shift_time_range(report_date, shift_type) print( f"班次时间范围: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}" ) # 2. 获取班次人员 print("获取班次人员...") personnel = self.get_shift_personnel(report_date, shift_type) # 3. 获取船舶作业数据 print("获取船舶作业数据...") vessels = self.get_vessels_in_time_range(start_time, end_time) # 4. 读取并渲染模板 template_path = os.path.join( project_root, "template", "shift_handover_template.txt" ) template = self.read_template(template_path) if not template: return "错误:无法读取模板文件" # 准备渲染数据 render_data = { "date": report_date.strftime("%m/%d"), "shift_name": shift_name, "personnel": personnel, "records": vessels, } # 渲染模板 report = self.render_template(template, render_data) return report def generate_all_shifts_report( self, report_date: Optional[datetime] = None ) -> Dict[str, str]: """ 生成当天所有班次的交接报告 Args: report_date: 报告日期 Returns: {"day": 白班报告, "night": 夜班报告} """ reports = {} for shift_type in self.SHIFTS.keys(): reports[shift_type] = self.generate_shift_report(report_date, shift_type) return reports def main(): """主函数""" parser = argparse.ArgumentParser(description="生成班次交接报告") parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为当天") parser.add_argument( "--shift", type=str, choices=["day", "night", "all"], default="all", help="班次类型: day(白班), night(夜班), all(全部),默认为 all", ) args = parser.parse_args() # 解析日期 if args.date: report_date = datetime.strptime(args.date, "%Y-%m-%d") else: report_date = datetime.now() # 生成报告 generator = ShiftReportGenerator() if args.shift == "all": reports = generator.generate_all_shifts_report(report_date) for shift_type, report in reports.items(): shift_name = generator.SHIFTS[shift_type]["name"] print("\n" + "=" * 60) print(f"{shift_name}交接报告") print("=" * 60) print(report) else: report = generator.generate_shift_report(report_date, args.shift) print("\n" + "=" * 60) print("班次交接报告") print("=" * 60) print(report) print("=" * 60) if __name__ == "__main__": main()