Files
gloria/jira_client.py
Developer 5d0cafac32 feat: 交接班报告支持 Confluence/Jira 集成,添加 N/A 记录时间归属功能
- 集成 Confluence API 获取船舶报告数据
- 集成 Jira API 查询故障数量
- 支持船号显示 (462#、463# 等)
- 支持故障次数/故障率、人工介入次数/介入率显示
- 跨班作业使用 Card 69 按时间查询效率
- 不跨班作业使用整船效率(剔除异常)
- N/A 记录根据作业时间归属到对应船舶
- 更新 AGENTS.md 和 README.md 文档
- 删除 daily_report_gui.py
2026-03-14 02:52:23 +08:00

110 lines
3.3 KiB
Python

"""
Jira API 客户端
用于查询 Jira issue 数量。
"""
import requests
import json
from typing import Optional, Dict, List, Any
import os
class JiraClient:
"""Jira API 客户端"""
def __init__(self, base_url: str, username: str, token: str):
"""
初始化 Jira 客户端
Args:
base_url: Jira 基础 URL (如 https://jira.example.com)
username: Jira 用户名
token: Jira API Token
"""
self.base_url = base_url.rstrip("/")
self.username = username
self.token = token
# 尝试 Bearer token 认证方式
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {token}",
}
# 同时保留基本认证作为备选
self.auth = (username, token)
def search_issues(self, jql: str, max_results: int = 100) -> List[Dict[str, Any]]:
"""
搜索 Jira issues
Args:
jql: JQL 查询语句
max_results: 最大返回数量
Returns:
Issue 列表
"""
url = f"{self.base_url}/rest/api/2/search"
headers = self.headers.copy()
params = {"jql": jql, "maxResults": max_results, "fields": "key"}
try:
# 先尝试 Bearer token 认证
response = requests.get(url, headers=headers, params=params, timeout=30)
# 如果失败,尝试基本认证
if response.status_code == 401:
headers.pop("Authorization", None)
response = requests.get(
url, headers=headers, auth=self.auth, params=params, timeout=30
)
response.raise_for_status()
data = response.json()
return data.get("issues", [])
except requests.exceptions.RequestException as e:
print(f"Jira 查询失败: {e}")
return []
def count_issues(self, jql: str) -> int:
"""
统计满足条件的 issue 数量
Args:
jql: JQL 查询语句
Returns:
Issue 数量
"""
url = f"{self.base_url}/rest/api/2/search"
headers = self.headers.copy()
params = {"jql": jql, "maxResults": 0}
try:
# 先尝试 Bearer token 认证
response = requests.get(url, headers=headers, params=params, timeout=30)
# 如果失败,尝试基本认证
if response.status_code == 401:
headers.pop("Authorization", None)
response = requests.get(
url, headers=headers, auth=self.auth, params=params, timeout=30
)
response.raise_for_status()
data = response.json()
return data.get("total", 0)
except requests.exceptions.RequestException as e:
print(f"Jira 查询失败: {e}")
return 0
def get_issue_keys(self, jql: str) -> List[str]:
"""
获取满足条件的 issue keys
Args:
jql: JQL 查询语句
Returns:
Issue key 列表 (如 ["FZ-2042", "FZ-2043"])
"""
issues = self.search_issues(jql)
return [str(issue.get("key")) for issue in issues if issue.get("key")]