#!/usr/bin/env python3 """ 排班数据解析器模块 支持月度表格和年度表格解析 """ import re from typing import Dict, List, Optional, Tuple import logging # 配置日志 logger = logging.getLogger(__name__) class ScheduleDataParser: """排班数据解析器(支持月度表格和年度表格)""" @staticmethod def _parse_chinese_date(date_str: str) -> Optional[str]: """ 解析中文日期格式 参数: date_str: 中文日期,如 "12月30日" 或 "12/30" 或 "12月1日" 或 "1月1日" 返回: 标准化日期字符串 "M月D日" (不补零) 异常: ValueError: 日期格式无效 """ if not date_str or not isinstance(date_str, str): return None date_str = date_str.strip() try: # 如果是 "12/30" 格式 if "/" in date_str: month, day = date_str.split("/") # 移除可能的空格和前导零 month = month.strip().lstrip("0") day = day.strip().lstrip("0") if not month.isdigit() or not day.isdigit(): raise ValueError(f"日期格式无效: {date_str}") return f"{int(month)}月{int(day)}日" # 如果是 "12月30日" 或 "1月1日" 格式 if "月" in date_str and "日" in date_str: # 移除前导零,如 "01月01日" -> "1月1日" parts = date_str.split("月") if len(parts) == 2: month_part = parts[0].lstrip("0") day_part = parts[1].rstrip("日").lstrip("0") if not month_part or not day_part: raise ValueError(f"日期格式无效: {date_str}") return f"{month_part}月{day_part}日" return date_str # 如果是 "12月1日" 格式(已经包含"日"字) if "月" in date_str: # 检查是否已经有"日"字 if "日" not in date_str: return f"{date_str}日" return date_str # 如果是纯数字,尝试解析 if date_str.isdigit() and len(date_str) == 4: # 假设是 "1230" 格式 month = date_str[:2].lstrip("0") day = date_str[2:].lstrip("0") return f"{month}月{day}日" return None except Exception as e: logger.warning(f"解析日期失败: {date_str}, 错误: {e}") return None @staticmethod def _find_date_column_index(headers: List[str], target_date: str) -> Optional[int]: """ 在表头中查找目标日期对应的列索引 参数: headers: 表头行 ["姓名", "12月1日", "12月2日", ...] target_date: 目标日期 "12月30日" 返回: 列索引(从0开始),未找到返回None """ if not headers or not target_date: return None # 标准化目标日期 target_std = ScheduleDataParser._parse_chinese_date(target_date) if not target_std: logger.warning(f"无法标准化目标日期: {target_date}") return None # 遍历表头查找匹配的日期 for i, header in enumerate(headers): if not header: continue header_std = ScheduleDataParser._parse_chinese_date(header) if header_std == target_std: logger.debug(f"找到日期列: {target_date} -> {header} (索引: {i})") return i logger.warning(f"未找到日期列: {target_date}, 表头: {headers}") return None def parse_monthly_sheet( self, values: List[List[str]], target_date: str ) -> Dict[str, any]: """ 解析月度表格数据(如12月表格) 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") 返回: 排班信息字典 """ if not values or len(values) < 2: logger.warning("表格数据为空或不足") return self._empty_result() # 第一行是表头 headers = values[0] date_column_index = self._find_date_column_index(headers, target_date) if date_column_index is None: logger.warning(f"未找到日期列: {target_date}") return self._empty_result() # 收集白班和夜班人员 day_shift_names = [] night_shift_names = [] # 从第二行开始是人员数据 for row_idx, row in enumerate(values[1:], start=2): if len(row) <= date_column_index: continue name = row[0] if row else "" shift = row[date_column_index] if date_column_index < len(row) else "" if not name or not shift: continue # 清理班次值 shift = shift.strip() if shift == "白": day_shift_names.append(name.strip()) elif shift == "夜": night_shift_names.append(name.strip()) elif shift: # 其他班次类型 logger.debug(f"忽略未知班次类型: {shift} (行: {row_idx})") return self._format_result(day_shift_names, night_shift_names) def parse_yearly_sheet( self, values: List[List[str]], target_date: str ) -> Dict[str, any]: """ 解析年度表格数据(如2026年排班表) 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") 返回: 排班信息字典 """ if not values: logger.warning("年度表格数据为空") return self._empty_result() # 查找目标月份的数据块 target_month = target_date.split("月")[0] if "月" in target_date else "" if not target_month: logger.warning(f"无法从 {target_date} 提取月份") return self._empty_result() # 在年度表格中查找对应的月份块 current_block_start = -1 current_month = "" for i, row in enumerate(values): if not row: continue first_cell = str(row[0]) if row else "" # 检查是否是月份标题行,如 "福州港1月排班表" if "排班表" in first_cell and "月" in first_cell: # 提取月份数字 month_match = re.search(r"(\d+)月", first_cell) if month_match: current_month = month_match.group(1).lstrip("0") current_block_start = i logger.debug(f"找到月份块: {current_month}月 (行: {i + 1})") # 如果找到目标月份,检查下一行是否是表头行 if current_month == target_month and i == current_block_start + 1: # 当前行是表头行 headers = row date_column_index = self._find_date_column_index(headers, target_date) if date_column_index is None: logger.warning(f"在年度表格中未找到日期列: {target_date}") return self._empty_result() # 收集人员数据(从表头行的下一行开始) day_shift_names = [] night_shift_names = [] for j in range(i + 1, len(values)): person_row = values[j] if not person_row: # 遇到空行,继续检查下一行 continue # 检查是否是下一个月份块的开始 if ( person_row[0] and isinstance(person_row[0], str) and "排班表" in person_row[0] and "月" in person_row[0] ): break # 跳过星期行(第一列为空的行) if not person_row[0]: continue if len(person_row) <= date_column_index: continue name = person_row[0] if person_row else "" shift = ( person_row[date_column_index] if date_column_index < len(person_row) else "" ) if not name or not shift: continue # 清理班次值 shift = shift.strip() if shift == "白": day_shift_names.append(name.strip()) elif shift == "夜": night_shift_names.append(name.strip()) return self._format_result(day_shift_names, night_shift_names) logger.warning(f"在年度表格中未找到 {target_month}月 的数据块") return self._empty_result() def parse( self, values: List[List[str]], target_date: str, sheet_title: str = "" ) -> Dict[str, any]: """ 解析排班数据,自动判断表格类型 参数: values: 飞书表格返回的二维数组 target_date: 目标日期(格式: "12月30日" 或 "12/30") sheet_title: 表格标题,用于判断表格类型 返回: 排班信息字典 """ # 根据表格标题判断表格类型 if "年" in sheet_title and "排班表" in sheet_title: # 年度表格 logger.info(f"使用年度表格解析器: {sheet_title}") return self.parse_yearly_sheet(values, target_date) else: # 月度表格 logger.info(f"使用月度表格解析器: {sheet_title}") return self.parse_monthly_sheet(values, target_date) def _empty_result(self) -> Dict[str, any]: """返回空结果""" return { "day_shift": "", "night_shift": "", "day_shift_list": [], "night_shift_list": [], } def _format_result( self, day_shift_names: List[str], night_shift_names: List[str] ) -> Dict[str, any]: """格式化结果""" # 去重并排序 day_shift_names = sorted(set(day_shift_names)) night_shift_names = sorted(set(night_shift_names)) # 格式化输出 day_shift_str = "、".join(day_shift_names) if day_shift_names else "" night_shift_str = "、".join(night_shift_names) if night_shift_names else "" return { "day_shift": day_shift_str, "night_shift": night_shift_str, "day_shift_list": day_shift_names, "night_shift_list": night_shift_names, } if __name__ == "__main__": # 测试代码 import sys # 设置日志 logging.basicConfig(level=logging.DEBUG) parser = ScheduleDataParser() # 测试日期解析 test_dates = ["12/30", "12月30日", "1月1日", "01/01", "1230", "无效日期"] for date in test_dates: parsed = parser._parse_chinese_date(date) print(f"解析 '{date}' -> '{parsed}'") # 测试月度表格解析 monthly_values = [ ["姓名", "12月1日", "12月2日", "12月3日"], ["张三", "白", "夜", ""], ["李四", "夜", "白", "白"], ["王五", "", "白", "夜"], ] result = parser.parse_monthly_sheet(monthly_values, "12月2日") print(f"\n月度表格解析结果: {result}") # 测试年度表格解析 yearly_values = [ ["福州港2026年排班表"], ["姓名", "1月1日", "1月2日", "1月3日"], ["张三", "白", "夜", ""], ["李四", "夜", "白", "白"], ["福州港2月排班表"], ["姓名", "2月1日", "2月2日"], ["王五", "白", "夜"], ] result = parser.parse_yearly_sheet(yearly_values, "1月2日") print(f"年度表格解析结果: {result}")