feat: 初始化福州港日报管理系统

- 添加日报生成功能 (report_generator.py)
- 添加 GUI 界面 (daily_report_gui.py)
- 添加班次交接报告功能 (shift_report.py)
- 集成飞书 API 获取排班信息
- 集成 Metabase 查询作业数据
- 生成 AGENTS.md 文档
This commit is contained in:
2026-03-03 02:07:34 +08:00
commit 00d2218c6d
16 changed files with 3713 additions and 0 deletions

58
feishu/AGENTS.md Normal file
View File

@@ -0,0 +1,58 @@
# FEISHU 飞书模块
## 概述
飞书表格 API 集成,用于获取排班信息。支持自动 Token 刷新。
## 结构
```
feishu/
├── __init__.py # 统一导出
├── client.py # FeishuSheetsClient - HTTP客户端
├── manager.py # FeishuScheduleManager - 业务入口
└── parser.py # ScheduleDataParser - 表格解析
```
## 查找指南
| 任务 | 位置 |
|------|------|
| 修改API调用 | `client.py` |
| 获取排班信息 | `manager.py:get_schedule_for_date()` |
| 解析新表格格式 | `parser.py` |
## 使用方式
```python
from feishu import FeishuScheduleManager
manager = FeishuScheduleManager()
schedule = manager.get_schedule_for_date("2026-03-01")
# {'day_shift': '张三', 'night_shift': '李四', ...}
```
## 关键逻辑
### Token 管理 (client.py)
- 自动获取 `tenant_access_token`
- 提前30分钟刷新
- 备用: 环境变量 `FEISHU_TOKEN`
### 表格选择 (manager.py:93-148)
- 2026年优先使用年度表格
- 其他年份优先月度表格
- 降级使用第一个表格
### 表格解析 (parser.py)
- 月度表格: 第一行为表头,查找日期列
- 年度表格: 查找月份块,再查日期列
## 环境变量
```
FEISHU_BASE_URL=https://open.feishu.cn/open-apis/sheets/v3
FEISHU_APP_ID=cli_xxx
FEISHU_APP_SECRET=xxx
FEISHU_SPREADSHEET_TOKEN=xxx
```

16
feishu/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
"""
飞书模块包
提供统一的飞书API接口
"""
from .client import FeishuSheetsClient, FeishuClientError
from .parser import ScheduleDataParser
from .manager import FeishuScheduleManager
__all__ = [
"FeishuSheetsClient",
"FeishuClientError",
"ScheduleDataParser",
"FeishuScheduleManager",
]

394
feishu/client.py Normal file
View File

@@ -0,0 +1,394 @@
#!/usr/bin/env python3
"""
飞书表格 API 客户端模块
统一版本,支持月度表格和年度表格
支持自动获取和刷新 tenant_access_token
"""
import os
import requests
import time
from typing import Dict, List, Optional, Tuple
import logging
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
# 配置日志
logger = logging.getLogger(__name__)
class FeishuClientError(Exception):
"""飞书客户端异常基类"""
pass
class FeishuSheetsClient:
"""飞书表格 API 客户端"""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
spreadsheet_token: Optional[str] = None,
app_id: Optional[str] = None,
app_secret: Optional[str] = None,
):
"""
初始化客户端
参数:
base_url: 飞书 API 基础URL如果为None则使用配置
token: Bearer 认证令牌如果为None则使用配置或自动获取
spreadsheet_token: 表格 token如果为None则使用配置
app_id: 飞书应用ID用于获取tenant_access_token
app_secret: 飞书应用密钥用于获取tenant_access_token
"""
self.base_url = (base_url or os.getenv("FEISHU_BASE_URL")).rstrip("/")
self.spreadsheet_token = spreadsheet_token or os.getenv(
"FEISHU_SPREADSHEET_TOKEN"
)
self.app_id = app_id or os.getenv("FEISHU_APP_ID")
self.app_secret = app_secret or os.getenv("FEISHU_APP_SECRET")
# Token管理相关属性
self._token = token or os.getenv("FEISHU_TOKEN")
self._token_expire_time = 0 # token过期时间戳
self._token_obtained_time = 0 # token获取时间戳
# 使用 Session 重用连接
self.session = requests.Session()
self.session.timeout = int(os.getenv("REQUEST_TIMEOUT", 30))
# 初始化headers
self._update_session_headers()
logger.debug(f"飞书客户端初始化完成基础URL: {self.base_url}")
logger.debug(
f"使用应用ID: {self.app_id[:8]}... 如果配置"
if self.app_id
else "未配置应用ID"
)
def _update_session_headers(self):
"""更新session的headers"""
if self._token:
self.session.headers.update(
{
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
)
else:
# 如果没有token移除Authorization头
if "Authorization" in self.session.headers:
del self.session.headers["Authorization"]
def _get_tenant_access_token(self) -> Tuple[str, int]:
"""
获取tenant_access_token
返回:
(token, expire_time): token字符串和过期时间
异常:
requests.exceptions.RequestException: 网络请求失败
ValueError: API返回错误
"""
if not self.app_id or not self.app_secret:
raise ValueError("未配置飞书应用ID和密钥无法获取tenant_access_token")
token_url = (
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
)
payload = {"app_id": self.app_id, "app_secret": self.app_secret}
headers = {"Content-Type": "application/json; charset=utf-8"}
try:
logger.info(f"正在获取tenant_access_token应用ID: {self.app_id[:8]}...")
response = requests.post(
token_url, json=payload, headers=headers, timeout=int(os.getenv("REQUEST_TIMEOUT", 30))
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
error_msg = f"获取tenant_access_token失败: {data.get('msg')}"
logger.error(error_msg)
raise ValueError(error_msg)
token = data.get("tenant_access_token")
expire = data.get("expire", 7200) # 默认2小时
if not token:
raise ValueError("API返回的token为空")
logger.info(f"成功获取tenant_access_token有效期: {expire}")
return token, expire
except requests.exceptions.RequestException as e:
logger.error(f"获取tenant_access_token网络请求失败: {e}")
raise
except Exception as e:
logger.error(f"获取tenant_access_token失败: {e}")
raise
def _ensure_valid_token(self):
"""
确保当前token有效如果无效则重新获取
返回:
bool: token是否有效
"""
current_time = time.time()
# 如果有手动配置的token直接使用
if os.getenv("FEISHU_TOKEN") and self._token == os.getenv("FEISHU_TOKEN"):
logger.debug("使用手动配置的FEISHU_TOKEN")
return True
# 检查token是否过期提前30分钟刷新
if self._token and self._token_expire_time > 0:
time_remaining = self._token_expire_time - current_time
if time_remaining > 1800: # 剩余时间大于30分钟
logger.debug(f"token仍然有效剩余时间: {int(time_remaining)}")
return True
elif time_remaining > 0: # 剩余时间小于30分钟但大于0
logger.info(
f"token即将过期剩余时间: {int(time_remaining)}秒,重新获取"
)
else: # 已过期
logger.info("token已过期重新获取")
# 需要获取新token
try:
token, expire = self._get_tenant_access_token()
self._token = token
self._token_obtained_time = current_time
self._token_expire_time = current_time + expire
self._update_session_headers()
logger.info(f"token获取成功将在 {expire} 秒后过期")
return True
except Exception as e:
logger.error(f"获取token失败: {e}")
# 如果配置了备用token尝试使用
if os.getenv("FEISHU_TOKEN") and os.getenv("FEISHU_TOKEN") != self._token:
logger.warning("使用备用FEISHU_TOKEN")
self._token = os.getenv("FEISHU_TOKEN")
self._update_session_headers()
return True
return False
def get_sheets_info(self) -> List[Dict[str, str]]:
"""
获取所有表格信息sheet_id 和 title
返回:
表格信息列表 [{'sheet_id': 'xxx', 'title': 'xxx'}, ...]
异常:
requests.exceptions.RequestException: 网络请求失败
ValueError: API返回错误
"""
# 确保token有效
if not self._ensure_valid_token():
raise FeishuClientError("无法获取有效的飞书token")
url = f"{self.base_url}/spreadsheets/{self.spreadsheet_token}/sheets/query"
try:
response = self.session.get(url, timeout=int(os.getenv("REQUEST_TIMEOUT", 30)))
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
error_msg = f"飞书API错误: {data.get('msg')}"
logger.error(error_msg)
raise ValueError(error_msg)
sheets = data.get("data", {}).get("sheets", [])
result = []
for sheet in sheets:
result.append(
{"sheet_id": sheet.get("sheet_id"), "title": sheet.get("title")}
)
logger.info(f"获取到 {len(result)} 个表格")
return result
except requests.exceptions.RequestException as e:
logger.error(f"获取表格信息失败: {e}")
raise
except Exception as e:
logger.error(f"解析表格信息失败: {e}")
raise
def get_sheet_data(self, sheet_id: str, range_: Optional[str] = None) -> Dict:
"""
获取指定表格的数据
参数:
sheet_id: 表格ID
range_: 数据范围如果为None则使用配置
返回:
飞书API返回的原始数据包含revision版本号
异常:
requests.exceptions.RequestException: 网络请求失败
ValueError: API返回错误
"""
# 确保token有效
if not self._ensure_valid_token():
raise FeishuClientError("无法获取有效的飞书token")
if range_ is None:
range_ = os.getenv("SHEET_RANGE", "A1:Z100")
# 注意:获取表格数据使用 v2 API而不是 v3
url = f"{self.base_url.replace('/v3', '/v2')}/spreadsheets/{self.spreadsheet_token}/values/{sheet_id}!{range_}"
params = {
"valueRenderOption": "ToString",
"dateTimeRenderOption": "FormattedString",
}
try:
response = self.session.get(
url, params=params, timeout=int(os.getenv("REQUEST_TIMEOUT", 30))
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
error_msg = f"飞书API错误: {data.get('msg')}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.debug(f"获取表格数据成功: {sheet_id}, 范围: {range_}")
return data.get("data", {})
except requests.exceptions.RequestException as e:
logger.error(f"获取表格数据失败: {e}, sheet_id: {sheet_id}")
raise
except Exception as e:
logger.error(f"解析表格数据失败: {e}, sheet_id: {sheet_id}")
raise
def get_token_info(self) -> Dict[str, any]:
"""
获取当前token信息
返回:
token信息字典
"""
current_time = time.time()
time_remaining = (
self._token_expire_time - current_time if self._token_expire_time > 0 else 0
)
return {
"has_token": bool(self._token),
"token_preview": self._token[:20] + "..."
if self._token and len(self._token) > 20
else self._token,
"token_obtained_time": self._token_obtained_time,
"token_expire_time": self._token_expire_time,
"time_remaining": max(0, time_remaining),
"using_app_credentials": bool(self.app_id and self.app_secret),
"using_manual_token": bool(
os.getenv("FEISHU_TOKEN") and self._token == os.getenv("FEISHU_TOKEN")
),
}
def test_connection(self) -> bool:
"""
测试飞书连接是否正常
返回:
连接是否正常
"""
try:
# 首先测试token获取
if not self._ensure_valid_token():
logger.error("无法获取有效的飞书token")
return False
# 然后测试表格访问
sheets = self.get_sheets_info()
if sheets:
logger.info(f"飞书连接测试成功,找到 {len(sheets)} 个表格")
return True
else:
logger.warning("飞书连接测试成功,但未找到表格")
return False
except Exception as e:
logger.error(f"飞书连接测试失败: {e}")
return False
def refresh_token(self) -> bool:
"""
强制刷新token
返回:
刷新是否成功
"""
try:
logger.info("强制刷新token...")
current_time = time.time()
token, expire = self._get_tenant_access_token()
self._token = token
self._token_obtained_time = current_time
self._token_expire_time = current_time + expire
self._update_session_headers()
logger.info(f"token刷新成功将在 {expire} 秒后过期")
return True
except Exception as e:
logger.error(f"强制刷新token失败: {e}")
return False
if __name__ == "__main__":
# 测试代码
import sys
# 设置日志级别
logging.basicConfig(level=logging.INFO)
# 测试连接
client = FeishuSheetsClient()
# 显示token信息
token_info = client.get_token_info()
print("当前token信息:")
print(f" 是否有token: {token_info['has_token']}")
print(f" token预览: {token_info['token_preview']}")
print(f" 剩余时间: {int(token_info['time_remaining'])}")
print(f" 使用应用凭证: {token_info['using_app_credentials']}")
print(f" 使用手动token: {token_info['using_manual_token']}")
if client.test_connection():
print("\n飞书连接测试成功")
# 获取表格信息
sheets = client.get_sheets_info()
for sheet in sheets[:3]: # 只显示前3个
print(f"表格: {sheet['title']} (ID: {sheet['sheet_id']})")
if sheets:
# 获取第一个表格的数据
sheet_id = sheets[0]["sheet_id"]
data = client.get_sheet_data(sheet_id, "A1:C5")
print(f"获取到表格数据,版本: {data.get('revision', '未知')}")
# 再次显示token信息
token_info = client.get_token_info()
print(f"\n测试后token剩余时间: {int(token_info['time_remaining'])}")
else:
print("\n飞书连接测试失败")
sys.exit(1)

363
feishu/manager.py Normal file
View File

@@ -0,0 +1,363 @@
#!/usr/bin/env python3
"""
飞书排班管理器模块
统一入口,使用数据库存储和缓存
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
import os
import sys
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# 简单实现不依赖src模块
config = None
get_logger = logging.getLogger
class ScheduleDatabase:
"""简单的排班数据库占位实现"""
def __init__(self, db_path=None):
self.db_path = db_path
def save_schedule(self, date_str, result, sheet_id, sheet_title):
pass
def get_schedule_by_range(self, start_date, end_date):
return []
def get_stats(self):
return {}
from .client import FeishuSheetsClient
from .parser import ScheduleDataParser
logger = get_logger(__name__)
class FeishuScheduleManager:
"""飞书排班管理器(统一入口)"""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
spreadsheet_token: Optional[str] = None,
db_path: Optional[str] = None,
):
"""
初始化管理器
参数:
base_url: 飞书API基础URL如果为None则使用配置
token: 飞书API令牌如果为None则使用配置
spreadsheet_token: 表格token如果为None则使用配置
db_path: 数据库路径如果为None则使用配置
"""
# 检查配置是否完整
self._check_config(token, spreadsheet_token)
# 初始化组件
self.client = FeishuSheetsClient(base_url, token, spreadsheet_token)
self.parser = ScheduleDataParser()
self.db = ScheduleDatabase(db_path)
logger.info("飞书排班管理器初始化完成")
def _check_config(
self, token: Optional[str], spreadsheet_token: Optional[str]
) -> None:
"""检查必要配置"""
# 检查是否有任何可用的认证方式
has_token = bool(token or os.getenv("FEISHU_TOKEN"))
has_app_credentials = bool(os.getenv("FEISHU_APP_ID") and os.getenv("FEISHU_APP_SECRET"))
if not has_token and not has_app_credentials:
logger.warning("飞书认证未配置,排班功能将不可用")
logger.warning("请配置 FEISHU_TOKEN 或 FEISHU_APP_ID + FEISHU_APP_SECRET")
elif has_app_credentials:
logger.info("使用飞书应用凭证自动获取token")
elif has_token:
logger.info("使用手动配置的FEISHU_TOKEN")
if not spreadsheet_token and not os.getenv("FEISHU_SPREADSHEET_TOKEN"):
logger.warning("飞书表格令牌未配置,排班功能将不可用")
def _select_sheet_for_date(
self, sheets: List[Dict[str, str]], target_year_month: str
) -> Optional[Dict[str, str]]:
"""
为指定日期选择最合适的表格
参数:
sheets: 表格列表
target_year_month: 目标年月,格式 "2025-12"
返回:
选中的表格信息未找到返回None
"""
if not sheets:
logger.error("表格列表为空")
return None
# 提取年份和月份
try:
year = target_year_month[:4]
month = target_year_month[5:7].lstrip("0")
except (IndexError, ValueError) as e:
logger.error(f"解析年月失败: {target_year_month}, 错误: {e}")
return None
# 对于2026年优先使用年度表格
if year == "2026":
# 查找年度表格,如 "2026年排班表"
year_name = f"{year}"
for sheet in sheets:
title = sheet.get("title", "")
if year_name in title and "排班表" in title:
logger.info(f"找到2026年年度表格: {title}")
return sheet
# 优先查找月份表格,如 "12月"
month_name = f"{int(month)}"
for sheet in sheets:
title = sheet.get("title", "")
if month_name in title:
logger.info(f"找到月份表格: {title}")
return sheet
# 查找年度表格,如 "2026年排班表"
year_name = f"{year}"
for sheet in sheets:
title = sheet.get("title", "")
if year_name in title and "排班表" in title:
logger.info(f"找到年度表格: {title}")
return sheet
# 如果没有找到匹配的表格,使用第一个表格
logger.warning(
f"未找到 {target_year_month} 的匹配表格,使用第一个表格: {sheets[0]['title']}"
)
return sheets[0]
def get_schedule_for_date(self, date_str: str) -> Dict[str, any]:
"""
获取指定日期的排班信息
修复:每次都从飞书获取最新数据并覆盖数据库,确保日报中显示最新排班信息
参数:
date_str: 日期字符串,格式 "2025-12-30"
返回:
排班信息字典
异常:
ValueError: 日期格式无效
Exception: 其他错误
"""
try:
# 解析日期
dt = datetime.strptime(date_str, "%Y-%m-%d")
# 生成两种格式的日期字符串,用于匹配不同表格
target_date_mm_dd = dt.strftime("%m/%d") # "01/01" 用于月度表格
target_date_chinese = f"{dt.month}{dt.day}" # "1月1日" 用于年度表格
target_year_month = dt.strftime("%Y-%m") # "2025-12"
logger.info(
f"获取 {date_str} 的排班信息 (格式: {target_date_mm_dd}/{target_date_chinese})"
)
# 1. 获取表格信息
sheets = self.client.get_sheets_info()
if not sheets:
logger.error("未获取到表格信息")
return self._empty_result()
# 2. 选择最合适的表格
selected_sheet = self._select_sheet_for_date(sheets, target_year_month)
if not selected_sheet:
logger.error("未找到合适的表格")
return self._empty_result()
sheet_id = selected_sheet["sheet_id"]
sheet_title = selected_sheet["title"]
# 3. 获取表格数据
sheet_data = self.client.get_sheet_data(sheet_id)
if not sheet_data:
logger.error("未获取到表格数据")
return self._empty_result()
values = sheet_data.get("valueRange", {}).get("values", [])
if not values:
logger.error("表格数据为空")
return self._empty_result()
# 4. 解析数据 - 根据表格类型选择合适的日期格式
# 如果是年度表格使用中文日期格式否则使用mm/dd格式
if "" in sheet_title and "排班表" in sheet_title:
target_date = target_date_chinese # "1月1日"
else:
target_date = target_date_mm_dd # "01/01"
logger.info(f"使用日期格式: {target_date} 解析表格: {sheet_title}")
result = self.parser.parse(values, target_date, sheet_title)
# 5. 每次都保存到数据库,覆盖旧数据,确保人员变动能及时更新
if result["day_shift"] or result["night_shift"]:
self.db.save_schedule(date_str, result, sheet_id, sheet_title)
logger.info(
f"已更新 {date_str} 的排班信息到数据库: 白班={result['day_shift']}, 夜班={result['night_shift']}"
)
else:
logger.warning(f"解析结果为空,{date_str} 未保存到数据库")
return result
except ValueError as e:
logger.error(f"日期格式无效: {date_str}, 错误: {e}")
raise
except Exception as e:
logger.error(f"获取排班信息失败: {e}")
# 降级处理:返回空值
return self._empty_result()
def get_schedule_for_today(self) -> Dict[str, any]:
"""获取今天的排班信息"""
today = datetime.now().strftime("%Y-%m-%d")
return self.get_schedule_for_date(today)
def get_schedule_for_tomorrow(self) -> Dict[str, any]:
"""获取明天的排班信息"""
tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
return self.get_schedule_for_date(tomorrow)
def refresh_all_schedules(self, days: Optional[int] = None):
"""
刷新未来指定天数的排班信息
参数:
days: 刷新未来多少天的排班信息如果为None则使用配置
"""
if days is None:
days = int(os.getenv("SCHEDULE_REFRESH_DAYS", 7))
logger.info(f"开始刷新未来 {days} 天的排班信息")
today = datetime.now()
success_count = 0
error_count = 0
for i in range(days):
date = (today + timedelta(days=i)).strftime("%Y-%m-%d")
try:
logger.debug(f"刷新 {date} 的排班信息...")
self.get_schedule_for_date(date)
success_count += 1
except Exception as e:
logger.error(f"刷新 {date} 的排班信息失败: {e}")
error_count += 1
logger.info(f"排班信息刷新完成,成功: {success_count}, 失败: {error_count}")
def get_schedule_by_range(
self, start_date: str, end_date: str
) -> List[Dict[str, any]]:
"""
获取日期范围内的排班信息
参数:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
返回:
排班信息列表
"""
try:
# 验证日期格式
datetime.strptime(start_date, "%Y-%m-%d")
datetime.strptime(end_date, "%Y-%m-%d")
return self.db.get_schedule_by_range(start_date, end_date)
except ValueError as e:
logger.error(f"日期格式无效: {e}")
return []
except Exception as e:
logger.error(f"获取排班范围失败: {e}")
return []
def test_connection(self) -> bool:
"""测试飞书连接是否正常"""
return self.client.test_connection()
def get_stats(self) -> Dict[str, any]:
"""获取排班数据库统计信息"""
return self.db.get_stats()
def _empty_result(self) -> Dict[str, any]:
"""返回空结果"""
return {
"day_shift": "",
"night_shift": "",
"day_shift_list": [],
"night_shift_list": [],
}
def _format_db_result(self, db_result: Dict[str, any]) -> Dict[str, any]:
"""格式化数据库结果"""
return {
"day_shift": db_result["day_shift"],
"night_shift": db_result["night_shift"],
"day_shift_list": db_result["day_shift_list"],
"night_shift_list": db_result["night_shift_list"],
}
if __name__ == "__main__":
# 测试代码
import sys
# 设置日志
logging.basicConfig(level=logging.INFO)
# 初始化管理器
manager = FeishuScheduleManager()
# 测试连接
if not manager.test_connection():
print("飞书连接测试失败")
sys.exit(1)
print("飞书连接测试成功")
# 测试获取今天和明天的排班
today_schedule = manager.get_schedule_for_today()
print(
f"今天排班: 白班={today_schedule['day_shift']}, 夜班={today_schedule['night_shift']}"
)
tomorrow_schedule = manager.get_schedule_for_tomorrow()
print(
f"明天排班: 白班={tomorrow_schedule['day_shift']}, 夜班={tomorrow_schedule['night_shift']}"
)
# 测试统计
stats = manager.get_stats()
print(f"排班统计: {stats}")
# 测试范围查询最近7天
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
schedules = manager.get_schedule_by_range(start_date, end_date)
print(f"最近7天排班记录: {len(schedules)}")

356
feishu/parser.py Normal file
View File

@@ -0,0 +1,356 @@
#!/usr/bin/env python3
"""
排班数据解析器模块
支持月度表格和年度表格解析
"""
import re
from typing import Dict, List, Optional, Tuple
import logging
# 配置日志
logger = logging.getLogger(__name__)
class ScheduleDataParser:
"""排班数据解析器(支持月度表格和年度表格)"""
@staticmethod
def _parse_chinese_date(date_str: str) -> Optional[str]:
"""
解析中文日期格式
参数:
date_str: 中文日期,如 "12月30日""12/30""12月1日""1月1日"
返回:
标准化日期字符串 "M月D日" (不补零)
异常:
ValueError: 日期格式无效
"""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
try:
# 如果是 "12/30" 格式
if "/" in date_str:
month, day = date_str.split("/")
# 移除可能的空格和前导零
month = month.strip().lstrip("0")
day = day.strip().lstrip("0")
if not month.isdigit() or not day.isdigit():
raise ValueError(f"日期格式无效: {date_str}")
return f"{int(month)}{int(day)}"
# 如果是 "12月30日" 或 "1月1日" 格式
if "" in date_str and "" in date_str:
# 移除前导零,如 "01月01日" -> "1月1日"
parts = date_str.split("")
if len(parts) == 2:
month_part = parts[0].lstrip("0")
day_part = parts[1].rstrip("").lstrip("0")
if not month_part or not day_part:
raise ValueError(f"日期格式无效: {date_str}")
return f"{month_part}{day_part}"
return date_str
# 如果是 "12月1日" 格式(已经包含"日"字)
if "" in date_str:
# 检查是否已经有"日"字
if "" not in date_str:
return f"{date_str}"
return date_str
# 如果是纯数字,尝试解析
if date_str.isdigit() and len(date_str) == 4:
# 假设是 "1230" 格式
month = date_str[:2].lstrip("0")
day = date_str[2:].lstrip("0")
return f"{month}{day}"
return None
except Exception as e:
logger.warning(f"解析日期失败: {date_str}, 错误: {e}")
return None
@staticmethod
def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]:
"""
在表头中查找目标日期对应的列索引
参数:
headers: 表头行 ["姓名", "12月1日", "12月2日", ...]
target_date: 目标日期 "12月30日"
返回:
列索引从0开始未找到返回None
"""
if not headers or not target_date:
return None
# 标准化目标日期
target_std = ScheduleDataParser._parse_chinese_date(target_date)
if not target_std:
logger.warning(f"无法标准化目标日期: {target_date}")
return None
# 遍历表头查找匹配的日期
for i, header in enumerate(headers):
if not header:
continue
header_std = ScheduleDataParser._parse_chinese_date(header)
if header_std == target_std:
logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})")
return i
logger.warning(f"未找到日期列: {target_date}, 表头: {headers}")
return None
def parse_monthly_sheet(
self, values: List[List[str]], target_date: str
) -> Dict[str, any]:
"""
解析月度表格数据如12月表格
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期(格式: "12月30日""12/30"
返回:
排班信息字典
"""
if not values or len(values) < 2:
logger.warning("表格数据为空或不足")
return self._empty_result()
# 第一行是表头
headers = values[0]
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"未找到日期列: {target_date}")
return self._empty_result()
# 收集白班和夜班人员
day_shift_names = []
night_shift_names = []
# 从第二行开始是人员数据
for row_idx, row in enumerate(values[1:], start=2):
if len(row) <= date_column_index:
continue
name = row[0] if row else ""
shift = row[date_column_index] if date_column_index < len(row) else ""
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == "":
day_shift_names.append(name.strip())
elif shift == "":
night_shift_names.append(name.strip())
elif shift: # 其他班次类型
logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})")
return self._format_result(day_shift_names, night_shift_names)
def parse_yearly_sheet(
self, values: List[List[str]], target_date: str
) -> Dict[str, any]:
"""
解析年度表格数据如2026年排班表
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期(格式: "12月30日""12/30"
返回:
排班信息字典
"""
if not values:
logger.warning("年度表格数据为空")
return self._empty_result()
# 查找目标月份的数据块
target_month = target_date.split("")[0] if "" in target_date else ""
if not target_month:
logger.warning(f"无法从 {target_date} 提取月份")
return self._empty_result()
# 在年度表格中查找对应的月份块
current_block_start = -1
current_month = ""
for i, row in enumerate(values):
if not row:
continue
first_cell = str(row[0]) if row else ""
# 检查是否是月份标题行,如 "福州港1月排班表"
if "排班表" in first_cell and "" in first_cell:
# 提取月份数字
month_match = re.search(r"(\d+)月", first_cell)
if month_match:
current_month = month_match.group(1).lstrip("0")
current_block_start = i
logger.debug(f"找到月份块: {current_month}月 (行: {i + 1})")
# 如果找到目标月份,检查下一行是否是表头行
if current_month == target_month and i == current_block_start + 1:
# 当前行是表头行
headers = row
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"在年度表格中未找到日期列: {target_date}")
return self._empty_result()
# 收集人员数据(从表头行的下一行开始)
day_shift_names = []
night_shift_names = []
for j in range(i + 1, len(values)):
person_row = values[j]
if not person_row:
# 遇到空行,继续检查下一行
continue
# 检查是否是下一个月份块的开始
if (
person_row[0]
and isinstance(person_row[0], str)
and "排班表" in person_row[0]
and "" in person_row[0]
):
break
# 跳过星期行(第一列为空的行)
if not person_row[0]:
continue
if len(person_row) <= date_column_index:
continue
name = person_row[0] if person_row else ""
shift = (
person_row[date_column_index]
if date_column_index < len(person_row)
else ""
)
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == "":
day_shift_names.append(name.strip())
elif shift == "":
night_shift_names.append(name.strip())
return self._format_result(day_shift_names, night_shift_names)
logger.warning(f"在年度表格中未找到 {target_month}月 的数据块")
return self._empty_result()
def parse(
self, values: List[List[str]], target_date: str, sheet_title: str = ""
) -> Dict[str, any]:
"""
解析排班数据,自动判断表格类型
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期(格式: "12月30日""12/30"
sheet_title: 表格标题,用于判断表格类型
返回:
排班信息字典
"""
# 根据表格标题判断表格类型
if "" in sheet_title and "排班表" in sheet_title:
# 年度表格
logger.info(f"使用年度表格解析器: {sheet_title}")
return self.parse_yearly_sheet(values, target_date)
else:
# 月度表格
logger.info(f"使用月度表格解析器: {sheet_title}")
return self.parse_monthly_sheet(values, target_date)
def _empty_result(self) -> Dict[str, any]:
"""返回空结果"""
return {
"day_shift": "",
"night_shift": "",
"day_shift_list": [],
"night_shift_list": [],
}
def _format_result(
self, day_shift_names: List[str], night_shift_names: List[str]
) -> Dict[str, any]:
"""格式化结果"""
# 去重并排序
day_shift_names = sorted(set(day_shift_names))
night_shift_names = sorted(set(night_shift_names))
# 格式化输出
day_shift_str = "".join(day_shift_names) if day_shift_names else ""
night_shift_str = "".join(night_shift_names) if night_shift_names else ""
return {
"day_shift": day_shift_str,
"night_shift": night_shift_str,
"day_shift_list": day_shift_names,
"night_shift_list": night_shift_names,
}
if __name__ == "__main__":
# 测试代码
import sys
# 设置日志
logging.basicConfig(level=logging.DEBUG)
parser = ScheduleDataParser()
# 测试日期解析
test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"]
for date in test_dates:
parsed = parser._parse_chinese_date(date)
print(f"解析 '{date}' -> '{parsed}'")
# 测试月度表格解析
monthly_values = [
["姓名", "12月1日", "12月2日", "12月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["王五", "", "", ""],
]
result = parser.parse_monthly_sheet(monthly_values, "12月2日")
print(f"\n月度表格解析结果: {result}")
# 测试年度表格解析
yearly_values = [
["福州港2026年排班表"],
["姓名", "1月1日", "1月2日", "1月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["福州港2月排班表"],
["姓名", "2月1日", "2月2日"],
["王五", "", ""],
]
result = parser.parse_yearly_sheet(yearly_values, "1月2日")
print(f"年度表格解析结果: {result}")