Files
Orbitin/src/parser.py

173 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
日志解析模块
"""
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class ShipLog:
"""船次日志数据类"""
date: str
shift: str
ship_name: str
teu: Optional[int] = None
efficiency: Optional[float] = None
vehicles: Optional[int] = None
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'date': self.date,
'shift': self.shift,
'ship_name': self.ship_name,
'teu': self.teu,
'efficiency': self.efficiency,
'vehicles': self.vehicles
}
class HandoverLogParser:
"""交接班日志解析器"""
SEPARATOR = '———————————————————————————————————————————————'
def __init__(self):
"""初始化解析器"""
pass
@staticmethod
def parse_date(date_str: str) -> str:
"""解析日期字符串"""
try:
parts = date_str.split('.')
if len(parts) == 3:
return f"{parts[0]}-{parts[1]}-{parts[2]}"
return date_str
except Exception:
return date_str
def parse(self, text: str) -> List[ShipLog]:
"""
解析日志文本
参数:
text: 日志文本
返回:
船次日志列表(已合并同日期同班次同船名的记录)
"""
logs = []
blocks = text.split(self.SEPARATOR)
for block in blocks:
if not block.strip() or '日期:' not in block:
continue
# 解析日期
date_match = re.search(r'日期:(\d{4}\.\d{2}\.\d{2})', block)
if not date_match:
continue
date = self.parse_date(date_match.group(1))
self._parse_block(block, date, logs)
# 合并同日期同班次同船名的记录累加TEU
merged = {}
for log in logs:
key = (log.date, log.shift, log.ship_name)
if key not in merged:
merged[key] = ShipLog(
date=log.date,
shift=log.shift,
ship_name=log.ship_name,
teu=log.teu,
efficiency=log.efficiency,
vehicles=log.vehicles
)
else:
# 累加TEU
if log.teu:
if merged[key].teu is None:
merged[key].teu = log.teu
else:
merged[key].teu += log.teu
# 累加车辆数
if log.vehicles:
if merged[key].vehicles is None:
merged[key].vehicles = log.vehicles
else:
merged[key].vehicles += log.vehicles
return list(merged.values())
def _parse_block(self, block: str, date: str, logs: List[ShipLog]):
"""解析日期块"""
for shift in ['白班', '夜班']:
shift_pattern = f'{shift}'
if shift_pattern not in block:
continue
shift_start = block.find(shift_pattern) + len(shift_pattern)
# 只找到下一个班次作为边界,不限制"注意事项:"
next_pos = len(block)
for next_shift in ['白班', '夜班']:
if next_shift != shift:
pos = block.find(f'{next_shift}', shift_start)
if pos != -1 and pos < next_pos:
next_pos = pos
shift_content = block[shift_start:next_pos]
self._parse_ships(shift_content, date, shift, logs)
def _parse_ships(self, content: str, date: str, shift: str, logs: List[ShipLog]):
"""解析船次"""
parts = content.split('实船作业:')
for part in parts:
if not part.strip():
continue
cleaned = part.replace('\xa0', ' ').strip()
# 匹配 "xxx# 船名" 格式(船号和船名分开)
ship_match = re.search(r'(\d+)#\s*(\S+)', cleaned)
if not ship_match:
continue
# 船名只取纯船名去掉xx#前缀和二次靠泊等标注)
ship_name = ship_match.group(2)
# 移除二次靠泊等标注
ship_name = re.sub(r'(二次靠泊)|(再次靠泊)|\(二次靠泊\)|\(再次靠泊\)', '', ship_name).strip()
vehicles_match = re.search(r'上场车辆数:(\d+)', cleaned)
teu_eff_match = re.search(
r'作业量/效率:(\d+)TEU[,\s]*', cleaned
)
log = ShipLog(
date=date,
shift=shift,
ship_name=ship_name,
teu=int(teu_eff_match.group(1)) if teu_eff_match else None,
efficiency=None,
vehicles=int(vehicles_match.group(1)) if vehicles_match else None
)
logs.append(log)
if __name__ == '__main__':
# 测试
with open('layout_output.txt', 'r', encoding='utf-8') as f:
text = f.read()
parser = HandoverLogParser()
logs = parser.parse(text)
print(f'解析到 {len(logs)} 条记录')
for log in logs[:5]:
print(f'{log.date} {log.shift} {log.ship_name}: {log.teu}TEU')