#!/usr/bin/env python3 """ 排班数据解析器模块 支持月度表格和年度表格解析 """ import re from typing import Dict, List, Optional, Tuple import logging from src.logging_config import get_logger logger = get_logger(__name__) class ScheduleDataParser: """排班数据解析器(支持月度表格和年度表格)""" @staticmethod def _parse_chinese_date(date_str: str) -> Optional[str]: """ 解析中文日期格式 参数: date_str: 中文日期,如 "12月30日" 或 "12/30" 或 "12月1日" 或 "1月1日" 返回: 标准化日期字符串 "M月D日" (不补零) 异常: ValueError: 日期格式无效 """ if not date_str or not isinstance(date_str, str): return None date_str = date_str.strip() try: # 如果是 "12/30" 格式 if '/' in date_str: month, day = date_str.split('/') # 移除可能的空格和前导零 month = month.strip().lstrip('0') day = day.strip().lstrip('0') if not month.isdigit() or not day.isdigit(): raise ValueError(f"日期格式无效: {date_str}") return f"{int(month)}月{int(day)}日" # 如果是 "12月30日" 或 "1月1日" 格式 if '月' in date_str and '日' in date_str: # 移除前导零,如 "01月01日" -> "1月1日" parts = date_str.split('月') if len(parts) == 2: month_part = parts[0].lstrip('0') day_part = parts[1].rstrip('日').lstrip('0') if not month_part or not day_part: raise ValueError(f"日期格式无效: {date_str}") return f"{month_part}月{day_part}日" return date_str # 如果是 "12月1日" 格式(已经包含"日"字) if '月' in date_str: # 检查是否已经有"日"字 if '日' not in date_str: return f"{date_str}日" return date_str # 如果是纯数字,尝试解析 if date_str.isdigit() and len(date_str) == 4: # 假设是 "1230" 格式 month = date_str[:2].lstrip('0') day = date_str[2:].lstrip('0') return f"{month}月{day}日" return None except Exception as e: logger.warning(f"解析日期失败: {date_str}, 错误: {e}") return None @staticmethod def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]: """ 在表头中查找目标日期对应的列索引 参数: headers: 表头行 ["姓名", "12月1日", "12月2日", ...] target_date: 目标日期 "12月30日" 返回: 列索引(从0开始),未找到返回None """ if not headers or not target_date: return None # 标准化目标日期 target_std = ScheduleDataParser._parse_chinese_date(target_date) if not target_std: logger.warning(f"无法标准化目标日期: {target_date}") return None # 遍历表头查找匹配的日期 for i, header in enumerate(headers): if not header: continue header_std = ScheduleDataParser._parse_chinese_date(header) if header_std == target_std: logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})") return i logger.warning(f"未找到日期列: {target_date}, 表头: {headers}") return None def parse_monthly_sheet(self, values: List[List[str]], target_date: str) -> Dict[str, any]: """ 解析月度表格数据(如12月表格) 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") 返回: 排班信息字典 """ if not values or len(values) < 2: logger.warning("表格数据为空或不足") return self._empty_result() # 第一行是表头 headers = values[0] date_column_index = self._find_date_column_index(headers, target_date) if date_column_index is None: logger.warning(f"未找到日期列: {target_date}") return self._empty_result() # 收集白班和夜班人员 day_shift_names = [] night_shift_names = [] # 从第二行开始是人员数据 for row_idx, row in enumerate(values[1:], start=2): if len(row) <= date_column_index: continue name = row[0] if row else '' shift = row[date_column_index] if date_column_index < len(row) else '' if not name or not shift: continue # 清理班次值 shift = shift.strip() if shift == '白': day_shift_names.append(name.strip()) elif shift == '夜': night_shift_names.append(name.strip()) elif shift: # 其他班次类型 logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})") return self._format_result(day_shift_names, night_shift_names) def parse_yearly_sheet(self, values: List[List[str]], target_date: str) -> Dict[str, any]: """ 解析年度表格数据(如2026年排班表) 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") 返回: 排班信息字典 """ if not values: logger.warning("年度表格数据为空") return self._empty_result() # 查找目标月份的数据块 target_month = target_date.split('月')[0] if '月' in target_date else '' if not target_month: logger.warning(f"无法从 {target_date} 提取月份") return self._empty_result() # 在年度表格中查找对应的月份块 current_block_start = -1 current_month = '' for i, row in enumerate(values): if not row: continue first_cell = str(row[0]) if row else '' # 检查是否是月份标题行,如 "福州港1月排班表" if '排班表' in first_cell and '月' in first_cell: # 提取月份数字 month_match = re.search(r'(\d+)月', first_cell) if month_match: current_month = month_match.group(1).lstrip('0') current_block_start = i logger.debug(f"找到月份块: {current_month}月 (行: {i+1})") # 如果找到目标月份,检查下一行是否是表头行 if current_month == target_month and i == current_block_start + 1: # 当前行是表头行 headers = row date_column_index = self._find_date_column_index(headers, target_date) if date_column_index is None: logger.warning(f"在年度表格中未找到日期列: {target_date}") return self._empty_result() # 收集人员数据(从表头行的下一行开始) day_shift_names = [] night_shift_names = [] for j in range(i + 1, len(values)): person_row = values[j] if not person_row: # 遇到空行,继续检查下一行 continue # 检查是否是下一个月份块的开始 if person_row[0] and isinstance(person_row[0], str) and '排班表' in person_row[0] and '月' in person_row[0]: break # 跳过星期行(第一列为空的行) if not person_row[0]: continue if len(person_row) <= date_column_index: continue name = person_row[0] if person_row else '' shift = person_row[date_column_index] if date_column_index < len(person_row) else '' if not name or not shift: continue # 清理班次值 shift = shift.strip() if shift == '白': day_shift_names.append(name.strip()) elif shift == '夜': night_shift_names.append(name.strip()) return self._format_result(day_shift_names, night_shift_names) logger.warning(f"在年度表格中未找到 {target_month}月 的数据块") return self._empty_result() def parse(self, values: List[List[str]], target_date: str, sheet_title: str = '') -> Dict[str, any]: """ 解析排班数据,自动判断表格类型 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") sheet_title: 表格标题,用于判断表格类型 返回: 排班信息字典 """ # 根据表格标题判断表格类型 if '年' in sheet_title and '排班表' in sheet_title: # 年度表格 logger.info(f"使用年度表格解析器: {sheet_title}") return self.parse_yearly_sheet(values, target_date) else: # 月度表格 logger.info(f"使用月度表格解析器: {sheet_title}") return self.parse_monthly_sheet(values, target_date) def _empty_result(self) -> Dict[str, any]: """返回空结果""" return { 'day_shift': '', 'night_shift': '', 'day_shift_list': [], 'night_shift_list': [] } def _format_result(self, day_shift_names: List[str], night_shift_names: List[str]) -> Dict[str, any]: """格式化结果""" # 去重并排序 day_shift_names = sorted(set(day_shift_names)) night_shift_names = sorted(set(night_shift_names)) # 格式化输出 day_shift_str = '、'.join(day_shift_names) if day_shift_names else '' night_shift_str = '、'.join(night_shift_names) if night_shift_names else '' return { 'day_shift': day_shift_str, 'night_shift': night_shift_str, 'day_shift_list': day_shift_names, 'night_shift_list': night_shift_names } if __name__ == '__main__': # 测试代码 import sys # 设置日志 logging.basicConfig(level=logging.DEBUG) parser = ScheduleDataParser() # 测试日期解析 test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"] for date in test_dates: parsed = parser._parse_chinese_date(date) print(f"解析 '{date}' -> '{parsed}'") # 测试月度表格解析 monthly_values = [ ["姓名", "12月1日", "12月2日", "12月3日"], ["张三", "白", "夜", ""], ["李四", "夜", "白", "白"], ["王五", "", "白", "夜"] ] result = parser.parse_monthly_sheet(monthly_values, "12月2日") print(f"\n月度表格解析结果: {result}") # 测试年度表格解析 yearly_values = [ ["福州港2026年排班表"], ["姓名", "1月1日", "1月2日", "1月3日"], ["张三", "白", "夜", ""], ["李四", "夜", "白", "白"], ["福州港2月排班表"], ["姓名", "2月1日", "2月2日"], ["王五", "白", "夜"] ] result = parser.parse_yearly_sheet(yearly_values, "1月2日") print(f"年度表格解析结果: {result}")