Files
Orbitin/src/feishu/parser.py

339 lines
13 KiB
Python
Raw Normal View History

2026-02-01 20:56:37 +08:00
#!/usr/bin/env python3
"""
排班数据解析器模块
支持月度表格和年度表格解析
"""
import re
from typing import Dict, List, Optional, Tuple
import logging
from src.logging_config import get_logger
logger = get_logger(__name__)
class ScheduleDataParser:
"""排班数据解析器(支持月度表格和年度表格)"""
@staticmethod
def _parse_chinese_date(date_str: str) -> Optional[str]:
"""
解析中文日期格式
参数:
date_str: 中文日期 "12月30日" "12/30" "12月1日" "1月1日"
返回:
标准化日期字符串 "M月D日" (不补零)
异常:
ValueError: 日期格式无效
"""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
try:
# 如果是 "12/30" 格式
if '/' in date_str:
month, day = date_str.split('/')
# 移除可能的空格和前导零
month = month.strip().lstrip('0')
day = day.strip().lstrip('0')
if not month.isdigit() or not day.isdigit():
raise ValueError(f"日期格式无效: {date_str}")
return f"{int(month)}{int(day)}"
# 如果是 "12月30日" 或 "1月1日" 格式
if '' in date_str and '' in date_str:
# 移除前导零,如 "01月01日" -> "1月1日"
parts = date_str.split('')
if len(parts) == 2:
month_part = parts[0].lstrip('0')
day_part = parts[1].rstrip('').lstrip('0')
if not month_part or not day_part:
raise ValueError(f"日期格式无效: {date_str}")
return f"{month_part}{day_part}"
return date_str
# 如果是 "12月1日" 格式(已经包含"日"字)
if '' in date_str:
# 检查是否已经有"日"字
if '' not in date_str:
return f"{date_str}"
return date_str
# 如果是纯数字,尝试解析
if date_str.isdigit() and len(date_str) == 4:
# 假设是 "1230" 格式
month = date_str[:2].lstrip('0')
day = date_str[2:].lstrip('0')
return f"{month}{day}"
return None
except Exception as e:
logger.warning(f"解析日期失败: {date_str}, 错误: {e}")
return None
@staticmethod
def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]:
"""
在表头中查找目标日期对应的列索引
参数:
headers: 表头行 ["姓名", "12月1日", "12月2日", ...]
target_date: 目标日期 "12月30日"
返回:
列索引从0开始未找到返回None
"""
if not headers or not target_date:
return None
# 标准化目标日期
target_std = ScheduleDataParser._parse_chinese_date(target_date)
if not target_std:
logger.warning(f"无法标准化目标日期: {target_date}")
return None
# 遍历表头查找匹配的日期
for i, header in enumerate(headers):
if not header:
continue
header_std = ScheduleDataParser._parse_chinese_date(header)
if header_std == target_std:
logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})")
return i
logger.warning(f"未找到日期列: {target_date}, 表头: {headers}")
return None
def parse_monthly_sheet(self, values: List[List[str]], target_date: str) -> Dict[str, any]:
"""
解析月度表格数据如12月表格
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
返回:
排班信息字典
"""
if not values or len(values) < 2:
logger.warning("表格数据为空或不足")
return self._empty_result()
# 第一行是表头
headers = values[0]
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"未找到日期列: {target_date}")
return self._empty_result()
# 收集白班和夜班人员
day_shift_names = []
night_shift_names = []
# 从第二行开始是人员数据
for row_idx, row in enumerate(values[1:], start=2):
if len(row) <= date_column_index:
continue
name = row[0] if row else ''
shift = row[date_column_index] if date_column_index < len(row) else ''
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == '':
day_shift_names.append(name.strip())
elif shift == '':
night_shift_names.append(name.strip())
elif shift: # 其他班次类型
logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})")
return self._format_result(day_shift_names, night_shift_names)
def parse_yearly_sheet(self, values: List[List[str]], target_date: str) -> Dict[str, any]:
"""
解析年度表格数据如2026年排班表
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
返回:
排班信息字典
"""
if not values:
logger.warning("年度表格数据为空")
return self._empty_result()
# 查找目标月份的数据块
target_month = target_date.split('')[0] if '' in target_date else ''
if not target_month:
logger.warning(f"无法从 {target_date} 提取月份")
return self._empty_result()
# 在年度表格中查找对应的月份块
current_block_start = -1
current_month = ''
for i, row in enumerate(values):
if not row:
continue
first_cell = str(row[0]) if row else ''
# 检查是否是月份标题行,如 "福州港1月排班表"
if '排班表' in first_cell and '' in first_cell:
# 提取月份数字
month_match = re.search(r'(\d+)月', first_cell)
if month_match:
current_month = month_match.group(1).lstrip('0')
current_block_start = i
logger.debug(f"找到月份块: {current_month}月 (行: {i+1})")
# 如果找到目标月份,检查下一行是否是表头行
if current_month == target_month and i == current_block_start + 1:
# 当前行是表头行
headers = row
date_column_index = self._find_date_column_index(headers, target_date)
if date_column_index is None:
logger.warning(f"在年度表格中未找到日期列: {target_date}")
return self._empty_result()
# 收集人员数据(从表头行的下一行开始)
day_shift_names = []
night_shift_names = []
for j in range(i + 1, len(values)):
person_row = values[j]
if not person_row:
# 遇到空行,继续检查下一行
continue
# 检查是否是下一个月份块的开始
if person_row[0] and isinstance(person_row[0], str) and '排班表' in person_row[0] and '' in person_row[0]:
break
# 跳过星期行(第一列为空的行)
if not person_row[0]:
continue
if len(person_row) <= date_column_index:
continue
name = person_row[0] if person_row else ''
shift = person_row[date_column_index] if date_column_index < len(person_row) else ''
if not name or not shift:
continue
# 清理班次值
shift = shift.strip()
if shift == '':
day_shift_names.append(name.strip())
elif shift == '':
night_shift_names.append(name.strip())
return self._format_result(day_shift_names, night_shift_names)
logger.warning(f"在年度表格中未找到 {target_month}月 的数据块")
return self._empty_result()
def parse(self, values: List[List[str]], target_date: str, sheet_title: str = '') -> Dict[str, any]:
"""
解析排班数据自动判断表格类型
参数:
values: 飞书表格返回的二维数组
target_date: 目标日期格式: "12月30日" "12/30"
sheet_title: 表格标题用于判断表格类型
返回:
排班信息字典
"""
# 根据表格标题判断表格类型
if '' in sheet_title and '排班表' in sheet_title:
# 年度表格
logger.info(f"使用年度表格解析器: {sheet_title}")
return self.parse_yearly_sheet(values, target_date)
else:
# 月度表格
logger.info(f"使用月度表格解析器: {sheet_title}")
return self.parse_monthly_sheet(values, target_date)
def _empty_result(self) -> Dict[str, any]:
"""返回空结果"""
return {
'day_shift': '',
'night_shift': '',
'day_shift_list': [],
'night_shift_list': []
}
def _format_result(self, day_shift_names: List[str], night_shift_names: List[str]) -> Dict[str, any]:
"""格式化结果"""
# 去重并排序
day_shift_names = sorted(set(day_shift_names))
night_shift_names = sorted(set(night_shift_names))
# 格式化输出
day_shift_str = ''.join(day_shift_names) if day_shift_names else ''
night_shift_str = ''.join(night_shift_names) if night_shift_names else ''
return {
'day_shift': day_shift_str,
'night_shift': night_shift_str,
'day_shift_list': day_shift_names,
'night_shift_list': night_shift_names
}
if __name__ == '__main__':
# 测试代码
import sys
# 设置日志
logging.basicConfig(level=logging.DEBUG)
parser = ScheduleDataParser()
# 测试日期解析
test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"]
for date in test_dates:
parsed = parser._parse_chinese_date(date)
print(f"解析 '{date}' -> '{parsed}'")
# 测试月度表格解析
monthly_values = [
["姓名", "12月1日", "12月2日", "12月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["王五", "", "", ""]
]
result = parser.parse_monthly_sheet(monthly_values, "12月2日")
print(f"\n月度表格解析结果: {result}")
# 测试年度表格解析
yearly_values = [
["福州港2026年排班表"],
["姓名", "1月1日", "1月2日", "1月3日"],
["张三", "", "", ""],
["李四", "", "", ""],
["福州港2月排班表"],
["姓名", "2月1日", "2月2日"],
["王五", "", ""]
]
result = parser.parse_yearly_sheet(yearly_values, "1月2日")
print(f"年度表格解析结果: {result}")