mirror of
https://devops.liangqichi.top/qichi.liang/Orbitin.git
synced 2026-02-10 07:41:29 +08:00
217 lines
7.2 KiB
Python
217 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HTML 文本提取模块
|
|
"""
|
|
import re
|
|
from bs4 import BeautifulSoup, Tag, NavigableString
|
|
from typing import List
|
|
|
|
|
|
class HTMLTextExtractor:
|
|
"""HTML 文本提取器 - 保留布局结构"""
|
|
|
|
# 块级元素列表
|
|
BLOCK_TAGS = {
|
|
'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'section',
|
|
'table', 'tr', 'td', 'th', 'li', 'ul', 'ol', 'blockquote',
|
|
'pre', 'hr', 'br', 'tbody', 'thead', 'tfoot'
|
|
}
|
|
|
|
def __init__(self):
|
|
"""初始化提取器"""
|
|
self.output_lines: List[str] = []
|
|
|
|
def extract(self, html: str) -> str:
|
|
"""
|
|
从HTML中提取保留布局的文本
|
|
|
|
参数:
|
|
html: HTML字符串
|
|
|
|
返回:
|
|
格式化的纯文本
|
|
"""
|
|
if not html:
|
|
return ''
|
|
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# 移除不需要的元素
|
|
for tag in soup(["script", "style", "noscript"]):
|
|
tag.decompose()
|
|
|
|
# 移除 Confluence 宏
|
|
for macro in soup.find_all(attrs={"ac:name": True}):
|
|
macro.decompose()
|
|
|
|
self.output_lines = []
|
|
|
|
# 处理 body 或整个文档
|
|
body = soup.body if soup.body else soup
|
|
for child in body.children:
|
|
self._process_node(child)
|
|
|
|
# 清理结果
|
|
result = ''.join(self.output_lines)
|
|
result = re.sub(r'\n\s*\n\s*\n', '\n\n', result)
|
|
result = '\n'.join(line.rstrip() for line in result.split('\n'))
|
|
return result.strip()
|
|
|
|
def _process_node(self, node, indent: int = 0, list_context=None):
|
|
"""递归处理节点"""
|
|
if isinstance(node, NavigableString):
|
|
text = str(node).strip()
|
|
if text:
|
|
text = re.sub(r'\s+', ' ', text)
|
|
if self.output_lines and not self.output_lines[-1].endswith('\n'):
|
|
self.output_lines[-1] += text
|
|
else:
|
|
self.output_lines.append(' ' * indent + text)
|
|
return
|
|
|
|
if not isinstance(node, Tag):
|
|
return
|
|
|
|
tag_name = node.name.lower()
|
|
is_block = tag_name in self.BLOCK_TAGS
|
|
|
|
# 块级元素前添加换行
|
|
if is_block and self.output_lines and not self.output_lines[-1].endswith('\n'):
|
|
self.output_lines.append('\n')
|
|
|
|
# 处理特定标签
|
|
if tag_name in ('h1', 'h2', 'h3', 'h4', 'h5', 'h6'):
|
|
level = int(tag_name[1])
|
|
prefix = '#' * level + ' '
|
|
text = node.get_text().strip()
|
|
if text:
|
|
self.output_lines.append(' ' * indent + prefix + text + '\n')
|
|
return
|
|
|
|
elif tag_name == 'p':
|
|
text = node.get_text().strip()
|
|
if text:
|
|
self.output_lines.append(' ' * indent + text + '\n')
|
|
return
|
|
|
|
elif tag_name == 'hr':
|
|
self.output_lines.append(' ' * indent + '─' * 50 + '\n')
|
|
return
|
|
|
|
elif tag_name == 'br':
|
|
self.output_lines.append('\n')
|
|
return
|
|
|
|
elif tag_name == 'table':
|
|
self._process_table(node, indent)
|
|
return
|
|
|
|
elif tag_name in ('ul', 'ol'):
|
|
self._process_list(node, indent, tag_name)
|
|
return
|
|
|
|
elif tag_name == 'li':
|
|
self._process_list_item(node, indent, list_context)
|
|
return
|
|
|
|
elif tag_name == 'a':
|
|
href = node.get('href', '')
|
|
text = node.get_text().strip()
|
|
if href and text:
|
|
self.output_lines.append(f'{text} ({href})')
|
|
elif text:
|
|
self.output_lines.append(text)
|
|
return
|
|
|
|
elif tag_name in ('strong', 'b'):
|
|
text = node.get_text().strip()
|
|
if text:
|
|
self.output_lines.append(f'**{text}**')
|
|
return
|
|
|
|
elif tag_name in ('em', 'i'):
|
|
text = node.get_text().strip()
|
|
if text:
|
|
self.output_lines.append(f'*{text}*')
|
|
return
|
|
|
|
else:
|
|
# 默认递归处理子元素
|
|
for child in node.children:
|
|
self._process_node(child, indent, list_context)
|
|
|
|
if is_block and self.output_lines and not self.output_lines[-1].endswith('\n'):
|
|
self.output_lines.append('\n')
|
|
|
|
def _process_table(self, table: Tag, indent: int):
|
|
"""处理表格"""
|
|
rows = []
|
|
for tr in table.find_all('tr'):
|
|
row = []
|
|
for td in tr.find_all(['td', 'th']):
|
|
row.append(td.get_text().strip())
|
|
if row:
|
|
rows.append(row)
|
|
|
|
if rows:
|
|
# 计算列宽
|
|
col_widths = []
|
|
for i in range(max(len(r) for r in rows)):
|
|
col_width = max((len(r[i]) if i < len(r) else 0) for r in rows)
|
|
col_widths.append(col_width)
|
|
|
|
for row in rows:
|
|
line = ' ' * indent
|
|
for i, cell in enumerate(row):
|
|
width = col_widths[i] if i < len(col_widths) else 0
|
|
line += cell.ljust(width) + ' '
|
|
self.output_lines.append(line.rstrip() + '\n')
|
|
self.output_lines.append('\n')
|
|
|
|
def _process_list(self, ul: Tag, indent: int, list_type: str):
|
|
"""处理列表"""
|
|
counter = 1 if list_type == 'ol' else None
|
|
for child in ul.children:
|
|
if isinstance(child, Tag) and child.name == 'li':
|
|
ctx = (list_type, counter) if counter else (list_type, 1)
|
|
self._process_list_item(child, indent, ctx)
|
|
if counter:
|
|
counter += 1
|
|
else:
|
|
self._process_node(child, indent, (list_type, 1) if not counter else None)
|
|
|
|
def _process_list_item(self, li: Tag, indent: int, list_context):
|
|
"""处理列表项"""
|
|
prefix = ''
|
|
if list_context:
|
|
list_type, num = list_context
|
|
prefix = '• ' if list_type == 'ul' else f'{num}. '
|
|
|
|
# 收集直接文本
|
|
direct_parts = []
|
|
for child in li.children:
|
|
if isinstance(child, NavigableString):
|
|
text = str(child).strip()
|
|
if text:
|
|
direct_parts.append(text)
|
|
elif isinstance(child, Tag) and child.name == 'a':
|
|
href = child.get('href', '')
|
|
link_text = child.get_text().strip()
|
|
if href and link_text:
|
|
direct_parts.append(f'{link_text} ({href})')
|
|
|
|
if direct_parts:
|
|
self.output_lines.append(' ' * indent + prefix + ' '.join(direct_parts) + '\n')
|
|
|
|
# 处理子元素
|
|
for child in li.children:
|
|
if isinstance(child, Tag) and child.name != 'a':
|
|
self._process_node(child, indent + 2, None)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# 测试
|
|
html = "<h1>标题</h1><p>段落</p><ul><li>项目1</li><li>项目2</li></ul>"
|
|
extractor = HTMLTextExtractor()
|
|
print(extractor.extract(html))
|