refactor: 模块化重构项目结构

This commit is contained in:
2025-12-28 23:31:22 +08:00
commit 3b60ae9ecf
8 changed files with 755 additions and 0 deletions

216
src/extractor.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
HTML 文本提取模块
"""
import re
from bs4 import BeautifulSoup, Tag, NavigableString
from typing import List
class HTMLTextExtractor:
"""HTML 文本提取器 - 保留布局结构"""
# 块级元素列表
BLOCK_TAGS = {
'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'section',
'table', 'tr', 'td', 'th', 'li', 'ul', 'ol', 'blockquote',
'pre', 'hr', 'br', 'tbody', 'thead', 'tfoot'
}
def __init__(self):
"""初始化提取器"""
self.output_lines: List[str] = []
def extract(self, html: str) -> str:
"""
从HTML中提取保留布局的文本
参数:
html: HTML字符串
返回:
格式化的纯文本
"""
if not html:
return ''
soup = BeautifulSoup(html, 'html.parser')
# 移除不需要的元素
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
# 移除 Confluence 宏
for macro in soup.find_all(attrs={"ac:name": True}):
macro.decompose()
self.output_lines = []
# 处理 body 或整个文档
body = soup.body if soup.body else soup
for child in body.children:
self._process_node(child)
# 清理结果
result = ''.join(self.output_lines)
result = re.sub(r'\n\s*\n\s*\n', '\n\n', result)
result = '\n'.join(line.rstrip() for line in result.split('\n'))
return result.strip()
def _process_node(self, node, indent: int = 0, list_context=None):
"""递归处理节点"""
if isinstance(node, NavigableString):
text = str(node).strip()
if text:
text = re.sub(r'\s+', ' ', text)
if self.output_lines and not self.output_lines[-1].endswith('\n'):
self.output_lines[-1] += text
else:
self.output_lines.append(' ' * indent + text)
return
if not isinstance(node, Tag):
return
tag_name = node.name.lower()
is_block = tag_name in self.BLOCK_TAGS
# 块级元素前添加换行
if is_block and self.output_lines and not self.output_lines[-1].endswith('\n'):
self.output_lines.append('\n')
# 处理特定标签
if tag_name in ('h1', 'h2', 'h3', 'h4', 'h5', 'h6'):
level = int(tag_name[1])
prefix = '#' * level + ' '
text = node.get_text().strip()
if text:
self.output_lines.append(' ' * indent + prefix + text + '\n')
return
elif tag_name == 'p':
text = node.get_text().strip()
if text:
self.output_lines.append(' ' * indent + text + '\n')
return
elif tag_name == 'hr':
self.output_lines.append(' ' * indent + '' * 50 + '\n')
return
elif tag_name == 'br':
self.output_lines.append('\n')
return
elif tag_name == 'table':
self._process_table(node, indent)
return
elif tag_name in ('ul', 'ol'):
self._process_list(node, indent, tag_name)
return
elif tag_name == 'li':
self._process_list_item(node, indent, list_context)
return
elif tag_name == 'a':
href = node.get('href', '')
text = node.get_text().strip()
if href and text:
self.output_lines.append(f'{text} ({href})')
elif text:
self.output_lines.append(text)
return
elif tag_name in ('strong', 'b'):
text = node.get_text().strip()
if text:
self.output_lines.append(f'**{text}**')
return
elif tag_name in ('em', 'i'):
text = node.get_text().strip()
if text:
self.output_lines.append(f'*{text}*')
return
else:
# 默认递归处理子元素
for child in node.children:
self._process_node(child, indent, list_context)
if is_block and self.output_lines and not self.output_lines[-1].endswith('\n'):
self.output_lines.append('\n')
def _process_table(self, table: Tag, indent: int):
"""处理表格"""
rows = []
for tr in table.find_all('tr'):
row = []
for td in tr.find_all(['td', 'th']):
row.append(td.get_text().strip())
if row:
rows.append(row)
if rows:
# 计算列宽
col_widths = []
for i in range(max(len(r) for r in rows)):
col_width = max((len(r[i]) if i < len(r) else 0) for r in rows)
col_widths.append(col_width)
for row in rows:
line = ' ' * indent
for i, cell in enumerate(row):
width = col_widths[i] if i < len(col_widths) else 0
line += cell.ljust(width) + ' '
self.output_lines.append(line.rstrip() + '\n')
self.output_lines.append('\n')
def _process_list(self, ul: Tag, indent: int, list_type: str):
"""处理列表"""
counter = 1 if list_type == 'ol' else None
for child in ul.children:
if isinstance(child, Tag) and child.name == 'li':
ctx = (list_type, counter) if counter else (list_type, 1)
self._process_list_item(child, indent, ctx)
if counter:
counter += 1
else:
self._process_node(child, indent, (list_type, 1) if not counter else None)
def _process_list_item(self, li: Tag, indent: int, list_context):
"""处理列表项"""
prefix = ''
if list_context:
list_type, num = list_context
prefix = '' if list_type == 'ul' else f'{num}. '
# 收集直接文本
direct_parts = []
for child in li.children:
if isinstance(child, NavigableString):
text = str(child).strip()
if text:
direct_parts.append(text)
elif isinstance(child, Tag) and child.name == 'a':
href = child.get('href', '')
link_text = child.get_text().strip()
if href and link_text:
direct_parts.append(f'{link_text} ({href})')
if direct_parts:
self.output_lines.append(' ' * indent + prefix + ' '.join(direct_parts) + '\n')
# 处理子元素
for child in li.children:
if isinstance(child, Tag) and child.name != 'a':
self._process_node(child, indent + 2, None)
if __name__ == '__main__':
# 测试
html = "<h1>标题</h1><p>段落</p><ul><li>项目1</li><li>项目2</li></ul>"
extractor = HTMLTextExtractor()
print(extractor.extract(html))