Files
Gloria/main.py

795 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Any, Optional
import os
from contextlib import asynccontextmanager
from confluence.client import ConfluenceClient, get_confluence_client
from confluence.parser import DataParser
from models.schemas import (
PageNode, PageTreeResponse, MonthlyStats, SummaryStatistics, ShipData,
TableDataResponse, TableDataRow, DataAnalysisResponse,
AnalysisSummaryResponse, MonthlySummary,
DailyStats, DailyStatsResponse, DailyStatsShip
)
from services.cache import cache
from config import settings
# 应用生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用启动和关闭时的处理"""
# 启动时清理过期缓存
cleared = cache.clear_expired()
if cleared > 0:
print(f"已清理 {cleared} 个过期缓存文件")
yield
# 关闭时的清理(如果需要)
pass
# 创建 FastAPI 应用
app = FastAPI(
title="福州江阴实船作业统计可视化系统",
description="Confluence 数据采集与可视化报表",
version="1.0.0",
lifespan=lifespan
)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 静态文件服务
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def root():
"""首页 - 重定向到可视化页面"""
return """
<!DOCTYPE html>
<html>
<head>
<title>福州江阴实船作业统计</title>
<meta http-equiv="refresh" content="0;url=/static/index.html">
</head>
<body>
<p>正在跳转到可视化页面...</p>
<p>如果没有自动跳转,请<a href="/static/index.html">点击这里</a></p>
</body>
</html>
"""
@app.get("/api/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"version": "1.0.0",
"confluence_configured": bool(settings.CONFLUENCE_TOKEN)
}
@app.get("/api/pages/tree", response_model=PageTreeResponse)
async def get_page_tree():
"""
获取完整页面树
从根页面开始,递归获取所有月度统计页面和船舶报告页面
"""
# 检查缓存
cached = cache.get("page_tree")
if cached:
return PageTreeResponse(**cached)
async with get_confluence_client() as client:
# 获取页面树
tree_data = await client.get_page_tree(settings.ROOT_PAGE_ID, max_depth=4)
if not tree_data:
raise HTTPException(status_code=404, detail="无法获取页面树")
# 转换为 PageNode 并补充数据
root_node = await _build_page_node_recursive(client, tree_data)
# 统计信息
total_months = 0
total_reports = 0
def count_stats(node: PageNode):
nonlocal total_months, total_reports
if node.is_monthly:
total_months += 1
if node.data:
total_reports += 1
for child in node.children:
count_stats(child)
count_stats(root_node)
response = PageTreeResponse(
root_page=root_node,
total_months=total_months,
total_ship_reports=total_reports
)
# 缓存结果
cache.set("page_tree", response.model_dump())
return response
async def _build_page_node_recursive(
client: ConfluenceClient,
tree_data: Dict[str, Any]
) -> PageNode:
"""递归构建页面节点"""
page_id = tree_data.get('id', '')
title = tree_data.get('title', '')
url = tree_data.get('url', '')
# 获取页面内容(如果是船舶报告页面)
html_content = None
ship_info = DataParser.parse_ship_title(title)
if ship_info:
html_content = await client.get_page_content(page_id)
# 构建当前节点
node = DataParser.build_page_node(
page_id=page_id,
title=title,
url=url,
html_content=html_content
)
# 递归处理子页面
for child_data in tree_data.get('children', []):
child_node = await _build_page_node_recursive(client, child_data)
node.children.append(child_node)
return node
@app.get("/api/pages/{page_id}")
async def get_page_detail(page_id: str):
"""
获取指定页面的详细数据
包含解析后的船舶报告数据(如果是船舶报告页面)
"""
cache_key = f"page_{page_id}"
cached = cache.get(cache_key)
if cached:
return cached
async with get_confluence_client() as client:
# 获取页面信息
page_info = await client.get_page(page_id)
if not page_info:
raise HTTPException(status_code=404, detail=f"页面 {page_id} 不存在")
title = page_info.get('title', '')
url = f"{settings.CONFLUENCE_BASE_URL}{page_info.get('_links', {}).get('webui', '')}"
# 获取页面内容
html_content = await client.get_page_content(page_id)
# 构建节点
node = DataParser.build_page_node(
page_id=page_id,
title=title,
url=url,
html_content=html_content
)
result = {
"page": node.model_dump(),
"raw_html": html_content[:1000] if html_content else None # 限制返回大小
}
cache.set(cache_key, result)
return result
@app.get("/api/monthly/{month}")
async def get_monthly_stats(month: str):
"""
获取指定月份的统计数据
month 格式: YYYY.MM (如 2026.02)
"""
cache_key = f"monthly_{month}"
cached = cache.get(cache_key)
if cached:
return cached
# 先获取完整页面树
tree_response = await get_page_tree()
# 查找对应月份的节点
def find_monthly_node(node: PageNode) -> Optional[PageNode]:
if node.is_monthly and node.month == month:
return node
for child in node.children:
found = find_monthly_node(child)
if found:
return found
return None
monthly_node = find_monthly_node(tree_response.root_page)
if not monthly_node:
raise HTTPException(status_code=404, detail=f"未找到月份 {month} 的数据")
# 获取子页面详细信息
async with get_confluence_client() as client:
for i, child in enumerate(monthly_node.children):
if not child.data:
html_content = await client.get_page_content(child.id)
if html_content:
child_node = DataParser.build_page_node(
page_id=child.id,
title=child.title,
url=child.url,
html_content=html_content
)
monthly_node.children[i] = child_node
# 构建月度统计
stats = DataParser.build_monthly_stats(monthly_node)
if not stats:
raise HTTPException(status_code=500, detail="构建月度统计数据失败")
result = stats.model_dump()
cache.set(cache_key, result)
return result
@app.get("/api/statistics/summary")
async def get_summary_statistics():
"""
获取整体汇总统计
包含所有月份的趋势、船次频率等信息
"""
cache_key = "summary_stats"
cached = cache.get(cache_key)
if cached:
return SummaryStatistics(**cached)
# 获取页面树
tree_response = await get_page_tree()
# 收集所有月度数据
monthly_stats_list = []
ship_frequency = {}
all_ships = []
def collect_monthly_data(node: PageNode):
if node.is_monthly:
stats = DataParser.build_monthly_stats(node)
if stats:
monthly_stats_list.append(stats)
# 统计船次频率
for ship in stats.ships:
ship_code = ship.ship_code
ship_frequency[ship_code] = ship_frequency.get(ship_code, 0) + 1
all_ships.append(ship)
for child in node.children:
collect_monthly_data(child)
collect_monthly_data(tree_response.root_page)
# 按月份排序
monthly_stats_list.sort(key=lambda x: x.month)
# 构建月度趋势数据
monthly_trend = []
for stats in monthly_stats_list:
monthly_trend.append({
"month": stats.month,
"total_ships": stats.total_ships,
"summary": stats.summary
})
# 计算日期范围
report_dates = [s.report_date for s in all_ships if s.report_date]
date_range = {
"start": min(report_dates).isoformat() if report_dates else None,
"end": max(report_dates).isoformat() if report_dates else None
}
summary = SummaryStatistics(
total_months=len(monthly_stats_list),
total_ship_reports=len(all_ships),
date_range=date_range,
monthly_trend=monthly_trend,
ship_frequency=ship_frequency,
monthly_data=monthly_stats_list
)
cache.set(cache_key, summary.model_dump())
return summary
@app.post("/api/refresh")
async def refresh_cache():
"""
强制刷新所有缓存
重新从 Confluence 获取最新数据
"""
cache.clear()
return {"message": "缓存已清除", "timestamp": datetime.now().isoformat()}
@app.get("/api/analysis/table-data", response_model=TableDataResponse)
async def get_table_data():
"""
获取所有船舶的表格数据
返回包含关键业务字段的完整表格数据
"""
cache_key = "table_data"
cached = cache.get(cache_key)
if cached:
return TableDataResponse(**cached)
# 获取页面树
tree_response = await get_page_tree()
# 收集所有船舶数据
all_ships = []
def collect_ships(node: PageNode, month: Optional[str] = None):
if node.is_monthly:
month = node.month
if node.data:
all_ships.append((node, month))
for child in node.children:
collect_ships(child, month)
collect_ships(tree_response.root_page)
# 构建表格数据行
table_rows = []
for node, month in all_ships:
ship = node.data
has_data = any([
ship.teu is not None,
ship.moves is not None,
ship.gross_efficiency is not None,
ship.net_efficiency is not None
])
row = TableDataRow(
ship_code=ship.ship_code,
ship_name=ship.ship_name,
report_date=ship.report_date.isoformat() if ship.report_date else None,
month=month,
operation_time=ship.operation_time,
operation_type=ship.operation_type,
teu=ship.teu,
moves=ship.moves,
gross_efficiency=ship.gross_efficiency,
net_efficiency=ship.net_efficiency,
fault_count=ship.fault_count,
fault_rate=ship.fault_rate,
manual_intervention_count=ship.manual_intervention_count,
manual_intervention_rate=ship.manual_intervention_rate,
page_id=node.id,
page_url=node.url,
has_complete_data=has_data
)
table_rows.append(row)
# 按报告日期排序
table_rows.sort(key=lambda x: x.report_date or "")
# 统计
records_with_data = sum(1 for r in table_rows if r.has_complete_data)
response = TableDataResponse(
total_records=len(table_rows),
fields=[
"ship_code", "ship_name", "report_date", "month",
"operation_time", "operation_type", "teu", "moves",
"gross_efficiency", "net_efficiency",
"fault_count", "fault_rate",
"manual_intervention_count", "manual_intervention_rate",
"page_id", "page_url", "has_complete_data"
],
records_with_data=records_with_data,
records_without_data=len(table_rows) - records_with_data,
data=table_rows
)
cache.set(cache_key, response.model_dump())
return response
@app.get("/api/analysis/table-data/analysis", response_model=DataAnalysisResponse)
async def get_data_analysis():
"""
获取表格数据的统计分析
展示哪些船次有作业箱量、效率等数据
"""
cache_key = "data_analysis"
cached = cache.get(cache_key)
if cached:
return DataAnalysisResponse(**cached)
# 获取表格数据
table_data = await get_table_data()
rows = table_data.data
# 统计有数据的船次
ships_with_teu = sum(1 for r in rows if r.teu is not None)
ships_with_moves = sum(1 for r in rows if r.moves is not None)
ships_with_efficiency = sum(1 for r in rows if r.gross_efficiency is not None or r.net_efficiency is not None)
ships_with_faults = sum(1 for r in rows if r.fault_count is not None)
ships_with_manual = sum(1 for r in rows if r.manual_intervention_count is not None)
# 月份分布
monthly_distribution = {}
for r in rows:
month = r.month or "未知"
monthly_distribution[month] = monthly_distribution.get(month, 0) + 1
# 作业类型分布
operation_type_distribution = {}
for r in rows:
op_type = r.operation_type or "未知"
operation_type_distribution[op_type] = operation_type_distribution.get(op_type, 0) + 1
# TEU统计
teu_values = [r.teu for r in rows if r.teu is not None]
teu_stats = {
"min": min(teu_values) if teu_values else None,
"max": max(teu_values) if teu_values else None,
"avg": sum(teu_values) / len(teu_values) if teu_values else None,
"total": sum(teu_values) if teu_values else None
}
# Moves统计
moves_values = [r.moves for r in rows if r.moves is not None]
moves_stats = {
"min": min(moves_values) if moves_values else None,
"max": max(moves_values) if moves_values else None,
"avg": sum(moves_values) / len(moves_values) if moves_values else None,
"total": sum(moves_values) if moves_values else None
}
# 效率统计
efficiency_values = [r.gross_efficiency for r in rows if r.gross_efficiency is not None]
efficiency_stats = {
"min": min(efficiency_values) if efficiency_values else None,
"max": max(efficiency_values) if efficiency_values else None,
"avg": sum(efficiency_values) / len(efficiency_values) if efficiency_values else None
}
response = DataAnalysisResponse(
total_ships=len(rows),
ships_with_teu=ships_with_teu,
ships_with_moves=ships_with_moves,
ships_with_efficiency=ships_with_efficiency,
ships_with_faults=ships_with_faults,
ships_with_manual=ships_with_manual,
monthly_distribution=monthly_distribution,
operation_type_distribution=operation_type_distribution,
teu_stats=teu_stats,
moves_stats=moves_stats,
efficiency_stats=efficiency_stats
)
cache.set(cache_key, response.model_dump())
return response
@app.get("/api/analysis/table-data/export")
async def export_table_data(format: str = "json"):
"""
导出表格数据
支持格式: json, csv
"""
from fastapi.responses import PlainTextResponse
import json
import csv
from io import StringIO
# 获取表格数据
table_data = await get_table_data()
if format.lower() == "csv":
# 生成CSV
output = StringIO()
writer = csv.writer(output)
# 写入表头
headers = [
"船舶代码", "船名", "报告日期", "月份", "作业时间", "作业类型",
"TEU", "Moves", "毛效率", "净效率",
"故障次数", "故障率", "人工介入次数", "人工介入率",
"页面ID", "页面URL"
]
writer.writerow(headers)
# 写入数据行
for row in table_data.data:
writer.writerow([
row.ship_code,
row.ship_name or "",
row.report_date or "",
row.month or "",
row.operation_time or "",
row.operation_type or "",
row.teu or "",
row.moves or "",
row.gross_efficiency or "",
row.net_efficiency or "",
row.fault_count or "",
row.fault_rate or "",
row.manual_intervention_count or "",
row.manual_intervention_rate or "",
row.page_id,
row.page_url
])
csv_content = output.getvalue()
output.close()
return PlainTextResponse(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=ship_data.csv"}
)
else: # JSON格式
# 转换为可序列化的字典
data_dict = {
"total_records": table_data.total_records,
"fields": table_data.fields,
"records_with_data": table_data.records_with_data,
"records_without_data": table_data.records_without_data,
"data": [row.model_dump() for row in table_data.data]
}
json_content = json.dumps(data_dict, ensure_ascii=False, indent=2)
return PlainTextResponse(
content=json_content,
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=ship_data.json"}
)
from datetime import datetime
@app.get("/api/analysis/summary", response_model=AnalysisSummaryResponse)
async def get_analysis_summary():
"""
获取数据分析摘要
统计各月份作业情况、作业类型分布、平均TEU和效率等
"""
cache_key = "analysis_summary"
cached = cache.get(cache_key)
if cached:
return AnalysisSummaryResponse(**cached)
# 获取表格数据
table_data = await get_table_data()
rows = table_data.data
if not rows:
return AnalysisSummaryResponse(
total_records=0,
monthly_summary=[],
operation_type_distribution={},
date_range={},
overall_avg_teu=None,
overall_avg_efficiency=None,
total_teu=None,
total_moves=None
)
# 按月份分组统计
monthly_stats: Dict[str, Dict] = {}
operation_type_count: Dict[str, int] = {}
all_teu_values = []
all_efficiency_values = []
all_moves_values = []
for row in rows:
month = row.month or "未知"
# 初始化月份统计
if month not in monthly_stats:
monthly_stats[month] = {
"total_ships": 0,
"loading_count": 0,
"unloading_count": 0,
"teu_values": [],
"moves_values": [],
"gross_efficiency_values": [],
"net_efficiency_values": []
}
stats = monthly_stats[month]
stats["total_ships"] += 1
# 作业类型统计
op_type = row.operation_type
if op_type:
operation_type_count[op_type] = operation_type_count.get(op_type, 0) + 1
if "装船" in op_type:
stats["loading_count"] += 1
elif "卸船" in op_type:
stats["unloading_count"] += 1
# 收集数值
if row.teu is not None:
stats["teu_values"].append(row.teu)
all_teu_values.append(row.teu)
if row.moves is not None:
stats["moves_values"].append(row.moves)
all_moves_values.append(row.moves)
if row.gross_efficiency is not None:
stats["gross_efficiency_values"].append(row.gross_efficiency)
all_efficiency_values.append(row.gross_efficiency)
if row.net_efficiency is not None:
stats["net_efficiency_values"].append(row.net_efficiency)
# 构建月度汇总
monthly_summary_list = []
for month, stats in sorted(monthly_stats.items()):
teu_values = stats["teu_values"]
moves_values = stats["moves_values"]
gross_eff_values = stats["gross_efficiency_values"]
net_eff_values = stats["net_efficiency_values"]
monthly_summary = MonthlySummary(
month=month,
total_ships=stats["total_ships"],
loading_count=stats["loading_count"],
unloading_count=stats["unloading_count"],
avg_teu=sum(teu_values) / len(teu_values) if teu_values else None,
total_teu=sum(teu_values) if teu_values else None,
avg_moves=sum(moves_values) / len(moves_values) if moves_values else None,
total_moves=sum(moves_values) if moves_values else None,
avg_gross_efficiency=sum(gross_eff_values) / len(gross_eff_values) if gross_eff_values else None,
avg_net_efficiency=sum(net_eff_values) / len(net_eff_values) if net_eff_values else None
)
monthly_summary_list.append(monthly_summary)
# 计算整体统计
date_range = {
"start": min([r.report_date for r in rows if r.report_date], default=None),
"end": max([r.report_date for r in rows if r.report_date], default=None)
}
response = AnalysisSummaryResponse(
total_records=len(rows),
date_range=date_range,
monthly_summary=monthly_summary_list,
operation_type_distribution=operation_type_count,
overall_avg_teu=sum(all_teu_values) / len(all_teu_values) if all_teu_values else None,
overall_avg_efficiency=sum(all_efficiency_values) / len(all_efficiency_values) if all_efficiency_values else None,
total_teu=sum(all_teu_values) if all_teu_values else None,
total_moves=sum(all_moves_values) if all_moves_values else None
)
cache.set(cache_key, response.model_dump())
return response
@app.get("/api/analysis/daily-stats", response_model=DailyStatsResponse)
async def get_daily_stats():
"""
获取按日期的统计数据
按 report_date 分组统计每天的船次数量、TEU总量、Moves总量和平均效率
"""
cache_key = "daily_stats"
cached = cache.get(cache_key)
if cached:
return DailyStatsResponse(**cached)
# 获取表格数据
table_data = await get_table_data()
rows = table_data.data
# 按日期分组
daily_data: Dict[str, List[TableDataRow]] = {}
for row in rows:
date_str = row.report_date
if not date_str:
continue
if date_str not in daily_data:
daily_data[date_str] = []
daily_data[date_str].append(row)
# 构建每日统计
daily_stats_list = []
for date_str, ships in sorted(daily_data.items()):
total_teu = 0.0
total_moves = 0.0
efficiency_values = []
ships_detail = []
for row in ships:
if row.teu is not None:
total_teu += row.teu
if row.moves is not None:
total_moves += row.moves
if row.gross_efficiency is not None:
efficiency_values.append(row.gross_efficiency)
ship_detail = DailyStatsShip(
ship_code=row.ship_code,
ship_name=row.ship_name,
operation_time=row.operation_time,
operation_type=row.operation_type,
teu=row.teu,
moves=row.moves,
gross_efficiency=row.gross_efficiency,
net_efficiency=row.net_efficiency,
page_id=row.page_id,
page_url=row.page_url
)
ships_detail.append(ship_detail)
daily_stat = DailyStats(
date=date_str,
ship_count=len(ships),
total_teu=total_teu,
total_moves=total_moves,
avg_efficiency=sum(efficiency_values) / len(efficiency_values) if efficiency_values else None,
ships=ships_detail
)
daily_stats_list.append(daily_stat)
# 计算汇总统计
total_days = len(daily_stats_list)
max_ships_per_day = max([d.ship_count for d in daily_stats_list]) if daily_stats_list else 0
max_teu_per_day = max([d.total_teu for d in daily_stats_list]) if daily_stats_list else None
response = DailyStatsResponse(
daily_stats=daily_stats_list,
total_days=total_days,
max_ships_per_day=max_ships_per_day,
max_teu_per_day=max_teu_per_day
)
cache.set(cache_key, response.model_dump())
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=settings.APP_HOST, port=settings.APP_PORT)