7.4 カスタマイゼーション
企業特有の要件に対応するZabbixカスタマイゼーション機能の包括的活用ガイド
概要
Zabbixカスタマイゼーションは、標準機能では対応できない特殊な監視要件や、企業固有のワークフローに合わせてZabbixを拡張する重要な機能群です。適切なカスタマイゼーションにより、監視の精度向上、運用効率の改善、そして組織のニーズに完全に適合した監視ソリューションを実現できます。
カスタマイゼーションの価値
カスタマイゼーション手法 | 効果 | 適用場面 |
---|---|---|
カスタムスクリプト | 独自メトリクス収集 | 業界特有の監視要件 |
ユーザーパラメータ | エージェント機能拡張 | 標準にない監視項目 |
外部スクリプト | 自動化・通知拡張 | 独自ワークフロー連携 |
プラグイン開発 | 高性能監視実装 | 大量データ・リアルタイム処理 |
フロントエンド改修 | UI/UX最適化 | 企業ブランディング・操作性向上 |
ユーザーパラメータとカスタムスクリプト
ユーザーパラメータの基本実装
設定ファイルの構成
bash
# /etc/zabbix/zabbix_agentd.conf
# ユーザーパラメータの有効化
UnsafeUserParameters=1
# 基本的なユーザーパラメータ
UserParameter=system.users,who | wc -l
UserParameter=mysql.ping,mysqladmin -uroot ping | grep -c alive
UserParameter=apache.status[*],curl -s "http://localhost/server-status?auto" | grep "^$1:" | cut -d' ' -f2
# 引数付きユーザーパラメータ
UserParameter=custom.file.size[*],stat -c%s "$1" 2>/dev/null || echo 0
UserParameter=custom.dir.count[*],find "$1" -type f | wc -l
UserParameter=custom.log.errors[*],grep -c "$1" /var/log/messages 2>/dev/null || echo 0
# 複雑なスクリプト実行
UserParameter=custom.app.health[*],/opt/zabbix/scripts/check_app_health.sh "$1" "$2"
UserParameter=custom.db.connections[*],/opt/zabbix/scripts/db_monitor.py --database="$1" --metric="$2"
# JSON形式でのデータ返却
UserParameter=custom.system.info,/opt/zabbix/scripts/system_info.py
UserParameter=custom.services.discovery,/opt/zabbix/scripts/service_discovery.py
高度なカスタムスクリプト実装
アプリケーション監視スクリプト
bash
#!/bin/bash
# /opt/zabbix/scripts/check_app_health.sh
APP_NAME="$1"
CHECK_TYPE="$2"
TIMESTAMP=$(date +%s)
# ログファイル設定
LOG_FILE="/var/log/zabbix_custom.log"
# 関数定義
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE"
}
check_process() {
local app="$1"
local pid_count=$(pgrep -c "$app")
if [ "$pid_count" -gt 0 ]; then
echo "1"
log_message "INFO: $app process check - Running ($pid_count processes)"
else
echo "0"
log_message "ERROR: $app process check - Not running"
fi
}
check_port() {
local app="$1"
local port
# アプリケーション別ポート設定
case "$app" in
"httpd"|"apache")
port="80"
;;
"nginx")
port="80"
;;
"mysql")
port="3306"
;;
"postgresql")
port="5432"
;;
*)
echo "0"
log_message "ERROR: Unknown application for port check: $app"
return
;;
esac
if netstat -ln | grep -q ":$port "; then
echo "1"
log_message "INFO: $app port check - Port $port is listening"
else
echo "0"
log_message "ERROR: $app port check - Port $port is not listening"
fi
}
check_response_time() {
local app="$1"
local url
case "$app" in
"web")
url="http://localhost/"
;;
"api")
url="http://localhost/api/health"
;;
*)
echo "0"
log_message "ERROR: Unknown application for response time check: $app"
return
;;
esac
local response_time=$(curl -o /dev/null -s -w '%{time_total}' "$url" 2>/dev/null)
local exit_code=$?
if [ $exit_code -eq 0 ]; then
# ミリ秒に変換
local ms_time=$(echo "$response_time * 1000" | bc)
echo "${ms_time%.*}"
log_message "INFO: $app response time - ${ms_time%.*}ms"
else
echo "0"
log_message "ERROR: $app response time check failed"
fi
}
check_memory_usage() {
local app="$1"
local memory_kb=$(ps -C "$app" -o rss= | awk '{sum+=$1} END {print sum}')
if [ -n "$memory_kb" ] && [ "$memory_kb" -gt 0 ]; then
local memory_mb=$((memory_kb / 1024))
echo "$memory_mb"
log_message "INFO: $app memory usage - ${memory_mb}MB"
else
echo "0"
log_message "WARNING: $app memory usage - No data or process not found"
fi
}
# メイン処理
case "$CHECK_TYPE" in
"process")
check_process "$APP_NAME"
;;
"port")
check_port "$APP_NAME"
;;
"response_time")
check_response_time "$APP_NAME"
;;
"memory")
check_memory_usage "$APP_NAME"
;;
*)
echo "0"
log_message "ERROR: Unknown check type: $CHECK_TYPE"
;;
esac
データベース監視スクリプト
python
#!/usr/bin/env python3
# /opt/zabbix/scripts/db_monitor.py
import sys
import json
import logging
import argparse
from datetime import datetime
import mysql.connector
import psycopg2
import cx_Oracle
class DatabaseMonitor:
def __init__(self, db_type, connection_params):
self.db_type = db_type.lower()
self.connection_params = connection_params
self.setup_logging()
def setup_logging(self):
"""ログ設定"""
logging.basicConfig(
filename='/var/log/zabbix_db_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def get_connection(self):
"""データベース接続の取得"""
try:
if self.db_type == 'mysql':
return mysql.connector.connect(**self.connection_params)
elif self.db_type == 'postgresql':
return psycopg2.connect(**self.connection_params)
elif self.db_type == 'oracle':
dsn = f"{self.connection_params['host']}:{self.connection_params['port']}/{self.connection_params['database']}"
return cx_Oracle.connect(
self.connection_params['user'],
self.connection_params['password'],
dsn
)
else:
raise ValueError(f"Unsupported database type: {self.db_type}")
except Exception as e:
self.logger.error(f"Database connection failed: {e}")
return None
def execute_query(self, query):
"""クエリ実行"""
conn = self.get_connection()
if not conn:
return None
try:
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
except Exception as e:
self.logger.error(f"Query execution failed: {e}")
return None
def get_connection_count(self):
"""接続数取得"""
queries = {
'mysql': "SHOW STATUS LIKE 'Threads_connected'",
'postgresql': "SELECT count(*) FROM pg_stat_activity",
'oracle': "SELECT count(*) FROM v$session WHERE status='ACTIVE'"
}
query = queries.get(self.db_type)
if not query:
return 0
result = self.execute_query(query)
if result:
if self.db_type == 'mysql':
return int(result[0][1])
else:
return int(result[0][0])
return 0
def get_slow_queries(self):
"""スロークエリ数取得"""
queries = {
'mysql': "SHOW STATUS LIKE 'Slow_queries'",
'postgresql': """
SELECT count(*) FROM pg_stat_statements
WHERE mean_time > 1000
""",
'oracle': """
SELECT count(*) FROM v$sql
WHERE elapsed_time/executions > 1000000
"""
}
query = queries.get(self.db_type)
if not query:
return 0
result = self.execute_query(query)
if result:
if self.db_type == 'mysql':
return int(result[0][1])
else:
return int(result[0][0])
return 0
def get_database_size(self, database_name=None):
"""データベースサイズ取得"""
queries = {
'mysql': f"""
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2)
FROM information_schema.tables
WHERE table_schema = '{database_name or "zabbix"}'
""",
'postgresql': f"""
SELECT pg_size_pretty(pg_database_size('{database_name or "zabbix"}'))
""",
'oracle': """
SELECT ROUND(SUM(bytes)/1024/1024, 2)
FROM user_segments
"""
}
query = queries.get(self.db_type)
if not query:
return 0
result = self.execute_query(query)
if result and result[0][0]:
if self.db_type == 'postgresql':
# PostgreSQLの場合、サイズ文字列から数値を抽出
size_str = result[0][0]
if 'MB' in size_str:
return float(size_str.replace(' MB', ''))
elif 'GB' in size_str:
return float(size_str.replace(' GB', '')) * 1024
else:
return float(result[0][0])
return 0
def get_table_stats(self):
"""テーブル統計情報取得"""
queries = {
'mysql': """
SELECT table_name, table_rows,
ROUND((data_length + index_length) / 1024 / 1024, 2) as size_mb
FROM information_schema.tables
WHERE table_schema = DATABASE()
ORDER BY size_mb DESC LIMIT 10
""",
'postgresql': """
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY n_tup_ins DESC LIMIT 10
""",
'oracle': """
SELECT table_name, num_rows,
ROUND(avg_row_len * num_rows / 1024 / 1024, 2) as size_mb
FROM user_tables
WHERE num_rows > 0
ORDER BY size_mb DESC
FETCH FIRST 10 ROWS ONLY
"""
}
query = queries.get(self.db_type)
if not query:
return []
result = self.execute_query(query)
if result:
return [list(row) for row in result]
return []
def discover_databases(self):
"""データベース一覧発見(LLDで使用)"""
queries = {
'mysql': "SHOW DATABASES",
'postgresql': "SELECT datname FROM pg_database WHERE datistemplate = false",
'oracle': "SELECT name FROM v$database"
}
query = queries.get(self.db_type)
if not query:
return {"data": []}
result = self.execute_query(query)
if result:
databases = []
for row in result:
databases.append({
"{#DATABASE}": row[0],
"{#DBTYPE}": self.db_type.upper()
})
return {"data": databases}
return {"data": []}
def main():
parser = argparse.ArgumentParser(description="Database Monitor for Zabbix")
parser.add_argument("--database", required=True, help="Database name")
parser.add_argument("--metric", required=True,
choices=['connections', 'slow_queries', 'size', 'discovery', 'table_stats'])
parser.add_argument("--host", default="localhost", help="Database host")
parser.add_argument("--port", type=int, help="Database port")
parser.add_argument("--user", default="zabbix", help="Database user")
parser.add_argument("--password", default="", help="Database password")
parser.add_argument("--db-type", default="mysql",
choices=['mysql', 'postgresql', 'oracle'], help="Database type")
args = parser.parse_args()
# デフォルトポート設定
if not args.port:
port_defaults = {'mysql': 3306, 'postgresql': 5432, 'oracle': 1521}
args.port = port_defaults.get(args.db_type, 3306)
# 接続パラメータ設定
connection_params = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database
}
monitor = DatabaseMonitor(args.db_type, connection_params)
try:
if args.metric == 'connections':
print(monitor.get_connection_count())
elif args.metric == 'slow_queries':
print(monitor.get_slow_queries())
elif args.metric == 'size':
print(monitor.get_database_size())
elif args.metric == 'discovery':
print(json.dumps(monitor.discover_databases(), indent=2))
elif args.metric == 'table_stats':
print(json.dumps(monitor.get_table_stats(), indent=2))
except Exception as e:
logging.error(f"Monitor execution failed: {e}")
print("0")
if __name__ == "__main__":
main()
システム情報収集スクリプト
python
#!/usr/bin/env python3
# /opt/zabbix/scripts/system_info.py
import json
import psutil
import subprocess
import platform
import socket
from datetime import datetime
class SystemInfoCollector:
def __init__(self):
self.info = {
'timestamp': datetime.now().isoformat(),
'hostname': socket.gethostname(),
'platform': platform.platform()
}
def collect_cpu_info(self):
"""CPU情報収集"""
return {
'physical_cores': psutil.cpu_count(logical=False),
'logical_cores': psutil.cpu_count(logical=True),
'cpu_percent': psutil.cpu_percent(interval=1),
'load_average': list(psutil.getloadavg()) if hasattr(psutil, 'getloadavg') else [],
'cpu_freq': {
'current': psutil.cpu_freq().current if psutil.cpu_freq() else 0,
'min': psutil.cpu_freq().min if psutil.cpu_freq() else 0,
'max': psutil.cpu_freq().max if psutil.cpu_freq() else 0
}
}
def collect_memory_info(self):
"""メモリ情報収集"""
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
'total': memory.total,
'available': memory.available,
'used': memory.used,
'free': memory.free,
'percent': memory.percent,
'buffers': getattr(memory, 'buffers', 0),
'cached': getattr(memory, 'cached', 0),
'swap': {
'total': swap.total,
'used': swap.used,
'free': swap.free,
'percent': swap.percent
}
}
def collect_disk_info(self):
"""ディスク情報収集"""
disk_info = {}
# ディスク使用量
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info[partition.mountpoint] = {
'device': partition.device,
'fstype': partition.fstype,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': (usage.used / usage.total) * 100
}
except PermissionError:
continue
# ディスクI/O統計
disk_io = psutil.disk_io_counters()
if disk_io:
disk_info['io_stats'] = {
'read_count': disk_io.read_count,
'write_count': disk_io.write_count,
'read_bytes': disk_io.read_bytes,
'write_bytes': disk_io.write_bytes,
'read_time': disk_io.read_time,
'write_time': disk_io.write_time
}
return disk_info
def collect_network_info(self):
"""ネットワーク情報収集"""
network_info = {}
# ネットワークインターフェース情報
for interface, addresses in psutil.net_if_addrs().items():
interface_info = []
for addr in addresses:
interface_info.append({
'family': str(addr.family),
'address': addr.address,
'netmask': addr.netmask,
'broadcast': addr.broadcast
})
network_info[interface] = interface_info
# ネットワークI/O統計
net_io = psutil.net_io_counters()
if net_io:
network_info['io_stats'] = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv,
'errin': net_io.errin,
'errout': net_io.errout,
'dropin': net_io.dropin,
'dropout': net_io.dropout
}
return network_info
def collect_process_info(self):
"""プロセス情報収集"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'username', 'memory_percent', 'cpu_percent']):
try:
processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'username': proc.info['username'],
'memory_percent': proc.info['memory_percent'],
'cpu_percent': proc.info['cpu_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# メモリ使用量でソート(上位20プロセス)
processes.sort(key=lambda x: x['memory_percent'] or 0, reverse=True)
return {
'total_processes': len(processes),
'top_memory_processes': processes[:20]
}
def collect_service_info(self):
"""サービス状態収集"""
services = {}
# systemctl でサービス状態を取得
try:
result = subprocess.run(
['systemctl', 'list-units', '--type=service', '--all', '--no-pager'],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')[1:] # ヘッダーをスキップ
for line in lines:
if '.service' in line:
parts = line.split()
if len(parts) >= 4:
service_name = parts[0].replace('.service', '')
services[service_name] = {
'loaded': parts[1],
'active': parts[2],
'sub': parts[3],
'description': ' '.join(parts[4:]) if len(parts) > 4 else ''
}
except Exception as e:
services['error'] = str(e)
return services
def collect_all_info(self):
"""全情報の収集"""
self.info.update({
'cpu': self.collect_cpu_info(),
'memory': self.collect_memory_info(),
'disk': self.collect_disk_info(),
'network': self.collect_network_info(),
'processes': self.collect_process_info(),
'services': self.collect_service_info()
})
return self.info
def main():
collector = SystemInfoCollector()
system_info = collector.collect_all_info()
print(json.dumps(system_info, indent=2))
if __name__ == "__main__":
main()
Zabbix Agent 2 プラグイン開発
Go言語プラグインの基本構造
go
// custom_plugin.go
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
"git.zabbix.com/ap/plugin-support/plugin"
"git.zabbix.com/ap/plugin-support/zbxerr"
)
const pluginName = "CustomMonitor"
// Plugin interface implementation
type Plugin struct {
plugin.Base
options Options
}
type Options struct {
Endpoint string `conf:"endpoint,https://api.example.com/metrics"`
Timeout int `conf:"timeout,30"`
Token string `conf:"token"`
}
// メトリクス構造
type MetricData struct {
Name string `json:"name"`
Value interface{} `json:"value"`
Timestamp int64 `json:"timestamp"`
Tags map[string]string `json:"tags"`
}
type APIResponse struct {
Status string `json:"status"`
Data []MetricData `json:"data"`
Message string `json:"message"`
}
var impl Plugin
// プラグイン初期化
func init() {
plugin.RegisterMetrics(&impl, pluginName,
"custom.api.health", "API health check status.",
"custom.api.response_time", "API response time in milliseconds.",
"custom.api.metrics[*]", "Custom metrics from API endpoint.",
"custom.discovery.services", "Discover available services.",
)
}
// 設定検証
func (p *Plugin) Validate(options interface{}) error {
o, ok := options.(*Options)
if !ok {
return fmt.Errorf("invalid options type")
}
if o.Endpoint == "" {
return fmt.Errorf("endpoint cannot be empty")
}
if o.Timeout <= 0 {
o.Timeout = 30
}
p.options = *o
return nil
}
// メトリクス収集の実装
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
switch key {
case "custom.api.health":
return p.checkAPIHealth(), nil
case "custom.api.response_time":
return p.measureResponseTime(), nil
case "custom.api.metrics":
if len(params) == 0 {
return nil, zbxerr.ErrorInvalidParams
}
return p.getCustomMetric(params[0]), nil
case "custom.discovery.services":
return p.discoverServices(), nil
default:
return nil, zbxerr.ErrorUnsupportedMetric
}
}
// API ヘルスチェック
func (p *Plugin) checkAPIHealth() int {
client := &http.Client{
Timeout: time.Duration(p.options.Timeout) * time.Second,
}
req, err := http.NewRequest("GET", p.options.Endpoint+"/health", nil)
if err != nil {
return 0
}
if p.options.Token != "" {
req.Header.Add("Authorization", "Bearer "+p.options.Token)
}
resp, err := client.Do(req)
if err != nil {
return 0
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
return 1
}
return 0
}
// レスポンス時間測定
func (p *Plugin) measureResponseTime() int64 {
start := time.Now()
client := &http.Client{
Timeout: time.Duration(p.options.Timeout) * time.Second,
}
req, err := http.NewRequest("GET", p.options.Endpoint+"/ping", nil)
if err != nil {
return 0
}
if p.options.Token != "" {
req.Header.Add("Authorization", "Bearer "+p.options.Token)
}
resp, err := client.Do(req)
if err != nil {
return 0
}
defer resp.Body.Close()
duration := time.Since(start)
return duration.Nanoseconds() / 1000000 // ミリ秒で返す
}
// カスタムメトリクス取得
func (p *Plugin) getCustomMetric(metricName string) interface{} {
client := &http.Client{
Timeout: time.Duration(p.options.Timeout) * time.Second,
}
url := fmt.Sprintf("%s/metrics/%s", p.options.Endpoint, metricName)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0
}
if p.options.Token != "" {
req.Header.Add("Authorization", "Bearer "+p.options.Token)
}
resp, err := client.Do(req)
if err != nil {
return 0
}
defer resp.Body.Close()
var apiResp APIResponse
if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil {
return 0
}
if apiResp.Status == "success" && len(apiResp.Data) > 0 {
return apiResp.Data[0].Value
}
return 0
}
// サービス発見
func (p *Plugin) discoverServices() string {
client := &http.Client{
Timeout: time.Duration(p.options.Timeout) * time.Second,
}
req, err := http.NewRequest("GET", p.options.Endpoint+"/services", nil)
if err != nil {
return `{"data":[]}`
}
if p.options.Token != "" {
req.Header.Add("Authorization", "Bearer "+p.options.Token)
}
resp, err := client.Do(req)
if err != nil {
return `{"data":[]}`
}
defer resp.Body.Close()
var services []map[string]string
if err := json.NewDecoder(resp.Body).Decode(&services); err != nil {
return `{"data":[]}`
}
// Zabbix LLD形式に変換
discovery := map[string][]map[string]string{
"data": make([]map[string]string, 0),
}
for _, service := range services {
discovery["data"] = append(discovery["data"], map[string]string{
"{#SERVICE_NAME}": service["name"],
"{#SERVICE_TYPE}": service["type"],
"{#SERVICE_PORT}": service["port"],
})
}
result, _ := json.Marshal(discovery)
return string(result)
}
func main() {
plugin.Start(&impl, pluginName)
}
プラグインビルドとデプロイ
makefile
# Makefile
PLUGIN_NAME = custom_monitor
VERSION = 1.0.0
BUILD_DIR = build
GO_SRC = custom_plugin.go
.PHONY: all build clean install test
all: build
build:
@echo "Building $(PLUGIN_NAME) plugin..."
@mkdir -p $(BUILD_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags="-s -w -X main.version=$(VERSION)" \
-o $(BUILD_DIR)/$(PLUGIN_NAME) $(GO_SRC)
install: build
@echo "Installing $(PLUGIN_NAME) plugin..."
sudo cp $(BUILD_DIR)/$(PLUGIN_NAME) /usr/lib/zabbix/agent2/plugins/
sudo chown zabbix:zabbix /usr/lib/zabbix/agent2/plugins/$(PLUGIN_NAME)
sudo chmod 755 /usr/lib/zabbix/agent2/plugins/$(PLUGIN_NAME)
test:
go test -v ./...
clean:
rm -rf $(BUILD_DIR)
package: build
@echo "Creating package..."
tar -czf $(BUILD_DIR)/$(PLUGIN_NAME)-$(VERSION).tar.gz \
-C $(BUILD_DIR) $(PLUGIN_NAME)
Agent2設定ファイルの更新
bash
# /etc/zabbix/zabbix_agent2.conf
# プラグイン設定
Plugins.CustomMonitor.Endpoint=https://api.example.com/v1
Plugins.CustomMonitor.Timeout=30
Plugins.CustomMonitor.Token=your_api_token_here
# ログレベル設定
DebugLevel=4
# プラグインディレクトリ
PluginPath=/usr/lib/zabbix/agent2/plugins/
外部スクリプトとアクション
高度な外部スクリプト実装
自動復旧スクリプト
python
#!/usr/bin/env python3
# /usr/lib/zabbix/externalscripts/auto_remediation.py
import sys
import json
import subprocess
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import requests
class AutoRemediation:
def __init__(self):
self.setup_logging()
self.config = self.load_config()
def setup_logging(self):
"""ログ設定"""
logging.basicConfig(
filename='/var/log/zabbix_remediation.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""設定読み込み"""
try:
with open('/etc/zabbix/remediation_config.json', 'r') as f:
return json.load(f)
except Exception as e:
self.logger.error(f"Config load failed: {e}")
return {}
def parse_zabbix_params(self, args):
"""Zabbixパラメータの解析"""
if len(args) < 6:
raise ValueError("Insufficient parameters")
return {
'event_id': args[1],
'event_date': args[2],
'event_time': args[3],
'trigger_name': args[4],
'host_name': args[5],
'trigger_severity': args[6] if len(args) > 6 else 'Unknown',
'item_name': args[7] if len(args) > 7 else '',
'item_value': args[8] if len(args) > 8 else ''
}
def disk_space_cleanup(self, host_name, threshold=90):
"""ディスク容量不足の自動対応"""
self.logger.info(f"Starting disk cleanup for {host_name}")
cleanup_commands = [
# 一時ファイルクリーンアップ
f"ssh {host_name} 'find /tmp -type f -atime +7 -delete'",
# ログローテーション強制実行
f"ssh {host_name} 'logrotate -f /etc/logrotate.conf'",
# パッケージキャッシュクリーンアップ
f"ssh {host_name} 'apt-get clean || yum clean all'",
# ジャーナルログクリーンアップ
f"ssh {host_name} 'journalctl --vacuum-time=7d'"
]
results = []
for cmd in cleanup_commands:
try:
result = subprocess.run(
cmd.split(),
capture_output=True,
text=True,
timeout=300
)
results.append({
'command': cmd,
'success': result.returncode == 0,
'output': result.stdout[:500], # 出力を制限
'error': result.stderr[:500] if result.stderr else None
})
self.logger.info(f"Command executed: {cmd} - Success: {result.returncode == 0}")
except Exception as e:
self.logger.error(f"Command failed: {cmd} - Error: {e}")
results.append({
'command': cmd,
'success': False,
'error': str(e)
})
return results
def service_restart(self, host_name, service_name):
"""サービス再起動"""
self.logger.info(f"Restarting service {service_name} on {host_name}")
# サービス状態確認
check_cmd = f"ssh {host_name} 'systemctl is-active {service_name}'"
try:
# 現在の状態をチェック
check_result = subprocess.run(
check_cmd.split(),
capture_output=True,
text=True,
timeout=30
)
current_status = check_result.stdout.strip()
self.logger.info(f"Current service status: {current_status}")
# サービス再起動
restart_cmd = f"ssh {host_name} 'sudo systemctl restart {service_name}'"
restart_result = subprocess.run(
restart_cmd.split(),
capture_output=True,
text=True,
timeout=60
)
# 再起動後の状態確認
time.sleep(5) # 少し待つ
final_check = subprocess.run(
check_cmd.split(),
capture_output=True,
text=True,
timeout=30
)
final_status = final_check.stdout.strip()
success = restart_result.returncode == 0 and final_status == 'active'
self.logger.info(f"Service restart result: {success}, Final status: {final_status}")
return {
'success': success,
'previous_status': current_status,
'final_status': final_status,
'restart_output': restart_result.stdout,
'restart_error': restart_result.stderr
}
except Exception as e:
self.logger.error(f"Service restart failed: {e}")
return {'success': False, 'error': str(e)}
def high_cpu_investigation(self, host_name):
"""高CPU使用率の調査"""
self.logger.info(f"Investigating high CPU on {host_name}")
investigation_commands = [
# トップ10プロセス
f"ssh {host_name} 'ps aux --sort=-%cpu | head -11'",
# システム負荷
f"ssh {host_name} 'uptime'",
# メモリ使用状況
f"ssh {host_name} 'free -h'",
# ディスクI/O状況
f"ssh {host_name} 'iostat -x 1 1'",
# ネットワーク接続
f"ssh {host_name} 'netstat -tuln | wc -l'"
]
investigation_results = {}
for cmd in investigation_commands:
try:
result = subprocess.run(
cmd.split(),
capture_output=True,
text=True,
timeout=30
)
cmd_name = cmd.split("'")[1].split()[0] # コマンド名を抽出
investigation_results[cmd_name] = {
'output': result.stdout,
'error': result.stderr,
'success': result.returncode == 0
}
except Exception as e:
self.logger.error(f"Investigation command failed: {cmd} - {e}")
return investigation_results
def send_notification(self, event_data, remediation_result):
"""通知送信"""
try:
# Slack通知
if 'slack_webhook' in self.config:
self.send_slack_notification(event_data, remediation_result)
# メール通知
if 'email' in self.config:
self.send_email_notification(event_data, remediation_result)
except Exception as e:
self.logger.error(f"Notification failed: {e}")
def send_slack_notification(self, event_data, remediation_result):
"""Slack通知"""
webhook_url = self.config['slack_webhook']
color = "good" if remediation_result.get('success') else "danger"
payload = {
"attachments": [{
"color": color,
"title": f"Auto-Remediation: {event_data['trigger_name']}",
"fields": [
{"title": "Host", "value": event_data['host_name'], "short": True},
{"title": "Event ID", "value": event_data['event_id'], "short": True},
{"title": "Status", "value": "Success" if remediation_result.get('success') else "Failed", "short": True},
{"title": "Time", "value": f"{event_data['event_date']} {event_data['event_time']}", "short": True}
],
"text": f"Details: {json.dumps(remediation_result, indent=2)}"
}]
}
requests.post(webhook_url, json=payload, timeout=10)
def execute_remediation(self, event_data):
"""自動復旧の実行"""
trigger_name = event_data['trigger_name'].lower()
host_name = event_data['host_name']
# トリガー名に基づく自動復旧ロジック
if 'disk space' in trigger_name or 'filesystem' in trigger_name:
result = self.disk_space_cleanup(host_name)
result['remediation_type'] = 'disk_cleanup'
elif 'service' in trigger_name and 'down' in trigger_name:
# サービス名を抽出(例:httpd service is down)
service_name = self.extract_service_name(trigger_name)
if service_name:
result = self.service_restart(host_name, service_name)
result['remediation_type'] = 'service_restart'
result['service_name'] = service_name
else:
result = {'success': False, 'error': 'Could not determine service name'}
elif 'high cpu' in trigger_name or 'cpu utilization' in trigger_name:
result = self.high_cpu_investigation(host_name)
result['remediation_type'] = 'cpu_investigation'
result['success'] = True # 調査は常に成功
else:
result = {
'success': False,
'error': 'No remediation rule found for this trigger',
'remediation_type': 'none'
}
# 通知送信
self.send_notification(event_data, result)
return result
def extract_service_name(self, trigger_name):
"""トリガー名からサービス名を抽出"""
common_services = ['httpd', 'nginx', 'mysql', 'postgresql', 'ssh', 'postfix']
for service in common_services:
if service in trigger_name.lower():
return service
return None
def main():
if len(sys.argv) < 6:
print("Usage: auto_remediation.py event_id date time trigger_name host_name [severity] [item_name] [item_value]")
sys.exit(1)
remediation = AutoRemediation()
try:
event_data = remediation.parse_zabbix_params(sys.argv)
result = remediation.execute_remediation(event_data)
# 結果をJSON形式で出力
print(json.dumps({
'event_data': event_data,
'remediation_result': result,
'timestamp': datetime.now().isoformat()
}, indent=2))
except Exception as e:
error_result = {
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
print(json.dumps(error_result, indent=2))
logging.error(f"Auto-remediation failed: {e}")
if __name__ == "__main__":
main()
設定ファイル例
json
{
"slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"email": {
"smtp_server": "smtp.company.com",
"smtp_port": 587,
"username": "[email protected]",
"password": "email_password",
"recipients": ["[email protected]", "[email protected]"]
},
"remediation_rules": {
"disk_space": {
"enabled": true,
"threshold": 90,
"max_cleanup_size_gb": 10
},
"service_restart": {
"enabled": true,
"allowed_services": ["httpd", "nginx", "mysql", "postgresql"],
"max_restart_attempts": 3
},
"cpu_investigation": {
"enabled": true,
"cpu_threshold": 80
}
},
"security": {
"allowed_hosts": ["web-*", "db-*", "app-*"],
"ssh_key_path": "/home/zabbix/.ssh/id_rsa",
"sudo_required": true
}
}
フロントエンドカスタマイゼーション
カスタムダッシュボードウィジェット
javascript
// custom_widget.js
class CustomMetricsWidget {
constructor(widgetId, options) {
this.widgetId = widgetId;
this.options = options;
this.container = document.getElementById(widgetId);
this.refreshInterval = options.refresh_interval || 30000;
this.init();
}
init() {
this.createContainer();
this.loadData();
this.startAutoRefresh();
}
createContainer() {
this.container.innerHTML = `
<div class="custom-widget">
<div class="widget-header">
<h3>${this.options.title || 'Custom Metrics'}</h3>
<div class="widget-controls">
<button class="refresh-btn" onclick="window.customWidgets['${this.widgetId}'].loadData()">
🔄
</button>
</div>
</div>
<div class="widget-content" id="${this.widgetId}-content">
<div class="loading">Loading...</div>
</div>
</div>
`;
}
async loadData() {
try {
const response = await fetch('/zabbix/api_jsonrpc.php', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
jsonrpc: '2.0',
method: 'item.get',
params: {
output: ['itemid', 'name', 'lastvalue', 'units'],
hostids: this.options.hostids,
search: {
key_: this.options.item_pattern
},
monitored: true
},
auth: this.getAuthToken(),
id: 1
})
});
const data = await response.json();
if (data.result) {
this.renderData(data.result);
} else {
this.showError('Failed to load data');
}
} catch (error) {
this.showError('Network error: ' + error.message);
}
}
renderData(items) {
const content = document.getElementById(`${this.widgetId}-content`);
if (items.length === 0) {
content.innerHTML = '<div class="no-data">No data available</div>';
return;
}
const html = `
<div class="metrics-grid">
${items.map(item => this.renderMetricCard(item)).join('')}
</div>
`;
content.innerHTML = html;
}
renderMetricCard(item) {
const value = parseFloat(item.lastvalue);
const colorClass = this.getValueColorClass(value, item.name);
return `
<div class="metric-card ${colorClass}">
<div class="metric-name">${item.name}</div>
<div class="metric-value">
${this.formatValue(value, item.units)}
</div>
<div class="metric-trend">
${this.getTrendIcon(value)}
</div>
</div>
`;
}
getValueColorClass(value, itemName) {
// カスタムしきい値ロジック
if (itemName.includes('CPU') && value > 80) return 'critical';
if (itemName.includes('Memory') && value > 85) return 'warning';
if (itemName.includes('Disk') && value > 90) return 'critical';
return 'normal';
}
formatValue(value, units) {
if (units === '%') {
return `${value.toFixed(1)}%`;
} else if (units === 'B') {
return this.formatBytes(value);
} else {
return `${value.toFixed(2)} ${units}`;
}
}
formatBytes(bytes) {
const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
if (bytes === 0) return '0 B';
const i = Math.floor(Math.log(bytes) / Math.log(1024));
return `${(bytes / Math.pow(1024, i)).toFixed(2)} ${sizes[i]}`;
}
getTrendIcon(value) {
// 簡単なトレンド表示(実際には履歴データが必要)
return '📈'; // 実装に応じて動的に変更
}
showError(message) {
const content = document.getElementById(`${this.widgetId}-content`);
content.innerHTML = `<div class="error">${message}</div>`;
}
getAuthToken() {
// セッションからトークンを取得
return sessionStorage.getItem('zabbix_auth_token') || '';
}
startAutoRefresh() {
setInterval(() => {
this.loadData();
}, this.refreshInterval);
}
}
// グローバル管理
window.customWidgets = window.customWidgets || {};
// ウィジェット初期化関数
function initCustomWidget(widgetId, options) {
window.customWidgets[widgetId] = new CustomMetricsWidget(widgetId, options);
}
カスタムスタイル
css
/* custom_widgets.css */
.custom-widget {
border: 1px solid #ddd;
border-radius: 8px;
background: #fff;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin: 10px;
}
.widget-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 15px 20px;
border-bottom: 1px solid #eee;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 7px 7px 0 0;
}
.widget-header h3 {
margin: 0;
font-size: 18px;
font-weight: 600;
}
.widget-controls {
display: flex;
gap: 10px;
}
.refresh-btn {
background: rgba(255,255,255,0.2);
border: none;
color: white;
padding: 8px 12px;
border-radius: 4px;
cursor: pointer;
transition: background 0.3s;
}
.refresh-btn:hover {
background: rgba(255,255,255,0.3);
}
.widget-content {
padding: 20px;
min-height: 200px;
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 15px;
}
.metric-card {
padding: 20px;
border-radius: 8px;
text-align: center;
transition: transform 0.3s, box-shadow 0.3s;
position: relative;
overflow: hidden;
}
.metric-card:hover {
transform: translateY(-5px);
box-shadow: 0 5px 15px rgba(0,0,0,0.2);
}
.metric-card.normal {
background: linear-gradient(135deg, #a8edea 0%, #fed6e3 100%);
border-left: 4px solid #00c851;
}
.metric-card.warning {
background: linear-gradient(135deg, #ffeaa7 0%, #fab1a0 100%);
border-left: 4px solid #ff8800;
}
.metric-card.critical {
background: linear-gradient(135deg, #fd79a8 0%, #fdcb6e 100%);
border-left: 4px solid #cc0000;
}
.metric-name {
font-size: 14px;
color: #666;
margin-bottom: 10px;
font-weight: 500;
}
.metric-value {
font-size: 28px;
font-weight: bold;
color: #333;
margin-bottom: 10px;
}
.metric-trend {
font-size: 18px;
position: absolute;
top: 10px;
right: 15px;
}
.loading {
text-align: center;
padding: 50px;
color: #666;
}
.loading::after {
content: '';
display: inline-block;
width: 20px;
height: 20px;
border: 3px solid #f3f3f3;
border-top: 3px solid #3498db;
border-radius: 50%;
animation: spin 1s linear infinite;
margin-left: 10px;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.error {
color: #cc0000;
text-align: center;
padding: 30px;
background: #ffe6e6;
border-radius: 4px;
}
.no-data {
text-align: center;
padding: 30px;
color: #999;
font-style: italic;
}
/* レスポンシブ対応 */
@media (max-width: 768px) {
.metrics-grid {
grid-template-columns: 1fr;
}
.widget-header {
padding: 10px 15px;
}
.widget-content {
padding: 15px;
}
.metric-card {
padding: 15px;
}
.metric-value {
font-size: 24px;
}
}
/* ダークモード対応 */
@media (prefers-color-scheme: dark) {
.custom-widget {
background: #2c3e50;
border-color: #34495e;
color: #ecf0f1;
}
.widget-header {
border-bottom-color: #34495e;
}
.metric-name {
color: #bdc3c7;
}
.metric-value {
color: #ecf0f1;
}
.loading {
color: #bdc3c7;
}
}
テーマカスタマイゼーション
php
<?php
// include/classes/html/CTheme.php への追加
class CustomTheme extends CTheme {
public function __construct() {
parent::__construct();
$this->addCustomStyles();
$this->addCustomScripts();
}
private function addCustomStyles() {
// カスタムCSS の追加
$this->addExternalCSS('assets/css/custom_theme.css');
$this->addExternalCSS('assets/css/custom_widgets.css');
}
private function addCustomScripts() {
// カスタムJavaScript の追加
$this->addExternalJS('assets/js/custom_widgets.js');
$this->addExternalJS('assets/js/custom_functions.js');
}
public function getCustomLogo() {
// カスタムロゴの設定
return 'assets/img/custom_logo.png';
}
public function getCustomFavicon() {
// カスタムファビコンの設定
return 'assets/img/custom_favicon.ico';
}
}
セキュリティとベストプラクティス
セキュアなカスタムスクリプト実装
python
#!/usr/bin/env python3
# セキュアなカスタムスクリプトの例
import os
import sys
import subprocess
import json
import logging
import hashlib
import time
from pathlib import Path
class SecureCustomScript:
def __init__(self):
self.setup_logging()
self.validate_environment()
self.load_security_config()
def setup_logging(self):
"""セキュアなログ設定"""
log_file = "/var/log/zabbix_secure_custom.log"
# ログファイルの権限確認
if os.path.exists(log_file):
stat = os.stat(log_file)
if stat.st_mode & 0o077: # 所有者以外に権限がある
raise PermissionError("Log file has insecure permissions")
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - PID:%(process)d - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def validate_environment(self):
"""実行環境の検証"""
# 実行ユーザーの確認
if os.getuid() == 0:
raise PermissionError("Script should not run as root")
# 実行ユーザーがzabbixかどうか確認
import pwd
current_user = pwd.getpwuid(os.getuid()).pw_name
if current_user != 'zabbix':
self.logger.warning(f"Script running as {current_user}, expected zabbix")
# umask設定
os.umask(0o077)
def load_security_config(self):
"""セキュリティ設定の読み込み"""
config_file = "/etc/zabbix/secure_custom.json"
try:
with open(config_file, 'r') as f:
self.security_config = json.load(f)
except Exception as e:
self.logger.error(f"Failed to load security config: {e}")
self.security_config = {
"allowed_commands": [],
"max_execution_time": 300,
"allowed_paths": ["/usr/bin", "/bin"],
"rate_limit": {"max_calls": 100, "time_window": 3600}
}
def validate_input(self, input_data):
"""入力値の検証"""
if not isinstance(input_data, (str, int, float)):
raise ValueError("Invalid input type")
if isinstance(input_data, str):
# 危険な文字の検証
dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>']
for char in dangerous_chars:
if char in input_data:
raise ValueError(f"Dangerous character found: {char}")
# 長さ制限
if len(input_data) > 1000:
raise ValueError("Input too long")
return input_data
def execute_safe_command(self, command, args=None):
"""安全なコマンド実行"""
# コマンドのホワイトリストチェック
if command not in self.security_config.get("allowed_commands", []):
raise PermissionError(f"Command not allowed: {command}")
# 実行可能ファイルのパス検証
command_path = None
for allowed_path in self.security_config.get("allowed_paths", []):
potential_path = os.path.join(allowed_path, command)
if os.path.isfile(potential_path) and os.access(potential_path, os.X_OK):
command_path = potential_path
break
if not command_path:
raise FileNotFoundError(f"Command not found in allowed paths: {command}")
# 引数の検証
if args:
validated_args = [self.validate_input(arg) for arg in args]
else:
validated_args = []
# レート制限チェック
self.check_rate_limit()
try:
# タイムアウト付きでコマンド実行
result = subprocess.run(
[command_path] + validated_args,
capture_output=True,
text=True,
timeout=self.security_config.get("max_execution_time", 300),
env={"PATH": ":".join(self.security_config.get("allowed_paths", []))}
)
self.logger.info(f"Command executed: {command} {' '.join(validated_args)}")
return {
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
self.logger.error(f"Command timeout: {command}")
raise TimeoutError("Command execution timeout")
except Exception as e:
self.logger.error(f"Command execution failed: {command} - {e}")
raise
def check_rate_limit(self):
"""レート制限のチェック"""
rate_file = "/tmp/zabbix_custom_rate_limit"
current_time = int(time.time())
# レート制限ファイルの読み込み
try:
with open(rate_file, 'r') as f:
rate_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
rate_data = {"calls": [], "last_cleanup": current_time}
# 古いエントリのクリーンアップ
time_window = self.security_config.get("rate_limit", {}).get("time_window", 3600)
cutoff_time = current_time - time_window
rate_data["calls"] = [call_time for call_time in rate_data["calls"] if call_time > cutoff_time]
# レート制限チェック
max_calls = self.security_config.get("rate_limit", {}).get("max_calls", 100)
if len(rate_data["calls"]) >= max_calls:
raise PermissionError("Rate limit exceeded")
# 現在の呼び出しを記録
rate_data["calls"].append(current_time)
# ファイルに保存
with open(rate_file, 'w') as f:
json.dump(rate_data, f)
os.chmod(rate_file, 0o600) # 所有者のみ読み書き可能
def calculate_checksum(self, data):
"""データのチェックサム計算"""
return hashlib.sha256(str(data).encode()).hexdigest()
def secure_output(self, data):
"""セキュアな出力処理"""
# 機密情報のマスキング
if isinstance(data, str):
# パスワード系の情報をマスキング
import re
data = re.sub(r'(password|secret|key)[\s]*[:=][\s]*[^\s,}]+',
r'\1: ****', data, flags=re.IGNORECASE)
return data
# 使用例
def main():
try:
script = SecureCustomScript()
# 安全なコマンド実行例
if len(sys.argv) > 1 and sys.argv[1] == "disk_usage":
result = script.execute_safe_command("df", ["-h"])
output = script.secure_output(result["stdout"])
print(output)
elif len(sys.argv) > 1 and sys.argv[1] == "process_count":
result = script.execute_safe_command("ps", ["-aux"])
if result["returncode"] == 0:
process_count = len(result["stdout"].strip().split('\n')) - 1
print(process_count)
else:
print("0")
else:
print("Invalid or missing command")
sys.exit(1)
except Exception as e:
logging.error(f"Script execution failed: {e}")
print("Error")
sys.exit(1)
if __name__ == "__main__":
main()
参考リンク
本セクションでは、Zabbixのカスタマイゼーション機能について詳しく説明しました。これで第7部「高度な機能」が完成です。次の第8部では、運用とベストプラクティスについて説明する予定です。