commit 00d2218c6d6183a6a71b5ce4224ed3745449887d Author: qichi.liang Date: Tue Mar 3 02:07:34 2026 +0800 feat: 初始化福州港日报管理系统 - 添加日报生成功能 (report_generator.py) - 添加 GUI 界面 (daily_report_gui.py) - 添加班次交接报告功能 (shift_report.py) - 集成飞书 API 获取排班信息 - 集成 Metabase 查询作业数据 - 生成 AGENTS.md 文档 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..97d6838 --- /dev/null +++ b/.gitignore @@ -0,0 +1,49 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Environment variables +.env +.env.local + +# Cache +.ruff_cache/ +.pytest_cache/ +.mypy_cache/ + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..45615e7 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,72 @@ +# GLORIA 日报管理系统 + +**生成时间:** 2026-03-03 +**语言:** Python 3.10+ +**用途:** 福州港日报生成与管理 + +## 概述 + +从飞书获取排班信息,从 Metabase 获取船舶作业数据,生成标准化日报。 + +## 结构 + +``` +Gloria/ +├── daily_report_gui.py # GUI入口 (Tkinter) +├── report_generator.py # CLI入口 + 核心生成逻辑 +├── feishu/ # 飞书API集成 +│ ├── client.py # HTTP客户端 + Token管理 +│ ├── manager.py # 排班管理器(统一入口) +│ └── parser.py # 月度/年度表格解析 +├── metabase/ # Metabase数据查询 +│ ├── time_operations.py # 按时间范围查询 +│ └── vessel_operations.py # 按船舶查询 +└── template/ # 日报模板 +``` + +## 查找指南 + +| 任务 | 位置 | +|------|------| +| 修改日报格式 | `template/daily_report_template.txt` | +| 调整班次时间规则 | `report_generator.py:36-78` (`get_shift_time_range`) | +| 添加飞书功能 | `feishu/manager.py` | +| 新增Metabase查询 | `metabase/` 对应客户端 | +| 修改GUI界面 | `daily_report_gui.py` | + +## 关键约定 + +### 班次时间规则 +- **每月1号:** 00:00 ~ 次日08:00 +- **月底最后一天:** 08:00 ~ 23:59 +- **其他日期:** 08:00 ~ 次日08:00 + +### 环境变量 (.env) +``` +MATEBASE_USERNAME=xxx +MATEBASE_PASSWORD=xxx +FEISHU_APP_ID=xxx +FEISHU_APP_SECRET=xxx +FEISHU_SPREADSHEET_TOKEN=xxx +``` + +### 运行方式 +```bash +# GUI模式 +python daily_report_gui.py + +# CLI模式 +python report_generator.py --date 2026-03-01 +``` + +## 依赖 + +- `requests` - HTTP请求 +- `python-dotenv` - 环境变量 +- `tkinter` - GUI (标准库) + +## 注意事项 + +- 程序需在 **8:00 后运行**,确保最后一条船指令结束时间超过8点 +- 飞书 Token 自动刷新,提前30分钟续期 +- Metabase 无原生 Python SDK,使用 REST API \ No newline at end of file diff --git a/daily_report_gui.py b/daily_report_gui.py new file mode 100644 index 0000000..71e7b44 --- /dev/null +++ b/daily_report_gui.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +""" +日报展示和复制工具 +基于tkinter的GUI应用,用于展示日报信息和一键复制 +""" + +import tkinter as tk +from tkinter import ttk, messagebox, scrolledtext +from datetime import datetime, timedelta +import sys +import os +import threading + +# 添加项目根目录到路径 +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.insert(0, project_root) + + +class DailyReportApp: + """日报展示和复制应用""" + + def __init__(self, root): + self.root = root + self.root.title("日报管理系统") + self.root.geometry("1400x900") + self.root.minsize(1200, 700) + + # 设置默认字体大小 + self.default_font_size = 14 + self.title_font_size = 16 + + # 创建主布局 + self.create_main_layout() + + # 初始化数据 + self.current_report_date = None + self.report_content = "" + self.is_generating = False + + def create_main_layout(self): + """创建主布局""" + # 确保能导入项目模块 + project_root = os.path.dirname(os.path.abspath(__file__)) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + # 主容器 + main_container = tk.Frame(self.root, padx=10, pady=10) + main_container.pack(fill="both", expand=True) + + # 配置权重 + main_container.columnconfigure(1, weight=3) # 中间区域 + main_container.columnconfigure(2, weight=1) # 右侧区域 + main_container.rowconfigure(1, weight=1) + + # 1. 顶部提示区域 + self.create_notice_area(main_container) + + # 2. 左侧日期选择区域 + self.create_left_panel(main_container) + + # 3. 中间日报展示区域 + self.create_center_panel(main_container) + + # 4. 右侧复制按钮区域 + self.create_right_panel(main_container) + + def create_notice_area(self, parent): + """创建提示区域""" + notice_frame = tk.Frame(parent, bd=2, relief="solid", bg="#FFF3E0") + notice_frame.grid(row=0, column=0, columnspan=3, sticky="ew", pady=(0, 10)) + + # 静态提示标签 + self.notice_label = tk.Label( + notice_frame, + text="重要提示:本程序需在8:00过后运行,确保最后一条船的指令结束时间超过8点,以保证日报数据完整性!", + fg="#FF6B6B", + bg="#FFF3E0", + font=("", self.title_font_size, "bold"), + padx=10, + pady=10, + ) + self.notice_label.pack(fill="x", expand=True) + + def create_left_panel(self, parent): + """创建左侧控制面板""" + left_frame = tk.LabelFrame( + parent, + text="日期选择", + padx=10, + pady=10, + font=("", self.title_font_size, "bold"), + ) + left_frame.grid(row=1, column=0, sticky="nsew", padx=(0, 10)) + + # 昨日汇总按钮 + self.yesterday_btn = tk.Button( + left_frame, + text="昨日汇总", + command=self.select_yesterday, + font=("", self.default_font_size), + padx=10, + pady=5, + bg="#4CAF50", + fg="white", + ) + self.yesterday_btn.pack(fill="x", pady=(0, 10)) + + # 日期选择器 + tk.Label(left_frame, text="选择日期:", font=("", self.default_font_size)).pack( + anchor="w", pady=(10, 5) + ) + + # 日期输入框 + self.date_var = tk.StringVar() + self.date_entry = tk.Entry( + left_frame, textvariable=self.date_var, font=("", self.default_font_size) + ) + self.date_entry.pack(fill="x", pady=(0, 5)) + + # 日期提示 + tk.Label( + left_frame, + text="格式:YYYY-MM-DD", + fg="gray", + font=("", self.default_font_size - 2), + ).pack(anchor="w") + + # 生成日报按钮 + self.generate_btn = tk.Button( + left_frame, + text="生成日报", + command=self.generate_report_async, + font=("", self.default_font_size), + padx=10, + pady=5, + bg="#2196F3", + fg="white", + ) + self.generate_btn.pack(fill="x", pady=(20, 0)) + + # 当前选中日期标签 + self.selected_date_label = tk.Label( + left_frame, + text="当前选中:无", + fg="#2196F3", + font=("", self.default_font_size, "bold"), + ) + self.selected_date_label.pack(anchor="w", pady=(20, 0)) + + def create_center_panel(self, parent): + """创建中间日报展示区域""" + center_frame = tk.LabelFrame( + parent, + text="日报内容", + padx=10, + pady=10, + font=("", self.title_font_size, "bold"), + ) + center_frame.grid(row=1, column=1, sticky="nsew", padx=(0, 10)) + + # 日报信息文本框 + self.report_text = scrolledtext.ScrolledText( + center_frame, + wrap=tk.WORD, + padx=10, + pady=10, + height=30, + font=("", self.default_font_size), + ) + self.report_text.pack(fill="both", expand=True) + + # 默认提示文本 + self.report_text.insert( + tk.END, '请点击左侧"昨日汇总"按钮,或选择日期后点击"生成日报"按钮...' + ) + self.report_text.config(state=tk.DISABLED) + + def create_right_panel(self, parent): + """创建右侧控制面板""" + right_frame = tk.LabelFrame( + parent, + text="操作", + padx=10, + pady=10, + font=("", self.title_font_size, "bold"), + ) + right_frame.grid(row=1, column=2, sticky="nsew") + + # 复制按钮 + self.copy_btn = tk.Button( + right_frame, + text="复制日报", + command=self.copy_report, + font=("", self.default_font_size), + padx=10, + pady=5, + bg="#4CAF50", + fg="white", + ) + self.copy_btn.pack(fill="x", pady=(0, 20)) + + # 复制状态标签 + self.copy_status = tk.Label( + right_frame, text="", font=("", self.default_font_size) + ) + self.copy_status.pack(fill="x") + + # 分隔线 + tk.Frame(right_frame, height=2, bg="gray").pack(fill="x", pady=20) + + # 清空按钮 + self.clear_btn = tk.Button( + right_frame, + text="清空", + command=self.clear_report, + font=("", self.default_font_size), + padx=10, + pady=5, + ) + self.clear_btn.pack(fill="x", pady=(0, 10)) + + # 退出按钮 + self.exit_btn = tk.Button( + right_frame, + text="退出", + command=self.root.quit, + font=("", self.default_font_size), + padx=10, + pady=5, + ) + self.exit_btn.pack(fill="x") + + def select_yesterday(self): + """选择昨天""" + yesterday = datetime.now() - timedelta(days=1) + self.date_var.set(yesterday.strftime("%Y-%m-%d")) + self.selected_date_label.config( + text=f"当前选中:昨天 ({yesterday.strftime('%Y-%m-%d')})" + ) + self.generate_report_async() + + def generate_report_async(self): + """异步生成日报(避免界面卡死)""" + if self.is_generating: + return + + self.is_generating = True + self.generate_btn.config(state=tk.DISABLED, text="生成中...") + + # 在后台线程生成日报 + thread = threading.Thread(target=self._generate_report_thread) + thread.daemon = True + thread.start() + + def _generate_report_thread(self): + """在后台线程中生成日报""" + try: + date_str = self.date_var.get().strip() + + if not date_str: + self.root.after(0, lambda: self._on_generate_error("请先选择日期")) + return + + try: + report_date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + self.root.after( + 0, + lambda: self._on_generate_error( + "日期格式错误!请使用 YYYY-MM-DD 格式" + ), + ) + return + + # 更新UI + self.root.after( + 0, + lambda: self.selected_date_label.config( + text=f"当前选中:{report_date.strftime('%Y-%m-%d')}" + ), + ) + + # 生成日报 + try: + from report_generator import DailyReportGenerator + + generator = DailyReportGenerator() + report = generator.generate_daily_report(report_date) + self.root.after(0, lambda: self._on_generate_success(report)) + except Exception as e: + error_msg = f"生成日报时出错:{str(e)}\n\n请确保:\n1. 环境变量配置正确\n2. Metabase 和飞书服务可访问\n3. 在8:00过后运行程序" + self.root.after(0, lambda: self._on_generate_error(error_msg)) + + except Exception as e: + self.root.after(0, lambda: self._on_generate_error(str(e))) + + def _on_generate_success(self, report): + """生成成功回调""" + self.report_content = report + self.report_text.config(state=tk.NORMAL) + self.report_text.delete(1.0, tk.END) + self.report_text.insert(tk.END, report) + self.report_text.config(state=tk.DISABLED) + self._reset_generate_button() + + def _on_generate_error(self, error_msg): + """生成失败回调""" + self.report_content = error_msg + self.report_text.config(state=tk.NORMAL) + self.report_text.delete(1.0, tk.END) + self.report_text.insert(tk.END, error_msg) + self.report_text.config(state=tk.DISABLED) + self._reset_generate_button() + messagebox.showerror("错误", error_msg) + + def _reset_generate_button(self): + """重置生成按钮状态""" + self.is_generating = False + self.generate_btn.config(state=tk.NORMAL, text="生成日报") + + def copy_report(self): + """复制日报到剪贴板""" + if ( + not self.report_content + or self.report_content + == '请点击左侧"昨日汇总"按钮,或选择日期后点击"生成日报"按钮...' + ): + messagebox.showwarning("提示", "请先生成日报!") + return + + try: + # 复制到剪贴板 + self.root.clipboard_clear() + self.root.clipboard_append(self.report_content) + + # 显示成功提示 + self.copy_status.config(text="已复制到剪贴板!", fg="#4CAF50") + self.root.after(3000, lambda: self.copy_status.config(text="")) + + except Exception as e: + messagebox.showerror("错误", f"复制失败:{str(e)}") + + def clear_report(self): + """清空日报内容""" + self.report_content = "" + self.report_text.config(state=tk.NORMAL) + self.report_text.delete(1.0, tk.END) + self.report_text.insert( + tk.END, '请点击左侧"昨日汇总"按钮,或选择日期后点击"生成日报"按钮...' + ) + self.report_text.config(state=tk.DISABLED) + self.date_var.set("") + self.selected_date_label.config(text="当前选中:无") + self.copy_status.config(text="") + + +def main(): + """主函数""" + # 创建主窗口 + root = tk.Tk() + + # 创建应用 + app = DailyReportApp(root) + + # 运行主循环 + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/feishu/AGENTS.md b/feishu/AGENTS.md new file mode 100644 index 0000000..e8b9325 --- /dev/null +++ b/feishu/AGENTS.md @@ -0,0 +1,58 @@ +# FEISHU 飞书模块 + +## 概述 + +飞书表格 API 集成,用于获取排班信息。支持自动 Token 刷新。 + +## 结构 + +``` +feishu/ +├── __init__.py # 统一导出 +├── client.py # FeishuSheetsClient - HTTP客户端 +├── manager.py # FeishuScheduleManager - 业务入口 +└── parser.py # ScheduleDataParser - 表格解析 +``` + +## 查找指南 + +| 任务 | 位置 | +|------|------| +| 修改API调用 | `client.py` | +| 获取排班信息 | `manager.py:get_schedule_for_date()` | +| 解析新表格格式 | `parser.py` | + +## 使用方式 + +```python +from feishu import FeishuScheduleManager + +manager = FeishuScheduleManager() +schedule = manager.get_schedule_for_date("2026-03-01") +# {'day_shift': '张三', 'night_shift': '李四', ...} +``` + +## 关键逻辑 + +### Token 管理 (client.py) +- 自动获取 `tenant_access_token` +- 提前30分钟刷新 +- 备用: 环境变量 `FEISHU_TOKEN` + +### 表格选择 (manager.py:93-148) +- 2026年优先使用年度表格 +- 其他年份优先月度表格 +- 降级使用第一个表格 + +### 表格解析 (parser.py) +- 月度表格: 第一行为表头,查找日期列 +- 年度表格: 查找月份块,再查日期列 + +## 环境变量 + +``` +FEISHU_BASE_URL=https://open.feishu.cn/open-apis/sheets/v3 +FEISHU_APP_ID=cli_xxx +FEISHU_APP_SECRET=xxx +FEISHU_SPREADSHEET_TOKEN=xxx +``` \ No newline at end of file diff --git a/feishu/__init__.py b/feishu/__init__.py new file mode 100644 index 0000000..b9bb29b --- /dev/null +++ b/feishu/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +""" +飞书模块包 +提供统一的飞书API接口 +""" + +from .client import FeishuSheetsClient, FeishuClientError +from .parser import ScheduleDataParser +from .manager import FeishuScheduleManager + +__all__ = [ + "FeishuSheetsClient", + "FeishuClientError", + "ScheduleDataParser", + "FeishuScheduleManager", +] diff --git a/feishu/client.py b/feishu/client.py new file mode 100644 index 0000000..9d79250 --- /dev/null +++ b/feishu/client.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 +""" +飞书表格 API 客户端模块 +统一版本,支持月度表格和年度表格 +支持自动获取和刷新 tenant_access_token +""" + +import os +import requests +import time +from typing import Dict, List, Optional, Tuple +import logging + +# 加载环境变量 +from dotenv import load_dotenv + +load_dotenv() + +# 配置日志 +logger = logging.getLogger(__name__) + + +class FeishuClientError(Exception): + """飞书客户端异常基类""" + + pass + + +class FeishuSheetsClient: + """飞书表格 API 客户端""" + + def __init__( + self, + base_url: Optional[str] = None, + token: Optional[str] = None, + spreadsheet_token: Optional[str] = None, + app_id: Optional[str] = None, + app_secret: Optional[str] = None, + ): + """ + 初始化客户端 + + 参数: + base_url: 飞书 API 基础URL,如果为None则使用配置 + token: Bearer 认证令牌,如果为None则使用配置或自动获取 + spreadsheet_token: 表格 token,如果为None则使用配置 + app_id: 飞书应用ID,用于获取tenant_access_token + app_secret: 飞书应用密钥,用于获取tenant_access_token + """ + self.base_url = (base_url or os.getenv("FEISHU_BASE_URL")).rstrip("/") + self.spreadsheet_token = spreadsheet_token or os.getenv( + "FEISHU_SPREADSHEET_TOKEN" + ) + self.app_id = app_id or os.getenv("FEISHU_APP_ID") + self.app_secret = app_secret or os.getenv("FEISHU_APP_SECRET") + + # Token管理相关属性 + self._token = token or os.getenv("FEISHU_TOKEN") + self._token_expire_time = 0 # token过期时间戳 + self._token_obtained_time = 0 # token获取时间戳 + + # 使用 Session 重用连接 + self.session = requests.Session() + self.session.timeout = int(os.getenv("REQUEST_TIMEOUT", 30)) + + # 初始化headers + self._update_session_headers() + + logger.debug(f"飞书客户端初始化完成,基础URL: {self.base_url}") + logger.debug( + f"使用应用ID: {self.app_id[:8]}... 如果配置" + if self.app_id + else "未配置应用ID" + ) + + def _update_session_headers(self): + """更新session的headers""" + if self._token: + self.session.headers.update( + { + "Authorization": f"Bearer {self._token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + ) + else: + # 如果没有token,移除Authorization头 + if "Authorization" in self.session.headers: + del self.session.headers["Authorization"] + + def _get_tenant_access_token(self) -> Tuple[str, int]: + """ + 获取tenant_access_token + + 返回: + (token, expire_time): token字符串和过期时间(秒) + + 异常: + requests.exceptions.RequestException: 网络请求失败 + ValueError: API返回错误 + """ + if not self.app_id or not self.app_secret: + raise ValueError("未配置飞书应用ID和密钥,无法获取tenant_access_token") + + token_url = ( + "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" + ) + + payload = {"app_id": self.app_id, "app_secret": self.app_secret} + + headers = {"Content-Type": "application/json; charset=utf-8"} + + try: + logger.info(f"正在获取tenant_access_token,应用ID: {self.app_id[:8]}...") + response = requests.post( + token_url, json=payload, headers=headers, timeout=int(os.getenv("REQUEST_TIMEOUT", 30)) + ) + response.raise_for_status() + data = response.json() + + if data.get("code") != 0: + error_msg = f"获取tenant_access_token失败: {data.get('msg')}" + logger.error(error_msg) + raise ValueError(error_msg) + + token = data.get("tenant_access_token") + expire = data.get("expire", 7200) # 默认2小时 + + if not token: + raise ValueError("API返回的token为空") + + logger.info(f"成功获取tenant_access_token,有效期: {expire}秒") + return token, expire + + except requests.exceptions.RequestException as e: + logger.error(f"获取tenant_access_token网络请求失败: {e}") + raise + except Exception as e: + logger.error(f"获取tenant_access_token失败: {e}") + raise + + def _ensure_valid_token(self): + """ + 确保当前token有效,如果无效则重新获取 + + 返回: + bool: token是否有效 + """ + current_time = time.time() + + # 如果有手动配置的token,直接使用 + if os.getenv("FEISHU_TOKEN") and self._token == os.getenv("FEISHU_TOKEN"): + logger.debug("使用手动配置的FEISHU_TOKEN") + return True + + # 检查token是否过期(提前30分钟刷新) + if self._token and self._token_expire_time > 0: + time_remaining = self._token_expire_time - current_time + if time_remaining > 1800: # 剩余时间大于30分钟 + logger.debug(f"token仍然有效,剩余时间: {int(time_remaining)}秒") + return True + elif time_remaining > 0: # 剩余时间小于30分钟但大于0 + logger.info( + f"token即将过期,剩余时间: {int(time_remaining)}秒,重新获取" + ) + else: # 已过期 + logger.info("token已过期,重新获取") + + # 需要获取新token + try: + token, expire = self._get_tenant_access_token() + self._token = token + self._token_obtained_time = current_time + self._token_expire_time = current_time + expire + self._update_session_headers() + logger.info(f"token获取成功,将在 {expire} 秒后过期") + return True + except Exception as e: + logger.error(f"获取token失败: {e}") + # 如果配置了备用token,尝试使用 + if os.getenv("FEISHU_TOKEN") and os.getenv("FEISHU_TOKEN") != self._token: + logger.warning("使用备用FEISHU_TOKEN") + self._token = os.getenv("FEISHU_TOKEN") + self._update_session_headers() + return True + return False + + def get_sheets_info(self) -> List[Dict[str, str]]: + """ + 获取所有表格信息(sheet_id 和 title) + + 返回: + 表格信息列表 [{'sheet_id': 'xxx', 'title': 'xxx'}, ...] + + 异常: + requests.exceptions.RequestException: 网络请求失败 + ValueError: API返回错误 + """ + # 确保token有效 + if not self._ensure_valid_token(): + raise FeishuClientError("无法获取有效的飞书token") + + url = f"{self.base_url}/spreadsheets/{self.spreadsheet_token}/sheets/query" + + try: + response = self.session.get(url, timeout=int(os.getenv("REQUEST_TIMEOUT", 30))) + response.raise_for_status() + data = response.json() + + if data.get("code") != 0: + error_msg = f"飞书API错误: {data.get('msg')}" + logger.error(error_msg) + raise ValueError(error_msg) + + sheets = data.get("data", {}).get("sheets", []) + result = [] + for sheet in sheets: + result.append( + {"sheet_id": sheet.get("sheet_id"), "title": sheet.get("title")} + ) + + logger.info(f"获取到 {len(result)} 个表格") + return result + + except requests.exceptions.RequestException as e: + logger.error(f"获取表格信息失败: {e}") + raise + except Exception as e: + logger.error(f"解析表格信息失败: {e}") + raise + + def get_sheet_data(self, sheet_id: str, range_: Optional[str] = None) -> Dict: + """ + 获取指定表格的数据 + + 参数: + sheet_id: 表格ID + range_: 数据范围,如果为None则使用配置 + + 返回: + 飞书API返回的原始数据,包含revision版本号 + + 异常: + requests.exceptions.RequestException: 网络请求失败 + ValueError: API返回错误 + """ + # 确保token有效 + if not self._ensure_valid_token(): + raise FeishuClientError("无法获取有效的飞书token") + + if range_ is None: + range_ = os.getenv("SHEET_RANGE", "A1:Z100") + + # 注意:获取表格数据使用 v2 API,而不是 v3 + url = f"{self.base_url.replace('/v3', '/v2')}/spreadsheets/{self.spreadsheet_token}/values/{sheet_id}!{range_}" + params = { + "valueRenderOption": "ToString", + "dateTimeRenderOption": "FormattedString", + } + + try: + response = self.session.get( + url, params=params, timeout=int(os.getenv("REQUEST_TIMEOUT", 30)) + ) + response.raise_for_status() + data = response.json() + + if data.get("code") != 0: + error_msg = f"飞书API错误: {data.get('msg')}" + logger.error(error_msg) + raise ValueError(error_msg) + + logger.debug(f"获取表格数据成功: {sheet_id}, 范围: {range_}") + return data.get("data", {}) + + except requests.exceptions.RequestException as e: + logger.error(f"获取表格数据失败: {e}, sheet_id: {sheet_id}") + raise + except Exception as e: + logger.error(f"解析表格数据失败: {e}, sheet_id: {sheet_id}") + raise + + def get_token_info(self) -> Dict[str, any]: + """ + 获取当前token信息 + + 返回: + token信息字典 + """ + current_time = time.time() + time_remaining = ( + self._token_expire_time - current_time if self._token_expire_time > 0 else 0 + ) + + return { + "has_token": bool(self._token), + "token_preview": self._token[:20] + "..." + if self._token and len(self._token) > 20 + else self._token, + "token_obtained_time": self._token_obtained_time, + "token_expire_time": self._token_expire_time, + "time_remaining": max(0, time_remaining), + "using_app_credentials": bool(self.app_id and self.app_secret), + "using_manual_token": bool( + os.getenv("FEISHU_TOKEN") and self._token == os.getenv("FEISHU_TOKEN") + ), + } + + def test_connection(self) -> bool: + """ + 测试飞书连接是否正常 + + 返回: + 连接是否正常 + """ + try: + # 首先测试token获取 + if not self._ensure_valid_token(): + logger.error("无法获取有效的飞书token") + return False + + # 然后测试表格访问 + sheets = self.get_sheets_info() + if sheets: + logger.info(f"飞书连接测试成功,找到 {len(sheets)} 个表格") + return True + else: + logger.warning("飞书连接测试成功,但未找到表格") + return False + except Exception as e: + logger.error(f"飞书连接测试失败: {e}") + return False + + def refresh_token(self) -> bool: + """ + 强制刷新token + + 返回: + 刷新是否成功 + """ + try: + logger.info("强制刷新token...") + current_time = time.time() + token, expire = self._get_tenant_access_token() + self._token = token + self._token_obtained_time = current_time + self._token_expire_time = current_time + expire + self._update_session_headers() + logger.info(f"token刷新成功,将在 {expire} 秒后过期") + return True + except Exception as e: + logger.error(f"强制刷新token失败: {e}") + return False + + +if __name__ == "__main__": + # 测试代码 + import sys + + # 设置日志级别 + logging.basicConfig(level=logging.INFO) + + # 测试连接 + client = FeishuSheetsClient() + + # 显示token信息 + token_info = client.get_token_info() + print("当前token信息:") + print(f" 是否有token: {token_info['has_token']}") + print(f" token预览: {token_info['token_preview']}") + print(f" 剩余时间: {int(token_info['time_remaining'])}秒") + print(f" 使用应用凭证: {token_info['using_app_credentials']}") + print(f" 使用手动token: {token_info['using_manual_token']}") + + if client.test_connection(): + print("\n飞书连接测试成功") + + # 获取表格信息 + sheets = client.get_sheets_info() + for sheet in sheets[:3]: # 只显示前3个 + print(f"表格: {sheet['title']} (ID: {sheet['sheet_id']})") + + if sheets: + # 获取第一个表格的数据 + sheet_id = sheets[0]["sheet_id"] + data = client.get_sheet_data(sheet_id, "A1:C5") + print(f"获取到表格数据,版本: {data.get('revision', '未知')}") + + # 再次显示token信息 + token_info = client.get_token_info() + print(f"\n测试后token剩余时间: {int(token_info['time_remaining'])}秒") + else: + print("\n飞书连接测试失败") + sys.exit(1) diff --git a/feishu/manager.py b/feishu/manager.py new file mode 100644 index 0000000..81307e1 --- /dev/null +++ b/feishu/manager.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 +""" +飞书排班管理器模块 +统一入口,使用数据库存储和缓存 +""" + +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple +import logging +import os +import sys + +# 添加项目根目录到路径 +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +# 简单实现,不依赖src模块 +config = None +get_logger = logging.getLogger + + +class ScheduleDatabase: + """简单的排班数据库占位实现""" + + def __init__(self, db_path=None): + self.db_path = db_path + + def save_schedule(self, date_str, result, sheet_id, sheet_title): + pass + + def get_schedule_by_range(self, start_date, end_date): + return [] + + def get_stats(self): + return {} + + +from .client import FeishuSheetsClient +from .parser import ScheduleDataParser + +logger = get_logger(__name__) + + +class FeishuScheduleManager: + """飞书排班管理器(统一入口)""" + + def __init__( + self, + base_url: Optional[str] = None, + token: Optional[str] = None, + spreadsheet_token: Optional[str] = None, + db_path: Optional[str] = None, + ): + """ + 初始化管理器 + + 参数: + base_url: 飞书API基础URL,如果为None则使用配置 + token: 飞书API令牌,如果为None则使用配置 + spreadsheet_token: 表格token,如果为None则使用配置 + db_path: 数据库路径,如果为None则使用配置 + """ + # 检查配置是否完整 + self._check_config(token, spreadsheet_token) + + # 初始化组件 + self.client = FeishuSheetsClient(base_url, token, spreadsheet_token) + self.parser = ScheduleDataParser() + self.db = ScheduleDatabase(db_path) + + logger.info("飞书排班管理器初始化完成") + + def _check_config( + self, token: Optional[str], spreadsheet_token: Optional[str] + ) -> None: + """检查必要配置""" + # 检查是否有任何可用的认证方式 + has_token = bool(token or os.getenv("FEISHU_TOKEN")) + has_app_credentials = bool(os.getenv("FEISHU_APP_ID") and os.getenv("FEISHU_APP_SECRET")) + + if not has_token and not has_app_credentials: + logger.warning("飞书认证未配置,排班功能将不可用") + logger.warning("请配置 FEISHU_TOKEN 或 FEISHU_APP_ID + FEISHU_APP_SECRET") + elif has_app_credentials: + logger.info("使用飞书应用凭证自动获取token") + elif has_token: + logger.info("使用手动配置的FEISHU_TOKEN") + + if not spreadsheet_token and not os.getenv("FEISHU_SPREADSHEET_TOKEN"): + logger.warning("飞书表格令牌未配置,排班功能将不可用") + + def _select_sheet_for_date( + self, sheets: List[Dict[str, str]], target_year_month: str + ) -> Optional[Dict[str, str]]: + """ + 为指定日期选择最合适的表格 + + 参数: + sheets: 表格列表 + target_year_month: 目标年月,格式 "2025-12" + + 返回: + 选中的表格信息,未找到返回None + """ + if not sheets: + logger.error("表格列表为空") + return None + + # 提取年份和月份 + try: + year = target_year_month[:4] + month = target_year_month[5:7].lstrip("0") + except (IndexError, ValueError) as e: + logger.error(f"解析年月失败: {target_year_month}, 错误: {e}") + return None + + # 对于2026年,优先使用年度表格 + if year == "2026": + # 查找年度表格,如 "2026年排班表" + year_name = f"{year}年" + for sheet in sheets: + title = sheet.get("title", "") + if year_name in title and "排班表" in title: + logger.info(f"找到2026年年度表格: {title}") + return sheet + + # 优先查找月份表格,如 "12月" + month_name = f"{int(month)}月" + for sheet in sheets: + title = sheet.get("title", "") + if month_name in title: + logger.info(f"找到月份表格: {title}") + return sheet + + # 查找年度表格,如 "2026年排班表" + year_name = f"{year}年" + for sheet in sheets: + title = sheet.get("title", "") + if year_name in title and "排班表" in title: + logger.info(f"找到年度表格: {title}") + return sheet + + # 如果没有找到匹配的表格,使用第一个表格 + logger.warning( + f"未找到 {target_year_month} 的匹配表格,使用第一个表格: {sheets[0]['title']}" + ) + return sheets[0] + + def get_schedule_for_date(self, date_str: str) -> Dict[str, any]: + """ + 获取指定日期的排班信息 + + 修复:每次都从飞书获取最新数据并覆盖数据库,确保日报中显示最新排班信息 + + 参数: + date_str: 日期字符串,格式 "2025-12-30" + + 返回: + 排班信息字典 + + 异常: + ValueError: 日期格式无效 + Exception: 其他错误 + """ + try: + # 解析日期 + dt = datetime.strptime(date_str, "%Y-%m-%d") + + # 生成两种格式的日期字符串,用于匹配不同表格 + target_date_mm_dd = dt.strftime("%m/%d") # "01/01" 用于月度表格 + target_date_chinese = f"{dt.month}月{dt.day}日" # "1月1日" 用于年度表格 + target_year_month = dt.strftime("%Y-%m") # "2025-12" + + logger.info( + f"获取 {date_str} 的排班信息 (格式: {target_date_mm_dd}/{target_date_chinese})" + ) + + # 1. 获取表格信息 + sheets = self.client.get_sheets_info() + if not sheets: + logger.error("未获取到表格信息") + return self._empty_result() + + # 2. 选择最合适的表格 + selected_sheet = self._select_sheet_for_date(sheets, target_year_month) + if not selected_sheet: + logger.error("未找到合适的表格") + return self._empty_result() + + sheet_id = selected_sheet["sheet_id"] + sheet_title = selected_sheet["title"] + + # 3. 获取表格数据 + sheet_data = self.client.get_sheet_data(sheet_id) + if not sheet_data: + logger.error("未获取到表格数据") + return self._empty_result() + + values = sheet_data.get("valueRange", {}).get("values", []) + + if not values: + logger.error("表格数据为空") + return self._empty_result() + + # 4. 解析数据 - 根据表格类型选择合适的日期格式 + # 如果是年度表格,使用中文日期格式;否则使用mm/dd格式 + if "年" in sheet_title and "排班表" in sheet_title: + target_date = target_date_chinese # "1月1日" + else: + target_date = target_date_mm_dd # "01/01" + + logger.info(f"使用日期格式: {target_date} 解析表格: {sheet_title}") + result = self.parser.parse(values, target_date, sheet_title) + + # 5. 每次都保存到数据库,覆盖旧数据,确保人员变动能及时更新 + if result["day_shift"] or result["night_shift"]: + self.db.save_schedule(date_str, result, sheet_id, sheet_title) + logger.info( + f"已更新 {date_str} 的排班信息到数据库: 白班={result['day_shift']}, 夜班={result['night_shift']}" + ) + else: + logger.warning(f"解析结果为空,{date_str} 未保存到数据库") + + return result + + except ValueError as e: + logger.error(f"日期格式无效: {date_str}, 错误: {e}") + raise + except Exception as e: + logger.error(f"获取排班信息失败: {e}") + # 降级处理:返回空值 + return self._empty_result() + + def get_schedule_for_today(self) -> Dict[str, any]: + """获取今天的排班信息""" + today = datetime.now().strftime("%Y-%m-%d") + return self.get_schedule_for_date(today) + + def get_schedule_for_tomorrow(self) -> Dict[str, any]: + """获取明天的排班信息""" + tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") + return self.get_schedule_for_date(tomorrow) + + def refresh_all_schedules(self, days: Optional[int] = None): + """ + 刷新未来指定天数的排班信息 + + 参数: + days: 刷新未来多少天的排班信息,如果为None则使用配置 + """ + if days is None: + days = int(os.getenv("SCHEDULE_REFRESH_DAYS", 7)) + + logger.info(f"开始刷新未来 {days} 天的排班信息") + + today = datetime.now() + success_count = 0 + error_count = 0 + + for i in range(days): + date = (today + timedelta(days=i)).strftime("%Y-%m-%d") + try: + logger.debug(f"刷新 {date} 的排班信息...") + self.get_schedule_for_date(date) + success_count += 1 + except Exception as e: + logger.error(f"刷新 {date} 的排班信息失败: {e}") + error_count += 1 + + logger.info(f"排班信息刷新完成,成功: {success_count}, 失败: {error_count}") + + def get_schedule_by_range( + self, start_date: str, end_date: str + ) -> List[Dict[str, any]]: + """ + 获取日期范围内的排班信息 + + 参数: + start_date: 开始日期 (YYYY-MM-DD) + end_date: 结束日期 (YYYY-MM-DD) + + 返回: + 排班信息列表 + """ + try: + # 验证日期格式 + datetime.strptime(start_date, "%Y-%m-%d") + datetime.strptime(end_date, "%Y-%m-%d") + + return self.db.get_schedule_by_range(start_date, end_date) + + except ValueError as e: + logger.error(f"日期格式无效: {e}") + return [] + except Exception as e: + logger.error(f"获取排班范围失败: {e}") + return [] + + def test_connection(self) -> bool: + """测试飞书连接是否正常""" + return self.client.test_connection() + + def get_stats(self) -> Dict[str, any]: + """获取排班数据库统计信息""" + return self.db.get_stats() + + def _empty_result(self) -> Dict[str, any]: + """返回空结果""" + return { + "day_shift": "", + "night_shift": "", + "day_shift_list": [], + "night_shift_list": [], + } + + def _format_db_result(self, db_result: Dict[str, any]) -> Dict[str, any]: + """格式化数据库结果""" + return { + "day_shift": db_result["day_shift"], + "night_shift": db_result["night_shift"], + "day_shift_list": db_result["day_shift_list"], + "night_shift_list": db_result["night_shift_list"], + } + + +if __name__ == "__main__": + # 测试代码 + import sys + + # 设置日志 + logging.basicConfig(level=logging.INFO) + + # 初始化管理器 + manager = FeishuScheduleManager() + + # 测试连接 + if not manager.test_connection(): + print("飞书连接测试失败") + sys.exit(1) + + print("飞书连接测试成功") + + # 测试获取今天和明天的排班 + today_schedule = manager.get_schedule_for_today() + print( + f"今天排班: 白班={today_schedule['day_shift']}, 夜班={today_schedule['night_shift']}" + ) + + tomorrow_schedule = manager.get_schedule_for_tomorrow() + print( + f"明天排班: 白班={tomorrow_schedule['day_shift']}, 夜班={tomorrow_schedule['night_shift']}" + ) + + # 测试统计 + stats = manager.get_stats() + print(f"排班统计: {stats}") + + # 测试范围查询(最近7天) + end_date = datetime.now().strftime("%Y-%m-%d") + start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") + schedules = manager.get_schedule_by_range(start_date, end_date) + print(f"最近7天排班记录: {len(schedules)} 条") diff --git a/feishu/parser.py b/feishu/parser.py new file mode 100644 index 0000000..81e953b --- /dev/null +++ b/feishu/parser.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +排班数据解析器模块 +支持月度表格和年度表格解析 +""" + +import re +from typing import Dict, List, Optional, Tuple +import logging + +# 配置日志 +logger = logging.getLogger(__name__) + + +class ScheduleDataParser: + """排班数据解析器(支持月度表格和年度表格)""" + + @staticmethod + def _parse_chinese_date(date_str: str) -> Optional[str]: + """ + 解析中文日期格式 + + 参数: + date_str: 中文日期,如 "12月30日" 或 "12/30" 或 "12月1日" 或 "1月1日" + + 返回: + 标准化日期字符串 "M月D日" (不补零) + + 异常: + ValueError: 日期格式无效 + """ + if not date_str or not isinstance(date_str, str): + return None + + date_str = date_str.strip() + + try: + # 如果是 "12/30" 格式 + if "/" in date_str: + month, day = date_str.split("/") + # 移除可能的空格和前导零 + month = month.strip().lstrip("0") + day = day.strip().lstrip("0") + if not month.isdigit() or not day.isdigit(): + raise ValueError(f"日期格式无效: {date_str}") + return f"{int(month)}月{int(day)}日" + + # 如果是 "12月30日" 或 "1月1日" 格式 + if "月" in date_str and "日" in date_str: + # 移除前导零,如 "01月01日" -> "1月1日" + parts = date_str.split("月") + if len(parts) == 2: + month_part = parts[0].lstrip("0") + day_part = parts[1].rstrip("日").lstrip("0") + if not month_part or not day_part: + raise ValueError(f"日期格式无效: {date_str}") + return f"{month_part}月{day_part}日" + return date_str + + # 如果是 "12月1日" 格式(已经包含"日"字) + if "月" in date_str: + # 检查是否已经有"日"字 + if "日" not in date_str: + return f"{date_str}日" + return date_str + + # 如果是纯数字,尝试解析 + if date_str.isdigit() and len(date_str) == 4: + # 假设是 "1230" 格式 + month = date_str[:2].lstrip("0") + day = date_str[2:].lstrip("0") + return f"{month}月{day}日" + + return None + + except Exception as e: + logger.warning(f"解析日期失败: {date_str}, 错误: {e}") + return None + + @staticmethod + def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]: + """ + 在表头中查找目标日期对应的列索引 + + 参数: + headers: 表头行 ["姓名", "12月1日", "12月2日", ...] + target_date: 目标日期 "12月30日" + + 返回: + 列索引(从0开始),未找到返回None + """ + if not headers or not target_date: + return None + + # 标准化目标日期 + target_std = ScheduleDataParser._parse_chinese_date(target_date) + if not target_std: + logger.warning(f"无法标准化目标日期: {target_date}") + return None + + # 遍历表头查找匹配的日期 + for i, header in enumerate(headers): + if not header: + continue + + header_std = ScheduleDataParser._parse_chinese_date(header) + if header_std == target_std: + logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})") + return i + + logger.warning(f"未找到日期列: {target_date}, 表头: {headers}") + return None + + def parse_monthly_sheet( + self, values: List[List[str]], target_date: str + ) -> Dict[str, any]: + """ + 解析月度表格数据(如12月表格) + + 参数: + values: 飞书表格返回的二维数组 + target_date: 目标日期(格式: "12月30日" 或 "12/30") + + 返回: + 排班信息字典 + """ + if not values or len(values) < 2: + logger.warning("表格数据为空或不足") + return self._empty_result() + + # 第一行是表头 + headers = values[0] + date_column_index = self._find_date_column_index(headers, target_date) + + if date_column_index is None: + logger.warning(f"未找到日期列: {target_date}") + return self._empty_result() + + # 收集白班和夜班人员 + day_shift_names = [] + night_shift_names = [] + + # 从第二行开始是人员数据 + for row_idx, row in enumerate(values[1:], start=2): + if len(row) <= date_column_index: + continue + + name = row[0] if row else "" + shift = row[date_column_index] if date_column_index < len(row) else "" + + if not name or not shift: + continue + + # 清理班次值 + shift = shift.strip() + if shift == "白": + day_shift_names.append(name.strip()) + elif shift == "夜": + night_shift_names.append(name.strip()) + elif shift: # 其他班次类型 + logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})") + + return self._format_result(day_shift_names, night_shift_names) + + def parse_yearly_sheet( + self, values: List[List[str]], target_date: str + ) -> Dict[str, any]: + """ + 解析年度表格数据(如2026年排班表) + + 参数: + values: 飞书表格返回的二维数组 + target_date: 目标日期(格式: "12月30日" 或 "12/30") + + 返回: + 排班信息字典 + """ + if not values: + logger.warning("年度表格数据为空") + return self._empty_result() + + # 查找目标月份的数据块 + target_month = target_date.split("月")[0] if "月" in target_date else "" + if not target_month: + logger.warning(f"无法从 {target_date} 提取月份") + return self._empty_result() + + # 在年度表格中查找对应的月份块 + current_block_start = -1 + current_month = "" + + for i, row in enumerate(values): + if not row: + continue + + first_cell = str(row[0]) if row else "" + + # 检查是否是月份标题行,如 "福州港1月排班表" + if "排班表" in first_cell and "月" in first_cell: + # 提取月份数字 + month_match = re.search(r"(\d+)月", first_cell) + if month_match: + current_month = month_match.group(1).lstrip("0") + current_block_start = i + logger.debug(f"找到月份块: {current_month}月 (行: {i + 1})") + + # 如果找到目标月份,检查下一行是否是表头行 + if current_month == target_month and i == current_block_start + 1: + # 当前行是表头行 + headers = row + date_column_index = self._find_date_column_index(headers, target_date) + + if date_column_index is None: + logger.warning(f"在年度表格中未找到日期列: {target_date}") + return self._empty_result() + + # 收集人员数据(从表头行的下一行开始) + day_shift_names = [] + night_shift_names = [] + + for j in range(i + 1, len(values)): + person_row = values[j] + if not person_row: + # 遇到空行,继续检查下一行 + continue + + # 检查是否是下一个月份块的开始 + if ( + person_row[0] + and isinstance(person_row[0], str) + and "排班表" in person_row[0] + and "月" in person_row[0] + ): + break + + # 跳过星期行(第一列为空的行) + if not person_row[0]: + continue + + if len(person_row) <= date_column_index: + continue + + name = person_row[0] if person_row else "" + shift = ( + person_row[date_column_index] + if date_column_index < len(person_row) + else "" + ) + + if not name or not shift: + continue + + # 清理班次值 + shift = shift.strip() + if shift == "白": + day_shift_names.append(name.strip()) + elif shift == "夜": + night_shift_names.append(name.strip()) + + return self._format_result(day_shift_names, night_shift_names) + + logger.warning(f"在年度表格中未找到 {target_month}月 的数据块") + return self._empty_result() + + def parse( + self, values: List[List[str]], target_date: str, sheet_title: str = "" + ) -> Dict[str, any]: + """ + 解析排班数据,自动判断表格类型 + + 参数: + values: 飞书表格返回的二维数组 + target_date: 目标日期(格式: "12月30日" 或 "12/30") + sheet_title: 表格标题,用于判断表格类型 + + 返回: + 排班信息字典 + """ + # 根据表格标题判断表格类型 + if "年" in sheet_title and "排班表" in sheet_title: + # 年度表格 + logger.info(f"使用年度表格解析器: {sheet_title}") + return self.parse_yearly_sheet(values, target_date) + else: + # 月度表格 + logger.info(f"使用月度表格解析器: {sheet_title}") + return self.parse_monthly_sheet(values, target_date) + + def _empty_result(self) -> Dict[str, any]: + """返回空结果""" + return { + "day_shift": "", + "night_shift": "", + "day_shift_list": [], + "night_shift_list": [], + } + + def _format_result( + self, day_shift_names: List[str], night_shift_names: List[str] + ) -> Dict[str, any]: + """格式化结果""" + # 去重并排序 + day_shift_names = sorted(set(day_shift_names)) + night_shift_names = sorted(set(night_shift_names)) + + # 格式化输出 + day_shift_str = "、".join(day_shift_names) if day_shift_names else "" + night_shift_str = "、".join(night_shift_names) if night_shift_names else "" + + return { + "day_shift": day_shift_str, + "night_shift": night_shift_str, + "day_shift_list": day_shift_names, + "night_shift_list": night_shift_names, + } + + +if __name__ == "__main__": + # 测试代码 + import sys + + # 设置日志 + logging.basicConfig(level=logging.DEBUG) + + parser = ScheduleDataParser() + + # 测试日期解析 + test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"] + for date in test_dates: + parsed = parser._parse_chinese_date(date) + print(f"解析 '{date}' -> '{parsed}'") + + # 测试月度表格解析 + monthly_values = [ + ["姓名", "12月1日", "12月2日", "12月3日"], + ["张三", "白", "夜", ""], + ["李四", "夜", "白", "白"], + ["王五", "", "白", "夜"], + ] + + result = parser.parse_monthly_sheet(monthly_values, "12月2日") + print(f"\n月度表格解析结果: {result}") + + # 测试年度表格解析 + yearly_values = [ + ["福州港2026年排班表"], + ["姓名", "1月1日", "1月2日", "1月3日"], + ["张三", "白", "夜", ""], + ["李四", "夜", "白", "白"], + ["福州港2月排班表"], + ["姓名", "2月1日", "2月2日"], + ["王五", "白", "夜"], + ] + + result = parser.parse_yearly_sheet(yearly_values, "1月2日") + print(f"年度表格解析结果: {result}") diff --git a/metabase/AGENTS.md b/metabase/AGENTS.md new file mode 100644 index 0000000..f99dbf4 --- /dev/null +++ b/metabase/AGENTS.md @@ -0,0 +1,68 @@ +# METABASE 数据查询模块 + +## 概述 + +Metabase REST API 客户端,查询船舶作业数据。 + +## 结构 + +``` +metabase/ +├── __init__.py # 统一导出 +├── time_operations.py # TimeOperationsClient - 时间范围查询 +└── vessel_operations.py # VesselOperationsClient - 船舶查询 +``` + +## 查找指南 + +| 任务 | 位置 | +|------|------| +| 按时间查作业量 | `time_operations.py:get_operations_by_time()` | +| 按船舶查数据 | `vessel_operations.py:get_vessel_operations()` | +| 添加新Card查询 | 对应文件的 `_CARD_IDS` | + +## 使用方式 + +```python +from metabase import TimeOperationsClient, VesselOperationsClient + +# 时间范围查询 +time_client = TimeOperationsClient() +data = time_client.get_operations_by_time("2026-03-01", "2026-03-02") +# {'cnt20': 100, 'cnt40': 50, 'teu': 200, ...} + +# 船舶查询 +vessel_client = VesselOperationsClient() +vessel = vessel_client.get_vessel_operations("260301-船名") +# {'cnt20': 44, 'teu': 88, 'start_time': '...', ...} +``` + +## Card ID 映射 + +### time_operations.py +| 名称 | ID | 用途 | +|------|-----|------| +| overview | 50 | 总览-箱量统计 | +| load | 51 | 装船 | +| discharge | 52 | 卸船 | +| yardmove | 53 | 转堆 | + +### vessel_operations.py +| 名称 | ID | 用途 | +|------|-----|------| +| overview | 57 | 船舶箱量统计 | +| efficiency_normal | 64 | 无人集卡效率 | +| work_info | 74 | 作业指令时间 | + +## 环境变量 + +``` +MATEBASE_USERNAME=xxx +MATEBASE_PASSWORD=xxx +``` + +## 注意事项 + +- 无官方 Python SDK,使用 REST API +- Session Token 无过期检测,每次请求前检查 +- 时间格式: `YYYY-MM-DD HH:MM:SS` \ No newline at end of file diff --git a/metabase/__init__.py b/metabase/__init__.py new file mode 100644 index 0000000..7f45662 --- /dev/null +++ b/metabase/__init__.py @@ -0,0 +1,31 @@ +""" +Metabase 查询模块 + +提供从 Metabase 查询时间范围数据和船舶数据的功能。 +""" + +from .time_operations import ( + TimeOperationsClient, + get_operations_by_time, + MetabaseAPIError, + MetabaseAuthError, + MetabaseQueryError, +) + +from .vessel_operations import ( + VesselOperationsClient, + get_vessel_by_visit_id, +) + +__all__ = [ + # time_operations + "TimeOperationsClient", + "get_operations_by_time", + # vessel_operations + "VesselOperationsClient", + "get_vessel_by_visit_id", + # exceptions + "MetabaseAPIError", + "MetabaseAuthError", + "MetabaseQueryError", +] diff --git a/metabase/time_operations.py b/metabase/time_operations.py new file mode 100644 index 0000000..4806e5e --- /dev/null +++ b/metabase/time_operations.py @@ -0,0 +1,438 @@ +""" +时间范围作业数据查询模块 + +用于从 Metabase 查询特定时间段内的作业情况数据。 +以时间范围为单位获取作业统计数据。 + +安装依赖: + pip install requests python-dotenv + +环境变量配置(推荐): + MATEBASE_USERNAME=your_username + MATEBASE_PASSWORD=your_password + +基本用法: + >>> from time_operations import TimeOperationsClient + >>> client = TimeOperationsClient() + >>> data = client.get_operations_by_time("2026-02-01", "2026-03-01") + >>> print(data['cnt20']) # 2359 +""" + +import requests +from typing import Optional, Dict, Any, List +import os +from dotenv import load_dotenv + +# 加载环境变量 +load_dotenv() + + +class MetabaseAPIError(Exception): + """Metabase API 调用异常""" + + pass + + +class MetabaseAuthError(MetabaseAPIError): + """Metabase 认证异常""" + + pass + + +class MetabaseQueryError(MetabaseAPIError): + """Metabase 查询异常""" + + pass + + +class TimeOperationsClient: + """ + 基于时间的作业数据查询客户端 + + 用于从 Metabase 查询特定时间段内的作业情况数据,包括: + - 箱量统计(20尺、40尺、总箱数、TEU) + - 装船、卸船、转堆数据 + - 无人集卡效率指标(cycle/h) + + Args: + base_url: Metabase 服务地址 + username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) + password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) + + Example: + >>> client = TimeBasedOperationsClient() + >>> data = client.get_operations_by_time("2026-02-01", "2026-03-01") + >>> print(f"20尺箱量: {data['cnt20']}") + """ + + # Metabase Card ID 映射表(基础指标-时间筛选) + _CARD_IDS = { + "overview": 50, # 总览 - 箱量统计 + "load": 51, # 装船 + "discharge": 52, # 卸船 + "yardmove": 53, # 转堆 + "efficiency": 65, # 圈效率-剔除异常 + "efficiency_filtered": 69, # 无人集卡效率指标-剔除异常(时间筛选) + } + + def __init__( + self, + base_url: str = "http://10.80.0.11:30001", + username: Optional[str] = None, + password: Optional[str] = None, + ): + self.base_url = base_url.rstrip("/") + self.session_token: Optional[str] = None + + # 优先使用传入的参数,否则从环境变量读取 + self.username = username or os.getenv("MATEBASE_USERNAME") + self.password = password or os.getenv("MATEBASE_PASSWORD") + + if not self.username or not self.password: + raise MetabaseAuthError( + "未提供 Metabase 用户名或密码。" + "请通过参数传入,或设置环境变量 " + "MATEBASE_USERNAME 和 MATEBASE_PASSWORD" + ) + + def _authenticate(self) -> None: + """认证并获取 session token""" + auth_url = f"{self.base_url}/api/session" + payload = {"username": self.username, "password": self.password} + + try: + response = requests.post(auth_url, json=payload, timeout=30) + response.raise_for_status() + data = response.json() + self.session_token = data.get("id") + except requests.exceptions.RequestException as e: + raise MetabaseAuthError(f"Metabase 认证失败: {e}") + + def _ensure_authenticated(self) -> None: + """确保已认证""" + if not self.session_token: + self._authenticate() + + def _query_card(self, card_id: int, parameters: List[Dict]) -> Dict: + """ + 查询指定 Card + + Args: + card_id: Metabase Card ID + parameters: 查询参数列表 + + Returns: + API 响应数据(字典) + + Raises: + MetabaseQueryError: 查询失败 + """ + self._ensure_authenticated() + + url = f"{self.base_url}/api/card/{card_id}/query" + headers = { + "X-Metabase-Session": self.session_token, + "Content-Type": "application/json", + } + payload = {"parameters": parameters} + + try: + response = requests.post(url, headers=headers, json=payload, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise MetabaseQueryError(f"查询 Metabase Card {card_id} 失败: {e}") + + def _extract_row_data(self, response: Dict) -> Optional[Dict]: + """ + 从 API 响应中提取第一行数据 + + Args: + response: API 响应字典 + + Returns: + 列名到值的映射字典,如果没有数据则返回 None + """ + rows = response.get("data", {}).get("rows", []) + cols = response.get("data", {}).get("cols", []) + + if not rows: + return None + + row_data = rows[0] + result = {} + for i, col in enumerate(cols): + col_name = col.get("name", f"col_{i}") + result[col_name] = row_data[i] if i < len(row_data) else None + + return result + + def _extract_summed_data( + self, response: Dict, sum_columns: List[str] + ) -> Optional[Dict]: + """ + 从 API 响应中提取多行数据并汇总指定列 + + Args: + response: API 响应字典 + sum_columns: 需要求和的列名列表 + + Returns: + 列名到汇总值的映射字典,如果没有数据则返回 None + """ + rows = response.get("data", {}).get("rows", []) + cols = response.get("data", {}).get("cols", []) + + if not rows: + return None + + # 构建列名到索引的映射 + col_to_idx = {} + for i, col in enumerate(cols): + col_name = col.get("name", f"col_{i}") + col_to_idx[col_name] = i + + # 汇总数据 + result = {} + for col_name in sum_columns: + if col_name in col_to_idx: + idx = col_to_idx[col_name] + total = 0 + for row in rows: + value = row[idx] if idx < len(row) else 0 + if value is not None: + try: + total += float(value) + except (ValueError, TypeError): + pass + result[col_name] = int(total) if total == int(total) else total + else: + result[col_name] = None + + return result + + def _build_time_parameters(self, start_time: str, end_time: str) -> List[Dict]: + """ + 构建时间范围查询参数 + + Args: + start_time: 开始时间,格式 "YYYY-MM-DD" 或 "YYYY-MM-DD HH:MM:SS" + end_time: 结束时间,格式 "YYYY-MM-DD" 或 "YYYY-MM-DD HH:MM:SS" + + Returns: + 参数列表 + """ + # 处理日期格式(如果不包含时间,则添加 00:00:00) + if len(start_time) == 10: # YYYY-MM-DD + start_value = f"{start_time} 00:00:00" + else: + start_value = start_time + + if len(end_time) == 10: # YYYY-MM-DD + end_value = f"{end_time} 23:59:59" + else: + end_value = end_time + + return [ + { + "type": "date/single", + "target": ["variable", ["template-tag", "time_start"]], + "value": start_value, + }, + { + "type": "date/single", + "target": ["variable", ["template-tag", "time_end"]], + "value": end_value, + }, + ] + + def get_operations_by_time(self, start_time: str, end_time: str) -> Dict[str, Any]: + """ + 获取指定时间段内的作业统计数据 + + 查询指定时间范围内的箱量统计数据。 + + Args: + start_time: 开始时间,格式: + - "YYYY-MM-DD" (自动补充为 00:00:00) + - "YYYY-MM-DD HH:MM:SS" (精确时间) + end_time: 结束时间,格式: + - "YYYY-MM-DD" (自动补充为 23:59:59) + - "YYYY-MM-DD HH:MM:SS" (精确时间) + + Returns: + 包含以下字段的字典: + - cnt20: 20尺箱量 + - cnt40: 40尺箱量 + - cntAll: 总箱数 + - teu: TEU数 + + Raises: + MetabaseAPIError: API 调用失败 + MetabaseAuthError: 认证失败 + MetabaseQueryError: 查询失败 + + Example: + >>> client = TimeOperationsClient() + >>> # 按日期查询 + >>> data = client.get_operations_by_time("2026-02-01", "2026-03-01") + >>> # 按精确时间查询 + >>> data = client.get_operations_by_time("2026-02-01 08:00:00", "2026-02-01 18:00:00") + >>> print(f"20尺箱量: {data['cnt20']}") # 2359 + """ + # 构建时间参数 + time_params = self._build_time_parameters(start_time, end_time) + + # 查询总览数据(箱量统计)- 需要汇总所有车辆 + overview_response = self._query_card(self._CARD_IDS["overview"], time_params) + overview = ( + self._extract_summed_data( + overview_response, ["cnt20", "cnt40", "cntAll", "teu"] + ) + or {} + ) + + # 查询装船数据 + load_response = self._query_card(self._CARD_IDS["load"], time_params) + load_data = self._extract_summed_data(load_response, ["cnt20", "cnt40"]) or {} + + # 查询卸船数据 + discharge_response = self._query_card(self._CARD_IDS["discharge"], time_params) + discharge_data = ( + self._extract_summed_data(discharge_response, ["cnt20", "cnt40"]) or {} + ) + + # 查询转堆数据 + yardmove_response = self._query_card(self._CARD_IDS["yardmove"], time_params) + yardmove_data = ( + self._extract_summed_data(yardmove_response, ["cnt20", "cnt40"]) or {} + ) + + # 查询效率指标数据 + efficiency_response = self._query_card( + self._CARD_IDS["efficiency"], time_params + ) + efficiency_data = self._extract_row_data(efficiency_response) or {} + cycle_h_normal = efficiency_data.get("cycle/h") + + # 查询剔除异常后的效率指标 + efficiency_filtered_response = self._query_card( + self._CARD_IDS["efficiency_filtered"], time_params + ) + efficiency_filtered_data = ( + self._extract_row_data(efficiency_filtered_response) or {} + ) + cycle_h_filtered = efficiency_filtered_data.get("cycle/h") + + # 四舍五入到2位小数 + cycle_h_normal_rounded = ( + round(cycle_h_normal, 2) if cycle_h_normal is not None else None + ) + cycle_h_filtered_rounded = ( + round(cycle_h_filtered, 2) if cycle_h_filtered is not None else None + ) + + return { + "cnt20": overview.get("cnt20"), + "cnt40": overview.get("cnt40"), + "cntAll": overview.get("cntAll"), + "teu": overview.get("teu"), + } + + +# 工厂函数(推荐用于简单场景) +def create_time_operations_client( + base_url: str = "http://10.80.0.11:30001", + username: Optional[str] = None, + password: Optional[str] = None, +) -> "TimeOperationsClient": + """ + 创建时间范围作业数据客户端 + + 这是创建 TimeOperationsClient 实例的便捷工厂函数。 + + Args: + base_url: Metabase 服务地址 + username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) + password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) + + Returns: + TimeOperationsClient 实例 + + Example: + >>> from time_operations import create_time_operations_client + >>> client = create_time_operations_client() + >>> data = client.get_operations_by_time("2026-02-01", "2026-03-01") + """ + return TimeOperationsClient( + base_url=base_url, + username=username, + password=password, + ) + + +# 便捷函数 +def get_operations_by_time(start_date: str, end_date: str) -> Optional[Dict[str, Any]]: + """ + 获取指定时间段内的作业统计数据(便捷函数) + + 注意:此函数使用默认配置(从环境变量读取账号密码), + 在新项目中建议使用 TimeBasedOperationsClient 类或 create_time_operations_client() 函数。 + + Args: + start_date: 开始日期,格式 "YYYY-MM-DD" + end_date: 结束日期,格式 "YYYY-MM-DD" + + Returns: + 作业统计数据字典,失败时返回 None + """ + client = create_time_operations_client() + try: + return client.get_operations_by_time(start_date, end_date) + except Exception as e: + print(f"查询失败: {e}") + return None + + +# 导出公共接口 +__all__ = [ + "TimeOperationsClient", + "create_time_operations_client", + "get_operations_by_time", + "MetabaseAPIError", + "MetabaseAuthError", + "MetabaseQueryError", +] + + +# 示例用法 +if __name__ == "__main__": + import argparse + import json + + # 命令行参数解析 + parser = argparse.ArgumentParser(description="获取指定时间段的作业数据") + parser.add_argument( + "--start", + "-s", + type=str, + default="2026-02-01", + help="开始日期,格式 YYYY-MM-DD (默认: 2026-02-01)", + ) + parser.add_argument( + "--end", + "-e", + type=str, + default="2026-03-01", + help="结束日期,格式 YYYY-MM-DD (默认: 2026-03-01)", + ) + args = parser.parse_args() + + # 获取数据 + result = get_operations_by_time(args.start, args.end) + + if result: + print(f"成功获取 {args.start} 至 {args.end} 的作业数据") + print(json.dumps(result, indent=2, ensure_ascii=False)) + else: + print(f"获取数据失败") diff --git a/metabase/vessel_operations.py b/metabase/vessel_operations.py new file mode 100644 index 0000000..501c08d --- /dev/null +++ b/metabase/vessel_operations.py @@ -0,0 +1,399 @@ +""" +船舶作业数据查询模块 + +用于从 Metabase 查询船舶作业相关的箱量统计和效率指标数据。 +以船舶为单位获取作业情况。 + +安装依赖: + pip install requests python-dotenv + +环境变量配置(推荐): + MATEBASE_USERNAME=your_username + MATEBASE_PASSWORD=your_password + +基本用法: + >>> from vessel_operations import VesselOperationsClient + >>> client = VesselOperationsClient() + >>> data = client.get_vessel_operations("260209-华晟67_X") + >>> print(data['cnt20']) # 44 +""" + +import requests +from typing import Optional, Dict, Any, List +import os +from dotenv import load_dotenv + +# 加载环境变量 +load_dotenv() + + +class MetabaseAPIError(Exception): + """Metabase API 调用异常""" + + pass + + +class MetabaseAuthError(MetabaseAPIError): + """Metabase 认证异常""" + + pass + + +class MetabaseQueryError(MetabaseAPIError): + """Metabase 查询异常""" + + pass + + +class VesselOperationsClient: + """ + 船舶数据查询客户端 + + 用于从 Metabase 查询船舶相关数据,包括: + - 箱量统计(20尺、40尺、总箱数、TEU) + - 无人集卡效率指标(cycle/h) + + Args: + base_url: Metabase 服务地址 + username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) + password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) + + Example: + >>> client = VesselOperationsClient() + >>> data = client.get_vessel_operations("260209-华晟67_X") + >>> print(f"20尺箱量: {data['cnt20']}") + """ + + # Metabase Card ID 映射表 + _CARD_IDS = { + "overview": 57, # 总览 - 箱量统计 + "efficiency_normal": 64, # 无人集卡效率指标 + "efficiency_filtered": 70, # 无人集卡效率指标-剔除异常 + "work_info": 74, # 船舶作业指令时间(AT_WorkInfo) + } + + def __init__( + self, + base_url: str = "http://10.80.0.11:30001", + username: Optional[str] = None, + password: Optional[str] = None, + ): + self.base_url = base_url.rstrip("/") + self.session_token: Optional[str] = None + + # 优先使用传入的参数,否则从环境变量读取 + self.username = username or os.getenv("MATEBASE_USERNAME") + self.password = password or os.getenv("MATEBASE_PASSWORD") + + if not self.username or not self.password: + raise MetabaseAuthError( + "未提供 Metabase 用户名或密码。" + "请通过参数传入,或设置环境变量 " + "MATEBASE_USERNAME 和 MATEBASE_PASSWORD" + ) + + def _authenticate(self) -> None: + """认证并获取 session token""" + auth_url = f"{self.base_url}/api/session" + payload = {"username": self.username, "password": self.password} + + try: + response = requests.post(auth_url, json=payload, timeout=30) + response.raise_for_status() + data = response.json() + self.session_token = data.get("id") + except requests.exceptions.RequestException as e: + raise MetabaseAuthError(f"Metabase 认证失败: {e}") + + def _ensure_authenticated(self) -> None: + """确保已认证""" + if not self.session_token: + self._authenticate() + + def _query_card(self, card_id: int, parameters: List[Dict]) -> Dict: + """ + 查询指定 Card + + Args: + card_id: Metabase Card ID + parameters: 查询参数列表 + + Returns: + API 响应数据(字典) + + Raises: + MetabaseQueryError: 查询失败 + """ + self._ensure_authenticated() + + url = f"{self.base_url}/api/card/{card_id}/query" + headers = { + "X-Metabase-Session": self.session_token, + "Content-Type": "application/json", + } + payload = {"parameters": parameters} + + try: + response = requests.post(url, headers=headers, json=payload, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise MetabaseQueryError(f"查询 Metabase Card {card_id} 失败: {e}") + + def _extract_row_data(self, response: Dict) -> Optional[Dict]: + """ + 从 API 响应中提取第一行数据 + + Args: + response: API 响应字典 + + Returns: + 列名到值的映射字典,如果没有数据则返回 None + """ + rows = response.get("data", {}).get("rows", []) + cols = response.get("data", {}).get("cols", []) + + if not rows: + return None + + row_data = rows[0] + result = {} + for i, col in enumerate(cols): + col_name = col.get("name", f"col_{i}") + result[col_name] = row_data[i] if i < len(row_data) else None + + return result + + def _extract_work_time_range(self, response: Dict) -> Dict[str, Optional[str]]: + """ + 从作业指令数据中提取第一条指令开始时间和最后一条指令结束时间 + + Args: + response: API 响应字典 + + Returns: + 包含 start_time 和 end_time 的字典,如果没有数据则返回 None + """ + rows = response.get("data", {}).get("rows", []) + cols = response.get("data", {}).get("cols", []) + + if not rows: + return {"start_time": None, "end_time": None} + + # 构建列名到索引的映射 + col_to_idx = {} + for i, col in enumerate(cols): + col_name = col.get("name", f"col_{i}") + col_to_idx[col_name] = i + + # 假设有 start_time 和 end_time 列,找到第一条开始时间和最后一条结束时间 + start_time_idx = col_to_idx.get("start_time") + end_time_idx = col_to_idx.get("end_time") + + if start_time_idx is None or end_time_idx is None: + return {"start_time": None, "end_time": None} + + # 获取第一条指令的开始时间 + first_start_time = ( + rows[0][start_time_idx] if len(rows[0]) > start_time_idx else None + ) + # 获取最后一条指令的结束时间 + last_end_time = rows[-1][end_time_idx] if len(rows[-1]) > end_time_idx else None + + # 处理时间格式,只保留到秒(去掉毫秒) + if first_start_time and isinstance(first_start_time, str): + # 去掉小数点及后面的部分 + first_start_time = first_start_time.split(".")[0] + if last_end_time and isinstance(last_end_time, str): + last_end_time = last_end_time.split(".")[0] + + return {"start_time": first_start_time, "end_time": last_end_time} + + def get_vessel_operations(self, vessel_visit_id: str) -> Dict[str, Any]: + """ + 获取船舶统计数据 + + 查询指定船舶的箱量统计和效率指标数据。 + + Args: + vessel_visit_id: 船舶访问ID,格式如 "260209-华晟67_X" + + Returns: + 包含以下字段的字典: + - vessel_visit_id: 船舶访问ID + - cnt20: 20尺箱量 + - cnt40: 40尺箱量 + - cntAll: 总箱数 + - teu: TEU数 + - cycle_h_normal: 无人集卡效率指标 (cycle/h) + - cycle_h_filtered: 无人集卡效率指标-剔除异常 (cycle/h) + - start_time: 第一条指令开始时间 + - end_time: 最后一条指令结束时间 + + Raises: + MetabaseAPIError: API 调用失败 + MetabaseAuthError: 认证失败 + MetabaseQueryError: 查询失败 + + Example: + >>> client = VesselOperationsClient() + >>> data = client.get_vessel_operations("260209-华晟67_X") + >>> print(f"20尺箱量: {data['cnt20']}") # 44 + >>> print(f"效率指标: {data['cycle_h_normal']}") # 2.35 + >>> print(f"作业开始: {data['start_time']}") # 2026-02-01 08:00:00 + >>> print(f"作业结束: {data['end_time']}") # 2026-02-01 18:00:00 + """ + # 查询总览数据(箱量统计) + vessel_param = { + "type": "string/=", + "target": ["variable", ["template-tag", "vesselVisitID"]], + "value": vessel_visit_id, + } + + overview_response = self._query_card(self._CARD_IDS["overview"], [vessel_param]) + overview = self._extract_row_data(overview_response) or {} + + # 查询效率指标数据 + efficiency_param = { + "type": "string/=", + "target": ["variable", ["template-tag", "vesselVisitId"]], + "value": vessel_visit_id, + } + + # Card 64: 无人集卡效率指标 + normal_response = self._query_card( + self._CARD_IDS["efficiency_normal"], [efficiency_param] + ) + normal_data = self._extract_row_data(normal_response) or {} + cycle_h_normal = normal_data.get("cycle/h") + + # Card 70: 无人集卡效率指标-剔除异常 + filtered_response = self._query_card( + self._CARD_IDS["efficiency_filtered"], [efficiency_param] + ) + filtered_data = self._extract_row_data(filtered_response) or {} + cycle_h_filtered = filtered_data.get("cycle/h") + + # Card 74: 船舶作业指令时间(AT_WorkInfo) + # 需要同时传递 vesselVisitID 和 vehicleId 两个参数 + vehicle_param_all = { + "type": "string/=", + "target": ["variable", ["template-tag", "vehicleId"]], + "value": "ALL", # 使用 ALL 获取所有车辆的作业指令 + } + work_info_response = self._query_card( + self._CARD_IDS["work_info"], [vessel_param, vehicle_param_all] + ) + work_time_range = self._extract_work_time_range(work_info_response) + + # 四舍五入到2位小数 + cycle_h_normal_rounded = ( + round(cycle_h_normal, 2) if cycle_h_normal is not None else None + ) + cycle_h_filtered_rounded = ( + round(cycle_h_filtered, 2) if cycle_h_filtered is not None else None + ) + + return { + "vessel_visit_id": vessel_visit_id, + "cnt20": overview.get("cnt20"), + "cnt40": overview.get("cnt40"), + "cntAll": overview.get("cntAll"), + "teu": overview.get("teu"), + "cycle_h_normal": cycle_h_normal_rounded, + "cycle_h_filtered": cycle_h_filtered_rounded, + "start_time": work_time_range.get("start_time"), + "end_time": work_time_range.get("end_time"), + } + + +# 工厂函数(推荐用于简单场景) +def create_vessel_operations_client( + base_url: str = "http://10.80.0.11:30001", + username: Optional[str] = None, + password: Optional[str] = None, +) -> VesselOperationsClient: + """ + 创建船舶数据客户端 + + 这是创建 VesselOperationsClient 实例的便捷工厂函数。 + + Args: + base_url: Metabase 服务地址 + username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) + password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) + + Returns: + VesselOperationsClient 实例 + + Example: + >>> from metabase_vessel_client import create_vessel_operations_client + >>> client = create_vessel_operations_client() + >>> data = client.get_vessel_operations("260209-华晟67_X") + """ + return VesselOperationsClient( + base_url=base_url, + username=username, + password=password, + ) + + +# 向后兼容:保留旧函数名 +def get_vessel_by_visit_id(vessel_visit_id: str) -> Dict[str, Any]: + """ + 获取船舶统计数据(兼容旧版本) + + 注意:此函数使用默认配置(从环境变量读取账号密码), + 在新项目中建议使用 VesselOperationsClient 类或 create_vessel_operations_client() 函数。 + + Args: + vessel_visit_id: 船舶访问ID + + Returns: + 船舶统计数据字典 + """ + client = create_vessel_operations_client() + return client.get_vessel_operations(vessel_visit_id) + + +# 导出公共接口 +__all__ = [ + "VesselOperationsClient", + "create_vessel_operations_client", + "get_vessel_by_visit_id", + "MetabaseAPIError", + "MetabaseAuthError", + "MetabaseQueryError", +] + + +# 命令行入口点 +if __name__ == "__main__": + import sys + import json + + # 解析命令行参数 + if len(sys.argv) < 2: + print("用法: python3 vessel_operations.py <船舶号>") + print("示例: python3 vessel_operations.py 260209-德盛6") + sys.exit(1) + + vessel_visit_id = sys.argv[1] + + try: + # 创建客户端并查询数据 + client = VesselOperationsClient() + data = client.get_vessel_operations(vessel_visit_id) + + # 输出 JSON 格式的结果 + print(json.dumps(data, indent=2, ensure_ascii=False)) + + except MetabaseAuthError as e: + print(f"认证失败: {e}", file=sys.stderr) + sys.exit(1) + except MetabaseQueryError as e: + print(f"查询失败: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"错误: {e}", file=sys.stderr) + sys.exit(1) diff --git a/report_generator.py b/report_generator.py new file mode 100644 index 0000000..6516933 --- /dev/null +++ b/report_generator.py @@ -0,0 +1,631 @@ +""" +日报生成模块 + +从飞书获取排班信息,从Metabase获取作业数据,生成日报。 +班次时间:08:30 ~ 次日08:30 +""" + +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional +import os +import sys + +# 加载环境变量 +from dotenv import load_dotenv + +load_dotenv() + +# 添加项目根目录到路径 +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +from feishu.manager import FeishuScheduleManager +from metabase.time_operations import TimeOperationsClient +from metabase.vessel_operations import VesselOperationsClient + + +class DailyReportGenerator: + """日报生成器""" + + def __init__(self): + self.feishu_manager = FeishuScheduleManager() + self.time_client = TimeOperationsClient() + self.vessel_client = VesselOperationsClient() + + def get_shift_time_range(self, report_date: datetime) -> tuple: + """ + 获取班次时间范围 + 特殊规则: + 1. 每月1号:从00:00到次日08:00(特殊规则) + 2. 每月月底最后一天:从当天08:00到当天23:59(特殊规则) + 3. 其他日期:08:00 ~ 次日08:00(默认规则) + + Args: + report_date: 报告日期 + + Returns: + (start_time, end_time) 元组 + """ + year = report_date.year + month = report_date.month + day = report_date.day + + # 获取当月最后一天 + if month == 12: + next_month = datetime(year + 1, 1, 1) + else: + next_month = datetime(year, month + 1, 1) + last_day_of_month = (next_month - timedelta(days=1)).day + + # 判断是否是每月1号 + if day == 1: + # 每月1号:从00:00到次日08:00 + start_time = report_date.replace(hour=0, minute=0, second=0, microsecond=0) + end_time = report_date.replace( + hour=8, minute=0, second=0, microsecond=0 + ) + timedelta(days=1) + # 判断是否是月底最后一天 + elif day == last_day_of_month: + # 每月月底最后一天:从当天08:00到当天23:59 + start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0) + end_time = report_date.replace(hour=23, minute=59, second=59, microsecond=0) + else: + # 其他日期:08:00 ~ 次日08:00 + start_time = report_date.replace(hour=8, minute=0, second=0, microsecond=0) + end_time = start_time + timedelta(days=1) + + return start_time, end_time + + def get_next_day_schedule(self, report_date: datetime) -> Dict[str, Any]: + """ + 获取次日排班信息(用于日报中的排班部分) + + Args: + report_date: 报告日期 + + Returns: + 排班信息字典 + """ + # 次日日期 + next_day = report_date + timedelta(days=1) + next_day_str = next_day.strftime("%Y-%m-%d") + + # 从飞书获取排班 + schedule = self.feishu_manager.get_schedule_for_date(next_day_str) + + return { + "date": next_day.strftime("%m/%d"), + "day_shift": schedule.get("day_shift", ""), + "night_shift": schedule.get("night_shift", ""), + "day_shift_list": schedule.get("day_shift_list", []), + "night_shift_list": schedule.get("night_shift_list", []), + } + + def get_vessels_in_time_range( + self, start_time: datetime, end_time: datetime + ) -> List[Dict[str, Any]]: + """ + 获取指定时间范围内作业的所有船舶列表 + 使用 Metabase 原生查询 API + + Args: + start_time: 开始时间 + end_time: 结束时间 + + Returns: + 船舶列表,每个船舶包含: + - vessel_visit_id: 船舶访问ID + - vessel_name: 船舶名称(从ID中提取) + - start_time: 船舶作业开始时间 + - end_time: 船舶作业结束时间 + - cnt20: 20尺箱量 + - cnt40: 40尺箱量 + - teu: TEU数 + """ + try: + # 使用 Metabase 原生查询获取船舶列表 + start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") + end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") + + query = f""" +SELECT + vesselVisitID, + MIN(_time_end) as start_time, + MAX(_time_end) as end_time, + SUM(num20) as cnt20, + SUM(num40) as cnt40, + SUM(num20) + SUM(num40)*2 as teu +FROM ( + SELECT + CASE + WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) + OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1)) + THEN cn.num20 ELSE cc.num20 END AS num20, + CASE + WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) + OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3))) + THEN cn.num40 ELSE cc.num40 END AS num40, + cc.vehicleId, + cc.movementType, + cc.vesselVisitID, + cc.batchName, + cc._time_end + FROM cnt_cycles cc + LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end` + WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR) + AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR) +) AS basedata +GROUP BY vesselVisitID +ORDER BY vesselVisitID + """ + + # 调用 Metabase API + result = self._query_metabase_native(query) + + if result and "data" in result: + rows = result["data"].get("rows", []) + cols = result["data"].get("cols", []) + + # 构建列名到索引的映射 + col_idx = {col["name"]: i for i, col in enumerate(cols)} + + vessels = [] + for row in rows: + vessel_id = row[col_idx.get("vesselVisitID", 0)] + start = row[col_idx.get("start_time", 1)] + end = row[col_idx.get("end_time", 2)] + cnt20 = row[col_idx.get("cnt20", 3)] + cnt40 = row[col_idx.get("cnt40", 4)] + teu = row[col_idx.get("teu", 5)] + + # 从 vessel_visit_id 提取船名 + # 格式通常是:日期-船名_其他信息 + vessel_name = self._extract_vessel_name(vessel_id) + + vessels.append( + { + "vessel_visit_id": vessel_id, + "vessel_name": vessel_name, + "start_time": start, + "end_time": end, + "cnt20": cnt20 or 0, + "cnt40": cnt40 or 0, + "teu": teu or 0, + } + ) + + return vessels + + except Exception as e: + print(f"获取船舶列表失败: {e}") + + return [] + + def _extract_vessel_name(self, vessel_visit_id: str) -> str: + """从 vessel_visit_id 中提取船名""" + if not vessel_visit_id: + return "" + + # 格式:260208-鑫源永顺 或 260209-华晟67_X + # 提取 "-" 后面的部分,并去掉 "_X" 等后缀 + parts = vessel_visit_id.split("-") + if len(parts) >= 2: + name = parts[1] + # 去掉 _X, _Y 等后缀 + if "_" in name: + name = name.split("_")[0] + return name + + return vessel_visit_id + + def _query_metabase_native( + self, query: str, database_id: int = 3 + ) -> Optional[Dict]: + """ + 执行 Metabase 原生查询 + + Args: + query: SQL 查询语句 + database_id: 数据库ID,默认为3 + + Returns: + 查询结果字典 + """ + import requests + + # 获取认证信息 + username = os.getenv("MATEBASE_USERNAME") + password = os.getenv("MATEBASE_PASSWORD") + base_url = "http://10.80.0.11:30001" + + if not username or not password: + raise Exception("缺少 Metabase 认证信息") + + # 1. 认证 + auth_url = f"{base_url}/api/session" + auth_response = requests.post( + auth_url, json={"username": username, "password": password}, timeout=30 + ) + auth_response.raise_for_status() + session_token = auth_response.json().get("id") + + # 2. 执行查询 + query_url = f"{base_url}/api/dataset" + headers = { + "X-Metabase-Session": session_token, + "Content-Type": "application/json", + } + + payload = { + "type": "native", + "native": {"query": query}, + "database": database_id, + } + + response = requests.post(query_url, headers=headers, json=payload, timeout=30) + response.raise_for_status() + + return response.json() + + def calculate_vessel_workload_in_shift( + self, + vessel_visit_id: str, + vessel_start: datetime, + vessel_end: datetime, + shift_start: datetime, + shift_end: datetime, + ) -> Optional[Dict[str, Any]]: + """ + 计算船舶在班次时间范围内的作业量 + 处理跨班次的情况 + + Args: + vessel_visit_id: 船舶访问ID + vessel_start: 船舶作业开始时间 + vessel_end: 船舶作业结束时间 + shift_start: 班次开始时间 + shift_end: 班次结束时间 + + Returns: + 作业量数据,如果船舶不在班次范围内则返回None + """ + # 计算时间重叠 + overlap_start = max(vessel_start, shift_start) + overlap_end = min(vessel_end, shift_end) + + # 如果没有重叠,返回None + if overlap_start >= overlap_end: + return None + + try: + # 查询重叠时间范围内的作业数据 + start_str = overlap_start.strftime("%Y-%m-%d %H:%M:%S") + end_str = overlap_end.strftime("%Y-%m-%d %H:%M:%S") + + # 使用 time_operations 查询该时间段的作业数据 + # TODO: 需要确认是否可以按船舶过滤 + operations_data = self.time_client.get_operations_by_time( + start_str, end_str + ) + + return { + "vessel_visit_id": vessel_visit_id, + "start_time": overlap_start, + "end_time": overlap_end, + "cnt20": operations_data.get("cnt20", 0), + "cnt40": operations_data.get("cnt40", 0), + "cntAll": operations_data.get("cntAll", 0), + "teu": operations_data.get("teu", 0), + } + except Exception as e: + print(f"查询船舶 {vessel_visit_id} 作业数据失败: {e}") + return None + + def get_daily_vessel_operations( + self, report_date: datetime + ) -> List[Dict[str, Any]]: + """ + 获取当天(班次时间范围内)的所有船舶作业数据 + 处理跨班次的情况 + + Args: + report_date: 报告日期 + + Returns: + 船舶作业列表 + """ + shift_start, shift_end = self.get_shift_time_range(report_date) + + # 获取在班次时间范围内作业的所有船舶及其作业量 + vessels = self.get_vessels_in_time_range(shift_start, shift_end) + + # 转换时间字符串为 datetime 对象,并格式化输出 + vessel_operations = [] + for vessel in vessels: + start_time_str = vessel.get("start_time") + end_time_str = vessel.get("end_time") + + # 解析时间字符串 + try: + if isinstance(start_time_str, str): + # 去掉毫秒部分 + start_time_str = start_time_str.split(".")[0] + vessel["start_time"] = datetime.strptime( + start_time_str, "%Y-%m-%d %H:%M:%S" + ) + if isinstance(end_time_str, str): + end_time_str = end_time_str.split(".")[0] + vessel["end_time"] = datetime.strptime( + end_time_str, "%Y-%m-%d %H:%M:%S" + ) + except Exception as e: + continue + + vessel_operations.append(vessel) + + return vessel_operations + + def get_daily_operations(self, report_date: datetime) -> Dict[str, Any]: + """ + 获取当天作业情况(从Metabase) + 时间范围:昨日08:30 ~ 今日08:30 + + Args: + report_date: 报告日期 + + Returns: + 作业数据字典 + """ + start_time, end_time = self.get_shift_time_range(report_date) + + # 格式化时间字符串,包含具体时间(08:30) + # Metabase查询需要时间格式:YYYY-MM-DD HH:MM:SS + start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") + end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") + + try: + # 获取时间段内的作业数据 + operations_data = self.time_client.get_operations_by_time( + start_str, end_str + ) + + return { + "start_time": start_time, + "end_time": end_time, + "cnt20": operations_data.get("cnt20", 0), + "cnt40": operations_data.get("cnt40", 0), + "cntAll": operations_data.get("cntAll", 0), + "teu": operations_data.get("teu", 0), + "cycle_h_normal": operations_data.get("cycle_h_normal", 0), + "cycle_h_filtered": operations_data.get("cycle_h_filtered", 0), + } + except Exception as e: + print(f"获取作业数据失败: {e}") + return { + "start_time": start_time, + "end_time": end_time, + "cnt20": 0, + "cnt40": 0, + "cntAll": 0, + "teu": 0, + "cycle_h_normal": 0, + "cycle_h_filtered": 0, + } + + def calculate_monthly_stats( + self, report_date: datetime, daily_teu: int + ) -> Dict[str, Any]: + """ + 计算月度统计数据 + + Args: + report_date: 报告日期 + daily_teu: 当日TEU数 + + Returns: + 月度统计数据 + """ + # 当月计划作业量(当前日期 * 300TEU) + day_of_month = report_date.day + month_plan_teu = day_of_month * 300 + + # 当月实际作业量:从Metabase查询当月累计数据(1号到当前日期) + try: + # 构建当月1号到当前日期的查询 + # 日报包含昨日08:30到今日08:30的数据 + # 所以累计应该到今日08:30为止 + month_start = report_date.replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) + # 获取班次结束时间(次日08:30) + _, shift_end = self.get_shift_time_range(report_date) + current_day_end = shift_end + + start_str = month_start.strftime("%Y-%m-%d %H:%M:%S") + end_str = current_day_end.strftime("%Y-%m-%d %H:%M:%S") + + # 查询当月累计数据 + monthly_data = self.time_client.get_operations_by_time(start_str, end_str) + month_actual_teu = monthly_data.get("teu", 0) + + except Exception as e: + print(f"获取当月累计数据失败: {e}") + # 如果查询失败,使用当日数据作为备用 + month_actual_teu = daily_teu + + # 完成比例 + completion_rate = ( + (month_actual_teu / month_plan_teu * 100) if month_plan_teu > 0 else 0 + ) + + return { + "month_plan_teu": month_plan_teu, + "month_actual_teu": month_actual_teu, + "completion_rate": round(completion_rate, 2), + } + + def get_days_in_month(self, year: int, month: int) -> int: + """获取某月的天数""" + if month == 12: + next_month = datetime(year + 1, 1, 1) + else: + next_month = datetime(year, month + 1, 1) + + this_month = datetime(year, month, 1) + return (next_month - this_month).days + + def read_template(self, template_path: str) -> str: + """读取模板文件""" + try: + with open(template_path, "r", encoding="utf-8") as f: + return f.read() + except Exception as e: + print(f"读取模板失败: {e}") + return "" + + def render_template(self, template: str, data: Dict[str, Any]) -> str: + """ + 简单的模板渲染 + 支持 {{variable}} 和 {{#list}}...{{/list}} + """ + import re + + result = template + + # 处理列表循环 {{#list}}...{{/list}} + def replace_list(match): + list_name = match.group(1) + list_template = match.group(2) + + if list_name in data and isinstance(data[list_name], list): + items = [] + for item in data[list_name]: + item_str = list_template + # 替换变量 + for key, value in item.items(): + placeholder = f"{{{{{key}}}}}" + item_str = item_str.replace(placeholder, str(value)) + items.append(item_str) + return "".join(items) + return "" + + # 匹配 {{#name}}...{{/name}} + list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}" + result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL) + + # 处理简单变量 {{variable}} + for key, value in data.items(): + if not isinstance(value, (list, dict)): + placeholder = f"{{{{{key}}}}}" + result = result.replace(placeholder, str(value)) + + return result + + def generate_daily_report(self, report_date: Optional[datetime] = None) -> str: + """ + 生成日报 + + Args: + report_date: 报告日期,默认为昨天 + + Returns: + 生成的日报内容 + """ + if report_date is None: + # 默认为昨天 + report_date = datetime.now() - timedelta(days=1) + + print(f"正在生成 {report_date.strftime('%Y-%m-%d')} 的日报...") + + # 1. 获取次日排班信息 + print("获取次日排班信息...") + next_day_schedule = self.get_next_day_schedule(report_date) + + # 2. 获取当天作业情况 + print("获取当天作业数据...") + operations_data = self.get_daily_operations(report_date) + + # 3. 获取当天船舶作业情况(处理跨班次) + print("获取船舶作业数据...") + vessel_operations = self.get_daily_vessel_operations(report_date) + + # 4. 计算月度统计 + print("计算月度统计...") + monthly_stats = self.calculate_monthly_stats( + report_date, operations_data.get("teu", 0) + ) + + # 5. 读取并渲染模板 + template_path = os.path.join( + project_root, "template", "daily_report_template.txt" + ) + template = self.read_template(template_path) + + if not template: + return "错误:无法读取模板文件" + + # 准备渲染数据 + # 注意:排班人员是次日的,所以日期也要用次日的 + next_day_date = report_date + timedelta(days=1) + + # 构建船舶列表数据 + vessels_data = [] + for vessel in vessel_operations: + vessels_data.append( + { + "name": vessel.get("vessel_name", ""), + "teu_20ft": vessel.get("cnt20", 0), + "teu_40ft": vessel.get("cnt40", 0), + "total_teu": vessel.get("teu", 0), + } + ) + + render_data = { + "date": report_date.strftime("%m/%d"), + "next_day_date": next_day_date.strftime("%m/%d"), + "vessels": vessels_data, + "daily_actual": operations_data.get("teu", 0), + "month_plan_teu": monthly_stats["month_plan_teu"], + "month_actual_teu": monthly_stats["month_actual_teu"], + "completion_rate": monthly_stats["completion_rate"], + "day_shift": next_day_schedule["day_shift"], + "night_shift": next_day_schedule["night_shift"], + "duty_phone": os.getenv("DUTY_PHONE", "13107662315"), + } + + # 渲染模板 + report = self.render_template(template, render_data) + + return report + + +def main(): + """主函数""" + import argparse + + parser = argparse.ArgumentParser(description="生成日报") + parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为昨天") + + args = parser.parse_args() + + # 解析日期 + if args.date: + report_date = datetime.strptime(args.date, "%Y-%m-%d") + else: + report_date = None + + # 生成日报 + generator = DailyReportGenerator() + report = generator.generate_daily_report(report_date) + + # 直接输出到终端 + print("\n" + "=" * 60) + print("日报内容") + print("=" * 60) + print(report) + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/shift_report.py b/shift_report.py new file mode 100644 index 0000000..f0afe65 --- /dev/null +++ b/shift_report.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 +""" +班次交接报告生成模块 + +分别统计白班(08:00-20:00)和夜班(20:00-次日08:00)的作业情况。 +生成格式化的交接班报告。 +""" + +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional +import os +import sys +import re +import argparse + +# 加载环境变量 +from dotenv import load_dotenv + +load_dotenv() + +# 添加项目根目录到路径 +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +from feishu.manager import FeishuScheduleManager + + +class ShiftReportGenerator: + """班次交接报告生成器""" + + # 班次定义 + SHIFTS = { + "day": { + "name": "白班", + "start_hour": 8, + "end_hour": 20, + }, + "night": { + "name": "夜班", + "start_hour": 20, + "end_hour": 8, # 次日 + }, + } + + def __init__(self): + self.feishu_manager = FeishuScheduleManager() + + def get_shift_time_range(self, report_date: datetime, shift_type: str) -> tuple: + """ + 获取班次时间范围 + + Args: + report_date: 报告日期 + shift_type: 班次类型 ("day" 或 "night") + + Returns: + (start_time, end_time) 元组 + """ + shift = self.SHIFTS.get(shift_type) + if not shift: + raise ValueError(f"未知的班次类型: {shift_type}") + + start_hour = shift["start_hour"] + end_hour = shift["end_hour"] + + if shift_type == "day": + # 白班:当天 08:00 - 当天 20:00 + start_time = report_date.replace( + hour=start_hour, minute=0, second=0, microsecond=0 + ) + end_time = report_date.replace( + hour=end_hour, minute=0, second=0, microsecond=0 + ) + else: + # 夜班:当天 20:00 - 次日 08:00 + start_time = report_date.replace( + hour=start_hour, minute=0, second=0, microsecond=0 + ) + end_time = report_date.replace( + hour=end_hour, minute=0, second=0, microsecond=0 + ) + timedelta(days=1) + + return start_time, end_time + + def get_shift_personnel(self, report_date: datetime, shift_type: str) -> str: + """ + 获取班次人员 + + Args: + report_date: 报告日期 + shift_type: 班次类型 + + Returns: + 人员姓名字符串 + """ + date_str = report_date.strftime("%Y-%m-%d") + schedule = self.feishu_manager.get_schedule_for_date(date_str) + + if shift_type == "day": + return schedule.get("day_shift", "") + else: + return schedule.get("night_shift", "") + + def get_vessels_in_time_range( + self, start_time: datetime, end_time: datetime + ) -> List[Dict[str, Any]]: + """ + 获取指定时间范围内作业的所有船舶列表 + + Args: + start_time: 开始时间 + end_time: 结束时间 + + Returns: + 船舶作业列表 + """ + try: + # 使用原生 SQL 查询获取船舶数据 + start_str = start_time.strftime("%Y-%m-%d %H:%M:%S") + end_str = end_time.strftime("%Y-%m-%d %H:%M:%S") + + query = f""" +SELECT + vesselVisitID, + MIN(_time_end) as start_time, + MAX(_time_end) as end_time, + SUM(num20) as cnt20, + SUM(num40) as cnt40, + SUM(num20) + SUM(num40)*2 as teu, + COUNT(DISTINCT vehicleId) as vehicles +FROM ( + SELECT + CASE + WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) + OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3) OR cc.num20 = 1)) + THEN cn.num20 ELSE cc.num20 END AS num20, + CASE + WHEN ((cc.num20 = 0 AND cc.num40 = 0 AND cc.num45 = 0) + OR (cc.num20 > 2 OR cc.num40 > 1 OR (cc.num20 + cc.num40 >= 3))) + THEN cn.num40 ELSE cc.num40 END AS num40, + cc.vehicleId, + cc.movementType, + cc.vesselVisitID, + cc.batchName, + cc._time_end + FROM cnt_cycles cc + LEFT JOIN cnt_newcycles cn ON cc.`_time` = cn.`_time_end` + WHERE cc._time_end >= DATE_SUB('{start_str}', INTERVAL 8 HOUR) + AND cc._time_end <= DATE_SUB('{end_str}', INTERVAL 8 HOUR) +) AS basedata +GROUP BY vesselVisitID +ORDER BY vesselVisitID + """ + + result = self._query_metabase_native(query) + + if result and "data" in result: + rows = result["data"].get("rows", []) + cols = result["data"].get("cols", []) + + col_idx = {col["name"]: i for i, col in enumerate(cols)} + + vessels = [] + for row in rows: + vessel_id = row[col_idx.get("vesselVisitID", 0)] + cnt20 = row[col_idx.get("cnt20", 3)] or 0 + cnt40 = row[col_idx.get("cnt40", 4)] or 0 + teu = row[col_idx.get("teu", 5)] or 0 + vehicles = row[col_idx.get("vehicles", 6)] or 0 + + vessel_name = self._extract_vessel_name(vessel_id) + vessel_code = self._extract_vessel_code(vessel_id) + + vessels.append( + { + "vessel_code": vessel_code, + "vessel_name": vessel_name, + "vehicles": vehicles, + "teu_20ft": cnt20, + "teu_40ft": cnt40, + "total_teu": teu, + "efficiency": self._calculate_efficiency(teu, vehicles), + # 故障和人工介入暂无数据接口,显示占位符 + "failures": "--", + "failure_rate": "--", + "interventions": "--", + "intervention_rate": "--", + } + ) + + return vessels + + except Exception as e: + print(f"获取船舶列表失败: {e}") + + return [] + + def _extract_vessel_name(self, vessel_visit_id: str) -> str: + """从 vessel_visit_id 中提取船名""" + if not vessel_visit_id: + return "" + + parts = vessel_visit_id.split("-") + if len(parts) >= 2: + name = parts[1] + if "_" in name: + name = name.split("_")[0] + return name + + return vessel_visit_id + + def _extract_vessel_code(self, vessel_visit_id: str) -> str: + """从 vessel_visit_id 中提取船舶代码(日期部分)""" + if not vessel_visit_id: + return "" + + parts = vessel_visit_id.split("-") + if len(parts) >= 1: + return parts[0] + + return vessel_visit_id + + def _calculate_efficiency(self, teu: int, vehicles: int) -> str: + """ + 计算效率(简化版本,后续可接入真实效率数据) + + 暂时返回 "--",等待后续接口 + """ + return "--" + + def _query_metabase_native( + self, query: str, database_id: int = 3 + ) -> Optional[Dict]: + """ + 执行 Metabase 原生查询 + + Args: + query: SQL 查询语句 + database_id: 数据库ID + + Returns: + 查询结果字典 + """ + import requests + + username = os.getenv("MATEBASE_USERNAME") + password = os.getenv("MATEBASE_PASSWORD") + base_url = "http://10.80.0.11:30001" + + if not username or not password: + raise Exception("缺少 Metabase 认证信息") + + # 认证 + auth_url = f"{base_url}/api/session" + auth_response = requests.post( + auth_url, json={"username": username, "password": password}, timeout=30 + ) + auth_response.raise_for_status() + session_token = auth_response.json().get("id") + + # 执行查询 + query_url = f"{base_url}/api/dataset" + headers = { + "X-Metabase-Session": session_token, + "Content-Type": "application/json", + } + + payload = { + "type": "native", + "native": {"query": query}, + "database": database_id, + } + + response = requests.post(query_url, headers=headers, json=payload, timeout=30) + response.raise_for_status() + + return response.json() + + def read_template(self, template_path: str) -> str: + """读取模板文件""" + try: + with open(template_path, "r", encoding="utf-8") as f: + return f.read() + except Exception as e: + print(f"读取模板失败: {e}") + return "" + + def render_template(self, template: str, data: Dict[str, Any]) -> str: + """ + 模板渲染 + 支持 {{variable}} 和 {{#list}}...{{/list}} + """ + result = template + + # 处理列表循环 + def replace_list(match): + list_name = match.group(1) + list_template = match.group(2) + + if list_name in data and isinstance(data[list_name], list): + items = [] + for item in data[list_name]: + item_str = list_template + for key, value in item.items(): + placeholder = f"{{{{{key}}}}}" + item_str = item_str.replace(placeholder, str(value)) + items.append(item_str) + return "".join(items) + return "" + + list_pattern = r"\{\{#(\w+)\}\}(.*?)\{\{/\1\}\}" + result = re.sub(list_pattern, replace_list, result, flags=re.DOTALL) + + # 处理简单变量 + for key, value in data.items(): + if not isinstance(value, (list, dict)): + placeholder = f"{{{{{key}}}}}" + result = result.replace(placeholder, str(value)) + + return result + + def generate_shift_report( + self, report_date: Optional[datetime] = None, shift_type: str = "day" + ) -> str: + """ + 生成班次交接报告 + + Args: + report_date: 报告日期,默认为当天 + shift_type: 班次类型 ("day" 或 "night") + + Returns: + 生成的报告内容 + """ + if report_date is None: + report_date = datetime.now() + + shift = self.SHIFTS.get(shift_type) + shift_name = shift["name"] if shift else shift_type + + print(f"正在生成 {report_date.strftime('%Y-%m-%d')} {shift_name} 交接报告...") + + # 1. 获取班次时间范围 + start_time, end_time = self.get_shift_time_range(report_date, shift_type) + print( + f"班次时间范围: {start_time.strftime('%H:%M')} - {end_time.strftime('%H:%M')}" + ) + + # 2. 获取班次人员 + print("获取班次人员...") + personnel = self.get_shift_personnel(report_date, shift_type) + + # 3. 获取船舶作业数据 + print("获取船舶作业数据...") + vessels = self.get_vessels_in_time_range(start_time, end_time) + + # 4. 读取并渲染模板 + template_path = os.path.join( + project_root, "template", "shift_handover_template.txt" + ) + template = self.read_template(template_path) + + if not template: + return "错误:无法读取模板文件" + + # 准备渲染数据 + render_data = { + "date": report_date.strftime("%m/%d"), + "shift_name": shift_name, + "personnel": personnel, + "records": vessels, + } + + # 渲染模板 + report = self.render_template(template, render_data) + + return report + + def generate_all_shifts_report( + self, report_date: Optional[datetime] = None + ) -> Dict[str, str]: + """ + 生成当天所有班次的交接报告 + + Args: + report_date: 报告日期 + + Returns: + {"day": 白班报告, "night": 夜班报告} + """ + reports = {} + for shift_type in self.SHIFTS.keys(): + reports[shift_type] = self.generate_shift_report(report_date, shift_type) + return reports + + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description="生成班次交接报告") + parser.add_argument("--date", type=str, help="报告日期 (YYYY-MM-DD),默认为当天") + parser.add_argument( + "--shift", + type=str, + choices=["day", "night", "all"], + default="all", + help="班次类型: day(白班), night(夜班), all(全部),默认为 all", + ) + + args = parser.parse_args() + + # 解析日期 + if args.date: + report_date = datetime.strptime(args.date, "%Y-%m-%d") + else: + report_date = datetime.now() + + # 生成报告 + generator = ShiftReportGenerator() + + if args.shift == "all": + reports = generator.generate_all_shifts_report(report_date) + for shift_type, report in reports.items(): + shift_name = generator.SHIFTS[shift_type]["name"] + print("\n" + "=" * 60) + print(f"{shift_name}交接报告") + print("=" * 60) + print(report) + else: + report = generator.generate_shift_report(report_date, args.shift) + print("\n" + "=" * 60) + print("班次交接报告") + print("=" * 60) + print(report) + + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/template/daily_report_template.txt b/template/daily_report_template.txt new file mode 100644 index 0000000..9e5ce30 --- /dev/null +++ b/template/daily_report_template.txt @@ -0,0 +1,15 @@ +日期:{{date}} + +{{#vessels}} +船名:{{name}} +作业量:{{total_teu}}TEU(20尺*{{teu_20ft}} 40尺*{{teu_40ft}}) +{{/vessels}} + +当日实际作业量:{{daily_actual}}TEU +当月计划作业量:{{month_plan_teu}}TEU +当月实际作业量:{{month_actual_teu}}TEU +当月完成比例:{{completion_rate}}% + +{{next_day_date}} 白班人员:{{day_shift}} +{{next_day_date}} 夜班人员:{{night_shift}} +24小时值班手机:{{duty_phone}} \ No newline at end of file diff --git a/template/shift_handover_template.txt b/template/shift_handover_template.txt new file mode 100644 index 0000000..1159f74 --- /dev/null +++ b/template/shift_handover_template.txt @@ -0,0 +1,11 @@ +日期:{{date}} +班次及人员: +{{shift_name}}: {{personnel}} + +{{#records}} +实船作业:{{vessel_code}}# {{vessel_name}} +上场车辆数:{{vehicles}} +作业量/效率:{{total_teu}}TEU(20尺*{{teu_20ft}} 40尺*{{teu_40ft}}),{{efficiency}}循环/车/小时 +故障次数/故障率:{{failures}}次,{{failure_rate}}% +人工介入次数/介入率:{{interventions}}次,{{intervention_rate}}% +{{/records}} \ No newline at end of file