Files
gloria/feishu/parser.py

357 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
排班数据解析器模块
支持月度表格和年度表格解析
"""
import re
from typing import Dict, List, Optional, Tuple
import logging
# 配置日志
logger = logging.getLogger(__name__)
class ScheduleDataParser:
"""排班数据解析器(支持月度表格和年度表格)"""
@staticmethod
def _parse_chinese_date(date_str: str) -> Optional[str]:
"""
解析中文日期格式
参数:
date_str: 中文日期 "12月30日" "12/30" "12月1日" "1月1日"
返回:
标准化日期字符串 "M月D日" (不补零)
异常:
ValueError: 日期格式无效
"""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
try:
# 如果是 "12/30" 格式
if "/" in date_str:
month, day = date_str.split("/")
# 移除可能的空格和前导零
month = month.strip().lstrip("0")
day = day.strip().lstrip("0")
if not month.isdigit() or not day.isdigit():
raise ValueError(f"日期格式无效: {date_str}")
return f"{int(month)}{int(day)}"
# 如果是 "12月30日" 或 "1月1日" 格式
if "" in date_str and "" in date_str:
# 移除前导零,如 "01月01日" -> "1月1日"
parts = date_str.split("")
if len(parts) == 2:
month_part = parts[0].lstrip("0")
day_part = parts[1].rstrip("").lstrip("0")
if not month_part or not day_part:
raise ValueError(f"日期格式无效: {date_str}")
return f"{month_part}{day_part}"
return date_str
# 如果是 "12月1日" 格式(已经包含"日"字)
if "" in date_str:
# 检查是否已经有"日"字
if "" not in date_str:
return f"{date_str}"
return date_str
# 如果是纯数字,尝试解析
if date_str.isdigit() and len(date_str) == 4:
# 假设是 "1230" 格式
month = date_str[:2].lstrip("0")
day = date_str[2:].lstrip("0")
return f"{month}{day}"
return None
except Exception as e:
logger.warning(f"解析日期失败: {date_str}, 错误: {e}")
return None
@staticmethod
def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]:
"""
在表头中查找目标日期对应的列索引
参数:
headers: 表头行 ["姓名", "12月1日", "12月2日", ...]
target_date: 目标日期 "12月30日"
返回:
列索引从0开始未找到返回None
"""
if not headers or not target_date:
return None
# 标准化目标日期
target_std = ScheduleDataParser._parse_chinese_date(target_date)
if not target_std:
logger.warning(f"无法标准化目标日期: {target_date}")
return None
# 遍历表头查找匹配的日期
for i, header in enumerate(headers):
if not header:
continue
header_std = ScheduleDataParser._parse_chinese_date(header)
if header_std == target_std:
logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})")
return i
logger.warning(f"未找到日期列: {target_date}, 表头: {headers}")
return None
def parse_monthly_sheet(
self, values: List[List[str]], target_date: str
) -> Dict[str, any]:
"""
解析月度表格数据如12月表格
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
返回:
排班信息字典
"""
if not values or len(values) < 2:
logger.warning("表格数据为空或不足")
return self._empty_result()
# 第一行是表头
headers = values[0]
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"未找到日期列: {target_date}")
return self._empty_result()
# 收集白班和夜班人员
day_shift_names = []
night_shift_names = []
# 从第二行开始是人员数据
for row_idx, row in enumerate(values[1:], start=2):
if len(row) <= date_column_index:
continue
name = row[0] if row else ""
shift = row[date_column_index] if date_column_index < len(row) else ""
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == "":
day_shift_names.append(name.strip())
elif shift == "":
night_shift_names.append(name.strip())
elif shift: # 其他班次类型
logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})")
return self._format_result(day_shift_names, night_shift_names)
def parse_yearly_sheet(
self, values: List[List[str]], target_date: str
) -> Dict[str, any]:
"""
解析年度表格数据如2026年排班表
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
返回:
排班信息字典
"""
if not values:
logger.warning("年度表格数据为空")
return self._empty_result()
# 查找目标月份的数据块
target_month = target_date.split("")[0] if "" in target_date else ""
if not target_month:
logger.warning(f"无法从 {target_date} 提取月份")
return self._empty_result()
# 在年度表格中查找对应的月份块
current_block_start = -1
current_month = ""
for i, row in enumerate(values):
if not row:
continue
first_cell = str(row[0]) if row else ""
# 检查是否是月份标题行,如 "福州港1月排班表"
if "排班表" in first_cell and "" in first_cell:
# 提取月份数字
month_match = re.search(r"(\d+)月", first_cell)
if month_match:
current_month = month_match.group(1).lstrip("0")
current_block_start = i
logger.debug(f"找到月份块: {current_month}月 (行: {i + 1})")
# 如果找到目标月份,检查下一行是否是表头行
if current_month == target_month and i == current_block_start + 1:
# 当前行是表头行
headers = row
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"在年度表格中未找到日期列: {target_date}")
return self._empty_result()
# 收集人员数据(从表头行的下一行开始)
day_shift_names = []
night_shift_names = []
for j in range(i + 1, len(values)):
person_row = values[j]
if not person_row:
# 遇到空行,继续检查下一行
continue
# 检查是否是下一个月份块的开始
if (
person_row[0]
and isinstance(person_row[0], str)
and "排班表" in person_row[0]
and "" in person_row[0]
):
break
# 跳过星期行(第一列为空的行)
if not person_row[0]:
continue
if len(person_row) <= date_column_index:
continue
name = person_row[0] if person_row else ""
shift = (
person_row[date_column_index]
if date_column_index < len(person_row)
else ""
)
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == "":
day_shift_names.append(name.strip())
elif shift == "":
night_shift_names.append(name.strip())
return self._format_result(day_shift_names, night_shift_names)
logger.warning(f"在年度表格中未找到 {target_month}月 的数据块")
return self._empty_result()
def parse(
self, values: List[List[str]], target_date: str, sheet_title: str = ""
) -> Dict[str, any]:
"""
解析排班数据自动判断表格类型
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
sheet_title: 表格标题用于判断表格类型
返回:
排班信息字典
"""
# 根据表格标题判断表格类型
if "" in sheet_title and "排班表" in sheet_title:
# 年度表格
logger.info(f"使用年度表格解析器: {sheet_title}")
return self.parse_yearly_sheet(values, target_date)
else:
# 月度表格
logger.info(f"使用月度表格解析器: {sheet_title}")
return self.parse_monthly_sheet(values, target_date)
def _empty_result(self) -> Dict[str, any]:
"""返回空结果"""
return {
"day_shift": "",
"night_shift": "",
"day_shift_list": [],
"night_shift_list": [],
}
def _format_result(
self, day_shift_names: List[str], night_shift_names: List[str]
) -> Dict[str, any]:
"""格式化结果"""
# 去重并排序
day_shift_names = sorted(set(day_shift_names))
night_shift_names = sorted(set(night_shift_names))
# 格式化输出
day_shift_str = "".join(day_shift_names) if day_shift_names else ""
night_shift_str = "".join(night_shift_names) if night_shift_names else ""
return {
"day_shift": day_shift_str,
"night_shift": night_shift_str,
"day_shift_list": day_shift_names,
"night_shift_list": night_shift_names,
}
if __name__ == "__main__":
# 测试代码
import sys
# 设置日志
logging.basicConfig(level=logging.DEBUG)
parser = ScheduleDataParser()
# 测试日期解析
test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"]
for date in test_dates:
parsed = parser._parse_chinese_date(date)
print(f"解析 '{date}' -> '{parsed}'")
# 测试月度表格解析
monthly_values = [
["姓名", "12月1日", "12月2日", "12月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["王五", "", "", ""],
]
result = parser.parse_monthly_sheet(monthly_values, "12月2日")
print(f"\n月度表格解析结果: {result}")
# 测试年度表格解析
yearly_values = [
["福州港2026年排班表"],
["姓名", "1月1日", "1月2日", "1月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["福州港2月排班表"],
["姓名", "2月1日", "2月2日"],
["王五", "", ""],
]
result = parser.parse_yearly_sheet(yearly_values, "1月2日")
print(f"年度表格解析结果: {result}")