Files
gloria/test_confluence.py
Developer 5d0cafac32 feat: 交接班报告支持 Confluence/Jira 集成,添加 N/A 记录时间归属功能
- 集成 Confluence API 获取船舶报告数据
- 集成 Jira API 查询故障数量
- 支持船号显示 (462#、463# 等)
- 支持故障次数/故障率、人工介入次数/介入率显示
- 跨班作业使用 Card 69 按时间查询效率
- 不跨班作业使用整船效率(剔除异常)
- N/A 记录根据作业时间归属到对应船舶
- 更新 AGENTS.md 和 README.md 文档
- 删除 daily_report_gui.py
2026-03-14 02:52:23 +08:00

124 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
测试 Confluence 数据提取功能
"""
import sys
import os
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from confluence import VesselReportManager
def test_confluence_connection():
"""测试 Confluence 连接"""
print("=" * 60)
print("测试 Confluence 连接")
print("=" * 60)
try:
# 设置环境变量(用于测试)
os.environ.setdefault("CONFLUENCE_URL", "https://confluence.westwell-lab.com")
# 注意:实际运行时需要设置 CONFLUENCE_TOKEN
manager = VesselReportManager()
print("✓ Confluence 管理器初始化成功")
return manager
except Exception as e:
print(f"✗ Confluence 连接失败: {e}")
return None
def test_get_monthly_reports(manager):
"""测试获取月度报告"""
print("\n" + "=" * 60)
print("测试获取月度报告 (2026.03)")
print("=" * 60)
try:
reports = manager.get_vessel_reports_by_month("2026.03")
print(f"✓ 成功获取 {len(reports)} 个船舶报告")
# 显示前5个报告
for i, report in enumerate(reports[:5]):
print(f"\n报告 {i+1}:")
print(f" 船次: {report.get('vessel_number')}#")
print(f" 船名: {report.get('vessel_name')}")
print(f" 日期: {report.get('operation_date')}")
print(f" TEU: {report.get('teu')}")
print(f" 故障次数: {report.get('failures')}")
print(f" 故障率: {report.get('failure_rate')}%")
print(f" 人工介入次数: {report.get('interventions')}")
print(f" 人工介入率: {report.get('intervention_rate')}%")
print(f" 效率: {report.get('efficiency')}")
return True
except Exception as e:
print(f"✗ 获取月度报告失败: {e}")
import traceback
traceback.print_exc()
return False
def test_get_daily_reports(manager):
"""测试获取指定日期报告"""
print("\n" + "=" * 60)
print("测试获取指定日期报告 (2026-03-05)")
print("=" * 60)
try:
reports = manager.get_vessel_reports_by_date("2026-03-05")
print(f"✓ 成功获取 {len(reports)} 个船舶报告")
for report in reports:
print(f"\n {report.get('vessel_number')}# {report.get('vessel_name')}")
print(f" TEU: {report.get('teu')}, "
f"故障: {report.get('failures')}次 ({report.get('failure_rate')}%), "
f"介入: {report.get('interventions')}次 ({report.get('intervention_rate')}%)")
return True
except Exception as e:
print(f"✗ 获取每日报告失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主函数"""
print("Confluence 数据提取测试")
print("=" * 60)
# 检查环境变量
if not os.getenv("CONFLUENCE_TOKEN"):
print("\n⚠ 警告: 未设置 CONFLUENCE_TOKEN 环境变量")
print("请设置环境变量后再运行测试:")
print(" export CONFLUENCE_TOKEN='your_token_here'")
print("\n使用内置 token 进行测试...")
# 使用用户提供的 token
os.environ["CONFLUENCE_TOKEN"] = "ODE4NjI1Nzk4NTIzOmqzD4f8ifNmo2PcaMluS23djMzu"
# 测试连接
manager = test_confluence_connection()
if not manager:
print("\n✗ 测试失败: 无法连接到 Confluence")
return 1
# 测试获取月度报告
if not test_get_monthly_reports(manager):
print("\n✗ 测试失败: 无法获取月度报告")
return 1
# 测试获取每日报告
if not test_get_daily_reports(manager):
print("\n✗ 测试失败: 无法获取每日报告")
return 1
print("\n" + "=" * 60)
print("✓ 所有测试通过!")
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())