#!/usr/bin/env python3 """ 班次交接报告生成模块 分别统计白班(08:00-20:00)和夜班(20:00-次日08:00)的作业情况。 生成格式化的交接班报告。 """ from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import os import sys import re import argparse # 加载环境变量 from dotenv import load_dotenv load_dotenv() # 添加项目根目录到路径 project_root = os.path.dirname(os.path.abspath(__file__)) if project_root not in sys.path: sys.path.insert(0, project_root) from feishu.manager import FeishuScheduleManager from confluence import VesselReportManager from jira_client import JiraClient from metabase.vessel_operations import VesselOperationsClient from metabase.time_operations import TimeOperationsClient class ShiftReportGenerator: """班次交接报告生成器""" # 班次定义 SHIFTS = { "day": { "name": "白班", "start_hour": 8, "end_hour": 20, }, "night": { "name": "夜班", "start_hour": 20, "end_hour": 8, # 次日 }, } def __init__(self): self.feishu_manager = FeishuScheduleManager() self.vessel_ops_client = VesselOperationsClient() self.time_ops_client = TimeOperationsClient() try: self.confluence_manager = VesselReportManager() self.confluence_enabled = True jira_url = os.getenv("JIRA_URL") jira_username = os.getenv("JIRA_USERNAME") jira_token = os.getenv("JIRA_TOKEN") if jira_url and jira_username and jira_token: jira_client = JiraClient(jira_url, jira_username, jira_token) self.confluence_manager.set_jira_client(jira_client) print("Jira 客户端已启用") except Exception as e: print(f"Confluence 数据源初始化失败: {e}") self.confluence_enabled = False def get_shift_time_range(self, report_date: datetime, shift_type: str) -> tuple: """ 获取班次时间范围 Args: report_date: 报告日期 shift_type: 班次类型 ("day" 或 "night") Returns: (start_time, end_time) 元组 """ shift = self.SHIFTS.get(shift_type) if not shift: raise ValueError(f"未知的班次类型: {shift_type}") start_hour = shift["start_hour"] end_hour = shift["end_hour"] if shift_type == "day": # 白班:当天 08:00 - 当天 20:00 start_time = report_date.replace( hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = report_date.replace( hour=end_hour, minute=0, second=0, microsecond=0 ) else: # 夜班:当天 20:00 - 次日 08:00 start_time = report_date.replace( hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = report_date.replace( hour=end_hour, minute=0, second=0, microsecond=0 ) + timedelta(days=1) return start_time, end_time def get_shift_personnel(self, report_date: datetime, shift_type: str) -> str: """ 获取班次人员 Args: report_date: 报告日期 shift_type: 班次类型 Returns: 人员姓名字符串 """ date_str = report_date.strftime("%Y-%m-%d") schedule = self.feishu_manager.get_schedule_for_date(date_str) if shift_type == "day": return schedule.get("day_shift", "") else: return schedule.get("night_shift", "") def get_vessels_in_time_range( self, start_time: datetime, end_time: datetime, shift_type: str ) -> List[Dict[str, Any]]: """ 获取指定时间范围内作业的所有船舶列表 Args: start_time: 开始时间 end_time: 结束时间 Returns: 船舶作业列表 """ try: # 使用原生 SQL 查询获取船舶数据 start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") query = f""" SELECT vesselVisitID, MIN(_time_end) as start_time, MAX(_time_end) as end_time, SUM(num20) as cnt20, SUM(num40) as cnt40, SUM(num20) + SUM(num40)*2 as teu, COUNT(DISTINCT vehicleId) as vehicles FROM ( SELECT CASE WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1)) THEN cn.num20 ELSE cc.num20 END AS num20, CASE WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3))) THEN cn.num40 ELSE cc.num40 END AS num40, cc.vehicleId, cc.movementType, cc.vesselVisitID, cc.batchName, cc._time_end FROM cnt_cycles cc LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end` WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR) AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR) ) AS basedata GROUP BY vesselVisitID ORDER BY vesselVisitID """ result = self._query_metabase_native(query) if result and "data" in result: rows = result["data"].get("rows", []) cols = result["data"].get("cols", []) col_idx = {col["name"]: i for i, col in enumerate(cols)} confluence_data = {} confluence_data_by_name = {} confluence_reports_list = [] if self.confluence_enabled: try: query_start = (start_time - timedelta(days=1)).strftime( "%Y-%m-%d" ) query_end = end_time.strftime("%Y-%m-%d") reports = self.confluence_manager.get_vessel_reports_in_range( query_start, query_end ) for report in reports: vessel_num = report.get("vessel_number") if vessel_num: confluence_data[vessel_num] = report vessel_name = report.get("vessel_name") if vessel_name: confluence_data_by_name[vessel_name] = report confluence_reports_list.append(report) except Exception as e: print(f"从 Confluence 获取数据失败: {e}") vessels = [] for row in rows: vessel_id = row[col_idx.get("vesselVisitID", 0)] cnt20 = row[col_idx.get("cnt20", 3)] or 0 cnt40 = row[col_idx.get("cnt40", 4)] or 0 teu = row[col_idx.get("teu", 5)] or 0 vehicles = row[col_idx.get("vehicles", 6)] or 0 # 获取船的作业开始和结束时间(用于判断是否跨班) vessel_start_str = row[col_idx.get("start_time", 1)] vessel_end_str = row[col_idx.get("end_time", 2)] vessel_start = None vessel_end = None if vessel_start_str: try: vessel_start = datetime.strptime( str(vessel_start_str).split(".")[0], "%Y-%m-%d %H:%M:%S" ) except: pass if vessel_end_str: try: vessel_end = datetime.strptime( str(vessel_end_str).split(".")[0], "%Y-%m-%d %H:%M:%S" ) except: pass vessel_name = self._extract_vessel_name(vessel_id) vessel_code = self._extract_vessel_code(vessel_id) failures = "--" failure_rate = "--" interventions = "--" intervention_rate = "--" efficiency = "--" vessel_number_display = vessel_code report = None if vessel_name and vessel_name in confluence_data_by_name: report = confluence_data_by_name[vessel_name] elif vessel_start and vessel_end: report = self._find_vessel_report_by_time( vessel_start, vessel_end, confluence_reports_list ) if report: failures = str(report.get("failures", 0)) interventions = str(report.get("interventions", 0)) vessel_num = report.get("vessel_number") vessel_number_display = vessel_num if vessel_num else "--" conf_vessel_name = report.get("vessel_name") if conf_vessel_name: vessel_name = conf_vessel_name confluence_vehicles = report.get("vehicles", "") if confluence_vehicles: vehicles = len(confluence_vehicles.split("、")) # 使用 Metabase 的 TEU 重新计算故障率和介入率 failures_int = report.get("failures", 0) interventions_int = report.get("interventions", 0) if teu > 0: failure_rate = f"{(failures_int / (teu / 2) * 100):.2f}" intervention_rate = ( f"{(interventions_int / (teu / 2) * 100):.2f}" ) else: failure_rate = f"{report.get('failure_rate', 0):.2f}" intervention_rate = ( f"{report.get('intervention_rate', 0):.2f}" ) # 从 Metabase 获取效率数据(根据是否跨班决定查询方式) try: # 判断船舶是否跨班 is_cross_shift = False if vessel_start and vessel_end: # 白班:08:00-20:00 # 夜班:20:00-次日08:00 if shift_type == "day": # 白班:船在08:00前开始,或20:00后结束,都算跨班 shift_start = start_time.replace( hour=8, minute=0, second=0 ) shift_end = start_time.replace( hour=20, minute=0, second=0 ) if vessel_start < shift_start or vessel_end > shift_end: is_cross_shift = True else: # 夜班:船在20:00前开始,或次日08:00后结束,都算跨班 shift_start = start_time.replace( hour=20, minute=0, second=0 ) shift_end = (start_time + timedelta(days=1)).replace( hour=8, minute=0, second=0 ) if vessel_start < shift_start or vessel_end > shift_end: is_cross_shift = True if is_cross_shift and vessel_start and vessel_end: # 跨班:使用 Card 69 按班次时间查询效率 try: if shift_type == "day": query_start = max( vessel_start, start_time.replace(hour=8, minute=0, second=0), ) query_end = start_time.replace( hour=20, minute=0, second=0 ) else: query_start = start_time.replace( hour=20, minute=0, second=0 ) query_end = min( vessel_end, (start_time + timedelta(days=1)).replace( hour=8, minute=0, second=0 ), ) cycle_h = self.time_ops_client.get_efficiency_by_time( query_start.strftime("%Y-%m-%d %H:%M:%S"), query_end.strftime("%Y-%m-%d %H:%M:%S"), ) if cycle_h is not None and cycle_h > 0: efficiency = f"{cycle_h:.2f}" except Exception as e: print(f"获取班次效率失败: {e}") # 如果不跨班或跨班查询失败,使用整船效率 if efficiency == "--": vessel_ops_data = ( self.vessel_ops_client.get_vessel_operations(vessel_id) ) cycle_h = vessel_ops_data.get("cycle_h_filtered") if cycle_h is not None and cycle_h > 0: efficiency = f"{cycle_h:.2f}" except Exception as e: print(f"获取船舶 {vessel_id} 效率失败: {e}") vessels.append( { "vessel_code": vessel_number_display, "vessel_name": vessel_name, "vehicles": vehicles, "teu_20ft": cnt20, "teu_40ft": cnt40, "total_teu": teu, "efficiency": efficiency, "failures": failures, "failure_rate": failure_rate, "interventions": interventions, "intervention_rate": intervention_rate, "vessel_start": vessel_start, "vessel_end": vessel_end, } ) vessels = self._merge_vessels_by_name(vessels) return vessels except Exception as e: print(f"获取船舶列表失败: {e}") return [] def _extract_vessel_name(self, vessel_visit_id: str) -> str: """从 vessel_visit_id 中提取船名""" if not vessel_visit_id: return "" parts = vessel_visit_id.split("-") if len(parts) >= 2: name = parts[1] if "_" in name: name = name.split("_")[0] return name return vessel_visit_id def _extract_vessel_code(self, vessel_visit_id: str) -> str: """从 vessel_visit_id 中提取船舶代码(日期部分)""" if not vessel_visit_id: return "" parts = vessel_visit_id.split("-") if len(parts) >= 1: return parts[0] return vessel_visit_id def _parse_confluence_operation_time( self, operation_time: str ) -> tuple[Optional[datetime], Optional[datetime]]: """ 解析 Confluence 报告中的作业时间字符串 Args: operation_time: 作业时间字符串,如 "2026.03.13 08:00~2026.03.13 20:00" Returns: (start_time, end_time) 元组,解析失败返回 (None, None) """ if not operation_time: return None, None try: # 格式: "2026.03.13 08:00~2026.03.13 20:00" if "~" in operation_time: parts = operation_time.split("~") if len(parts) == 2: start_str = parts[0].strip() end_str = parts[1].strip() # 解析开始时间 start_dt = datetime.strptime(start_str, "%Y.%m.%d %H:%M") # 解析结束时间 end_dt = datetime.strptime(end_str, "%Y.%m.%d %H:%M") return start_dt, end_dt except Exception as e: print(f"解析作业时间失败 '{operation_time}': {e}") return None, None def _find_vessel_report_by_time( self, vessel_start: Optional[datetime], vessel_end: Optional[datetime], confluence_reports: List[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: """ 根据作业时间范围查找匹配的 Confluence 报告 Args: vessel_start: Metabase 中船舶作业开始时间 vessel_end: Metabase 中船舶作业结束时间 confluence_reports: Confluence 报告列表 Returns: 匹配的报告,未找到返回 None """ if not vessel_start or not vessel_end: return None for report in confluence_reports: operation_time = report.get("operation_time", "") conf_start, conf_end = self._parse_confluence_operation_time(operation_time) if conf_start and conf_end: # 检查时间是否有重叠 # 重叠条件: 一个区间的开始小于另一个区间的结束,且一个区间的结束大于另一个区间的开始 if vessel_start < conf_end and vessel_end > conf_start: return report return None def _merge_vessels_by_name( self, vessels: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: merged = {} for vessel in vessels: name = vessel.get("vessel_name", "") if not name or name == "N/A": continue if name not in merged: merged[name] = vessel.copy() else: existing = merged[name] existing["teu_20ft"] = existing.get("teu_20ft", 0) + vessel.get( "teu_20ft", 0 ) existing["teu_40ft"] = existing.get("teu_40ft", 0) + vessel.get( "teu_40ft", 0 ) existing["total_teu"] = existing.get("total_teu", 0) + vessel.get( "total_teu", 0 ) existing["vehicles"] = max( existing.get("vehicles", 0), vessel.get("vehicles", 0) ) if vessel.get("vessel_code") and vessel["vessel_code"] != "--": existing["vessel_code"] = vessel["vessel_code"] if vessel.get("failures") and vessel["failures"] != "--": existing["failures"] = vessel["failures"] existing["failure_rate"] = vessel.get("failure_rate", "--") if vessel.get("interventions") and vessel["interventions"] != "--": existing["interventions"] = vessel["interventions"] existing["intervention_rate"] = vessel.get( "intervention_rate", "--" ) # Assign N/A vessels to existing vessels by time overlap for vessel in vessels: name = vessel.get("vessel_name", "") if name != "N/A": continue vessel_start = vessel.get("vessel_start") vessel_end = vessel.get("vessel_end") if vessel_start and vessel_end: for target_name, target_vessel in merged.items(): target_start = target_vessel.get("vessel_start") target_end = target_vessel.get("vessel_end") if target_start and target_end: if vessel_start < target_end and vessel_end > target_start: target_vessel["teu_20ft"] = target_vessel.get( "teu_20ft", 0 ) + vessel.get("teu_20ft", 0) target_vessel["teu_40ft"] = target_vessel.get( "teu_40ft", 0 ) + vessel.get("teu_40ft", 0) target_vessel["total_teu"] = target_vessel.get( "total_teu", 0 ) + vessel.get("total_teu", 0) target_vessel["vehicles"] = max( target_vessel.get("vehicles", 0), vessel.get("vehicles", 0), ) break return list(merged.values()) def _calculate_efficiency(self, teu: int, vehicles: int) -> str: """ 计算效率(简化版本,后续可接入真实效率数据) 暂时返回 "--",等待后续接口 """ return "--" def _query_metabase_native( self, query: str, database_id: int = 3 ) -> Optional[Dict]: """ 执行 Metabase 原生查询 Args: query: SQL 查询语句 database_id: 数据库ID Returns: 查询结果字典 """ import requests username = os.getenv("MATEBASE_USERNAME") password = os.getenv("MATEBASE_PASSWORD") base_url = "http://10.80.0.11:30001" if not username or not password: raise Exception("缺少 Metabase 认证信息") # 认证 auth_url = f"{base_url}/api/session" auth_response = requests.post( auth_url, json={"username": username, "password": password}, timeout=30 ) auth_response.raise_for_status() session_token = auth_response.json().get("id") # 执行查询 query_url = f"{base_url}/api/dataset" headers = { "X-Metabase-Session": session_token, "Content-Type": "application/json", } payload = { "type": "native", "native": {"query": query}, "database": database_id, } response = requests.post(query_url, headers=headers, json=payload, timeout=30) response.raise_for_status() return response.json() def _get_vessel_efficiency_in_range( self, vessel_visit_id: str, start_time: datetime, end_time: datetime ) -> Optional[float]: """计算船舶在指定时间范围内的效率(剔除异常)""" start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") # 查询该船在指定时间范围内的循环数和车辆数 query = f""" SELECT COUNT(*) as cycles, COUNT(DISTINCT vehicleId) as vehicles FROM cnt_cycles cc WHERE cc.vesselVisitID = '{vessel_visit_id}' AND cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR) AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR) AND cc.movementType IN ('Load', 'Discharge', 'YardMove') """ result = self._query_metabase_native(query) if result and "data" in result: rows = result["data"].get("rows", []) cols = result["data"].get("cols", []) if rows: col_idx = {col["name"]: i for i, col in enumerate(cols)} cycles = rows[0][col_idx.get("cycles", 0)] or 0 vehicles = rows[0][col_idx.get("vehicles", 1)] or 1 # 计算时间差(小时) hours = (end_time - start_time).total_seconds() / 3600 if hours > 0 and vehicles > 0: efficiency = cycles / vehicles / hours return round(efficiency, 2) return None def read_template(self, template_path: str) -> str: """读取模板文件""" try: with open(template_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"读取模板失败: {e}") return "" def render_template(self, template: str, data: Dict[str, Any]) -> str: """ 模板渲染 支持 {{variable}} 和 {{#list}}...{{/list}} """ result = template # 处理列表循环 def replace_list(match): list_name = match.group(1) list_template = match.group(2) if list_name in data and isinstance(data[list_name], list): items = [] for item in data[list_name]: item_str = list_template for key, value in item.items(): placeholder = f"{{{{{key}}}}}" item_str = item_str.replace(placeholder, str(value)) items.append(item_str) return "".join(items) return "" list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}" result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL) # 处理简单变量 for key, value in data.items(): if not isinstance(value, (list, dict)): placeholder = f"{{{{{key}}}}}" result = result.replace(placeholder, str(value)) return result def generate_shift_report( self, report_date: Optional[datetime] = None, shift_type: str = "day" ) -> str: """ 生成班次交接报告 Args: report_date: 报告日期,默认为当天 shift_type: 班次类型 ("day" 或 "night") Returns: 生成的报告内容 """ if report_date is None: report_date = datetime.now() shift = self.SHIFTS.get(shift_type) shift_name = shift["name"] if shift else shift_type print(f"正在生成 {report_date.strftime('%Y-%m-%d')} {shift_name} 交接报告...") # 1. 获取班次时间范围 start_time, end_time = self.get_shift_time_range(report_date, shift_type) print( f"班次时间范围: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}" ) # 2. 获取班次人员 print("获取班次人员...") personnel = self.get_shift_personnel(report_date, shift_type) # 3. 获取船舶作业数据 print("获取船舶作业数据...") vessels = self.get_vessels_in_time_range(start_time, end_time, shift_type) # 4. 读取并渲染模板 template_path = os.path.join( project_root, "template", "shift_handover_template.txt" ) template = self.read_template(template_path) if not template: return "错误:无法读取模板文件" # 准备渲染数据 render_data = { "date": report_date.strftime("%m/%d"), "shift_name": shift_name, "personnel": personnel, "records": vessels, } # 渲染模板 report = self.render_template(template, render_data) return report def generate_all_shifts_report( self, report_date: Optional[datetime] = None ) -> Dict[str, str]: """ 生成当天所有班次的交接报告 Args: report_date: 报告日期 Returns: {"day": 白班报告, "night": 夜班报告} """ reports = {} for shift_type in self.SHIFTS.keys(): reports[shift_type] = self.generate_shift_report(report_date, shift_type) return reports def main(): """主函数""" parser = argparse.ArgumentParser(description="生成班次交接报告") parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为当天") parser.add_argument( "--shift", type=str, choices=["day", "night", "all"], default="all", help="班次类型: day(白班), night(夜班), all(全部),默认为 all", ) args = parser.parse_args() # 解析日期 if args.date: report_date = datetime.strptime(args.date, "%Y-%m-%d") else: report_date = datetime.now() # 生成报告 generator = ShiftReportGenerator() if args.shift == "all": reports = generator.generate_all_shifts_report(report_date) for shift_type, report in reports.items(): shift_name = generator.SHIFTS[shift_type]["name"] print("\n" + "=" * 60) print(f"{shift_name}交接报告") print("=" * 60) print(report) else: report = generator.generate_shift_report(report_date, args.shift) print("\n" + "=" * 60) print("班次交接报告") print("=" * 60) print(report) print("=" * 60) if __name__ == "__main__": main()