6.4 レポート機能
監視データの体系的集計・分析・配信を実現するZabbixレポート機能の包括的活用法
概要
レポート機能は、Zabbixで収集した監視データを定期的に集計・分析し、関係者に適切な形で配信する重要な機能です。効果的なレポート体制により、データドリブンな意思決定、SLA管理、コンプライアンス対応、そして継続的なシステム改善が可能になります。
レポート機能の価値
要素 | 効果 | 適用場面 |
---|---|---|
定期配信 | 継続的な状況把握 | 管理層報告・定例会議 |
SLA管理 | サービス品質の可視化 | 顧客報告・契約管理 |
トレンド分析 | 長期的パターン把握 | 容量計画・予算策定 |
コンプライアンス | 規制要件への対応 | 監査・証跡管理 |
自動化 | 運用工数の削減 | 効率的な情報配信 |
定期レポート設定
スケジュールレポート
基本的な定期レポート設定
yaml
# 週次レポート設定
レポート名: "Weekly Infrastructure Report"
レポートタイプ: "カスタムレポート"
スケジュール設定:
実行頻度: "週次"
実行曜日: "月曜日"
実行時刻: "09:00"
タイムゾーン: "Asia/Tokyo"
対象期間:
データ範囲: "過去7日間"
除外期間: "メンテナンス時間"
配信設定:
送信先:
- "[email protected]"
- "[email protected]"
件名: "週次インフラレポート - {REPORT.DATE}"
形式: "PDF + Excel"
複数レポートの統合管理
python
#!/usr/bin/env python3
"""Zabbix レポート自動化スクリプト"""
import schedule
import time
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
class ZabbixReportScheduler:
def __init__(self, zabbix_url: str, auth_token: str, smtp_config: Dict[str, str]):
self.zabbix_url = zabbix_url
self.auth_token = auth_token
self.smtp_config = smtp_config
self.report_templates = {}
self.load_report_templates()
self.setup_schedules()
def load_report_templates(self):
"""レポートテンプレート読み込み"""
self.report_templates = {
"daily_executive": {
"name": "Daily Executive Summary",
"schedule": "daily",
"time": "08:00",
"recipients": ["[email protected]", "[email protected]"],
"sections": [
{"type": "sla_summary", "period": "24h"},
{"type": "critical_issues", "severity": "high"},
{"type": "capacity_overview", "metrics": ["cpu", "memory", "disk"]}
]
},
"weekly_technical": {
"name": "Weekly Technical Report",
"schedule": "weekly",
"time": "monday-09:00",
"recipients": ["[email protected]"],
"sections": [
{"type": "performance_trends", "period": "7d"},
{"type": "incident_analysis", "period": "7d"},
{"type": "capacity_forecast", "period": "30d"},
{"type": "maintenance_summary", "period": "7d"}
]
},
"monthly_management": {
"name": "Monthly Management Report",
"schedule": "monthly",
"time": "first-monday-10:00",
"recipients": ["[email protected]"],
"sections": [
{"type": "sla_detailed", "period": "30d"},
{"type": "cost_analysis", "period": "30d"},
{"type": "growth_metrics", "period": "30d"},
{"type": "improvement_recommendations", "period": "30d"}
]
}
}
def setup_schedules(self):
"""スケジュール設定"""
for report_id, config in self.report_templates.items():
if config["schedule"] == "daily":
schedule.every().day.at(config["time"]).do(
self.generate_and_send_report, report_id
)
elif config["schedule"] == "weekly":
day, time = config["time"].split("-")
getattr(schedule.every(), day.lower()).at(time).do(
self.generate_and_send_report, report_id
)
elif config["schedule"] == "monthly":
# 月初の実装は簡略化
schedule.every().monday.at("10:00").do(
self.check_monthly_schedule, report_id
)
def generate_and_send_report(self, report_id: str):
"""レポート生成・送信"""
try:
config = self.report_templates[report_id]
# レポートデータ収集
report_data = self.collect_report_data(config["sections"])
# レポート生成
report_content = self.generate_report_content(config, report_data)
# PDF/Excel生成
pdf_file = self.generate_pdf_report(config["name"], report_content)
excel_file = self.generate_excel_report(config["name"], report_data)
# メール送信
self.send_report_email(config, report_content, [pdf_file, excel_file])
print(f"Report {report_id} generated and sent successfully")
except Exception as e:
print(f"Error generating report {report_id}: {e}")
self.send_error_notification(report_id, str(e))
def collect_report_data(self, sections: List[Dict[str, Any]]) -> Dict[str, Any]:
"""レポートデータ収集"""
report_data = {}
for section in sections:
section_type = section["type"]
if section_type == "sla_summary":
report_data["sla"] = self.get_sla_data(section["period"])
elif section_type == "critical_issues":
report_data["issues"] = self.get_critical_issues(section["severity"])
elif section_type == "capacity_overview":
report_data["capacity"] = self.get_capacity_data(section["metrics"])
elif section_type == "performance_trends":
report_data["performance"] = self.get_performance_trends(section["period"])
elif section_type == "incident_analysis":
report_data["incidents"] = self.get_incident_analysis(section["period"])
return report_data
def get_sla_data(self, period: str) -> Dict[str, Any]:
"""SLAデータ取得"""
# 期間パース
hours = self.parse_period_to_hours(period)
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# ITサービス情報取得
services_data = {
"jsonrpc": "2.0",
"method": "service.get",
"params": {
"output": ["serviceid", "name", "algorithm", "goodsla"],
"selectTrigger": ["triggerid", "description"],
"selectProblemEvents": "extend"
},
"auth": self.auth_token,
"id": 1
}
response = requests.post(self.zabbix_url, json=services_data)
services = response.json().get("result", [])
sla_results = []
for service in services:
# SLA計算
total_time = hours * 3600 # 秒
downtime = 0
# 問題イベントから停止時間計算
for event in service.get("selectProblemEvents", []):
event_start = int(event["clock"])
event_end = int(event.get("r_clock", 0)) or int(end_time.timestamp())
# 期間内の停止時間
if event_start < int(end_time.timestamp()) and event_end > int(start_time.timestamp()):
overlap_start = max(event_start, int(start_time.timestamp()))
overlap_end = min(event_end, int(end_time.timestamp()))
downtime += max(0, overlap_end - overlap_start)
uptime = total_time - downtime
availability = (uptime / total_time) * 100 if total_time > 0 else 100
sla_results.append({
"service_name": service["name"],
"target_sla": float(service.get("goodsla", 99.9)),
"actual_sla": round(availability, 3),
"status": "達成" if availability >= float(service.get("goodsla", 99.9)) else "未達成",
"downtime_minutes": round(downtime / 60, 2)
})
return {
"period": period,
"services": sla_results,
"overall_availability": round(sum(s["actual_sla"] for s in sla_results) / len(sla_results), 3) if sla_results else 100
}
def get_critical_issues(self, severity: str) -> Dict[str, Any]:
"""重要問題取得"""
severity_map = {
"high": ["4", "5"], # High, Disaster
"medium": ["3", "4", "5"], # Warning, High, Disaster
"all": ["1", "2", "3", "4", "5"]
}
problems_data = {
"jsonrpc": "2.0",
"method": "problem.get",
"params": {
"output": ["eventid", "objectid", "clock", "name", "severity"],
"selectHosts": ["hostid", "host", "name"],
"selectTriggers": ["triggerid", "description", "priority"],
"severities": severity_map.get(severity, ["4", "5"]),
"recent": True,
"sortfield": ["clock"],
"sortorder": "DESC"
},
"auth": self.auth_token,
"id": 1
}
response = requests.post(self.zabbix_url, json=problems_data)
problems = response.json().get("result", [])
critical_issues = []
for problem in problems[:20]: # 上位20件
critical_issues.append({
"time": datetime.fromtimestamp(int(problem["clock"])).strftime("%Y-%m-%d %H:%M:%S"),
"host": problem["selectHosts"][0]["name"] if problem["selectHosts"] else "Unknown",
"description": problem["name"],
"severity": self.get_severity_name(problem["severity"]),
"duration": self.calculate_duration(int(problem["clock"]))
})
return {
"count": len(problems),
"issues": critical_issues,
"by_severity": self.group_by_severity(problems)
}
def get_capacity_data(self, metrics: List[str]) -> Dict[str, Any]:
"""容量データ取得"""
capacity_data = {}
for metric in metrics:
if metric == "cpu":
capacity_data["cpu"] = self.get_cpu_capacity()
elif metric == "memory":
capacity_data["memory"] = self.get_memory_capacity()
elif metric == "disk":
capacity_data["disk"] = self.get_disk_capacity()
return capacity_data
def generate_report_content(self, config: Dict[str, Any],
report_data: Dict[str, Any]) -> str:
"""レポートコンテンツ生成"""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>{config['name']}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
.section {{ margin: 20px 0; }}
.metric {{ margin: 10px 0; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.status-ok {{ color: green; }}
.status-warning {{ color: orange; }}
.status-critical {{ color: red; }}
</style>
</head>
<body>
<div class="header">
<h1>{config['name']}</h1>
<p>生成日時: {datetime.now().strftime("%Y年%m月%d日 %H:%M")}</p>
</div>
"""
# SLAセクション
if "sla" in report_data:
html_content += self.generate_sla_section(report_data["sla"])
# 重要問題セクション
if "issues" in report_data:
html_content += self.generate_issues_section(report_data["issues"])
# 容量セクション
if "capacity" in report_data:
html_content += self.generate_capacity_section(report_data["capacity"])
html_content += """
</body>
</html>
"""
return html_content
def generate_sla_section(self, sla_data: Dict[str, Any]) -> str:
"""SLAセクション生成"""
content = f"""
<div class="section">
<h2>SLA状況</h2>
<p><strong>対象期間:</strong> {sla_data['period']}</p>
<p><strong>全体可用性:</strong> {sla_data['overall_availability']}%</p>
<table>
<tr>
<th>サービス名</th>
<th>目標SLA</th>
<th>実績SLA</th>
<th>状態</th>
<th>停止時間(分)</th>
</tr>
"""
for service in sla_data["services"]:
status_class = "status-ok" if service["status"] == "達成" else "status-critical"
content += f"""
<tr>
<td>{service['service_name']}</td>
<td>{service['target_sla']}%</td>
<td>{service['actual_sla']}%</td>
<td class="{status_class}">{service['status']}</td>
<td>{service['downtime_minutes']}</td>
</tr>
"""
content += """
</table>
</div>
"""
return content
def send_report_email(self, config: Dict[str, Any],
html_content: str, attachments: List[str]):
"""レポートメール送信"""
msg = MIMEMultipart()
msg['From'] = self.smtp_config['from_addr']
msg['Subject'] = f"{config['name']} - {datetime.now().strftime('%Y/%m/%d')}"
# HTML本文
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 添付ファイル
for file_path in attachments:
if file_path and os.path.exists(file_path):
with open(file_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {os.path.basename(file_path)}'
)
msg.attach(part)
# 送信
with smtplib.SMTP(self.smtp_config['smtp_server'], self.smtp_config['smtp_port']) as server:
if self.smtp_config.get('use_tls'):
server.starttls()
if self.smtp_config.get('username'):
server.login(self.smtp_config['username'], self.smtp_config['password'])
for recipient in config['recipients']:
msg['To'] = recipient
server.send_message(msg)
del msg['To']
def run_scheduler(self):
"""スケジューラー実行"""
print("Zabbix Report Scheduler started")
while True:
schedule.run_pending()
time.sleep(60) # 1分ごとにチェック
# 使用例
def main():
smtp_config = {
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': '[email protected]',
'password': 'password',
'from_addr': 'Zabbix Reports <[email protected]>',
'use_tls': True
}
scheduler = ZabbixReportScheduler(
zabbix_url="https://zabbix.example.com/api_jsonrpc.php",
auth_token="auth_token_here",
smtp_config=smtp_config
)
scheduler.run_scheduler()
if __name__ == "__main__":
main()
SLA レポート
サービスレベル分析
詳細SLAレポート生成
yaml
# SLAレポート設定
SLAレポート構成:
計算期間:
日次: "過去24時間"
週次: "過去7日間"
月次: "過去30日間"
四半期: "過去90日間"
年次: "過去365日間"
集計レベル:
サービス別:
- Webサイト
- APIサービス
- データベース
- バッチ処理
重要度別:
- 重要: 99.9%
- 標準: 99.5%
- 一般: 99.0%
地域別:
- 東京リージョン
- 大阪リージョン
- クラウドリージョン
# SLA計算方式
計算ロジック:
可用性計算:
式: "(総時間 - 停止時間) / 総時間 × 100"
除外条件:
- 計画メンテナンス
- 外部要因による停止
- 同意された緊急メンテナンス
平均応答時間:
測定間隔: "1分"
集計方法: "95パーセンタイル"
タイムアウト: "30秒"
エラー率:
計算: "エラー数 / 総リクエスト数 × 100"
閾値: "< 1%"
高度なSLA分析
python
#!/usr/bin/env python3
"""高度なSLA分析レポート"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
class AdvancedSLAAnalyzer:
def __init__(self, zabbix_api):
self.api = zabbix_api
self.sla_thresholds = {
"critical": 99.9,
"important": 99.5,
"standard": 99.0
}
def generate_comprehensive_sla_report(self, period_days: int = 30) -> Dict[str, Any]:
"""包括的SLAレポート生成"""
end_time = datetime.now()
start_time = end_time - timedelta(days=period_days)
# データ収集
services_data = self.collect_services_data(start_time, end_time)
# 分析実行
analysis_results = {
"period": {
"start": start_time.isoformat(),
"end": end_time.isoformat(),
"days": period_days
},
"executive_summary": self.generate_executive_summary(services_data),
"service_details": self.analyze_service_details(services_data),
"trend_analysis": self.perform_trend_analysis(services_data, period_days),
"breach_analysis": self.analyze_sla_breaches(services_data),
"recommendations": self.generate_recommendations(services_data),
"financial_impact": self.calculate_financial_impact(services_data)
}
return analysis_results
def generate_executive_summary(self, services_data: List[Dict]) -> Dict[str, Any]:
"""エグゼクティブサマリー生成"""
total_services = len(services_data)
services_meeting_sla = sum(1 for s in services_data if s["availability"] >= s["target_sla"])
# 全体可用性
overall_availability = np.mean([s["availability"] for s in services_data])
# 重要度別集計
by_criticality = {}
for criticality in ["critical", "important", "standard"]:
filtered_services = [s for s in services_data if s["criticality"] == criticality]
if filtered_services:
by_criticality[criticality] = {
"count": len(filtered_services),
"avg_availability": np.mean([s["availability"] for s in filtered_services]),
"meeting_sla": sum(1 for s in filtered_services if s["availability"] >= s["target_sla"])
}
return {
"total_services": total_services,
"services_meeting_sla": services_meeting_sla,
"sla_compliance_rate": (services_meeting_sla / total_services * 100) if total_services > 0 else 100,
"overall_availability": round(overall_availability, 3),
"by_criticality": by_criticality,
"key_metrics": {
"best_performing_service": max(services_data, key=lambda x: x["availability"])["name"],
"worst_performing_service": min(services_data, key=lambda x: x["availability"])["name"],
"total_downtime_hours": sum(s["downtime_hours"] for s in services_data),
"major_incidents": sum(1 for s in services_data if s["major_incidents"] > 0)
}
}
def analyze_service_details(self, services_data: List[Dict]) -> List[Dict[str, Any]]:
"""サービス詳細分析"""
detailed_analysis = []
for service in services_data:
# パフォーマンス分析
performance_metrics = self.calculate_performance_metrics(service)
# 傾向分析
trend_data = self.analyze_service_trend(service)
# インシデント分析
incident_analysis = self.analyze_service_incidents(service)
detailed_analysis.append({
"service_name": service["name"],
"availability": {
"actual": service["availability"],
"target": service["target_sla"],
"status": "合格" if service["availability"] >= service["target_sla"] else "不合格",
"gap": service["availability"] - service["target_sla"]
},
"performance": performance_metrics,
"trends": trend_data,
"incidents": incident_analysis,
"reliability_score": self.calculate_reliability_score(service)
})
return detailed_analysis
def perform_trend_analysis(self, services_data: List[Dict], period_days: int) -> Dict[str, Any]:
"""トレンド分析"""
# 日別可用性トレンド
daily_availability = []
current_date = datetime.now() - timedelta(days=period_days)
while current_date < datetime.now():
day_availability = self.calculate_daily_availability(services_data, current_date)
daily_availability.append({
"date": current_date.strftime("%Y-%m-%d"),
"availability": day_availability
})
current_date += timedelta(days=1)
# 統計分析
availabilities = [d["availability"] for d in daily_availability]
trend_analysis = {
"daily_availability": daily_availability,
"statistics": {
"mean": np.mean(availabilities),
"median": np.median(availabilities),
"std_dev": np.std(availabilities),
"min": np.min(availabilities),
"max": np.max(availabilities)
},
"trend_direction": self.calculate_trend_direction(availabilities),
"seasonal_patterns": self.identify_seasonal_patterns(daily_availability),
"forecast": self.generate_availability_forecast(availabilities)
}
return trend_analysis
def analyze_sla_breaches(self, services_data: List[Dict]) -> Dict[str, Any]:
"""SLA違反分析"""
breaches = []
for service in services_data:
if service["availability"] < service["target_sla"]:
breach_analysis = {
"service_name": service["name"],
"target_sla": service["target_sla"],
"actual_sla": service["availability"],
"breach_magnitude": service["target_sla"] - service["availability"],
"downtime_hours": service["downtime_hours"],
"incident_count": len(service.get("incidents", [])),
"root_causes": self.identify_root_causes(service),
"business_impact": self.assess_business_impact(service)
}
breaches.append(breach_analysis)
# 違反パターン分析
breach_patterns = self.analyze_breach_patterns(breaches)
return {
"total_breaches": len(breaches),
"breach_details": breaches,
"patterns": breach_patterns,
"common_causes": self.identify_common_causes(breaches),
"improvement_areas": self.identify_improvement_areas(breaches)
}
def generate_recommendations(self, services_data: List[Dict]) -> List[Dict[str, Any]]:
"""改善提案生成"""
recommendations = []
# パフォーマンス改善提案
performance_recommendations = self.analyze_performance_improvements(services_data)
recommendations.extend(performance_recommendations)
# 可用性改善提案
availability_recommendations = self.analyze_availability_improvements(services_data)
recommendations.extend(availability_recommendations)
# インフラ改善提案
infrastructure_recommendations = self.analyze_infrastructure_improvements(services_data)
recommendations.extend(infrastructure_recommendations)
# 優先度付け
prioritized_recommendations = sorted(
recommendations,
key=lambda x: (x["impact_score"], x["effort_score"]),
reverse=True
)
return prioritized_recommendations[:10] # 上位10件
def calculate_financial_impact(self, services_data: List[Dict]) -> Dict[str, Any]:
"""財務影響計算"""
# SLA違反ペナルティ計算
total_penalty = 0
penalty_details = []
for service in services_data:
if service["availability"] < service["target_sla"]:
# ペナルティ率計算(例:1%違反で契約額の0.1%)
breach_percentage = service["target_sla"] - service["availability"]
penalty_rate = breach_percentage * 0.1 # 0.1% per 1% breach
service_penalty = service.get("contract_value", 0) * penalty_rate / 100
total_penalty += service_penalty
penalty_details.append({
"service": service["name"],
"breach_percentage": breach_percentage,
"penalty_amount": service_penalty
})
# 機会損失計算
opportunity_cost = self.calculate_opportunity_cost(services_data)
# ROI計算
improvement_cost = self.estimate_improvement_cost(services_data)
potential_savings = total_penalty + opportunity_cost
roi = ((potential_savings - improvement_cost) / improvement_cost * 100) if improvement_cost > 0 else 0
return {
"sla_penalties": {
"total": total_penalty,
"details": penalty_details
},
"opportunity_cost": opportunity_cost,
"improvement_investment": improvement_cost,
"potential_savings": potential_savings,
"roi_percentage": round(roi, 2),
"payback_period_months": (improvement_cost / (potential_savings / 12)) if potential_savings > 0 else float('inf')
}
def generate_sla_dashboard_data(self, services_data: List[Dict]) -> Dict[str, Any]:
"""SLAダッシュボード用データ生成"""
return {
"summary_cards": {
"total_services": len(services_data),
"services_meeting_sla": sum(1 for s in services_data if s["availability"] >= s["target_sla"]),
"overall_availability": round(np.mean([s["availability"] for s in services_data]), 2),
"total_incidents": sum(len(s.get("incidents", [])) for s in services_data)
},
"availability_trend": self.generate_availability_trend_data(services_data),
"service_status": [
{
"name": s["name"],
"availability": s["availability"],
"target": s["target_sla"],
"status": "OK" if s["availability"] >= s["target_sla"] else "BREACH"
}
for s in services_data
],
"incident_timeline": self.generate_incident_timeline(services_data),
"performance_heatmap": self.generate_performance_heatmap(services_data)
}
# 使用例
def main():
analyzer = AdvancedSLAAnalyzer(zabbix_api)
# 月次SLAレポート生成
monthly_report = analyzer.generate_comprehensive_sla_report(period_days=30)
# レポート出力
with open(f"sla_report_{datetime.now().strftime('%Y%m')}.json", "w") as f:
json.dump(monthly_report, f, indent=2, ensure_ascii=False)
print("SLA analysis completed")
if __name__ == "__main__":
main()
可用性レポート
アベイラビリティ分析
包括的可用性レポート
yaml
# 可用性レポート設定
アベイラビリティ分析:
計算基準:
測定間隔: "1分"
判定基準: "エージェント応答"
タイムアウト: "30秒"
リトライ回数: "3回"
除外条件:
計画メンテナンス: "事前承認済み"
緊急メンテナンス: "24時間以内承認"
外部要因: "ISP・電力等外部障害"
テスト環境: "開発・検証環境"
集計レベル:
ホスト別:
- 個別サーバー可用性
- ホストグループ別集計
- 地理的分散別集計
サービス別:
- アプリケーションサービス
- インフラサービス
- ネットワークサービス
時間軸別:
- 時間別(24時間)
- 日別(月間)
- 週別(四半期)
- 月別(年間)
# 可用性指標
KPI定義:
MTTF (Mean Time To Failure):
計算: "正常稼働時間の平均"
目標: "> 720時間 (30日)"
MTTR (Mean Time To Repair):
計算: "障害発生から復旧までの平均時間"
目標: "< 4時間"
MTBF (Mean Time Between Failures):
計算: "MTTF + MTTR"
目標: "> 730時間"
Availability:
計算: "MTTF / (MTTF + MTTR)"
目標: "> 99.5%"
パフォーマンスレポート
総合パフォーマンス分析
python
#!/usr/bin/env python3
"""パフォーマンスレポート生成"""
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class PerformanceReporter:
def __init__(self, zabbix_api):
self.api = zabbix_api
self.performance_thresholds = {
"cpu_warning": 80,
"cpu_critical": 90,
"memory_warning": 85,
"memory_critical": 95,
"disk_warning": 80,
"disk_critical": 90,
"response_time_warning": 2000, # ms
"response_time_critical": 5000
}
def generate_performance_report(self, hostgroup_ids: List[str],
period_days: int = 7) -> Dict[str, Any]:
"""総合パフォーマンスレポート生成"""
# データ収集
performance_data = self.collect_performance_data(hostgroup_ids, period_days)
# 分析実行
report_data = {
"summary": self.generate_performance_summary(performance_data),
"resource_analysis": self.analyze_resource_utilization(performance_data),
"trend_analysis": self.analyze_performance_trends(performance_data),
"capacity_forecast": self.generate_capacity_forecast(performance_data),
"bottleneck_analysis": self.identify_bottlenecks(performance_data),
"optimization_recommendations": self.generate_optimization_recommendations(performance_data),
"comparative_analysis": self.perform_comparative_analysis(performance_data)
}
# 視覚化
self.generate_performance_charts(performance_data, report_data)
return report_data
def collect_performance_data(self, hostgroup_ids: List[str],
period_days: int) -> Dict[str, Any]:
"""パフォーマンスデータ収集"""
end_time = datetime.now()
start_time = end_time - timedelta(days=period_days)
# ホスト取得
hosts = self.api.host.get(
groupids=hostgroup_ids,
output=["hostid", "host", "name"],
selectItems=["itemid", "key_", "name", "units"]
)
performance_data = {}
for host in hosts:
host_data = {
"host_info": {
"hostid": host["hostid"],
"hostname": host["host"],
"display_name": host["name"]
},
"metrics": {}
}
# メトリクス収集
for item in host["selectItems"]:
if any(key in item["key_"] for key in ["cpu.util", "memory.utilization", "vfs.fs.pused"]):
history_data = self.api.history.get(
itemids=[item["itemid"]],
time_from=int(start_time.timestamp()),
time_till=int(end_time.timestamp()),
sortfield="clock",
sortorder="ASC"
)
if history_data:
host_data["metrics"][item["key_"]] = {
"name": item["name"],
"units": item["units"],
"data": [(int(h["clock"]), float(h["value"])) for h in history_data]
}
performance_data[host["hostid"]] = host_data
return performance_data
def analyze_resource_utilization(self, performance_data: Dict[str, Any]) -> Dict[str, Any]:
"""リソース使用率分析"""
resource_analysis = {
"cpu": {"hosts": [], "overall": {}},
"memory": {"hosts": [], "overall": {}},
"disk": {"hosts": [], "overall": {}}
}
for host_id, host_data in performance_data.items():
host_name = host_data["host_info"]["display_name"]
# CPU分析
cpu_data = self.extract_metric_data(host_data, "system.cpu.util")
if cpu_data:
cpu_stats = self.calculate_metric_statistics(cpu_data)
cpu_stats["host_name"] = host_name
cpu_stats["threshold_breaches"] = self.count_threshold_breaches(
cpu_data, self.performance_thresholds["cpu_warning"]
)
resource_analysis["cpu"]["hosts"].append(cpu_stats)
# メモリ分析
memory_data = self.extract_metric_data(host_data, "vm.memory.utilization")
if memory_data:
memory_stats = self.calculate_metric_statistics(memory_data)
memory_stats["host_name"] = host_name
memory_stats["threshold_breaches"] = self.count_threshold_breaches(
memory_data, self.performance_thresholds["memory_warning"]
)
resource_analysis["memory"]["hosts"].append(memory_stats)
# ディスク分析
disk_data = self.extract_metric_data(host_data, "vfs.fs.pused")
if disk_data:
disk_stats = self.calculate_metric_statistics(disk_data)
disk_stats["host_name"] = host_name
disk_stats["threshold_breaches"] = self.count_threshold_breaches(
disk_data, self.performance_thresholds["disk_warning"]
)
resource_analysis["disk"]["hosts"].append(disk_stats)
# 全体統計
for resource in ["cpu", "memory", "disk"]:
if resource_analysis[resource]["hosts"]:
all_averages = [h["average"] for h in resource_analysis[resource]["hosts"]]
all_maximums = [h["maximum"] for h in resource_analysis[resource]["hosts"]]
resource_analysis[resource]["overall"] = {
"fleet_average": np.mean(all_averages),
"fleet_max": np.max(all_maximums),
"hosts_over_threshold": sum(1 for h in resource_analysis[resource]["hosts"]
if h["average"] > self.performance_thresholds[f"{resource}_warning"]),
"total_hosts": len(resource_analysis[resource]["hosts"])
}
return resource_analysis
def generate_capacity_forecast(self, performance_data: Dict[str, Any]) -> Dict[str, Any]:
"""容量予測分析"""
forecast_results = {}
for host_id, host_data in performance_data.items():
host_name = host_data["host_info"]["display_name"]
host_forecasts = {}
# CPU予測
cpu_data = self.extract_metric_data(host_data, "system.cpu.util")
if cpu_data and len(cpu_data) > 10:
cpu_forecast = self.calculate_linear_forecast(cpu_data, days_ahead=90)
host_forecasts["cpu"] = cpu_forecast
# メモリ予測
memory_data = self.extract_metric_data(host_data, "vm.memory.utilization")
if memory_data and len(memory_data) > 10:
memory_forecast = self.calculate_linear_forecast(memory_data, days_ahead=90)
host_forecasts["memory"] = memory_forecast
# ディスク予測
disk_data = self.extract_metric_data(host_data, "vfs.fs.pused")
if disk_data and len(disk_data) > 10:
disk_forecast = self.calculate_linear_forecast(disk_data, days_ahead=90)
host_forecasts["disk"] = disk_forecast
if host_forecasts:
forecast_results[host_name] = host_forecasts
# 全体予測サマリー
forecast_summary = self.generate_forecast_summary(forecast_results)
return {
"host_forecasts": forecast_results,
"summary": forecast_summary,
"recommendations": self.generate_capacity_recommendations(forecast_results)
}
def calculate_linear_forecast(self, data_points: List[Tuple[int, float]],
days_ahead: int = 90) -> Dict[str, Any]:
"""線形予測計算"""
if len(data_points) < 2:
return {"error": "Insufficient data points"}
# 時系列データ準備
timestamps = np.array([point[0] for point in data_points])
values = np.array([point[1] for point in data_points])
# 線形回帰
slope, intercept, r_value, p_value, std_err = stats.linregress(timestamps, values)
# 予測
future_timestamp = timestamps[-1] + (days_ahead * 24 * 3600) # 秒
predicted_value = slope * future_timestamp + intercept
# 信頼区間計算
confidence_interval = 1.96 * std_err * np.sqrt(1 + 1/len(timestamps))
return {
"current_value": values[-1],
"predicted_value": max(0, predicted_value), # 負の値は0に
"days_ahead": days_ahead,
"trend": "increasing" if slope > 0 else "decreasing" if slope < 0 else "stable",
"slope": slope,
"r_squared": r_value ** 2,
"confidence_interval": {
"lower": max(0, predicted_value - confidence_interval),
"upper": predicted_value + confidence_interval
},
"threshold_breach_prediction": self.predict_threshold_breach(
slope, intercept, timestamps[-1], values[-1]
)
}
def generate_performance_charts(self, performance_data: Dict[str, Any],
report_data: Dict[str, Any]):
"""パフォーマンスチャート生成"""
plt.style.use('seaborn-v0_8')
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Performance Analysis Dashboard', fontsize=16)
# CPU使用率分布
cpu_data = []
for host_data in performance_data.values():
cpu_values = self.extract_metric_data(host_data, "system.cpu.util")
if cpu_values:
cpu_data.extend([point[1] for point in cpu_values])
if cpu_data:
axes[0, 0].hist(cpu_data, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
axes[0, 0].axvline(x=self.performance_thresholds["cpu_warning"],
color='orange', linestyle='--', label='Warning')
axes[0, 0].axvline(x=self.performance_thresholds["cpu_critical"],
color='red', linestyle='--', label='Critical')
axes[0, 0].set_title('CPU Utilization Distribution')
axes[0, 0].set_xlabel('CPU Usage (%)')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].legend()
# メモリ使用率分布
memory_data = []
for host_data in performance_data.values():
memory_values = self.extract_metric_data(host_data, "vm.memory.utilization")
if memory_values:
memory_data.extend([point[1] for point in memory_values])
if memory_data:
axes[0, 1].hist(memory_data, bins=30, alpha=0.7, color='lightgreen', edgecolor='black')
axes[0, 1].axvline(x=self.performance_thresholds["memory_warning"],
color='orange', linestyle='--', label='Warning')
axes[0, 1].axvline(x=self.performance_thresholds["memory_critical"],
color='red', linestyle='--', label='Critical')
axes[0, 1].set_title('Memory Utilization Distribution')
axes[0, 1].set_xlabel('Memory Usage (%)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].legend()
# ホスト別パフォーマンス比較
if report_data["resource_analysis"]["cpu"]["hosts"]:
host_names = [h["host_name"] for h in report_data["resource_analysis"]["cpu"]["hosts"]]
cpu_avgs = [h["average"] for h in report_data["resource_analysis"]["cpu"]["hosts"]]
axes[1, 0].bar(range(len(host_names)), cpu_avgs, color='lightcoral')
axes[1, 0].set_title('Average CPU Usage by Host')
axes[1, 0].set_xlabel('Hosts')
axes[1, 0].set_ylabel('CPU Usage (%)')
axes[1, 0].set_xticks(range(len(host_names)))
axes[1, 0].set_xticklabels(host_names, rotation=45, ha='right')
# 容量予測
if "capacity_forecast" in report_data:
forecast_data = report_data["capacity_forecast"]["host_forecasts"]
if forecast_data:
sample_host = list(forecast_data.keys())[0]
if "cpu" in forecast_data[sample_host]:
current = forecast_data[sample_host]["cpu"]["current_value"]
predicted = forecast_data[sample_host]["cpu"]["predicted_value"]
axes[1, 1].bar(['Current', 'Predicted (90 days)'], [current, predicted],
color=['blue', 'orange'])
axes[1, 1].set_title(f'CPU Forecast - {sample_host}')
axes[1, 1].set_ylabel('CPU Usage (%)')
plt.tight_layout()
plt.savefig(f'performance_report_{datetime.now().strftime("%Y%m%d")}.png',
dpi=300, bbox_inches='tight')
plt.close()
# 使用例
def main():
reporter = PerformanceReporter(zabbix_api)
# パフォーマンスレポート生成
report = reporter.generate_performance_report(
hostgroup_ids=["1", "2", "3"], # ホストグループID
period_days=7
)
# レポート保存
with open(f"performance_report_{datetime.now().strftime('%Y%m%d')}.json", "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print("Performance report generated successfully")
if __name__ == "__main__":
main()
カスタムレポート作成
レポートテンプレート設計
柔軟なレポートフレームワーク
yaml
# カスタムレポートテンプレート
レポートテンプレート定義:
基本構造:
メタデータ:
- レポート名
- 作成者
- バージョン
- 説明
- カテゴリ
パラメータ:
- 対象期間
- ホストグループ
- メトリクス選択
- 出力形式
セクション:
- エグゼクティブサマリー
- 詳細データ
- チャート・グラフ
- 推奨事項
- 付録
# セクション定義例
セクションタイプ:
データテーブル:
- 集計データの表形式表示
- ソート・フィルタ機能
- 条件付き書式設定
チャート:
- 棒グラフ・円グラフ・線グラフ
- 時系列データ表示
- 比較分析
KPI指標:
- 数値指標
- 達成率表示
- トレンド矢印
テキスト:
- 分析コメント
- 推奨事項
- 免責事項
自動レポート配信
インテリジェント配信システム
python
#!/usr/bin/env python3
"""自動レポート配信システム"""
from jinja2 import Template
import pdfkit
import openpyxl
from openpyxl.chart import BarChart, Reference
from email.mime.application import MIMEApplication
import json
class AutoReportDistribution:
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.report_templates = self.load_templates()
self.distribution_rules = self.load_distribution_rules()
def create_executive_report(self, data: Dict[str, Any]) -> Dict[str, bytes]:
"""エグゼクティブレポート作成"""
# HTMLテンプレート
html_template = Template("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>{{ report_title }}</title>
<style>
body { font-family: 'Helvetica', 'Arial', sans-serif; margin: 40px; }
.header { text-align: center; margin-bottom: 40px; }
.kpi-section { display: flex; justify-content: space-around; margin: 30px 0; }
.kpi-card { text-align: center; padding: 20px; border: 1px solid #ddd; border-radius: 8px; min-width: 150px; }
.kpi-value { font-size: 2.5em; font-weight: bold; color: #2c3e50; }
.kpi-label { font-size: 0.9em; color: #7f8c8d; text-transform: uppercase; }
.section { margin: 40px 0; }
.chart-container { text-align: center; margin: 30px 0; }
table { width: 100%; border-collapse: collapse; margin: 20px 0; }
th, td { padding: 12px; border: 1px solid #ddd; text-align: left; }
th { background-color: #f8f9fa; font-weight: bold; }
.status-good { color: #27ae60; }
.status-warning { color: #f39c12; }
.status-critical { color: #e74c3c; }
</style>
</head>
<body>
<div class="header">
<h1>{{ report_title }}</h1>
<p>{{ report_period }} | 生成日時: {{ generation_time }}</p>
</div>
<div class="kpi-section">
{% for kpi in kpis %}
<div class="kpi-card">
<div class="kpi-value {{ kpi.status_class }}">{{ kpi.value }}</div>
<div class="kpi-label">{{ kpi.label }}</div>
</div>
{% endfor %}
</div>
{% for section in sections %}
<div class="section">
<h2>{{ section.title }}</h2>
{% if section.type == 'table' %}
<table>
<thead>
<tr>
{% for header in section.headers %}
<th>{{ header }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in section.data %}
<tr>
{% for cell in row %}
<td{% if cell.class %} class="{{ cell.class }}"{% endif %}>
{{ cell.value }}
</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
{% elif section.type == 'text' %}
<p>{{ section.content }}</p>
{% endif %}
</div>
{% endfor %}
<div class="section">
<h2>推奨事項</h2>
<ul>
{% for recommendation in recommendations %}
<li><strong>{{ recommendation.priority }}:</strong> {{ recommendation.description }}</li>
{% endfor %}
</ul>
</div>
</body>
</html>
""")
# テンプレートレンダリング
html_content = html_template.render(**data)
# PDF生成
pdf_options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'no-outline': None
}
pdf_content = pdfkit.from_string(html_content, False, options=pdf_options)
return {
'html': html_content.encode('utf-8'),
'pdf': pdf_content
}
def create_technical_report_excel(self, data: Dict[str, Any]) -> bytes:
"""技術レポートExcel作成"""
wb = openpyxl.Workbook()
# サマリーシート
ws_summary = wb.active
ws_summary.title = "Summary"
# ヘッダー設定
ws_summary['A1'] = "技術レポート"
ws_summary['A2'] = f"期間: {data['period']}"
ws_summary['A3'] = f"生成日時: {data['generation_time']}"
# KPI表
kpi_row = 5
ws_summary[f'A{kpi_row}'] = "KPI"
ws_summary[f'B{kpi_row}'] = "値"
ws_summary[f'C{kpi_row}'] = "目標"
ws_summary[f'D{kpi_row}'] = "ステータス"
for i, kpi in enumerate(data['kpis'], start=1):
row = kpi_row + i
ws_summary[f'A{row}'] = kpi['label']
ws_summary[f'B{row}'] = kpi['value']
ws_summary[f'C{row}'] = kpi.get('target', 'N/A')
ws_summary[f'D{row}'] = kpi['status']
# パフォーマンスデータシート
ws_perf = wb.create_sheet("Performance Data")
# パフォーマンステーブル
perf_headers = ["Host", "CPU Avg", "Memory Avg", "Disk Avg", "Status"]
for i, header in enumerate(perf_headers, start=1):
ws_perf.cell(row=1, column=i, value=header)
for row_idx, perf_data in enumerate(data.get('performance_data', []), start=2):
ws_perf.cell(row=row_idx, column=1, value=perf_data['host'])
ws_perf.cell(row=row_idx, column=2, value=perf_data['cpu_avg'])
ws_perf.cell(row=row_idx, column=3, value=perf_data['memory_avg'])
ws_perf.cell(row=row_idx, column=4, value=perf_data['disk_avg'])
ws_perf.cell(row=row_idx, column=5, value=perf_data['status'])
# チャート追加
chart = BarChart()
chart.title = "リソース使用率"
chart.y_axis.title = "使用率 (%)"
chart.x_axis.title = "ホスト"
# データ範囲設定
data_range = Reference(ws_perf, min_col=2, min_row=1, max_col=4, max_row=len(data.get('performance_data', [])) + 1)
categories = Reference(ws_perf, min_col=1, min_row=2, max_row=len(data.get('performance_data', [])) + 1)
chart.add_data(data_range, titles_from_data=True)
chart.set_categories(categories)
ws_perf.add_chart(chart, "F5")
# SLAデータシート
if 'sla_data' in data:
ws_sla = wb.create_sheet("SLA Report")
sla_headers = ["Service", "Target SLA", "Actual SLA", "Status", "Downtime"]
for i, header in enumerate(sla_headers, start=1):
ws_sla.cell(row=1, column=i, value=header)
for row_idx, sla_item in enumerate(data['sla_data'], start=2):
ws_sla.cell(row=row_idx, column=1, value=sla_item['service'])
ws_sla.cell(row=row_idx, column=2, value=sla_item['target'])
ws_sla.cell(row=row_idx, column=3, value=sla_item['actual'])
ws_sla.cell(row=row_idx, column=4, value=sla_item['status'])
ws_sla.cell(row=row_idx, column=5, value=sla_item['downtime'])
# ファイル保存
from io import BytesIO
excel_buffer = BytesIO()
wb.save(excel_buffer)
excel_content = excel_buffer.getvalue()
excel_buffer.close()
return excel_content
def distribute_reports(self, reports: Dict[str, Dict[str, bytes]]):
"""レポート配信"""
for report_type, recipients in self.distribution_rules.items():
if report_type in reports:
report_content = reports[report_type]
for recipient_group in recipients:
# 受信者別カスタマイズ
customized_content = self.customize_for_recipient(
report_content, recipient_group
)
# メール送信
self.send_report_email(
recipient_group['emails'],
recipient_group['subject_template'],
customized_content,
recipient_group.get('attachments', ['pdf'])
)
def customize_for_recipient(self, content: Dict[str, bytes],
recipient_config: Dict[str, Any]) -> Dict[str, bytes]:
"""受信者別カスタマイズ"""
customized = content.copy()
# セキュリティレベルに応じた情報フィルタリング
security_level = recipient_config.get('security_level', 'standard')
if security_level == 'restricted':
# 機密情報をマスク
customized = self.mask_sensitive_information(customized)
return customized
def send_report_email(self, recipients: List[str], subject_template: str,
content: Dict[str, bytes], attachment_types: List[str]):
"""レポートメール送信"""
msg = MIMEMultipart()
msg['From'] = self.config['smtp']['from_address']
msg['Subject'] = subject_template.format(
date=datetime.now().strftime('%Y/%m/%d'),
time=datetime.now().strftime('%H:%M')
)
# HTML本文
if 'html' in content:
msg.attach(MIMEText(content['html'].decode('utf-8'), 'html', 'utf-8'))
# 添付ファイル
if 'pdf' in attachment_types and 'pdf' in content:
pdf_attachment = MIMEApplication(content['pdf'], 'pdf')
pdf_attachment.add_header(
'Content-Disposition',
f'attachment; filename="report_{datetime.now().strftime("%Y%m%d")}.pdf"'
)
msg.attach(pdf_attachment)
if 'excel' in attachment_types and 'excel' in content:
excel_attachment = MIMEApplication(content['excel'], 'vnd.openxmlformats-officedocument.spreadsheetml.sheet')
excel_attachment.add_header(
'Content-Disposition',
f'attachment; filename="report_{datetime.now().strftime("%Y%m%d")}.xlsx"'
)
msg.attach(excel_attachment)
# SMTP送信
with smtplib.SMTP(self.config['smtp']['server'], self.config['smtp']['port']) as server:
if self.config['smtp'].get('use_tls'):
server.starttls()
if self.config['smtp'].get('username'):
server.login(self.config['smtp']['username'], self.config['smtp']['password'])
for recipient in recipients:
msg['To'] = recipient
server.send_message(msg)
del msg['To']
# 使用例
def main():
# 設定ファイル読み込み
distributor = AutoReportDistribution('report_config.json')
# レポートデータ準備
report_data = {
'report_title': '月次インフラレポート',
'report_period': '2024年1月',
'generation_time': datetime.now().strftime('%Y年%m月%d日 %H:%M'),
'kpis': [
{'label': '全体可用性', 'value': '99.8%', 'status_class': 'status-good'},
{'label': 'SLA達成率', 'value': '95%', 'status_class': 'status-warning'},
{'label': 'インシデント数', 'value': '3', 'status_class': 'status-good'}
],
'sections': [],
'recommendations': [
{'priority': '高', 'description': 'データベースサーバーのメモリ増設を推奨'},
{'priority': '中', 'description': 'ログローテーション設定の見直し'}
]
}
# レポート生成
executive_report = distributor.create_executive_report(report_data)
technical_excel = distributor.create_technical_report_excel(report_data)
reports = {
'executive': executive_report,
'technical': {'excel': technical_excel}
}
# 配信実行
distributor.distribute_reports(reports)
print("Reports distributed successfully")
if __name__ == "__main__":
main()
ベストプラクティス
レポート設計原則
効果的なレポート構成
yaml
# レポート設計ガイドライン
構成原則:
階層構造:
- エグゼクティブサマリー(1ページ)
- 主要指標(2-3ページ)
- 詳細分析(5-10ページ)
- 付録・生データ(必要に応じて)
視覚化原則:
- 1ページ1メッセージ
- 色使いの一貫性
- 適切なチャートタイプ選択
- 十分な余白とフォントサイズ
内容原則:
- データドリブンな分析
- アクショナブルな推奨事項
- 前回比較・トレンド表示
- リスクと機会の明示
# 品質保証
品質チェック項目:
データ精度:
- ソースデータの検証
- 計算ロジックの確認
- 異常値の検出・除外
- データ欠損の処理
レポート品質:
- 誤字脱字チェック
- 数値の整合性確認
- グラフの正確性
- レイアウトの統一性
配信確認:
- 受信者リストの更新
- 添付ファイルの確認
- 送信時刻の適切性
- セキュリティ要件遵守
運用管理
継続的改善プロセス
yaml
# レポート運用管理
運用プロセス:
定期レビュー:
- 月次:レポート内容の見直し
- 四半期:受信者リストの更新
- 半期:レポート形式の改善
- 年次:全体的な再設計検討
フィードバック収集:
- 受信者アンケート
- 利用状況分析
- 改善要望の収集
- 効果測定
パフォーマンス監視:
- 生成時間の監視
- 配信成功率の追跡
- システムリソース使用量
- エラー発生状況
# 自動化と効率化
効率化施策:
テンプレート化:
- 標準レポート形式
- 再利用可能コンポーネント
- 自動データ取得
- 動的コンテンツ生成
配信最適化:
- 配信時刻の最適化
- ファイルサイズ最適化
- 圧縮・暗号化
- 配信失敗時の再送
まとめ
効果的なレポート機能は、Zabbixで収集した監視データを組織的な知見と行動に変換する重要な機能です。
重要ポイント
- 自動化重視: 定期的なデータ収集・分析・配信の自動化
- 受信者別最適化: 対象者のニーズに応じたカスタマイズ
- アクション指向: 具体的な改善提案を含む実用的なレポート
- 継続的改善: フィードバックに基づく品質向上
次のステップ
これで第6部「可視化とレポート」が完了しました。ここまでで学習したダッシュボード、グラフ、マップ、レポートの知識を組み合わせることで、包括的で効果的な監視データ活用システムを構築できます。