Files
gloria/report_generator.py
qichi.liang 00d2218c6d feat: 初始化福州港日报管理系统
- 添加日报生成功能 (report_generator.py)
- 添加 GUI 界面 (daily_report_gui.py)
- 添加班次交接报告功能 (shift_report.py)
- 集成飞书 API 获取排班信息
- 集成 Metabase 查询作业数据
- 生成 AGENTS.md 文档
2026-03-03 02:07:34 +08:00

632 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
日报生成模块
从飞书获取排班信息从Metabase获取作业数据生成日报。
班次时间08:30 ~ 次日08:30
"""
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import os
import sys
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from feishu.manager import FeishuScheduleManager
from metabase.time_operations import TimeOperationsClient
from metabase.vessel_operations import VesselOperationsClient
class DailyReportGenerator:
"""日报生成器"""
def __init__(self):
self.feishu_manager = FeishuScheduleManager()
self.time_client = TimeOperationsClient()
self.vessel_client = VesselOperationsClient()
def get_shift_time_range(self, report_date: datetime) -> tuple:
"""
获取班次时间范围
特殊规则:
1. 每月1号从00:00到次日08:00特殊规则
2. 每月月底最后一天从当天08:00到当天23:59特殊规则
3. 其他日期08:00 ~ 次日08:00默认规则
Args:
report_date: 报告日期
Returns:
(start_time, end_time) 元组
"""
year = report_date.year
month = report_date.month
day = report_date.day
# 获取当月最后一天
if month == 12:
next_month = datetime(year + 1, 1, 1)
else:
next_month = datetime(year, month + 1, 1)
last_day_of_month = (next_month - timedelta(days=1)).day
# 判断是否是每月1号
if day == 1:
# 每月1号从00:00到次日08:00
start_time = report_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_time = report_date.replace(
hour=8, minute=0, second=0, microsecond=0
) + timedelta(days=1)
# 判断是否是月底最后一天
elif day == last_day_of_month:
# 每月月底最后一天从当天08:00到当天23:59
start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0)
end_time = report_date.replace(hour=23, minute=59, second=59, microsecond=0)
else:
# 其他日期08:00 ~ 次日08:00
start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=1)
return start_time, end_time
def get_next_day_schedule(self, report_date: datetime) -> Dict[str, Any]:
"""
获取次日排班信息(用于日报中的排班部分)
Args:
report_date: 报告日期
Returns:
排班信息字典
"""
# 次日日期
next_day = report_date + timedelta(days=1)
next_day_str = next_day.strftime("%Y-%m-%d")
# 从飞书获取排班
schedule = self.feishu_manager.get_schedule_for_date(next_day_str)
return {
"date": next_day.strftime("%m/%d"),
"day_shift": schedule.get("day_shift", ""),
"night_shift": schedule.get("night_shift", ""),
"day_shift_list": schedule.get("day_shift_list", []),
"night_shift_list": schedule.get("night_shift_list", []),
}
def get_vessels_in_time_range(
self, start_time: datetime, end_time: datetime
) -> List[Dict[str, Any]]:
"""
获取指定时间范围内作业的所有船舶列表
使用 Metabase 原生查询 API
Args:
start_time: 开始时间
end_time: 结束时间
Returns:
船舶列表,每个船舶包含:
- vessel_visit_id: 船舶访问ID
- vessel_name: 船舶名称从ID中提取
- start_time: 船舶作业开始时间
- end_time: 船舶作业结束时间
- cnt20: 20尺箱量
- cnt40: 40尺箱量
- teu: TEU数
"""
try:
# 使用 Metabase 原生查询获取船舶列表
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
query = f"""
SELECT
vesselVisitID,
MIN(_time_end) as start_time,
MAX(_time_end) as end_time,
SUM(num20) as cnt20,
SUM(num40) as cnt40,
SUM(num20) + SUM(num40)*2 as teu
FROM (
SELECT
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1))
THEN cn.num20 ELSE cc.num20 END AS num20,
CASE
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3)))
THEN cn.num40 ELSE cc.num40 END AS num40,
cc.vehicleId,
cc.movementType,
cc.vesselVisitID,
cc.batchName,
cc._time_end
FROM cnt_cycles cc
LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end`
WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR)
AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR)
) AS basedata
GROUP BY vesselVisitID
ORDER BY vesselVisitID
"""
# 调用 Metabase API
result = self._query_metabase_native(query)
if result and "data" in result:
rows = result["data"].get("rows", [])
cols = result["data"].get("cols", [])
# 构建列名到索引的映射
col_idx = {col["name"]: i for i, col in enumerate(cols)}
vessels = []
for row in rows:
vessel_id = row[col_idx.get("vesselVisitID", 0)]
start = row[col_idx.get("start_time", 1)]
end = row[col_idx.get("end_time", 2)]
cnt20 = row[col_idx.get("cnt20", 3)]
cnt40 = row[col_idx.get("cnt40", 4)]
teu = row[col_idx.get("teu", 5)]
# 从 vessel_visit_id 提取船名
# 格式通常是:日期-船名_其他信息
vessel_name = self._extract_vessel_name(vessel_id)
vessels.append(
{
"vessel_visit_id": vessel_id,
"vessel_name": vessel_name,
"start_time": start,
"end_time": end,
"cnt20": cnt20 or 0,
"cnt40": cnt40 or 0,
"teu": teu or 0,
}
)
return vessels
except Exception as e:
print(f"获取船舶列表失败: {e}")
return []
def _extract_vessel_name(self, vessel_visit_id: str) -> str:
"""从 vessel_visit_id 中提取船名"""
if not vessel_visit_id:
return ""
# 格式260208-鑫源永顺 或 260209-华晟67_X
# 提取 "-" 后面的部分,并去掉 "_X" 等后缀
parts = vessel_visit_id.split("-")
if len(parts) >= 2:
name = parts[1]
# 去掉 _X, _Y 等后缀
if "_" in name:
name = name.split("_")[0]
return name
return vessel_visit_id
def _query_metabase_native(
self, query: str, database_id: int = 3
) -> Optional[Dict]:
"""
执行 Metabase 原生查询
Args:
query: SQL 查询语句
database_id: 数据库ID默认为3
Returns:
查询结果字典
"""
import requests
# 获取认证信息
username = os.getenv("MATEBASE_USERNAME")
password = os.getenv("MATEBASE_PASSWORD")
base_url = "http://10.80.0.11:30001"
if not username or not password:
raise Exception("缺少 Metabase 认证信息")
# 1. 认证
auth_url = f"{base_url}/api/session"
auth_response = requests.post(
auth_url, json={"username": username, "password": password}, timeout=30
)
auth_response.raise_for_status()
session_token = auth_response.json().get("id")
# 2. 执行查询
query_url = f"{base_url}/api/dataset"
headers = {
"X-Metabase-Session": session_token,
"Content-Type": "application/json",
}
payload = {
"type": "native",
"native": {"query": query},
"database": database_id,
}
response = requests.post(query_url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()
def calculate_vessel_workload_in_shift(
self,
vessel_visit_id: str,
vessel_start: datetime,
vessel_end: datetime,
shift_start: datetime,
shift_end: datetime,
) -> Optional[Dict[str, Any]]:
"""
计算船舶在班次时间范围内的作业量
处理跨班次的情况
Args:
vessel_visit_id: 船舶访问ID
vessel_start: 船舶作业开始时间
vessel_end: 船舶作业结束时间
shift_start: 班次开始时间
shift_end: 班次结束时间
Returns:
作业量数据如果船舶不在班次范围内则返回None
"""
# 计算时间重叠
overlap_start = max(vessel_start, shift_start)
overlap_end = min(vessel_end, shift_end)
# 如果没有重叠返回None
if overlap_start >= overlap_end:
return None
try:
# 查询重叠时间范围内的作业数据
start_str = overlap_start.strftime("%Y-%m-%d %H:%M:%S")
end_str = overlap_end.strftime("%Y-%m-%d %H:%M:%S")
# 使用 time_operations 查询该时间段的作业数据
# TODO: 需要确认是否可以按船舶过滤
operations_data = self.time_client.get_operations_by_time(
start_str, end_str
)
return {
"vessel_visit_id": vessel_visit_id,
"start_time": overlap_start,
"end_time": overlap_end,
"cnt20": operations_data.get("cnt20", 0),
"cnt40": operations_data.get("cnt40", 0),
"cntAll": operations_data.get("cntAll", 0),
"teu": operations_data.get("teu", 0),
}
except Exception as e:
print(f"查询船舶 {vessel_visit_id} 作业数据失败: {e}")
return None
def get_daily_vessel_operations(
self, report_date: datetime
) -> List[Dict[str, Any]]:
"""
获取当天(班次时间范围内)的所有船舶作业数据
处理跨班次的情况
Args:
report_date: 报告日期
Returns:
船舶作业列表
"""
shift_start, shift_end = self.get_shift_time_range(report_date)
# 获取在班次时间范围内作业的所有船舶及其作业量
vessels = self.get_vessels_in_time_range(shift_start, shift_end)
# 转换时间字符串为 datetime 对象,并格式化输出
vessel_operations = []
for vessel in vessels:
start_time_str = vessel.get("start_time")
end_time_str = vessel.get("end_time")
# 解析时间字符串
try:
if isinstance(start_time_str, str):
# 去掉毫秒部分
start_time_str = start_time_str.split(".")[0]
vessel["start_time"] = datetime.strptime(
start_time_str, "%Y-%m-%d %H:%M:%S"
)
if isinstance(end_time_str, str):
end_time_str = end_time_str.split(".")[0]
vessel["end_time"] = datetime.strptime(
end_time_str, "%Y-%m-%d %H:%M:%S"
)
except Exception as e:
continue
vessel_operations.append(vessel)
return vessel_operations
def get_daily_operations(self, report_date: datetime) -> Dict[str, Any]:
"""
获取当天作业情况从Metabase
时间范围昨日08:30 ~ 今日08:30
Args:
report_date: 报告日期
Returns:
作业数据字典
"""
start_time, end_time = self.get_shift_time_range(report_date)
# 格式化时间字符串包含具体时间08:30
# Metabase查询需要时间格式YYYY-MM-DD HH:MM:SS
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
try:
# 获取时间段内的作业数据
operations_data = self.time_client.get_operations_by_time(
start_str, end_str
)
return {
"start_time": start_time,
"end_time": end_time,
"cnt20": operations_data.get("cnt20", 0),
"cnt40": operations_data.get("cnt40", 0),
"cntAll": operations_data.get("cntAll", 0),
"teu": operations_data.get("teu", 0),
"cycle_h_normal": operations_data.get("cycle_h_normal", 0),
"cycle_h_filtered": operations_data.get("cycle_h_filtered", 0),
}
except Exception as e:
print(f"获取作业数据失败: {e}")
return {
"start_time": start_time,
"end_time": end_time,
"cnt20": 0,
"cnt40": 0,
"cntAll": 0,
"teu": 0,
"cycle_h_normal": 0,
"cycle_h_filtered": 0,
}
def calculate_monthly_stats(
self, report_date: datetime, daily_teu: int
) -> Dict[str, Any]:
"""
计算月度统计数据
Args:
report_date: 报告日期
daily_teu: 当日TEU数
Returns:
月度统计数据
"""
# 当月计划作业量(当前日期 * 300TEU
day_of_month = report_date.day
month_plan_teu = day_of_month * 300
# 当月实际作业量从Metabase查询当月累计数据1号到当前日期
try:
# 构建当月1号到当前日期的查询
# 日报包含昨日08:30到今日08:30的数据
# 所以累计应该到今日08:30为止
month_start = report_date.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
# 获取班次结束时间次日08:30
_, shift_end = self.get_shift_time_range(report_date)
current_day_end = shift_end
start_str = month_start.strftime("%Y-%m-%d %H:%M:%S")
end_str = current_day_end.strftime("%Y-%m-%d %H:%M:%S")
# 查询当月累计数据
monthly_data = self.time_client.get_operations_by_time(start_str, end_str)
month_actual_teu = monthly_data.get("teu", 0)
except Exception as e:
print(f"获取当月累计数据失败: {e}")
# 如果查询失败,使用当日数据作为备用
month_actual_teu = daily_teu
# 完成比例
completion_rate = (
(month_actual_teu / month_plan_teu * 100) if month_plan_teu > 0 else 0
)
return {
"month_plan_teu": month_plan_teu,
"month_actual_teu": month_actual_teu,
"completion_rate": round(completion_rate, 2),
}
def get_days_in_month(self, year: int, month: int) -> int:
"""获取某月的天数"""
if month == 12:
next_month = datetime(year + 1, 1, 1)
else:
next_month = datetime(year, month + 1, 1)
this_month = datetime(year, month, 1)
return (next_month - this_month).days
def read_template(self, template_path: str) -> str:
"""读取模板文件"""
try:
with open(template_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"读取模板失败: {e}")
return ""
def render_template(self, template: str, data: Dict[str, Any]) -> str:
"""
简单的模板渲染
支持 {{variable}} 和 {{#list}}...{{/list}}
"""
import re
result = template
# 处理列表循环 {{#list}}...{{/list}}
def replace_list(match):
list_name = match.group(1)
list_template = match.group(2)
if list_name in data and isinstance(data[list_name], list):
items = []
for item in data[list_name]:
item_str = list_template
# 替换变量
for key, value in item.items():
placeholder = f"{{{{{key}}}}}"
item_str = item_str.replace(placeholder, str(value))
items.append(item_str)
return "".join(items)
return ""
# 匹配 {{#name}}...{{/name}}
list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}"
result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL)
# 处理简单变量 {{variable}}
for key, value in data.items():
if not isinstance(value, (list, dict)):
placeholder = f"{{{{{key}}}}}"
result = result.replace(placeholder, str(value))
return result
def generate_daily_report(self, report_date: Optional[datetime] = None) -> str:
"""
生成日报
Args:
report_date: 报告日期,默认为昨天
Returns:
生成的日报内容
"""
if report_date is None:
# 默认为昨天
report_date = datetime.now() - timedelta(days=1)
print(f"正在生成 {report_date.strftime('%Y-%m-%d')} 的日报...")
# 1. 获取次日排班信息
print("获取次日排班信息...")
next_day_schedule = self.get_next_day_schedule(report_date)
# 2. 获取当天作业情况
print("获取当天作业数据...")
operations_data = self.get_daily_operations(report_date)
# 3. 获取当天船舶作业情况(处理跨班次)
print("获取船舶作业数据...")
vessel_operations = self.get_daily_vessel_operations(report_date)
# 4. 计算月度统计
print("计算月度统计...")
monthly_stats = self.calculate_monthly_stats(
report_date, operations_data.get("teu", 0)
)
# 5. 读取并渲染模板
template_path = os.path.join(
project_root, "template", "daily_report_template.txt"
)
template = self.read_template(template_path)
if not template:
return "错误:无法读取模板文件"
# 准备渲染数据
# 注意:排班人员是次日的,所以日期也要用次日的
next_day_date = report_date + timedelta(days=1)
# 构建船舶列表数据
vessels_data = []
for vessel in vessel_operations:
vessels_data.append(
{
"name": vessel.get("vessel_name", ""),
"teu_20ft": vessel.get("cnt20", 0),
"teu_40ft": vessel.get("cnt40", 0),
"total_teu": vessel.get("teu", 0),
}
)
render_data = {
"date": report_date.strftime("%m/%d"),
"next_day_date": next_day_date.strftime("%m/%d"),
"vessels": vessels_data,
"daily_actual": operations_data.get("teu", 0),
"month_plan_teu": monthly_stats["month_plan_teu"],
"month_actual_teu": monthly_stats["month_actual_teu"],
"completion_rate": monthly_stats["completion_rate"],
"day_shift": next_day_schedule["day_shift"],
"night_shift": next_day_schedule["night_shift"],
"duty_phone": os.getenv("DUTY_PHONE", "13107662315"),
}
# 渲染模板
report = self.render_template(template, render_data)
return report
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="生成日报")
parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为昨天")
args = parser.parse_args()
# 解析日期
if args.date:
report_date = datetime.strptime(args.date, "%Y-%m-%d")
else:
report_date = None
# 生成日报
generator = DailyReportGenerator()
report = generator.generate_daily_report(report_date)
# 直接输出到终端
print("\n" + "=" * 60)
print("日报内容")
print("=" * 60)
print(report)
print("=" * 60)
if __name__ == "__main__":
main()