Files
Orbitin/src/parser.py

143 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
日志解析模块
"""
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class ShipLog:
"""船次日志数据类"""
date: str
shift: str
ship_name: str
teu: Optional[int] = None
efficiency: Optional[float] = None
vehicles: Optional[int] = None
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'date': self.date,
'shift': self.shift,
'ship_name': self.ship_name,
'teu': self.teu,
'efficiency': self.efficiency,
'vehicles': self.vehicles
}
class HandoverLogParser:
"""交接班日志解析器"""
SEPARATOR = '———————————————————————————————————————————————'
def __init__(self):
"""初始化解析器"""
pass
@staticmethod
def parse_date(date_str: str) -> str:
"""解析日期字符串"""
try:
parts = date_str.split('.')
if len(parts) == 3:
return f"{parts[0]}-{parts[1]}-{parts[2]}"
return date_str
except Exception:
return date_str
def parse(self, text: str) -> List[ShipLog]:
"""
解析日志文本
参数:
text: 日志文本
返回:
船次日志列表
"""
logs = []
blocks = text.split(self.SEPARATOR)
for block in blocks:
if not block.strip() or '日期:' not in block:
continue
# 解析日期
date_match = re.search(r'日期:(\d{4}\.\d{2}\.\d{2})', block)
if not date_match:
continue
date = self.parse_date(date_match.group(1))
self._parse_block(block, date, logs)
return logs
def _parse_block(self, block: str, date: str, logs: List[ShipLog]):
"""解析日期块"""
for shift in ['白班', '夜班']:
shift_pattern = f'{shift}'
if shift_pattern not in block:
continue
shift_start = block.find(shift_pattern) + len(shift_pattern)
# 只找到下一个班次作为边界,不限制"注意事项:"
next_pos = len(block)
for next_shift in ['白班', '夜班']:
if next_shift != shift:
pos = block.find(f'{next_shift}', shift_start)
if pos != -1 and pos < next_pos:
next_pos = pos
shift_content = block[shift_start:next_pos]
self._parse_ships(shift_content, date, shift, logs)
def _parse_ships(self, content: str, date: str, shift: str, logs: List[ShipLog]):
"""解析船次"""
parts = content.split('实船作业:')
for part in parts:
if not part.strip():
continue
cleaned = part.replace('\xa0', ' ').strip()
# 匹配 "xxx# 船名" 格式(船号和船名分开)
ship_match = re.search(r'(\d+)#\s*(\S+)', cleaned)
if not ship_match:
continue
ship_name = f"{ship_match.group(1)}#{ship_match.group(2)}"
vehicles_match = re.search(r'上场车辆数:(\d+)', cleaned)
teu_eff_match = re.search(
r'作业量/效率:(\d+)TEU[,\s]*', cleaned
)
log = ShipLog(
date=date,
shift=shift,
ship_name=ship_name,
teu=int(teu_eff_match.group(1)) if teu_eff_match else None,
efficiency=None,
vehicles=int(vehicles_match.group(1)) if vehicles_match else None
)
logs.append(log)
if __name__ == '__main__':
# 测试
with open('layout_output.txt', 'r', encoding='utf-8') as f:
text = f.read()
parser = HandoverLogParser()
logs = parser.parse(text)
print(f'解析到 {len(logs)} 条记录')
for log in logs[:5]:
print(f'{log.date} {log.shift} {log.ship_name}: {log.teu}TEU')