""" 船舶作业数据查询模块 用于从 Metabase 查询船舶作业相关的箱量统计和效率指标数据。 以船舶为单位获取作业情况。 安装依赖: pip install requests python-dotenv 环境变量配置(推荐): MATEBASE_USERNAME=your_username MATEBASE_PASSWORD=your_password 基本用法: >>> from vessel_operations import VesselOperationsClient >>> client = VesselOperationsClient() >>> data = client.get_vessel_operations("260209-华晟67_X") >>> print(data['cnt20']) # 44 """ import requests from typing import Optional, Dict, Any, List import os from dotenv import load_dotenv # 加载环境变量 load_dotenv() class MetabaseAPIError(Exception): """Metabase API 调用异常""" pass class MetabaseAuthError(MetabaseAPIError): """Metabase 认证异常""" pass class MetabaseQueryError(MetabaseAPIError): """Metabase 查询异常""" pass class VesselOperationsClient: """ 船舶数据查询客户端 用于从 Metabase 查询船舶相关数据,包括: - 箱量统计(20尺、40尺、总箱数、TEU) - 无人集卡效率指标(cycle/h) Args: base_url: Metabase 服务地址 username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) Example: >>> client = VesselOperationsClient() >>> data = client.get_vessel_operations("260209-华晟67_X") >>> print(f"20尺箱量: {data['cnt20']}") """ # Metabase Card ID 映射表 _CARD_IDS = { "overview": 57, # 总览 - 箱量统计 "efficiency_normal": 64, # 无人集卡效率指标 "efficiency_filtered": 70, # 无人集卡效率指标-剔除异常 "work_info": 74, # 船舶作业指令时间(AT_WorkInfo) } def __init__( self, base_url: str = "http://10.80.0.11:30001", username: Optional[str] = None, password: Optional[str] = None, ): self.base_url = base_url.rstrip("/") self.session_token: Optional[str] = None # 优先使用传入的参数,否则从环境变量读取 self.username = username or os.getenv("MATEBASE_USERNAME") self.password = password or os.getenv("MATEBASE_PASSWORD") if not self.username or not self.password: raise MetabaseAuthError( "未提供 Metabase 用户名或密码。" "请通过参数传入,或设置环境变量 " "MATEBASE_USERNAME 和 MATEBASE_PASSWORD" ) def _authenticate(self) -> None: """认证并获取 session token""" auth_url = f"{self.base_url}/api/session" payload = {"username": self.username, "password": self.password} try: response = requests.post(auth_url, json=payload, timeout=30) response.raise_for_status() data = response.json() self.session_token = data.get("id") except requests.exceptions.RequestException as e: raise MetabaseAuthError(f"Metabase 认证失败: {e}") def _ensure_authenticated(self) -> None: """确保已认证""" if not self.session_token: self._authenticate() def _query_card(self, card_id: int, parameters: List[Dict]) -> Dict: """ 查询指定 Card Args: card_id: Metabase Card ID parameters: 查询参数列表 Returns: API 响应数据(字典) Raises: MetabaseQueryError: 查询失败 """ self._ensure_authenticated() url = f"{self.base_url}/api/card/{card_id}/query" headers = { "X-Metabase-Session": self.session_token, "Content-Type": "application/json", } payload = {"parameters": parameters} try: response = requests.post(url, headers=headers, json=payload, timeout=30) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise MetabaseQueryError(f"查询 Metabase Card {card_id} 失败: {e}") def _extract_row_data(self, response: Dict) -> Optional[Dict]: """ 从 API 响应中提取第一行数据 Args: response: API 响应字典 Returns: 列名到值的映射字典,如果没有数据则返回 None """ rows = response.get("data", {}).get("rows", []) cols = response.get("data", {}).get("cols", []) if not rows: return None row_data = rows[0] result = {} for i, col in enumerate(cols): col_name = col.get("name", f"col_{i}") result[col_name] = row_data[i] if i < len(row_data) else None return result def _extract_work_time_range(self, response: Dict) -> Dict[str, Optional[str]]: """ 从作业指令数据中提取第一条指令开始时间和最后一条指令结束时间 Args: response: API 响应字典 Returns: 包含 start_time 和 end_time 的字典,如果没有数据则返回 None """ rows = response.get("data", {}).get("rows", []) cols = response.get("data", {}).get("cols", []) if not rows: return {"start_time": None, "end_time": None} # 构建列名到索引的映射 col_to_idx = {} for i, col in enumerate(cols): col_name = col.get("name", f"col_{i}") col_to_idx[col_name] = i # 假设有 start_time 和 end_time 列,找到第一条开始时间和最后一条结束时间 start_time_idx = col_to_idx.get("start_time") end_time_idx = col_to_idx.get("end_time") if start_time_idx is None or end_time_idx is None: return {"start_time": None, "end_time": None} # 获取第一条指令的开始时间 first_start_time = ( rows[0][start_time_idx] if len(rows[0]) > start_time_idx else None ) # 获取最后一条指令的结束时间 last_end_time = rows[-1][end_time_idx] if len(rows[-1]) > end_time_idx else None # 处理时间格式,只保留到秒(去掉毫秒) if first_start_time and isinstance(first_start_time, str): # 去掉小数点及后面的部分 first_start_time = first_start_time.split(".")[0] if last_end_time and isinstance(last_end_time, str): last_end_time = last_end_time.split(".")[0] return {"start_time": first_start_time, "end_time": last_end_time} def get_vessel_operations(self, vessel_visit_id: str) -> Dict[str, Any]: """ 获取船舶统计数据 查询指定船舶的箱量统计和效率指标数据。 Args: vessel_visit_id: 船舶访问ID,格式如 "260209-华晟67_X" Returns: 包含以下字段的字典: - vessel_visit_id: 船舶访问ID - cnt20: 20尺箱量 - cnt40: 40尺箱量 - cntAll: 总箱数 - teu: TEU数 - cycle_h_normal: 无人集卡效率指标 (cycle/h) - cycle_h_filtered: 无人集卡效率指标-剔除异常 (cycle/h) - start_time: 第一条指令开始时间 - end_time: 最后一条指令结束时间 Raises: MetabaseAPIError: API 调用失败 MetabaseAuthError: 认证失败 MetabaseQueryError: 查询失败 Example: >>> client = VesselOperationsClient() >>> data = client.get_vessel_operations("260209-华晟67_X") >>> print(f"20尺箱量: {data['cnt20']}") # 44 >>> print(f"效率指标: {data['cycle_h_normal']}") # 2.35 >>> print(f"作业开始: {data['start_time']}") # 2026-02-01 08:00:00 >>> print(f"作业结束: {data['end_time']}") # 2026-02-01 18:00:00 """ # 查询总览数据(箱量统计) vessel_param = { "type": "string/=", "target": ["variable", ["template-tag", "vesselVisitID"]], "value": vessel_visit_id, } overview_response = self._query_card(self._CARD_IDS["overview"], [vessel_param]) overview = self._extract_row_data(overview_response) or {} # 查询效率指标数据 efficiency_param = { "type": "string/=", "target": ["variable", ["template-tag", "vesselVisitId"]], "value": vessel_visit_id, } # Card 64: 无人集卡效率指标 normal_response = self._query_card( self._CARD_IDS["efficiency_normal"], [efficiency_param] ) normal_data = self._extract_row_data(normal_response) or {} cycle_h_normal = normal_data.get("cycle/h") # Card 70: 无人集卡效率指标-剔除异常 filtered_response = self._query_card( self._CARD_IDS["efficiency_filtered"], [efficiency_param] ) filtered_data = self._extract_row_data(filtered_response) or {} cycle_h_filtered = filtered_data.get("cycle/h") # Card 74: 船舶作业指令时间(AT_WorkInfo) # 需要同时传递 vesselVisitID 和 vehicleId 两个参数 vehicle_param_all = { "type": "string/=", "target": ["variable", ["template-tag", "vehicleId"]], "value": "ALL", # 使用 ALL 获取所有车辆的作业指令 } work_info_response = self._query_card( self._CARD_IDS["work_info"], [vessel_param, vehicle_param_all] ) work_time_range = self._extract_work_time_range(work_info_response) # 四舍五入到2位小数 cycle_h_normal_rounded = ( round(cycle_h_normal, 2) if cycle_h_normal is not None else None ) cycle_h_filtered_rounded = ( round(cycle_h_filtered, 2) if cycle_h_filtered is not None else None ) return { "vessel_visit_id": vessel_visit_id, "cnt20": overview.get("cnt20"), "cnt40": overview.get("cnt40"), "cntAll": overview.get("cntAll"), "teu": overview.get("teu"), "cycle_h_normal": cycle_h_normal_rounded, "cycle_h_filtered": cycle_h_filtered_rounded, "start_time": work_time_range.get("start_time"), "end_time": work_time_range.get("end_time"), } # 工厂函数(推荐用于简单场景) def create_vessel_operations_client( base_url: str = "http://10.80.0.11:30001", username: Optional[str] = None, password: Optional[str] = None, ) -> VesselOperationsClient: """ 创建船舶数据客户端 这是创建 VesselOperationsClient 实例的便捷工厂函数。 Args: base_url: Metabase 服务地址 username: 用户名(默认从环境变量 MATEBASE_USERNAME 读取) password: 密码(默认从环境变量 MATEBASE_PASSWORD 读取) Returns: VesselOperationsClient 实例 Example: >>> from metabase_vessel_client import create_vessel_operations_client >>> client = create_vessel_operations_client() >>> data = client.get_vessel_operations("260209-华晟67_X") """ return VesselOperationsClient( base_url=base_url, username=username, password=password, ) # 向后兼容:保留旧函数名 def get_vessel_by_visit_id(vessel_visit_id: str) -> Dict[str, Any]: """ 获取船舶统计数据(兼容旧版本) 注意:此函数使用默认配置(从环境变量读取账号密码), 在新项目中建议使用 VesselOperationsClient 类或 create_vessel_operations_client() 函数。 Args: vessel_visit_id: 船舶访问ID Returns: 船舶统计数据字典 """ client = create_vessel_operations_client() return client.get_vessel_operations(vessel_visit_id) # 导出公共接口 __all__ = [ "VesselOperationsClient", "create_vessel_operations_client", "get_vessel_by_visit_id", "MetabaseAPIError", "MetabaseAuthError", "MetabaseQueryError", ] # 命令行入口点 if __name__ == "__main__": import sys import json # 解析命令行参数 if len(sys.argv) < 2: print("用法: python3 vessel_operations.py <船舶号>") print("示例: python3 vessel_operations.py 260209-德盛6") sys.exit(1) vessel_visit_id = sys.argv[1] try: # 创建客户端并查询数据 client = VesselOperationsClient() data = client.get_vessel_operations(vessel_visit_id) # 输出 JSON 格式的结果 print(json.dumps(data, indent=2, ensure_ascii=False)) except MetabaseAuthError as e: print(f"认证失败: {e}", file=sys.stderr) sys.exit(1) except MetabaseQueryError as e: print(f"查询失败: {e}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"错误: {e}", file=sys.stderr) sys.exit(1)