Files
gloria/metabase/time_operations.py
Developer 5d0cafac32 feat: 交接班报告支持 Confluence/Jira 集成,添加 N/A 记录时间归属功能
- 集成 Confluence API 获取船舶报告数据
- 集成 Jira API 查询故障数量
- 支持船号显示 (462#、463# 等)
- 支持故障次数/故障率、人工介入次数/介入率显示
- 跨班作业使用 Card 69 按时间查询效率
- 不跨班作业使用整船效率(剔除异常)
- N/A 记录根据作业时间归属到对应船舶
- 更新 AGENTS.md 和 README.md 文档
- 删除 daily_report_gui.py
2026-03-14 02:52:23 +08:00

510 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
时间范围作业数据查询模块
用于从 Metabase 查询特定时间段内的作业情况数据。
以时间范围为单位获取作业统计数据。
安装依赖:
pip install requests python-dotenv
环境变量配置(推荐):
MATEBASE_USERNAME=your_username
MATEBASE_PASSWORD=your_password
基本用法:
>>> from time_operations import TimeOperationsClient
>>> client = TimeOperationsClient()
>>> data = client.get_operations_by_time("2026-02-01", "2026-03-01")
>>> print(data['cnt20']) # 2359
"""
import requests
from typing import Optional, Dict, Any, List
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class MetabaseAPIError(Exception):
"""Metabase API 调用异常"""
pass
class MetabaseAuthError(MetabaseAPIError):
"""Metabase 认证异常"""
pass
class MetabaseQueryError(MetabaseAPIError):
"""Metabase 查询异常"""
pass
class TimeOperationsClient:
"""
基于时间的作业数据查询客户端
用于从 Metabase 查询特定时间段内的作业情况数据,包括:
- 箱量统计20尺、40尺、总箱数、TEU
- 装船、卸船、转堆数据
- 无人集卡效率指标cycle/h
Args:
base_url: Metabase 服务地址
username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取)
password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取)
Example:
>>> client = TimeBasedOperationsClient()
>>> data = client.get_operations_by_time("2026-02-01", "2026-03-01")
>>> print(f"20尺箱量: {data['cnt20']}")
"""
# Metabase Card ID 映射表(基础指标-时间筛选)
_CARD_IDS = {
"overview": 50, # 总览 - 箱量统计
"load": 51, # 装船
"discharge": 52, # 卸船
"yardmove": 53, # 转堆
"efficiency": 65, # 圈效率-剔除异常
"efficiency_filtered": 69, # 无人集卡效率指标-剔除异常(时间筛选)
}
def __init__(
self,
base_url: str = "http://10.80.0.11:30001",
username: Optional[str] = None,
password: Optional[str] = None,
):
self.base_url = base_url.rstrip("/")
self.session_token: Optional[str] = None
# 优先使用传入的参数,否则从环境变量读取
self.username = username or os.getenv("MATEBASE_USERNAME")
self.password = password or os.getenv("MATEBASE_PASSWORD")
if not self.username or not self.password:
raise MetabaseAuthError(
"未提供 Metabase 用户名或密码。"
"请通过参数传入,或设置环境变量 "
"MATEBASE_USERNAME 和 MATEBASE_PASSWORD"
)
def _authenticate(self) -> None:
"""认证并获取 session token"""
auth_url = f"{self.base_url}/api/session"
payload = {"username": self.username, "password": self.password}
try:
response = requests.post(auth_url, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
self.session_token = data.get("id")
except requests.exceptions.RequestException as e:
raise MetabaseAuthError(f"Metabase 认证失败: {e}")
def _ensure_authenticated(self) -> None:
"""确保已认证"""
if not self.session_token:
self._authenticate()
def _query_card(self, card_id: int, parameters: List[Dict]) -> Dict:
"""
查询指定 Card
Args:
card_id: Metabase Card ID
parameters: 查询参数列表
Returns:
API 响应数据(字典)
Raises:
MetabaseQueryError: 查询失败
"""
self._ensure_authenticated()
url = f"{self.base_url}/api/card/{card_id}/query"
headers = {
"X-Metabase-Session": self.session_token,
"Content-Type": "application/json",
}
payload = {"parameters": parameters}
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise MetabaseQueryError(f"查询 Metabase Card {card_id} 失败: {e}")
def _extract_row_data(self, response: Dict) -> Optional[Dict]:
"""
从 API 响应中提取第一行数据
Args:
response: API 响应字典
Returns:
列名到值的映射字典,如果没有数据则返回 None
"""
rows = response.get("data", {}).get("rows", [])
cols = response.get("data", {}).get("cols", [])
if not rows:
return None
row_data = rows[0]
result = {}
for i, col in enumerate(cols):
col_name = col.get("name", f"col_{i}")
result[col_name] = row_data[i] if i < len(row_data) else None
return result
def _extract_summed_data(
self, response: Dict, sum_columns: List[str]
) -> Optional[Dict]:
"""
从 API 响应中提取多行数据并汇总指定列
Args:
response: API 响应字典
sum_columns: 需要求和的列名列表
Returns:
列名到汇总值的映射字典,如果没有数据则返回 None
"""
rows = response.get("data", {}).get("rows", [])
cols = response.get("data", {}).get("cols", [])
if not rows:
return None
# 构建列名到索引的映射
col_to_idx = {}
for i, col in enumerate(cols):
col_name = col.get("name", f"col_{i}")
col_to_idx[col_name] = i
# 汇总数据
result = {}
for col_name in sum_columns:
if col_name in col_to_idx:
idx = col_to_idx[col_name]
total = 0
for row in rows:
value = row[idx] if idx < len(row) else 0
if value is not None:
try:
total += float(value)
except (ValueError, TypeError):
pass
result[col_name] = int(total) if total == int(total) else total
else:
result[col_name] = None
return result
def _build_time_parameters(self, start_time: str, end_time: str) -> List[Dict]:
"""
构建时间范围查询参数
Args:
start_time: 开始时间,格式 "YYYY-MM-DD""YYYY-MM-DD HH:MM:SS"
end_time: 结束时间,格式 "YYYY-MM-DD""YYYY-MM-DD HH:MM:SS"
Returns:
参数列表
"""
# 处理日期格式(如果不包含时间,则添加 00:00:00
if len(start_time) == 10: # YYYY-MM-DD
start_value = f"{start_time} 00:00:00"
else:
start_value = start_time
if len(end_time) == 10: # YYYY-MM-DD
end_value = f"{end_time} 23:59:59"
else:
end_value = end_time
return [
{
"type": "date/single",
"target": ["variable", ["template-tag", "time_start"]],
"value": start_value,
},
{
"type": "date/single",
"target": ["variable", ["template-tag", "time_end"]],
"value": end_value,
},
]
def get_operations_by_time(self, start_time: str, end_time: str) -> Dict[str, Any]:
"""
获取指定时间段内的作业统计数据
查询指定时间范围内的箱量统计数据。
Args:
start_time: 开始时间,格式:
- "YYYY-MM-DD" (自动补充为 00:00:00)
- "YYYY-MM-DD HH:MM:SS" (精确时间)
end_time: 结束时间,格式:
- "YYYY-MM-DD" (自动补充为 23:59:59)
- "YYYY-MM-DD HH:MM:SS" (精确时间)
Returns:
包含以下字段的字典:
- cnt20: 20尺箱量
- cnt40: 40尺箱量
- cntAll: 总箱数
- teu: TEU数
Raises:
MetabaseAPIError: API 调用失败
MetabaseAuthError: 认证失败
MetabaseQueryError: 查询失败
Example:
>>> client = TimeOperationsClient()
>>> # 按日期查询
>>> data = client.get_operations_by_time("2026-02-01", "2026-03-01")
>>> # 按精确时间查询
>>> data = client.get_operations_by_time("2026-02-01 08:00:00", "2026-02-01 18:00:00")
>>> print(f"20尺箱量: {data['cnt20']}") # 2359
"""
# 构建时间参数
time_params = self._build_time_parameters(start_time, end_time)
# 查询总览数据(箱量统计)- 需要汇总所有车辆
overview_response = self._query_card(self._CARD_IDS["overview"], time_params)
overview = (
self._extract_summed_data(
overview_response, ["cnt20", "cnt40", "cntAll", "teu"]
)
or {}
)
# 查询装船数据
load_response = self._query_card(self._CARD_IDS["load"], time_params)
load_data = self._extract_summed_data(load_response, ["cnt20", "cnt40"]) or {}
# 查询卸船数据
discharge_response = self._query_card(self._CARD_IDS["discharge"], time_params)
discharge_data = (
self._extract_summed_data(discharge_response, ["cnt20", "cnt40"]) or {}
)
# 查询转堆数据
yardmove_response = self._query_card(self._CARD_IDS["yardmove"], time_params)
yardmove_data = (
self._extract_summed_data(yardmove_response, ["cnt20", "cnt40"]) or {}
)
# 查询效率指标数据
efficiency_response = self._query_card(
self._CARD_IDS["efficiency"], time_params
)
efficiency_data = self._extract_row_data(efficiency_response) or {}
cycle_h_normal = efficiency_data.get("cycle/h")
# 查询剔除异常后的效率指标
efficiency_filtered_response = self._query_card(
self._CARD_IDS["efficiency_filtered"], time_params
)
efficiency_filtered_data = (
self._extract_row_data(efficiency_filtered_response) or {}
)
cycle_h_filtered = efficiency_filtered_data.get("cycle/h")
# 四舍五入到2位小数
cycle_h_normal_rounded = (
round(cycle_h_normal, 2) if cycle_h_normal is not None else None
)
cycle_h_filtered_rounded = (
round(cycle_h_filtered, 2) if cycle_h_filtered is not None else None
)
return {
"cnt20": overview.get("cnt20"),
"cnt40": overview.get("cnt40"),
"cntAll": overview.get("cntAll"),
"teu": overview.get("teu"),
}
def get_efficiency_by_time(self, start_time: str, end_time: str) -> Optional[float]:
"""
获取指定时间段内的效率(剔除异常)
Args:
start_time: 开始时间,格式 "YYYY-MM-DD HH:MM:SS"
end_time: 结束时间,格式 "YYYY-MM-DD HH:MM:SS"
Returns:
效率值cycle/h如果没有数据则返回 None
"""
time_params = self._build_time_parameters(start_time, end_time)
# 查询剔除异常后的效率指标Card 69
efficiency_response = self._query_card(
self._CARD_IDS["efficiency_filtered"], time_params
)
efficiency_data = self._extract_row_data(efficiency_response) or {}
cycle_h_filtered = efficiency_data.get("cycle/h")
if cycle_h_filtered is not None:
return round(cycle_h_filtered, 2)
return None
def get_vessel_efficiency_by_time(
self, vessel_visit_id: str, start_time: str, end_time: str
) -> Optional[float]:
"""
获取指定船舶在指定时间段内的效率(剔除异常)
Args:
vessel_visit_id: 船舶访问ID格式如 "260313-信荣海"
start_time: 开始时间,格式 "YYYY-MM-DD HH:MM:SS"
end_time: 结束时间,格式 "YYYY-MM-DD HH:MM:SS"
Returns:
效率值cycle/h如果没有数据则返回 None
Example:
>>> client = TimeOperationsClient()
>>> efficiency = client.get_vessel_efficiency_by_time(
... "260313-信荣海",
... "2026-03-13 08:00:00",
... "2026-03-13 20:00:00"
... )
>>> print(f"效率: {efficiency}") # 1.66
"""
# 构建时间参数
time_params = self._build_time_parameters(start_time, end_time)
# 添加船舶参数(使用驼峰命名)
vessel_param = {
"type": "string/=",
"target": ["variable", ["template-tag", "vesselVisitId"]],
"value": vessel_visit_id,
}
# 合并参数
params = time_params + [vessel_param]
# 查询剔除异常后的效率指标Card 69
efficiency_response = self._query_card(
self._CARD_IDS["efficiency_filtered"], params
)
efficiency_data = self._extract_row_data(efficiency_response) or {}
cycle_h_filtered = efficiency_data.get("cycle/h")
if cycle_h_filtered is not None:
return round(cycle_h_filtered, 2)
return None
# 工厂函数(推荐用于简单场景)
def create_time_operations_client(
base_url: str = "http://10.80.0.11:30001",
username: Optional[str] = None,
password: Optional[str] = None,
) -> "TimeOperationsClient":
"""
创建时间范围作业数据客户端
这是创建 TimeOperationsClient 实例的便捷工厂函数。
Args:
base_url: Metabase 服务地址
username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取)
password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取)
Returns:
TimeOperationsClient 实例
Example:
>>> from time_operations import create_time_operations_client
>>> client = create_time_operations_client()
>>> data = client.get_operations_by_time("2026-02-01", "2026-03-01")
"""
return TimeOperationsClient(
base_url=base_url,
username=username,
password=password,
)
# 便捷函数
def get_operations_by_time(start_date: str, end_date: str) -> Optional[Dict[str, Any]]:
"""
获取指定时间段内的作业统计数据(便捷函数)
注意:此函数使用默认配置(从环境变量读取账号密码),
在新项目中建议使用 TimeBasedOperationsClient 类或 create_time_operations_client() 函数。
Args:
start_date: 开始日期,格式 "YYYY-MM-DD"
end_date: 结束日期,格式 "YYYY-MM-DD"
Returns:
作业统计数据字典,失败时返回 None
"""
client = create_time_operations_client()
try:
return client.get_operations_by_time(start_date, end_date)
except Exception as e:
print(f"查询失败: {e}")
return None
# 导出公共接口
__all__ = [
"TimeOperationsClient",
"create_time_operations_client",
"get_operations_by_time",
"MetabaseAPIError",
"MetabaseAuthError",
"MetabaseQueryError",
]
# 示例用法
if __name__ == "__main__":
import argparse
import json
# 命令行参数解析
parser = argparse.ArgumentParser(description="获取指定时间段的作业数据")
parser.add_argument(
"--start",
"-s",
type=str,
default="2026-02-01",
help="开始日期,格式 YYYY-MM-DD (默认: 2026-02-01)",
)
parser.add_argument(
"--end",
"-e",
type=str,
default="2026-03-01",
help="结束日期,格式 YYYY-MM-DD (默认: 2026-03-01)",
)
args = parser.parse_args()
# 获取数据
result = get_operations_by_time(args.start, args.end)
if result:
print(f"成功获取 {args.start}{args.end} 的作业数据")
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(f"获取数据失败")