- 添加日报生成功能 (report_generator.py) - 添加 GUI 界面 (daily_report_gui.py) - 添加班次交接报告功能 (shift_report.py) - 集成飞书 API 获取排班信息 - 集成 Metabase 查询作业数据 - 生成 AGENTS.md 文档
632 lines
21 KiB
Python
632 lines
21 KiB
Python
"""
|
||
日报生成模块
|
||
|
||
从飞书获取排班信息,从Metabase获取作业数据,生成日报。
|
||
班次时间:08:30 ~ 次日08:30
|
||
"""
|
||
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Any, Optional
|
||
import os
|
||
import sys
|
||
|
||
# 加载环境变量
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
# 添加项目根目录到路径
|
||
project_root = os.path.dirname(os.path.abspath(__file__))
|
||
if project_root not in sys.path:
|
||
sys.path.insert(0, project_root)
|
||
|
||
from feishu.manager import FeishuScheduleManager
|
||
from metabase.time_operations import TimeOperationsClient
|
||
from metabase.vessel_operations import VesselOperationsClient
|
||
|
||
|
||
class DailyReportGenerator:
|
||
"""日报生成器"""
|
||
|
||
def __init__(self):
|
||
self.feishu_manager = FeishuScheduleManager()
|
||
self.time_client = TimeOperationsClient()
|
||
self.vessel_client = VesselOperationsClient()
|
||
|
||
def get_shift_time_range(self, report_date: datetime) -> tuple:
|
||
"""
|
||
获取班次时间范围
|
||
特殊规则:
|
||
1. 每月1号:从00:00到次日08:00(特殊规则)
|
||
2. 每月月底最后一天:从当天08:00到当天23:59(特殊规则)
|
||
3. 其他日期:08:00 ~ 次日08:00(默认规则)
|
||
|
||
Args:
|
||
report_date: 报告日期
|
||
|
||
Returns:
|
||
(start_time, end_time) 元组
|
||
"""
|
||
year = report_date.year
|
||
month = report_date.month
|
||
day = report_date.day
|
||
|
||
# 获取当月最后一天
|
||
if month == 12:
|
||
next_month = datetime(year + 1, 1, 1)
|
||
else:
|
||
next_month = datetime(year, month + 1, 1)
|
||
last_day_of_month = (next_month - timedelta(days=1)).day
|
||
|
||
# 判断是否是每月1号
|
||
if day == 1:
|
||
# 每月1号:从00:00到次日08:00
|
||
start_time = report_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
end_time = report_date.replace(
|
||
hour=8, minute=0, second=0, microsecond=0
|
||
) + timedelta(days=1)
|
||
# 判断是否是月底最后一天
|
||
elif day == last_day_of_month:
|
||
# 每月月底最后一天:从当天08:00到当天23:59
|
||
start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0)
|
||
end_time = report_date.replace(hour=23, minute=59, second=59, microsecond=0)
|
||
else:
|
||
# 其他日期:08:00 ~ 次日08:00
|
||
start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0)
|
||
end_time = start_time + timedelta(days=1)
|
||
|
||
return start_time, end_time
|
||
|
||
def get_next_day_schedule(self, report_date: datetime) -> Dict[str, Any]:
|
||
"""
|
||
获取次日排班信息(用于日报中的排班部分)
|
||
|
||
Args:
|
||
report_date: 报告日期
|
||
|
||
Returns:
|
||
排班信息字典
|
||
"""
|
||
# 次日日期
|
||
next_day = report_date + timedelta(days=1)
|
||
next_day_str = next_day.strftime("%Y-%m-%d")
|
||
|
||
# 从飞书获取排班
|
||
schedule = self.feishu_manager.get_schedule_for_date(next_day_str)
|
||
|
||
return {
|
||
"date": next_day.strftime("%m/%d"),
|
||
"day_shift": schedule.get("day_shift", ""),
|
||
"night_shift": schedule.get("night_shift", ""),
|
||
"day_shift_list": schedule.get("day_shift_list", []),
|
||
"night_shift_list": schedule.get("night_shift_list", []),
|
||
}
|
||
|
||
def get_vessels_in_time_range(
|
||
self, start_time: datetime, end_time: datetime
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取指定时间范围内作业的所有船舶列表
|
||
使用 Metabase 原生查询 API
|
||
|
||
Args:
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
|
||
Returns:
|
||
船舶列表,每个船舶包含:
|
||
- vessel_visit_id: 船舶访问ID
|
||
- vessel_name: 船舶名称(从ID中提取)
|
||
- start_time: 船舶作业开始时间
|
||
- end_time: 船舶作业结束时间
|
||
- cnt20: 20尺箱量
|
||
- cnt40: 40尺箱量
|
||
- teu: TEU数
|
||
"""
|
||
try:
|
||
# 使用 Metabase 原生查询获取船舶列表
|
||
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
query = f"""
|
||
SELECT
|
||
vesselVisitID,
|
||
MIN(_time_end) as start_time,
|
||
MAX(_time_end) as end_time,
|
||
SUM(num20) as cnt20,
|
||
SUM(num40) as cnt40,
|
||
SUM(num20) + SUM(num40)*2 as teu
|
||
FROM (
|
||
SELECT
|
||
CASE
|
||
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
|
||
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1))
|
||
THEN cn.num20 ELSE cc.num20 END AS num20,
|
||
CASE
|
||
WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0)
|
||
OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3)))
|
||
THEN cn.num40 ELSE cc.num40 END AS num40,
|
||
cc.vehicleId,
|
||
cc.movementType,
|
||
cc.vesselVisitID,
|
||
cc.batchName,
|
||
cc._time_end
|
||
FROM cnt_cycles cc
|
||
LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end`
|
||
WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR)
|
||
AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR)
|
||
) AS basedata
|
||
GROUP BY vesselVisitID
|
||
ORDER BY vesselVisitID
|
||
"""
|
||
|
||
# 调用 Metabase API
|
||
result = self._query_metabase_native(query)
|
||
|
||
if result and "data" in result:
|
||
rows = result["data"].get("rows", [])
|
||
cols = result["data"].get("cols", [])
|
||
|
||
# 构建列名到索引的映射
|
||
col_idx = {col["name"]: i for i, col in enumerate(cols)}
|
||
|
||
vessels = []
|
||
for row in rows:
|
||
vessel_id = row[col_idx.get("vesselVisitID", 0)]
|
||
start = row[col_idx.get("start_time", 1)]
|
||
end = row[col_idx.get("end_time", 2)]
|
||
cnt20 = row[col_idx.get("cnt20", 3)]
|
||
cnt40 = row[col_idx.get("cnt40", 4)]
|
||
teu = row[col_idx.get("teu", 5)]
|
||
|
||
# 从 vessel_visit_id 提取船名
|
||
# 格式通常是:日期-船名_其他信息
|
||
vessel_name = self._extract_vessel_name(vessel_id)
|
||
|
||
vessels.append(
|
||
{
|
||
"vessel_visit_id": vessel_id,
|
||
"vessel_name": vessel_name,
|
||
"start_time": start,
|
||
"end_time": end,
|
||
"cnt20": cnt20 or 0,
|
||
"cnt40": cnt40 or 0,
|
||
"teu": teu or 0,
|
||
}
|
||
)
|
||
|
||
return vessels
|
||
|
||
except Exception as e:
|
||
print(f"获取船舶列表失败: {e}")
|
||
|
||
return []
|
||
|
||
def _extract_vessel_name(self, vessel_visit_id: str) -> str:
|
||
"""从 vessel_visit_id 中提取船名"""
|
||
if not vessel_visit_id:
|
||
return ""
|
||
|
||
# 格式:260208-鑫源永顺 或 260209-华晟67_X
|
||
# 提取 "-" 后面的部分,并去掉 "_X" 等后缀
|
||
parts = vessel_visit_id.split("-")
|
||
if len(parts) >= 2:
|
||
name = parts[1]
|
||
# 去掉 _X, _Y 等后缀
|
||
if "_" in name:
|
||
name = name.split("_")[0]
|
||
return name
|
||
|
||
return vessel_visit_id
|
||
|
||
def _query_metabase_native(
|
||
self, query: str, database_id: int = 3
|
||
) -> Optional[Dict]:
|
||
"""
|
||
执行 Metabase 原生查询
|
||
|
||
Args:
|
||
query: SQL 查询语句
|
||
database_id: 数据库ID,默认为3
|
||
|
||
Returns:
|
||
查询结果字典
|
||
"""
|
||
import requests
|
||
|
||
# 获取认证信息
|
||
username = os.getenv("MATEBASE_USERNAME")
|
||
password = os.getenv("MATEBASE_PASSWORD")
|
||
base_url = "http://10.80.0.11:30001"
|
||
|
||
if not username or not password:
|
||
raise Exception("缺少 Metabase 认证信息")
|
||
|
||
# 1. 认证
|
||
auth_url = f"{base_url}/api/session"
|
||
auth_response = requests.post(
|
||
auth_url, json={"username": username, "password": password}, timeout=30
|
||
)
|
||
auth_response.raise_for_status()
|
||
session_token = auth_response.json().get("id")
|
||
|
||
# 2. 执行查询
|
||
query_url = f"{base_url}/api/dataset"
|
||
headers = {
|
||
"X-Metabase-Session": session_token,
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
payload = {
|
||
"type": "native",
|
||
"native": {"query": query},
|
||
"database": database_id,
|
||
}
|
||
|
||
response = requests.post(query_url, headers=headers, json=payload, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
return response.json()
|
||
|
||
def calculate_vessel_workload_in_shift(
|
||
self,
|
||
vessel_visit_id: str,
|
||
vessel_start: datetime,
|
||
vessel_end: datetime,
|
||
shift_start: datetime,
|
||
shift_end: datetime,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
计算船舶在班次时间范围内的作业量
|
||
处理跨班次的情况
|
||
|
||
Args:
|
||
vessel_visit_id: 船舶访问ID
|
||
vessel_start: 船舶作业开始时间
|
||
vessel_end: 船舶作业结束时间
|
||
shift_start: 班次开始时间
|
||
shift_end: 班次结束时间
|
||
|
||
Returns:
|
||
作业量数据,如果船舶不在班次范围内则返回None
|
||
"""
|
||
# 计算时间重叠
|
||
overlap_start = max(vessel_start, shift_start)
|
||
overlap_end = min(vessel_end, shift_end)
|
||
|
||
# 如果没有重叠,返回None
|
||
if overlap_start >= overlap_end:
|
||
return None
|
||
|
||
try:
|
||
# 查询重叠时间范围内的作业数据
|
||
start_str = overlap_start.strftime("%Y-%m-%d %H:%M:%S")
|
||
end_str = overlap_end.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 使用 time_operations 查询该时间段的作业数据
|
||
# TODO: 需要确认是否可以按船舶过滤
|
||
operations_data = self.time_client.get_operations_by_time(
|
||
start_str, end_str
|
||
)
|
||
|
||
return {
|
||
"vessel_visit_id": vessel_visit_id,
|
||
"start_time": overlap_start,
|
||
"end_time": overlap_end,
|
||
"cnt20": operations_data.get("cnt20", 0),
|
||
"cnt40": operations_data.get("cnt40", 0),
|
||
"cntAll": operations_data.get("cntAll", 0),
|
||
"teu": operations_data.get("teu", 0),
|
||
}
|
||
except Exception as e:
|
||
print(f"查询船舶 {vessel_visit_id} 作业数据失败: {e}")
|
||
return None
|
||
|
||
def get_daily_vessel_operations(
|
||
self, report_date: datetime
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取当天(班次时间范围内)的所有船舶作业数据
|
||
处理跨班次的情况
|
||
|
||
Args:
|
||
report_date: 报告日期
|
||
|
||
Returns:
|
||
船舶作业列表
|
||
"""
|
||
shift_start, shift_end = self.get_shift_time_range(report_date)
|
||
|
||
# 获取在班次时间范围内作业的所有船舶及其作业量
|
||
vessels = self.get_vessels_in_time_range(shift_start, shift_end)
|
||
|
||
# 转换时间字符串为 datetime 对象,并格式化输出
|
||
vessel_operations = []
|
||
for vessel in vessels:
|
||
start_time_str = vessel.get("start_time")
|
||
end_time_str = vessel.get("end_time")
|
||
|
||
# 解析时间字符串
|
||
try:
|
||
if isinstance(start_time_str, str):
|
||
# 去掉毫秒部分
|
||
start_time_str = start_time_str.split(".")[0]
|
||
vessel["start_time"] = datetime.strptime(
|
||
start_time_str, "%Y-%m-%d %H:%M:%S"
|
||
)
|
||
if isinstance(end_time_str, str):
|
||
end_time_str = end_time_str.split(".")[0]
|
||
vessel["end_time"] = datetime.strptime(
|
||
end_time_str, "%Y-%m-%d %H:%M:%S"
|
||
)
|
||
except Exception as e:
|
||
continue
|
||
|
||
vessel_operations.append(vessel)
|
||
|
||
return vessel_operations
|
||
|
||
def get_daily_operations(self, report_date: datetime) -> Dict[str, Any]:
|
||
"""
|
||
获取当天作业情况(从Metabase)
|
||
时间范围:昨日08:30 ~ 今日08:30
|
||
|
||
Args:
|
||
report_date: 报告日期
|
||
|
||
Returns:
|
||
作业数据字典
|
||
"""
|
||
start_time, end_time = self.get_shift_time_range(report_date)
|
||
|
||
# 格式化时间字符串,包含具体时间(08:30)
|
||
# Metabase查询需要时间格式:YYYY-MM-DD HH:MM:SS
|
||
start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
try:
|
||
# 获取时间段内的作业数据
|
||
operations_data = self.time_client.get_operations_by_time(
|
||
start_str, end_str
|
||
)
|
||
|
||
return {
|
||
"start_time": start_time,
|
||
"end_time": end_time,
|
||
"cnt20": operations_data.get("cnt20", 0),
|
||
"cnt40": operations_data.get("cnt40", 0),
|
||
"cntAll": operations_data.get("cntAll", 0),
|
||
"teu": operations_data.get("teu", 0),
|
||
"cycle_h_normal": operations_data.get("cycle_h_normal", 0),
|
||
"cycle_h_filtered": operations_data.get("cycle_h_filtered", 0),
|
||
}
|
||
except Exception as e:
|
||
print(f"获取作业数据失败: {e}")
|
||
return {
|
||
"start_time": start_time,
|
||
"end_time": end_time,
|
||
"cnt20": 0,
|
||
"cnt40": 0,
|
||
"cntAll": 0,
|
||
"teu": 0,
|
||
"cycle_h_normal": 0,
|
||
"cycle_h_filtered": 0,
|
||
}
|
||
|
||
def calculate_monthly_stats(
|
||
self, report_date: datetime, daily_teu: int
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
计算月度统计数据
|
||
|
||
Args:
|
||
report_date: 报告日期
|
||
daily_teu: 当日TEU数
|
||
|
||
Returns:
|
||
月度统计数据
|
||
"""
|
||
# 当月计划作业量(当前日期 * 300TEU)
|
||
day_of_month = report_date.day
|
||
month_plan_teu = day_of_month * 300
|
||
|
||
# 当月实际作业量:从Metabase查询当月累计数据(1号到当前日期)
|
||
try:
|
||
# 构建当月1号到当前日期的查询
|
||
# 日报包含昨日08:30到今日08:30的数据
|
||
# 所以累计应该到今日08:30为止
|
||
month_start = report_date.replace(
|
||
day=1, hour=0, minute=0, second=0, microsecond=0
|
||
)
|
||
# 获取班次结束时间(次日08:30)
|
||
_, shift_end = self.get_shift_time_range(report_date)
|
||
current_day_end = shift_end
|
||
|
||
start_str = month_start.strftime("%Y-%m-%d %H:%M:%S")
|
||
end_str = current_day_end.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 查询当月累计数据
|
||
monthly_data = self.time_client.get_operations_by_time(start_str, end_str)
|
||
month_actual_teu = monthly_data.get("teu", 0)
|
||
|
||
except Exception as e:
|
||
print(f"获取当月累计数据失败: {e}")
|
||
# 如果查询失败,使用当日数据作为备用
|
||
month_actual_teu = daily_teu
|
||
|
||
# 完成比例
|
||
completion_rate = (
|
||
(month_actual_teu / month_plan_teu * 100) if month_plan_teu > 0 else 0
|
||
)
|
||
|
||
return {
|
||
"month_plan_teu": month_plan_teu,
|
||
"month_actual_teu": month_actual_teu,
|
||
"completion_rate": round(completion_rate, 2),
|
||
}
|
||
|
||
def get_days_in_month(self, year: int, month: int) -> int:
|
||
"""获取某月的天数"""
|
||
if month == 12:
|
||
next_month = datetime(year + 1, 1, 1)
|
||
else:
|
||
next_month = datetime(year, month + 1, 1)
|
||
|
||
this_month = datetime(year, month, 1)
|
||
return (next_month - this_month).days
|
||
|
||
def read_template(self, template_path: str) -> str:
|
||
"""读取模板文件"""
|
||
try:
|
||
with open(template_path, "r", encoding="utf-8") as f:
|
||
return f.read()
|
||
except Exception as e:
|
||
print(f"读取模板失败: {e}")
|
||
return ""
|
||
|
||
def render_template(self, template: str, data: Dict[str, Any]) -> str:
|
||
"""
|
||
简单的模板渲染
|
||
支持 {{variable}} 和 {{#list}}...{{/list}}
|
||
"""
|
||
import re
|
||
|
||
result = template
|
||
|
||
# 处理列表循环 {{#list}}...{{/list}}
|
||
def replace_list(match):
|
||
list_name = match.group(1)
|
||
list_template = match.group(2)
|
||
|
||
if list_name in data and isinstance(data[list_name], list):
|
||
items = []
|
||
for item in data[list_name]:
|
||
item_str = list_template
|
||
# 替换变量
|
||
for key, value in item.items():
|
||
placeholder = f"{{{{{key}}}}}"
|
||
item_str = item_str.replace(placeholder, str(value))
|
||
items.append(item_str)
|
||
return "".join(items)
|
||
return ""
|
||
|
||
# 匹配 {{#name}}...{{/name}}
|
||
list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}"
|
||
result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL)
|
||
|
||
# 处理简单变量 {{variable}}
|
||
for key, value in data.items():
|
||
if not isinstance(value, (list, dict)):
|
||
placeholder = f"{{{{{key}}}}}"
|
||
result = result.replace(placeholder, str(value))
|
||
|
||
return result
|
||
|
||
def generate_daily_report(self, report_date: Optional[datetime] = None) -> str:
|
||
"""
|
||
生成日报
|
||
|
||
Args:
|
||
report_date: 报告日期,默认为昨天
|
||
|
||
Returns:
|
||
生成的日报内容
|
||
"""
|
||
if report_date is None:
|
||
# 默认为昨天
|
||
report_date = datetime.now() - timedelta(days=1)
|
||
|
||
print(f"正在生成 {report_date.strftime('%Y-%m-%d')} 的日报...")
|
||
|
||
# 1. 获取次日排班信息
|
||
print("获取次日排班信息...")
|
||
next_day_schedule = self.get_next_day_schedule(report_date)
|
||
|
||
# 2. 获取当天作业情况
|
||
print("获取当天作业数据...")
|
||
operations_data = self.get_daily_operations(report_date)
|
||
|
||
# 3. 获取当天船舶作业情况(处理跨班次)
|
||
print("获取船舶作业数据...")
|
||
vessel_operations = self.get_daily_vessel_operations(report_date)
|
||
|
||
# 4. 计算月度统计
|
||
print("计算月度统计...")
|
||
monthly_stats = self.calculate_monthly_stats(
|
||
report_date, operations_data.get("teu", 0)
|
||
)
|
||
|
||
# 5. 读取并渲染模板
|
||
template_path = os.path.join(
|
||
project_root, "template", "daily_report_template.txt"
|
||
)
|
||
template = self.read_template(template_path)
|
||
|
||
if not template:
|
||
return "错误:无法读取模板文件"
|
||
|
||
# 准备渲染数据
|
||
# 注意:排班人员是次日的,所以日期也要用次日的
|
||
next_day_date = report_date + timedelta(days=1)
|
||
|
||
# 构建船舶列表数据
|
||
vessels_data = []
|
||
for vessel in vessel_operations:
|
||
vessels_data.append(
|
||
{
|
||
"name": vessel.get("vessel_name", ""),
|
||
"teu_20ft": vessel.get("cnt20", 0),
|
||
"teu_40ft": vessel.get("cnt40", 0),
|
||
"total_teu": vessel.get("teu", 0),
|
||
}
|
||
)
|
||
|
||
render_data = {
|
||
"date": report_date.strftime("%m/%d"),
|
||
"next_day_date": next_day_date.strftime("%m/%d"),
|
||
"vessels": vessels_data,
|
||
"daily_actual": operations_data.get("teu", 0),
|
||
"month_plan_teu": monthly_stats["month_plan_teu"],
|
||
"month_actual_teu": monthly_stats["month_actual_teu"],
|
||
"completion_rate": monthly_stats["completion_rate"],
|
||
"day_shift": next_day_schedule["day_shift"],
|
||
"night_shift": next_day_schedule["night_shift"],
|
||
"duty_phone": os.getenv("DUTY_PHONE", "13107662315"),
|
||
}
|
||
|
||
# 渲染模板
|
||
report = self.render_template(template, render_data)
|
||
|
||
return report
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="生成日报")
|
||
parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为昨天")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 解析日期
|
||
if args.date:
|
||
report_date = datetime.strptime(args.date, "%Y-%m-%d")
|
||
else:
|
||
report_date = None
|
||
|
||
# 生成日报
|
||
generator = DailyReportGenerator()
|
||
report = generator.generate_daily_report(report_date)
|
||
|
||
# 直接输出到终端
|
||
print("\n" + "=" * 60)
|
||
print("日报内容")
|
||
print("=" * 60)
|
||
print(report)
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|