Files
gloria/shift_report.py
qichi.liang 00d2218c6d feat: 初始化福州港日报管理系统
- 添加日报生成功能 (report_generator.py)
- 添加 GUI 界面 (daily_report_gui.py)
- 添加班次交接报告功能 (shift_report.py)
- 集成飞书 API 获取排班信息
- 集成 Metabase 查询作业数据
- 生成 AGENTS.md 文档
2026-03-03 02:07:34 +08:00

441 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
班次交接报告生成模块
分别统计白班08:00-20:00和夜班20:00-次日08:00的作业情况。
生成格式化的交接班报告。
"""
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import os
import sys
import re
import argparse
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from feishu.manager import FeishuScheduleManager
class ShiftReportGenerator:
"""班次交接报告生成器"""
# 班次定义
SHIFTS = {
"day": {
"name": "白班",
"start_hour": 8,
"end_hour": 20,
},
"night": {
"name": "夜班",
"start_hour": 20,
"end_hour": 8, # 次日
},
}
def __init__(self):
self.feishu_manager = FeishuScheduleManager()
def get_shift_time_range(self, report_date: datetime, shift_type: str) -> tuple:
"""
获取班次时间范围
Args:
report_date: 报告日期
shift_type: 班次类型 ("day""night")
Returns:
(start_time, end_time) 元组
"""
shift = self.SHIFTS.get(shift_type)
if not shift:
raise ValueError(f"未知的班次类型: {shift_type}")
start_hour = shift["start_hour"]
end_hour = shift["end_hour"]
if shift_type == "day":
# 白班:当天 08:00 - 当天 20:00
start_time = report_date.replace(
hour=start_hour, minute=0, second=0, microsecond=0
)
end_time = report_date.replace(
hour=end_hour, minute=0, second=0, microsecond=0
)
else:
# 夜班:当天 20:00 - 次日 08:00
start_time = report_date.replace(
hour=start_hour, minute=0, second=0, microsecond=0
)
end_time = report_date.replace(
hour=end_hour, minute=0, second=0, microsecond=0
) + timedelta(days=1)
return start_time, end_time
def get_shift_personnel(self, report_date: datetime, shift_type: str) -> str:
"""
获取班次人员
Args:
report_date: 报告日期
shift_type: 班次类型
Returns:
人员姓名字符串
"""
date_str = report_date.strftime("%Y-%m-%d")
schedule = self.feishu_manager.get_schedule_for_date(date_str)
if shift_type == "day":
return schedule.get("day_shift", "")
else:
return schedule.get("night_shift", "")
def get_vessels_in_time_range(
self, start_time: datetime, end_time: datetime
) -> List[Dict[str, Any]]:
"""
获取指定时间范围内作业的所有船舶列表
Args:
start_time: 开始时间
end_time: 结束时间
Returns:
船舶作业列表
"""
try:
# 使用原生 SQL 查询获取船舶数据
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
query = f"""
SELECT
vesselVisitID,
MIN(_time_end) as start_time,
MAX(_time_end) as end_time,
SUM(num20) as cnt20,
SUM(num40) as cnt40,
SUM(num20) + SUM(num40)*2 as teu,
COUNT(DISTINCT vehicleId) as vehicles
FROM (
SELECT
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1))
THEN cn.num20 ELSE cc.num20 END AS num20,
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3)))
THEN cn.num40 ELSE cc.num40 END AS num40,
cc.vehicleId,
cc.movementType,
cc.vesselVisitID,
cc.batchName,
cc._time_end
FROM cnt_cycles cc
LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end`
WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR)
AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR)
) AS basedata
GROUP BY vesselVisitID
ORDER BY vesselVisitID
"""
result = self._query_metabase_native(query)
if result and "data" in result:
rows = result["data"].get("rows", [])
cols = result["data"].get("cols", [])
col_idx = {col["name"]: i for i, col in enumerate(cols)}
vessels = []
for row in rows:
vessel_id = row[col_idx.get("vesselVisitID", 0)]
cnt20 = row[col_idx.get("cnt20", 3)] or 0
cnt40 = row[col_idx.get("cnt40", 4)] or 0
teu = row[col_idx.get("teu", 5)] or 0
vehicles = row[col_idx.get("vehicles", 6)] or 0
vessel_name = self._extract_vessel_name(vessel_id)
vessel_code = self._extract_vessel_code(vessel_id)
vessels.append(
{
"vessel_code": vessel_code,
"vessel_name": vessel_name,
"vehicles": vehicles,
"teu_20ft": cnt20,
"teu_40ft": cnt40,
"total_teu": teu,
"efficiency": self._calculate_efficiency(teu, vehicles),
# 故障和人工介入暂无数据接口,显示占位符
"failures": "--",
"failure_rate": "--",
"interventions": "--",
"intervention_rate": "--",
}
)
return vessels
except Exception as e:
print(f"获取船舶列表失败: {e}")
return []
def _extract_vessel_name(self, vessel_visit_id: str) -> str:
"""从 vessel_visit_id 中提取船名"""
if not vessel_visit_id:
return ""
parts = vessel_visit_id.split("-")
if len(parts) >= 2:
name = parts[1]
if "_" in name:
name = name.split("_")[0]
return name
return vessel_visit_id
def _extract_vessel_code(self, vessel_visit_id: str) -> str:
"""从 vessel_visit_id 中提取船舶代码(日期部分)"""
if not vessel_visit_id:
return ""
parts = vessel_visit_id.split("-")
if len(parts) >= 1:
return parts[0]
return vessel_visit_id
def _calculate_efficiency(self, teu: int, vehicles: int) -> str:
"""
计算效率(简化版本,后续可接入真实效率数据)
暂时返回 "--",等待后续接口
"""
return "--"
def _query_metabase_native(
self, query: str, database_id: int = 3
) -> Optional[Dict]:
"""
执行 Metabase 原生查询
Args:
query: SQL 查询语句
database_id: 数据库ID
Returns:
查询结果字典
"""
import requests
username = os.getenv("MATEBASE_USERNAME")
password = os.getenv("MATEBASE_PASSWORD")
base_url = "http://10.80.0.11:30001"
if not username or not password:
raise Exception("缺少 Metabase 认证信息")
# 认证
auth_url = f"{base_url}/api/session"
auth_response = requests.post(
auth_url, json={"username": username, "password": password}, timeout=30
)
auth_response.raise_for_status()
session_token = auth_response.json().get("id")
# 执行查询
query_url = f"{base_url}/api/dataset"
headers = {
"X-Metabase-Session": session_token,
"Content-Type": "application/json",
}
payload = {
"type": "native",
"native": {"query": query},
"database": database_id,
}
response = requests.post(query_url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()
def read_template(self, template_path: str) -> str:
"""读取模板文件"""
try:
with open(template_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"读取模板失败: {e}")
return ""
def render_template(self, template: str, data: Dict[str, Any]) -> str:
"""
模板渲染
支持 {{variable}} 和 {{#list}}...{{/list}}
"""
result = template
# 处理列表循环
def replace_list(match):
list_name = match.group(1)
list_template = match.group(2)
if list_name in data and isinstance(data[list_name], list):
items = []
for item in data[list_name]:
item_str = list_template
for key, value in item.items():
placeholder = f"{{{{{key}}}}}"
item_str = item_str.replace(placeholder, str(value))
items.append(item_str)
return "".join(items)
return ""
list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}"
result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL)
# 处理简单变量
for key, value in data.items():
if not isinstance(value, (list, dict)):
placeholder = f"{{{{{key}}}}}"
result = result.replace(placeholder, str(value))
return result
def generate_shift_report(
self, report_date: Optional[datetime] = None, shift_type: str = "day"
) -> str:
"""
生成班次交接报告
Args:
report_date: 报告日期,默认为当天
shift_type: 班次类型 ("day""night")
Returns:
生成的报告内容
"""
if report_date is None:
report_date = datetime.now()
shift = self.SHIFTS.get(shift_type)
shift_name = shift["name"] if shift else shift_type
print(f"正在生成 {report_date.strftime('%Y-%m-%d')} {shift_name} 交接报告...")
# 1. 获取班次时间范围
start_time, end_time = self.get_shift_time_range(report_date, shift_type)
print(
f"班次时间范围: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}"
)
# 2. 获取班次人员
print("获取班次人员...")
personnel = self.get_shift_personnel(report_date, shift_type)
# 3. 获取船舶作业数据
print("获取船舶作业数据...")
vessels = self.get_vessels_in_time_range(start_time, end_time)
# 4. 读取并渲染模板
template_path = os.path.join(
project_root, "template", "shift_handover_template.txt"
)
template = self.read_template(template_path)
if not template:
return "错误:无法读取模板文件"
# 准备渲染数据
render_data = {
"date": report_date.strftime("%m/%d"),
"shift_name": shift_name,
"personnel": personnel,
"records": vessels,
}
# 渲染模板
report = self.render_template(template, render_data)
return report
def generate_all_shifts_report(
self, report_date: Optional[datetime] = None
) -> Dict[str, str]:
"""
生成当天所有班次的交接报告
Args:
report_date: 报告日期
Returns:
{"day": 白班报告, "night": 夜班报告}
"""
reports = {}
for shift_type in self.SHIFTS.keys():
reports[shift_type] = self.generate_shift_report(report_date, shift_type)
return reports
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="生成班次交接报告")
parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为当天")
parser.add_argument(
"--shift",
type=str,
choices=["day", "night", "all"],
default="all",
help="班次类型: day(白班), night(夜班), all(全部),默认为 all",
)
args = parser.parse_args()
# 解析日期
if args.date:
report_date = datetime.strptime(args.date, "%Y-%m-%d")
else:
report_date = datetime.now()
# 生成报告
generator = ShiftReportGenerator()
if args.shift == "all":
reports = generator.generate_all_shifts_report(report_date)
for shift_type, report in reports.items():
shift_name = generator.SHIFTS[shift_type]["name"]
print("\n" + "=" * 60)
print(f"{shift_name}交接报告")
print("=" * 60)
print(report)
else:
report = generator.generate_shift_report(report_date, args.shift)
print("\n" + "=" * 60)
print("班次交接报告")
print("=" * 60)
print(report)
print("=" * 60)
if __name__ == "__main__":
main()