Files
gloria/shift_report.py
Developer 5d0cafac32 feat: 交接班报告支持 Confluence/Jira 集成,添加 N/A 记录时间归属功能
- 集成 Confluence API 获取船舶报告数据
- 集成 Jira API 查询故障数量
- 支持船号显示 (462#、463# 等)
- 支持故障次数/故障率、人工介入次数/介入率显示
- 跨班作业使用 Card 69 按时间查询效率
- 不跨班作业使用整船效率(剔除异常)
- N/A 记录根据作业时间归属到对应船舶
- 更新 AGENTS.md 和 README.md 文档
- 删除 daily_report_gui.py
2026-03-14 02:52:23 +08:00

793 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
班次交接报告生成模块
分别统计白班08:00-20:00和夜班20:00-次日08:00的作业情况。
生成格式化的交接班报告。
"""
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import os
import sys
import re
import argparse
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from feishu.manager import FeishuScheduleManager
from confluence import VesselReportManager
from jira_client import JiraClient
from metabase.vessel_operations import VesselOperationsClient
from metabase.time_operations import TimeOperationsClient
class ShiftReportGenerator:
"""班次交接报告生成器"""
# 班次定义
SHIFTS = {
"day": {
"name": "白班",
"start_hour": 8,
"end_hour": 20,
},
"night": {
"name": "夜班",
"start_hour": 20,
"end_hour": 8, # 次日
},
}
def __init__(self):
self.feishu_manager = FeishuScheduleManager()
self.vessel_ops_client = VesselOperationsClient()
self.time_ops_client = TimeOperationsClient()
try:
self.confluence_manager = VesselReportManager()
self.confluence_enabled = True
jira_url = os.getenv("JIRA_URL")
jira_username = os.getenv("JIRA_USERNAME")
jira_token = os.getenv("JIRA_TOKEN")
if jira_url and jira_username and jira_token:
jira_client = JiraClient(jira_url, jira_username, jira_token)
self.confluence_manager.set_jira_client(jira_client)
print("Jira 客户端已启用")
except Exception as e:
print(f"Confluence 数据源初始化失败: {e}")
self.confluence_enabled = False
def get_shift_time_range(self, report_date: datetime, shift_type: str) -> tuple:
"""
获取班次时间范围
Args:
report_date: 报告日期
shift_type: 班次类型 ("day""night")
Returns:
(start_time, end_time) 元组
"""
shift = self.SHIFTS.get(shift_type)
if not shift:
raise ValueError(f"未知的班次类型: {shift_type}")
start_hour = shift["start_hour"]
end_hour = shift["end_hour"]
if shift_type == "day":
# 白班:当天 08:00 - 当天 20:00
start_time = report_date.replace(
hour=start_hour, minute=0, second=0, microsecond=0
)
end_time = report_date.replace(
hour=end_hour, minute=0, second=0, microsecond=0
)
else:
# 夜班:当天 20:00 - 次日 08:00
start_time = report_date.replace(
hour=start_hour, minute=0, second=0, microsecond=0
)
end_time = report_date.replace(
hour=end_hour, minute=0, second=0, microsecond=0
) + timedelta(days=1)
return start_time, end_time
def get_shift_personnel(self, report_date: datetime, shift_type: str) -> str:
"""
获取班次人员
Args:
report_date: 报告日期
shift_type: 班次类型
Returns:
人员姓名字符串
"""
date_str = report_date.strftime("%Y-%m-%d")
schedule = self.feishu_manager.get_schedule_for_date(date_str)
if shift_type == "day":
return schedule.get("day_shift", "")
else:
return schedule.get("night_shift", "")
def get_vessels_in_time_range(
self, start_time: datetime, end_time: datetime, shift_type: str
) -> List[Dict[str, Any]]:
"""
获取指定时间范围内作业的所有船舶列表
Args:
start_time: 开始时间
end_time: 结束时间
Returns:
船舶作业列表
"""
try:
# 使用原生 SQL 查询获取船舶数据
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
query = f"""
SELECT
vesselVisitID,
MIN(_time_end) as start_time,
MAX(_time_end) as end_time,
SUM(num20) as cnt20,
SUM(num40) as cnt40,
SUM(num20) + SUM(num40)*2 as teu,
COUNT(DISTINCT vehicleId) as vehicles
FROM (
SELECT
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1))
THEN cn.num20 ELSE cc.num20 END AS num20,
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3)))
THEN cn.num40 ELSE cc.num40 END AS num40,
cc.vehicleId,
cc.movementType,
cc.vesselVisitID,
cc.batchName,
cc._time_end
FROM cnt_cycles cc
LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end`
WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR)
AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR)
) AS basedata
GROUP BY vesselVisitID
ORDER BY vesselVisitID
"""
result = self._query_metabase_native(query)
if result and "data" in result:
rows = result["data"].get("rows", [])
cols = result["data"].get("cols", [])
col_idx = {col["name"]: i for i, col in enumerate(cols)}
confluence_data = {}
confluence_data_by_name = {}
confluence_reports_list = []
if self.confluence_enabled:
try:
query_start = (start_time - timedelta(days=1)).strftime(
"%Y-%m-%d"
)
query_end = end_time.strftime("%Y-%m-%d")
reports = self.confluence_manager.get_vessel_reports_in_range(
query_start, query_end
)
for report in reports:
vessel_num = report.get("vessel_number")
if vessel_num:
confluence_data[vessel_num] = report
vessel_name = report.get("vessel_name")
if vessel_name:
confluence_data_by_name[vessel_name] = report
confluence_reports_list.append(report)
except Exception as e:
print(f"从 Confluence 获取数据失败: {e}")
vessels = []
for row in rows:
vessel_id = row[col_idx.get("vesselVisitID", 0)]
cnt20 = row[col_idx.get("cnt20", 3)] or 0
cnt40 = row[col_idx.get("cnt40", 4)] or 0
teu = row[col_idx.get("teu", 5)] or 0
vehicles = row[col_idx.get("vehicles", 6)] or 0
# 获取船的作业开始和结束时间(用于判断是否跨班)
vessel_start_str = row[col_idx.get("start_time", 1)]
vessel_end_str = row[col_idx.get("end_time", 2)]
vessel_start = None
vessel_end = None
if vessel_start_str:
try:
vessel_start = datetime.strptime(
str(vessel_start_str).split(".")[0], "%Y-%m-%d %H:%M:%S"
)
except:
pass
if vessel_end_str:
try:
vessel_end = datetime.strptime(
str(vessel_end_str).split(".")[0], "%Y-%m-%d %H:%M:%S"
)
except:
pass
vessel_name = self._extract_vessel_name(vessel_id)
vessel_code = self._extract_vessel_code(vessel_id)
failures = "--"
failure_rate = "--"
interventions = "--"
intervention_rate = "--"
efficiency = "--"
vessel_number_display = vessel_code
report = None
if vessel_name and vessel_name in confluence_data_by_name:
report = confluence_data_by_name[vessel_name]
elif vessel_start and vessel_end:
report = self._find_vessel_report_by_time(
vessel_start, vessel_end, confluence_reports_list
)
if report:
failures = str(report.get("failures", 0))
interventions = str(report.get("interventions", 0))
vessel_num = report.get("vessel_number")
vessel_number_display = vessel_num if vessel_num else "--"
conf_vessel_name = report.get("vessel_name")
if conf_vessel_name:
vessel_name = conf_vessel_name
confluence_vehicles = report.get("vehicles", "")
if confluence_vehicles:
vehicles = len(confluence_vehicles.split(""))
# 使用 Metabase 的 TEU 重新计算故障率和介入率
failures_int = report.get("failures", 0)
interventions_int = report.get("interventions", 0)
if teu > 0:
failure_rate = f"{(failures_int / (teu / 2) * 100):.2f}"
intervention_rate = (
f"{(interventions_int / (teu / 2) * 100):.2f}"
)
else:
failure_rate = f"{report.get('failure_rate', 0):.2f}"
intervention_rate = (
f"{report.get('intervention_rate', 0):.2f}"
)
# 从 Metabase 获取效率数据(根据是否跨班决定查询方式)
try:
# 判断船舶是否跨班
is_cross_shift = False
if vessel_start and vessel_end:
# 白班08:00-20:00
# 夜班20:00-次日08:00
if shift_type == "day":
# 白班船在08:00前开始或20:00后结束都算跨班
shift_start = start_time.replace(
hour=8, minute=0, second=0
)
shift_end = start_time.replace(
hour=20, minute=0, second=0
)
if vessel_start < shift_start or vessel_end > shift_end:
is_cross_shift = True
else:
# 夜班船在20:00前开始或次日08:00后结束都算跨班
shift_start = start_time.replace(
hour=20, minute=0, second=0
)
shift_end = (start_time + timedelta(days=1)).replace(
hour=8, minute=0, second=0
)
if vessel_start < shift_start or vessel_end > shift_end:
is_cross_shift = True
if is_cross_shift and vessel_start and vessel_end:
# 跨班:使用 Card 69 按班次时间查询效率
try:
if shift_type == "day":
query_start = max(
vessel_start,
start_time.replace(hour=8, minute=0, second=0),
)
query_end = start_time.replace(
hour=20, minute=0, second=0
)
else:
query_start = start_time.replace(
hour=20, minute=0, second=0
)
query_end = min(
vessel_end,
(start_time + timedelta(days=1)).replace(
hour=8, minute=0, second=0
),
)
cycle_h = self.time_ops_client.get_efficiency_by_time(
query_start.strftime("%Y-%m-%d %H:%M:%S"),
query_end.strftime("%Y-%m-%d %H:%M:%S"),
)
if cycle_h is not None and cycle_h > 0:
efficiency = f"{cycle_h:.2f}"
except Exception as e:
print(f"获取班次效率失败: {e}")
# 如果不跨班或跨班查询失败,使用整船效率
if efficiency == "--":
vessel_ops_data = (
self.vessel_ops_client.get_vessel_operations(vessel_id)
)
cycle_h = vessel_ops_data.get("cycle_h_filtered")
if cycle_h is not None and cycle_h > 0:
efficiency = f"{cycle_h:.2f}"
except Exception as e:
print(f"获取船舶 {vessel_id} 效率失败: {e}")
vessels.append(
{
"vessel_code": vessel_number_display,
"vessel_name": vessel_name,
"vehicles": vehicles,
"teu_20ft": cnt20,
"teu_40ft": cnt40,
"total_teu": teu,
"efficiency": efficiency,
"failures": failures,
"failure_rate": failure_rate,
"interventions": interventions,
"intervention_rate": intervention_rate,
"vessel_start": vessel_start,
"vessel_end": vessel_end,
}
)
vessels = self._merge_vessels_by_name(vessels)
return vessels
except Exception as e:
print(f"获取船舶列表失败: {e}")
return []
def _extract_vessel_name(self, vessel_visit_id: str) -> str:
"""从 vessel_visit_id 中提取船名"""
if not vessel_visit_id:
return ""
parts = vessel_visit_id.split("-")
if len(parts) >= 2:
name = parts[1]
if "_" in name:
name = name.split("_")[0]
return name
return vessel_visit_id
def _extract_vessel_code(self, vessel_visit_id: str) -> str:
"""从 vessel_visit_id 中提取船舶代码(日期部分)"""
if not vessel_visit_id:
return ""
parts = vessel_visit_id.split("-")
if len(parts) >= 1:
return parts[0]
return vessel_visit_id
def _parse_confluence_operation_time(
self, operation_time: str
) -> tuple[Optional[datetime], Optional[datetime]]:
"""
解析 Confluence 报告中的作业时间字符串
Args:
operation_time: 作业时间字符串,如 "2026.03.13 08:00~2026.03.13 20:00"
Returns:
(start_time, end_time) 元组,解析失败返回 (None, None)
"""
if not operation_time:
return None, None
try:
# 格式: "2026.03.13 08:00~2026.03.13 20:00"
if "~" in operation_time:
parts = operation_time.split("~")
if len(parts) == 2:
start_str = parts[0].strip()
end_str = parts[1].strip()
# 解析开始时间
start_dt = datetime.strptime(start_str, "%Y.%m.%d %H:%M")
# 解析结束时间
end_dt = datetime.strptime(end_str, "%Y.%m.%d %H:%M")
return start_dt, end_dt
except Exception as e:
print(f"解析作业时间失败 '{operation_time}': {e}")
return None, None
def _find_vessel_report_by_time(
self,
vessel_start: Optional[datetime],
vessel_end: Optional[datetime],
confluence_reports: List[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
"""
根据作业时间范围查找匹配的 Confluence 报告
Args:
vessel_start: Metabase 中船舶作业开始时间
vessel_end: Metabase 中船舶作业结束时间
confluence_reports: Confluence 报告列表
Returns:
匹配的报告,未找到返回 None
"""
if not vessel_start or not vessel_end:
return None
for report in confluence_reports:
operation_time = report.get("operation_time", "")
conf_start, conf_end = self._parse_confluence_operation_time(operation_time)
if conf_start and conf_end:
# 检查时间是否有重叠
# 重叠条件: 一个区间的开始小于另一个区间的结束,且一个区间的结束大于另一个区间的开始
if vessel_start < conf_end and vessel_end > conf_start:
return report
return None
def _merge_vessels_by_name(
self, vessels: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
merged = {}
for vessel in vessels:
name = vessel.get("vessel_name", "")
if not name or name == "N/A":
continue
if name not in merged:
merged[name] = vessel.copy()
else:
existing = merged[name]
existing["teu_20ft"] = existing.get("teu_20ft", 0) + vessel.get(
"teu_20ft", 0
)
existing["teu_40ft"] = existing.get("teu_40ft", 0) + vessel.get(
"teu_40ft", 0
)
existing["total_teu"] = existing.get("total_teu", 0) + vessel.get(
"total_teu", 0
)
existing["vehicles"] = max(
existing.get("vehicles", 0), vessel.get("vehicles", 0)
)
if vessel.get("vessel_code") and vessel["vessel_code"] != "--":
existing["vessel_code"] = vessel["vessel_code"]
if vessel.get("failures") and vessel["failures"] != "--":
existing["failures"] = vessel["failures"]
existing["failure_rate"] = vessel.get("failure_rate", "--")
if vessel.get("interventions") and vessel["interventions"] != "--":
existing["interventions"] = vessel["interventions"]
existing["intervention_rate"] = vessel.get(
"intervention_rate", "--"
)
# Assign N/A vessels to existing vessels by time overlap
for vessel in vessels:
name = vessel.get("vessel_name", "")
if name != "N/A":
continue
vessel_start = vessel.get("vessel_start")
vessel_end = vessel.get("vessel_end")
if vessel_start and vessel_end:
for target_name, target_vessel in merged.items():
target_start = target_vessel.get("vessel_start")
target_end = target_vessel.get("vessel_end")
if target_start and target_end:
if vessel_start < target_end and vessel_end > target_start:
target_vessel["teu_20ft"] = target_vessel.get(
"teu_20ft", 0
) + vessel.get("teu_20ft", 0)
target_vessel["teu_40ft"] = target_vessel.get(
"teu_40ft", 0
) + vessel.get("teu_40ft", 0)
target_vessel["total_teu"] = target_vessel.get(
"total_teu", 0
) + vessel.get("total_teu", 0)
target_vessel["vehicles"] = max(
target_vessel.get("vehicles", 0),
vessel.get("vehicles", 0),
)
break
return list(merged.values())
def _calculate_efficiency(self, teu: int, vehicles: int) -> str:
"""
计算效率(简化版本,后续可接入真实效率数据)
暂时返回 "--",等待后续接口
"""
return "--"
def _query_metabase_native(
self, query: str, database_id: int = 3
) -> Optional[Dict]:
"""
执行 Metabase 原生查询
Args:
query: SQL 查询语句
database_id: 数据库ID
Returns:
查询结果字典
"""
import requests
username = os.getenv("MATEBASE_USERNAME")
password = os.getenv("MATEBASE_PASSWORD")
base_url = "http://10.80.0.11:30001"
if not username or not password:
raise Exception("缺少 Metabase 认证信息")
# 认证
auth_url = f"{base_url}/api/session"
auth_response = requests.post(
auth_url, json={"username": username, "password": password}, timeout=30
)
auth_response.raise_for_status()
session_token = auth_response.json().get("id")
# 执行查询
query_url = f"{base_url}/api/dataset"
headers = {
"X-Metabase-Session": session_token,
"Content-Type": "application/json",
}
payload = {
"type": "native",
"native": {"query": query},
"database": database_id,
}
response = requests.post(query_url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()
def _get_vessel_efficiency_in_range(
self, vessel_visit_id: str, start_time: datetime, end_time: datetime
) -> Optional[float]:
"""计算船舶在指定时间范围内的效率(剔除异常)"""
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
# 查询该船在指定时间范围内的循环数和车辆数
query = f"""
SELECT
COUNT(*) as cycles,
COUNT(DISTINCT vehicleId) as vehicles
FROM cnt_cycles cc
WHERE cc.vesselVisitID = '{vessel_visit_id}'
AND cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR)
AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR)
AND cc.movementType IN ('Load', 'Discharge', 'YardMove')
"""
result = self._query_metabase_native(query)
if result and "data" in result:
rows = result["data"].get("rows", [])
cols = result["data"].get("cols", [])
if rows:
col_idx = {col["name"]: i for i, col in enumerate(cols)}
cycles = rows[0][col_idx.get("cycles", 0)] or 0
vehicles = rows[0][col_idx.get("vehicles", 1)] or 1
# 计算时间差(小时)
hours = (end_time - start_time).total_seconds() / 3600
if hours > 0 and vehicles > 0:
efficiency = cycles / vehicles / hours
return round(efficiency, 2)
return None
def read_template(self, template_path: str) -> str:
"""读取模板文件"""
try:
with open(template_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"读取模板失败: {e}")
return ""
def render_template(self, template: str, data: Dict[str, Any]) -> str:
"""
模板渲染
支持 {{variable}} 和 {{#list}}...{{/list}}
"""
result = template
# 处理列表循环
def replace_list(match):
list_name = match.group(1)
list_template = match.group(2)
if list_name in data and isinstance(data[list_name], list):
items = []
for item in data[list_name]:
item_str = list_template
for key, value in item.items():
placeholder = f"{{{{{key}}}}}"
item_str = item_str.replace(placeholder, str(value))
items.append(item_str)
return "".join(items)
return ""
list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}"
result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL)
# 处理简单变量
for key, value in data.items():
if not isinstance(value, (list, dict)):
placeholder = f"{{{{{key}}}}}"
result = result.replace(placeholder, str(value))
return result
def generate_shift_report(
self, report_date: Optional[datetime] = None, shift_type: str = "day"
) -> str:
"""
生成班次交接报告
Args:
report_date: 报告日期,默认为当天
shift_type: 班次类型 ("day""night")
Returns:
生成的报告内容
"""
if report_date is None:
report_date = datetime.now()
shift = self.SHIFTS.get(shift_type)
shift_name = shift["name"] if shift else shift_type
print(f"正在生成 {report_date.strftime('%Y-%m-%d')} {shift_name} 交接报告...")
# 1. 获取班次时间范围
start_time, end_time = self.get_shift_time_range(report_date, shift_type)
print(
f"班次时间范围: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}"
)
# 2. 获取班次人员
print("获取班次人员...")
personnel = self.get_shift_personnel(report_date, shift_type)
# 3. 获取船舶作业数据
print("获取船舶作业数据...")
vessels = self.get_vessels_in_time_range(start_time, end_time, shift_type)
# 4. 读取并渲染模板
template_path = os.path.join(
project_root, "template", "shift_handover_template.txt"
)
template = self.read_template(template_path)
if not template:
return "错误:无法读取模板文件"
# 准备渲染数据
render_data = {
"date": report_date.strftime("%m/%d"),
"shift_name": shift_name,
"personnel": personnel,
"records": vessels,
}
# 渲染模板
report = self.render_template(template, render_data)
return report
def generate_all_shifts_report(
self, report_date: Optional[datetime] = None
) -> Dict[str, str]:
"""
生成当天所有班次的交接报告
Args:
report_date: 报告日期
Returns:
{"day": 白班报告, "night": 夜班报告}
"""
reports = {}
for shift_type in self.SHIFTS.keys():
reports[shift_type] = self.generate_shift_report(report_date, shift_type)
return reports
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="生成班次交接报告")
parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为当天")
parser.add_argument(
"--shift",
type=str,
choices=["day", "night", "all"],
default="all",
help="班次类型: day(白班), night(夜班), all(全部),默认为 all",
)
args = parser.parse_args()
# 解析日期
if args.date:
report_date = datetime.strptime(args.date, "%Y-%m-%d")
else:
report_date = datetime.now()
# 生成报告
generator = ShiftReportGenerator()
if args.shift == "all":
reports = generator.generate_all_shifts_report(report_date)
for shift_type, report in reports.items():
shift_name = generator.SHIFTS[shift_type]["name"]
print("\n" + "=" * 60)
print(f"{shift_name}交接报告")
print("=" * 60)
print(report)
else:
report = generator.generate_shift_report(report_date, args.shift)
print("\n" + "=" * 60)
print("班次交接报告")
print("=" * 60)
print(report)
print("=" * 60)
if __name__ == "__main__":
main()