7.4 カスタマイゼーション

企業特有の要件に対応するZabbixカスタマイゼーション機能の包括的活用ガイド

概要

Zabbixカスタマイゼーションは、標準機能では対応できない特殊な監視要件や、企業固有のワークフローに合わせてZabbixを拡張する重要な機能群です。適切なカスタマイゼーションにより、監視の精度向上、運用効率の改善、そして組織のニーズに完全に適合した監視ソリューションを実現できます。

カスタマイゼーションの価値

カスタマイゼーション手法効果適用場面
カスタムスクリプト独自メトリクス収集業界特有の監視要件
ユーザーパラメータエージェント機能拡張標準にない監視項目
外部スクリプト自動化・通知拡張独自ワークフロー連携
プラグイン開発高性能監視実装大量データ・リアルタイム処理
フロントエンド改修UI/UX最適化企業ブランディング・操作性向上

ユーザーパラメータとカスタムスクリプト

ユーザーパラメータの基本実装

設定ファイルの構成

bash
# /etc/zabbix/zabbix_agentd.conf

# ユーザーパラメータの有効化
UnsafeUserParameters=1

# 基本的なユーザーパラメータ
UserParameter=system.users,who | wc -l
UserParameter=mysql.ping,mysqladmin -uroot ping | grep -c alive
UserParameter=apache.status[*],curl -s "http://localhost/server-status?auto" | grep "^$1:" | cut -d' ' -f2

# 引数付きユーザーパラメータ
UserParameter=custom.file.size[*],stat -c%s "$1" 2>/dev/null || echo 0
UserParameter=custom.dir.count[*],find "$1" -type f | wc -l
UserParameter=custom.log.errors[*],grep -c "$1" /var/log/messages 2>/dev/null || echo 0

# 複雑なスクリプト実行
UserParameter=custom.app.health[*],/opt/zabbix/scripts/check_app_health.sh "$1" "$2"
UserParameter=custom.db.connections[*],/opt/zabbix/scripts/db_monitor.py --database="$1" --metric="$2"

# JSON形式でのデータ返却
UserParameter=custom.system.info,/opt/zabbix/scripts/system_info.py
UserParameter=custom.services.discovery,/opt/zabbix/scripts/service_discovery.py

高度なカスタムスクリプト実装

アプリケーション監視スクリプト

bash
#!/bin/bash
# /opt/zabbix/scripts/check_app_health.sh

APP_NAME="$1"
CHECK_TYPE="$2"
TIMESTAMP=$(date +%s)

# ログファイル設定
LOG_FILE="/var/log/zabbix_custom.log"

# 関数定義
log_message() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE"
}

check_process() {
    local app="$1"
    local pid_count=$(pgrep -c "$app")
    
    if [ "$pid_count" -gt 0 ]; then
        echo "1"
        log_message "INFO: $app process check - Running ($pid_count processes)"
    else
        echo "0"
        log_message "ERROR: $app process check - Not running"
    fi
}

check_port() {
    local app="$1"
    local port
    
    # アプリケーション別ポート設定
    case "$app" in
        "httpd"|"apache")
            port="80"
            ;;
        "nginx")
            port="80"
            ;;
        "mysql")
            port="3306"
            ;;
        "postgresql")
            port="5432"
            ;;
        *)
            echo "0"
            log_message "ERROR: Unknown application for port check: $app"
            return
            ;;
    esac
    
    if netstat -ln | grep -q ":$port "; then
        echo "1"
        log_message "INFO: $app port check - Port $port is listening"
    else
        echo "0"
        log_message "ERROR: $app port check - Port $port is not listening"
    fi
}

check_response_time() {
    local app="$1"
    local url
    
    case "$app" in
        "web")
            url="http://localhost/"
            ;;
        "api")
            url="http://localhost/api/health"
            ;;
        *)
            echo "0"
            log_message "ERROR: Unknown application for response time check: $app"
            return
            ;;
    esac
    
    local response_time=$(curl -o /dev/null -s -w '%{time_total}' "$url" 2>/dev/null)
    local exit_code=$?
    
    if [ $exit_code -eq 0 ]; then
        # ミリ秒に変換
        local ms_time=$(echo "$response_time * 1000" | bc)
        echo "${ms_time%.*}"
        log_message "INFO: $app response time - ${ms_time%.*}ms"
    else
        echo "0"
        log_message "ERROR: $app response time check failed"
    fi
}

check_memory_usage() {
    local app="$1"
    local memory_kb=$(ps -C "$app" -o rss= | awk '{sum+=$1} END {print sum}')
    
    if [ -n "$memory_kb" ] && [ "$memory_kb" -gt 0 ]; then
        local memory_mb=$((memory_kb / 1024))
        echo "$memory_mb"
        log_message "INFO: $app memory usage - ${memory_mb}MB"
    else
        echo "0"
        log_message "WARNING: $app memory usage - No data or process not found"
    fi
}

# メイン処理
case "$CHECK_TYPE" in
    "process")
        check_process "$APP_NAME"
        ;;
    "port")
        check_port "$APP_NAME"
        ;;
    "response_time")
        check_response_time "$APP_NAME"
        ;;
    "memory")
        check_memory_usage "$APP_NAME"
        ;;
    *)
        echo "0"
        log_message "ERROR: Unknown check type: $CHECK_TYPE"
        ;;
esac

データベース監視スクリプト

python
#!/usr/bin/env python3
# /opt/zabbix/scripts/db_monitor.py

import sys
import json
import logging
import argparse
from datetime import datetime
import mysql.connector
import psycopg2
import cx_Oracle

class DatabaseMonitor:
    def __init__(self, db_type, connection_params):
        self.db_type = db_type.lower()
        self.connection_params = connection_params
        self.setup_logging()
    
    def setup_logging(self):
        """ログ設定"""
        logging.basicConfig(
            filename='/var/log/zabbix_db_monitor.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def get_connection(self):
        """データベース接続の取得"""
        try:
            if self.db_type == 'mysql':
                return mysql.connector.connect(**self.connection_params)
            elif self.db_type == 'postgresql':
                return psycopg2.connect(**self.connection_params)
            elif self.db_type == 'oracle':
                dsn = f"{self.connection_params['host']}:{self.connection_params['port']}/{self.connection_params['database']}"
                return cx_Oracle.connect(
                    self.connection_params['user'],
                    self.connection_params['password'],
                    dsn
                )
            else:
                raise ValueError(f"Unsupported database type: {self.db_type}")
        
        except Exception as e:
            self.logger.error(f"Database connection failed: {e}")
            return None
    
    def execute_query(self, query):
        """クエリ実行"""
        conn = self.get_connection()
        if not conn:
            return None
        
        try:
            cursor = conn.cursor()
            cursor.execute(query)
            result = cursor.fetchall()
            cursor.close()
            conn.close()
            return result
        
        except Exception as e:
            self.logger.error(f"Query execution failed: {e}")
            return None
    
    def get_connection_count(self):
        """接続数取得"""
        queries = {
            'mysql': "SHOW STATUS LIKE 'Threads_connected'",
            'postgresql': "SELECT count(*) FROM pg_stat_activity",
            'oracle': "SELECT count(*) FROM v$session WHERE status='ACTIVE'"
        }
        
        query = queries.get(self.db_type)
        if not query:
            return 0
        
        result = self.execute_query(query)
        if result:
            if self.db_type == 'mysql':
                return int(result[0][1])
            else:
                return int(result[0][0])
        return 0
    
    def get_slow_queries(self):
        """スロークエリ数取得"""
        queries = {
            'mysql': "SHOW STATUS LIKE 'Slow_queries'",
            'postgresql': """
                SELECT count(*) FROM pg_stat_statements 
                WHERE mean_time > 1000
            """,
            'oracle': """
                SELECT count(*) FROM v$sql 
                WHERE elapsed_time/executions > 1000000
            """
        }
        
        query = queries.get(self.db_type)
        if not query:
            return 0
        
        result = self.execute_query(query)
        if result:
            if self.db_type == 'mysql':
                return int(result[0][1])
            else:
                return int(result[0][0])
        return 0
    
    def get_database_size(self, database_name=None):
        """データベースサイズ取得"""
        queries = {
            'mysql': f"""
                SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) 
                FROM information_schema.tables 
                WHERE table_schema = '{database_name or "zabbix"}'
            """,
            'postgresql': f"""
                SELECT pg_size_pretty(pg_database_size('{database_name or "zabbix"}'))
            """,
            'oracle': """
                SELECT ROUND(SUM(bytes)/1024/1024, 2) 
                FROM user_segments
            """
        }
        
        query = queries.get(self.db_type)
        if not query:
            return 0
        
        result = self.execute_query(query)
        if result and result[0][0]:
            if self.db_type == 'postgresql':
                # PostgreSQLの場合、サイズ文字列から数値を抽出
                size_str = result[0][0]
                if 'MB' in size_str:
                    return float(size_str.replace(' MB', ''))
                elif 'GB' in size_str:
                    return float(size_str.replace(' GB', '')) * 1024
            else:
                return float(result[0][0])
        return 0
    
    def get_table_stats(self):
        """テーブル統計情報取得"""
        queries = {
            'mysql': """
                SELECT table_name, table_rows, 
                       ROUND((data_length + index_length) / 1024 / 1024, 2) as size_mb
                FROM information_schema.tables 
                WHERE table_schema = DATABASE()
                ORDER BY size_mb DESC LIMIT 10
            """,
            'postgresql': """
                SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
                FROM pg_stat_user_tables 
                ORDER BY n_tup_ins DESC LIMIT 10
            """,
            'oracle': """
                SELECT table_name, num_rows, 
                       ROUND(avg_row_len * num_rows / 1024 / 1024, 2) as size_mb
                FROM user_tables 
                WHERE num_rows > 0
                ORDER BY size_mb DESC
                FETCH FIRST 10 ROWS ONLY
            """
        }
        
        query = queries.get(self.db_type)
        if not query:
            return []
        
        result = self.execute_query(query)
        if result:
            return [list(row) for row in result]
        return []
    
    def discover_databases(self):
        """データベース一覧発見(LLDで使用)"""
        queries = {
            'mysql': "SHOW DATABASES",
            'postgresql': "SELECT datname FROM pg_database WHERE datistemplate = false",
            'oracle': "SELECT name FROM v$database"
        }
        
        query = queries.get(self.db_type)
        if not query:
            return {"data": []}
        
        result = self.execute_query(query)
        if result:
            databases = []
            for row in result:
                databases.append({
                    "{#DATABASE}": row[0],
                    "{#DBTYPE}": self.db_type.upper()
                })
            return {"data": databases}
        
        return {"data": []}

def main():
    parser = argparse.ArgumentParser(description="Database Monitor for Zabbix")
    parser.add_argument("--database", required=True, help="Database name")
    parser.add_argument("--metric", required=True, 
                       choices=['connections', 'slow_queries', 'size', 'discovery', 'table_stats'])
    parser.add_argument("--host", default="localhost", help="Database host")
    parser.add_argument("--port", type=int, help="Database port")
    parser.add_argument("--user", default="zabbix", help="Database user")
    parser.add_argument("--password", default="", help="Database password")
    parser.add_argument("--db-type", default="mysql", 
                       choices=['mysql', 'postgresql', 'oracle'], help="Database type")
    
    args = parser.parse_args()
    
    # デフォルトポート設定
    if not args.port:
        port_defaults = {'mysql': 3306, 'postgresql': 5432, 'oracle': 1521}
        args.port = port_defaults.get(args.db_type, 3306)
    
    # 接続パラメータ設定
    connection_params = {
        'host': args.host,
        'port': args.port,
        'user': args.user,
        'password': args.password,
        'database': args.database
    }
    
    monitor = DatabaseMonitor(args.db_type, connection_params)
    
    try:
        if args.metric == 'connections':
            print(monitor.get_connection_count())
        elif args.metric == 'slow_queries':
            print(monitor.get_slow_queries())
        elif args.metric == 'size':
            print(monitor.get_database_size())
        elif args.metric == 'discovery':
            print(json.dumps(monitor.discover_databases(), indent=2))
        elif args.metric == 'table_stats':
            print(json.dumps(monitor.get_table_stats(), indent=2))
    
    except Exception as e:
        logging.error(f"Monitor execution failed: {e}")
        print("0")

if __name__ == "__main__":
    main()

システム情報収集スクリプト

python
#!/usr/bin/env python3
# /opt/zabbix/scripts/system_info.py

import json
import psutil
import subprocess
import platform
import socket
from datetime import datetime

class SystemInfoCollector:
    def __init__(self):
        self.info = {
            'timestamp': datetime.now().isoformat(),
            'hostname': socket.gethostname(),
            'platform': platform.platform()
        }
    
    def collect_cpu_info(self):
        """CPU情報収集"""
        return {
            'physical_cores': psutil.cpu_count(logical=False),
            'logical_cores': psutil.cpu_count(logical=True),
            'cpu_percent': psutil.cpu_percent(interval=1),
            'load_average': list(psutil.getloadavg()) if hasattr(psutil, 'getloadavg') else [],
            'cpu_freq': {
                'current': psutil.cpu_freq().current if psutil.cpu_freq() else 0,
                'min': psutil.cpu_freq().min if psutil.cpu_freq() else 0,
                'max': psutil.cpu_freq().max if psutil.cpu_freq() else 0
            }
        }
    
    def collect_memory_info(self):
        """メモリ情報収集"""
        memory = psutil.virtual_memory()
        swap = psutil.swap_memory()
        
        return {
            'total': memory.total,
            'available': memory.available,
            'used': memory.used,
            'free': memory.free,
            'percent': memory.percent,
            'buffers': getattr(memory, 'buffers', 0),
            'cached': getattr(memory, 'cached', 0),
            'swap': {
                'total': swap.total,
                'used': swap.used,
                'free': swap.free,
                'percent': swap.percent
            }
        }
    
    def collect_disk_info(self):
        """ディスク情報収集"""
        disk_info = {}
        
        # ディスク使用量
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                disk_info[partition.mountpoint] = {
                    'device': partition.device,
                    'fstype': partition.fstype,
                    'total': usage.total,
                    'used': usage.used,
                    'free': usage.free,
                    'percent': (usage.used / usage.total) * 100
                }
            except PermissionError:
                continue
        
        # ディスクI/O統計
        disk_io = psutil.disk_io_counters()
        if disk_io:
            disk_info['io_stats'] = {
                'read_count': disk_io.read_count,
                'write_count': disk_io.write_count,
                'read_bytes': disk_io.read_bytes,
                'write_bytes': disk_io.write_bytes,
                'read_time': disk_io.read_time,
                'write_time': disk_io.write_time
            }
        
        return disk_info
    
    def collect_network_info(self):
        """ネットワーク情報収集"""
        network_info = {}
        
        # ネットワークインターフェース情報
        for interface, addresses in psutil.net_if_addrs().items():
            interface_info = []
            for addr in addresses:
                interface_info.append({
                    'family': str(addr.family),
                    'address': addr.address,
                    'netmask': addr.netmask,
                    'broadcast': addr.broadcast
                })
            network_info[interface] = interface_info
        
        # ネットワークI/O統計
        net_io = psutil.net_io_counters()
        if net_io:
            network_info['io_stats'] = {
                'bytes_sent': net_io.bytes_sent,
                'bytes_recv': net_io.bytes_recv,
                'packets_sent': net_io.packets_sent,
                'packets_recv': net_io.packets_recv,
                'errin': net_io.errin,
                'errout': net_io.errout,
                'dropin': net_io.dropin,
                'dropout': net_io.dropout
            }
        
        return network_info
    
    def collect_process_info(self):
        """プロセス情報収集"""
        processes = []
        
        for proc in psutil.process_iter(['pid', 'name', 'username', 'memory_percent', 'cpu_percent']):
            try:
                processes.append({
                    'pid': proc.info['pid'],
                    'name': proc.info['name'],
                    'username': proc.info['username'],
                    'memory_percent': proc.info['memory_percent'],
                    'cpu_percent': proc.info['cpu_percent']
                })
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        # メモリ使用量でソート(上位20プロセス)
        processes.sort(key=lambda x: x['memory_percent'] or 0, reverse=True)
        
        return {
            'total_processes': len(processes),
            'top_memory_processes': processes[:20]
        }
    
    def collect_service_info(self):
        """サービス状態収集"""
        services = {}
        
        # systemctl でサービス状態を取得
        try:
            result = subprocess.run(
                ['systemctl', 'list-units', '--type=service', '--all', '--no-pager'],
                capture_output=True, text=True, timeout=30
            )
            
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')[1:]  # ヘッダーをスキップ
                for line in lines:
                    if '.service' in line:
                        parts = line.split()
                        if len(parts) >= 4:
                            service_name = parts[0].replace('.service', '')
                            services[service_name] = {
                                'loaded': parts[1],
                                'active': parts[2],
                                'sub': parts[3],
                                'description': ' '.join(parts[4:]) if len(parts) > 4 else ''
                            }
        except Exception as e:
            services['error'] = str(e)
        
        return services
    
    def collect_all_info(self):
        """全情報の収集"""
        self.info.update({
            'cpu': self.collect_cpu_info(),
            'memory': self.collect_memory_info(),
            'disk': self.collect_disk_info(),
            'network': self.collect_network_info(),
            'processes': self.collect_process_info(),
            'services': self.collect_service_info()
        })
        
        return self.info

def main():
    collector = SystemInfoCollector()
    system_info = collector.collect_all_info()
    print(json.dumps(system_info, indent=2))

if __name__ == "__main__":
    main()

Zabbix Agent 2 プラグイン開発

Go言語プラグインの基本構造

go
// custom_plugin.go
package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    
    "git.zabbix.com/ap/plugin-support/plugin"
    "git.zabbix.com/ap/plugin-support/zbxerr"
)

const pluginName = "CustomMonitor"

// Plugin interface implementation
type Plugin struct {
    plugin.Base
    options Options
}

type Options struct {
    Endpoint string `conf:"endpoint,https://api.example.com/metrics"`
    Timeout  int    `conf:"timeout,30"`
    Token    string `conf:"token"`
}

// メトリクス構造
type MetricData struct {
    Name      string            `json:"name"`
    Value     interface{}       `json:"value"`
    Timestamp int64            `json:"timestamp"`
    Tags      map[string]string `json:"tags"`
}

type APIResponse struct {
    Status  string       `json:"status"`
    Data    []MetricData `json:"data"`
    Message string       `json:"message"`
}

var impl Plugin

// プラグイン初期化
func init() {
    plugin.RegisterMetrics(&impl, pluginName,
        "custom.api.health", "API health check status.",
        "custom.api.response_time", "API response time in milliseconds.",
        "custom.api.metrics[*]", "Custom metrics from API endpoint.",
        "custom.discovery.services", "Discover available services.",
    )
}

// 設定検証
func (p *Plugin) Validate(options interface{}) error {
    o, ok := options.(*Options)
    if !ok {
        return fmt.Errorf("invalid options type")
    }
    
    if o.Endpoint == "" {
        return fmt.Errorf("endpoint cannot be empty")
    }
    
    if o.Timeout <= 0 {
        o.Timeout = 30
    }
    
    p.options = *o
    return nil
}

// メトリクス収集の実装
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
    switch key {
    case "custom.api.health":
        return p.checkAPIHealth(), nil
        
    case "custom.api.response_time":
        return p.measureResponseTime(), nil
        
    case "custom.api.metrics":
        if len(params) == 0 {
            return nil, zbxerr.ErrorInvalidParams
        }
        return p.getCustomMetric(params[0]), nil
        
    case "custom.discovery.services":
        return p.discoverServices(), nil
        
    default:
        return nil, zbxerr.ErrorUnsupportedMetric
    }
}

// API ヘルスチェック
func (p *Plugin) checkAPIHealth() int {
    client := &http.Client{
        Timeout: time.Duration(p.options.Timeout) * time.Second,
    }
    
    req, err := http.NewRequest("GET", p.options.Endpoint+"/health", nil)
    if err != nil {
        return 0
    }
    
    if p.options.Token != "" {
        req.Header.Add("Authorization", "Bearer "+p.options.Token)
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return 0
    }
    defer resp.Body.Close()
    
    if resp.StatusCode == 200 {
        return 1
    }
    return 0
}

// レスポンス時間測定
func (p *Plugin) measureResponseTime() int64 {
    start := time.Now()
    
    client := &http.Client{
        Timeout: time.Duration(p.options.Timeout) * time.Second,
    }
    
    req, err := http.NewRequest("GET", p.options.Endpoint+"/ping", nil)
    if err != nil {
        return 0
    }
    
    if p.options.Token != "" {
        req.Header.Add("Authorization", "Bearer "+p.options.Token)
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return 0
    }
    defer resp.Body.Close()
    
    duration := time.Since(start)
    return duration.Nanoseconds() / 1000000 // ミリ秒で返す
}

// カスタムメトリクス取得
func (p *Plugin) getCustomMetric(metricName string) interface{} {
    client := &http.Client{
        Timeout: time.Duration(p.options.Timeout) * time.Second,
    }
    
    url := fmt.Sprintf("%s/metrics/%s", p.options.Endpoint, metricName)
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return 0
    }
    
    if p.options.Token != "" {
        req.Header.Add("Authorization", "Bearer "+p.options.Token)
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return 0
    }
    defer resp.Body.Close()
    
    var apiResp APIResponse
    if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil {
        return 0
    }
    
    if apiResp.Status == "success" && len(apiResp.Data) > 0 {
        return apiResp.Data[0].Value
    }
    
    return 0
}

// サービス発見
func (p *Plugin) discoverServices() string {
    client := &http.Client{
        Timeout: time.Duration(p.options.Timeout) * time.Second,
    }
    
    req, err := http.NewRequest("GET", p.options.Endpoint+"/services", nil)
    if err != nil {
        return `{"data":[]}`
    }
    
    if p.options.Token != "" {
        req.Header.Add("Authorization", "Bearer "+p.options.Token)
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return `{"data":[]}`
    }
    defer resp.Body.Close()
    
    var services []map[string]string
    if err := json.NewDecoder(resp.Body).Decode(&services); err != nil {
        return `{"data":[]}`
    }
    
    // Zabbix LLD形式に変換
    discovery := map[string][]map[string]string{
        "data": make([]map[string]string, 0),
    }
    
    for _, service := range services {
        discovery["data"] = append(discovery["data"], map[string]string{
            "{#SERVICE_NAME}": service["name"],
            "{#SERVICE_TYPE}": service["type"],
            "{#SERVICE_PORT}": service["port"],
        })
    }
    
    result, _ := json.Marshal(discovery)
    return string(result)
}

func main() {
    plugin.Start(&impl, pluginName)
}

プラグインビルドとデプロイ

makefile
# Makefile
PLUGIN_NAME = custom_monitor
VERSION = 1.0.0
BUILD_DIR = build
GO_SRC = custom_plugin.go

.PHONY: all build clean install test

all: build

build:
	@echo "Building $(PLUGIN_NAME) plugin..."
	@mkdir -p $(BUILD_DIR)
	CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
		-ldflags="-s -w -X main.version=$(VERSION)" \
		-o $(BUILD_DIR)/$(PLUGIN_NAME) $(GO_SRC)

install: build
	@echo "Installing $(PLUGIN_NAME) plugin..."
	sudo cp $(BUILD_DIR)/$(PLUGIN_NAME) /usr/lib/zabbix/agent2/plugins/
	sudo chown zabbix:zabbix /usr/lib/zabbix/agent2/plugins/$(PLUGIN_NAME)
	sudo chmod 755 /usr/lib/zabbix/agent2/plugins/$(PLUGIN_NAME)

test:
	go test -v ./...

clean:
	rm -rf $(BUILD_DIR)

package: build
	@echo "Creating package..."
	tar -czf $(BUILD_DIR)/$(PLUGIN_NAME)-$(VERSION).tar.gz \
		-C $(BUILD_DIR) $(PLUGIN_NAME)

Agent2設定ファイルの更新

bash
# /etc/zabbix/zabbix_agent2.conf

# プラグイン設定
Plugins.CustomMonitor.Endpoint=https://api.example.com/v1
Plugins.CustomMonitor.Timeout=30
Plugins.CustomMonitor.Token=your_api_token_here

# ログレベル設定
DebugLevel=4

# プラグインディレクトリ
PluginPath=/usr/lib/zabbix/agent2/plugins/

外部スクリプトとアクション

高度な外部スクリプト実装

自動復旧スクリプト

python
#!/usr/bin/env python3
# /usr/lib/zabbix/externalscripts/auto_remediation.py

import sys
import json
import subprocess
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import requests

class AutoRemediation:
    def __init__(self):
        self.setup_logging()
        self.config = self.load_config()
    
    def setup_logging(self):
        """ログ設定"""
        logging.basicConfig(
            filename='/var/log/zabbix_remediation.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def load_config(self):
        """設定読み込み"""
        try:
            with open('/etc/zabbix/remediation_config.json', 'r') as f:
                return json.load(f)
        except Exception as e:
            self.logger.error(f"Config load failed: {e}")
            return {}
    
    def parse_zabbix_params(self, args):
        """Zabbixパラメータの解析"""
        if len(args) < 6:
            raise ValueError("Insufficient parameters")
        
        return {
            'event_id': args[1],
            'event_date': args[2],
            'event_time': args[3],
            'trigger_name': args[4],
            'host_name': args[5],
            'trigger_severity': args[6] if len(args) > 6 else 'Unknown',
            'item_name': args[7] if len(args) > 7 else '',
            'item_value': args[8] if len(args) > 8 else ''
        }
    
    def disk_space_cleanup(self, host_name, threshold=90):
        """ディスク容量不足の自動対応"""
        self.logger.info(f"Starting disk cleanup for {host_name}")
        
        cleanup_commands = [
            # 一時ファイルクリーンアップ
            f"ssh {host_name} 'find /tmp -type f -atime +7 -delete'",
            # ログローテーション強制実行
            f"ssh {host_name} 'logrotate -f /etc/logrotate.conf'",
            # パッケージキャッシュクリーンアップ
            f"ssh {host_name} 'apt-get clean || yum clean all'",
            # ジャーナルログクリーンアップ
            f"ssh {host_name} 'journalctl --vacuum-time=7d'"
        ]
        
        results = []
        for cmd in cleanup_commands:
            try:
                result = subprocess.run(
                    cmd.split(), 
                    capture_output=True, 
                    text=True, 
                    timeout=300
                )
                results.append({
                    'command': cmd,
                    'success': result.returncode == 0,
                    'output': result.stdout[:500],  # 出力を制限
                    'error': result.stderr[:500] if result.stderr else None
                })
                self.logger.info(f"Command executed: {cmd} - Success: {result.returncode == 0}")
            except Exception as e:
                self.logger.error(f"Command failed: {cmd} - Error: {e}")
                results.append({
                    'command': cmd,
                    'success': False,
                    'error': str(e)
                })
        
        return results
    
    def service_restart(self, host_name, service_name):
        """サービス再起動"""
        self.logger.info(f"Restarting service {service_name} on {host_name}")
        
        # サービス状態確認
        check_cmd = f"ssh {host_name} 'systemctl is-active {service_name}'"
        
        try:
            # 現在の状態をチェック
            check_result = subprocess.run(
                check_cmd.split(),
                capture_output=True,
                text=True,
                timeout=30
            )
            
            current_status = check_result.stdout.strip()
            self.logger.info(f"Current service status: {current_status}")
            
            # サービス再起動
            restart_cmd = f"ssh {host_name} 'sudo systemctl restart {service_name}'"
            restart_result = subprocess.run(
                restart_cmd.split(),
                capture_output=True,
                text=True,
                timeout=60
            )
            
            # 再起動後の状態確認
            time.sleep(5)  # 少し待つ
            final_check = subprocess.run(
                check_cmd.split(),
                capture_output=True,
                text=True,
                timeout=30
            )
            
            final_status = final_check.stdout.strip()
            success = restart_result.returncode == 0 and final_status == 'active'
            
            self.logger.info(f"Service restart result: {success}, Final status: {final_status}")
            
            return {
                'success': success,
                'previous_status': current_status,
                'final_status': final_status,
                'restart_output': restart_result.stdout,
                'restart_error': restart_result.stderr
            }
            
        except Exception as e:
            self.logger.error(f"Service restart failed: {e}")
            return {'success': False, 'error': str(e)}
    
    def high_cpu_investigation(self, host_name):
        """高CPU使用率の調査"""
        self.logger.info(f"Investigating high CPU on {host_name}")
        
        investigation_commands = [
            # トップ10プロセス
            f"ssh {host_name} 'ps aux --sort=-%cpu | head -11'",
            # システム負荷
            f"ssh {host_name} 'uptime'",
            # メモリ使用状況
            f"ssh {host_name} 'free -h'",
            # ディスクI/O状況
            f"ssh {host_name} 'iostat -x 1 1'",
            # ネットワーク接続
            f"ssh {host_name} 'netstat -tuln | wc -l'"
        ]
        
        investigation_results = {}
        for cmd in investigation_commands:
            try:
                result = subprocess.run(
                    cmd.split(),
                    capture_output=True,
                    text=True,
                    timeout=30
                )
                
                cmd_name = cmd.split("'")[1].split()[0]  # コマンド名を抽出
                investigation_results[cmd_name] = {
                    'output': result.stdout,
                    'error': result.stderr,
                    'success': result.returncode == 0
                }
                
            except Exception as e:
                self.logger.error(f"Investigation command failed: {cmd} - {e}")
        
        return investigation_results
    
    def send_notification(self, event_data, remediation_result):
        """通知送信"""
        try:
            # Slack通知
            if 'slack_webhook' in self.config:
                self.send_slack_notification(event_data, remediation_result)
            
            # メール通知
            if 'email' in self.config:
                self.send_email_notification(event_data, remediation_result)
            
        except Exception as e:
            self.logger.error(f"Notification failed: {e}")
    
    def send_slack_notification(self, event_data, remediation_result):
        """Slack通知"""
        webhook_url = self.config['slack_webhook']
        
        color = "good" if remediation_result.get('success') else "danger"
        
        payload = {
            "attachments": [{
                "color": color,
                "title": f"Auto-Remediation: {event_data['trigger_name']}",
                "fields": [
                    {"title": "Host", "value": event_data['host_name'], "short": True},
                    {"title": "Event ID", "value": event_data['event_id'], "short": True},
                    {"title": "Status", "value": "Success" if remediation_result.get('success') else "Failed", "short": True},
                    {"title": "Time", "value": f"{event_data['event_date']} {event_data['event_time']}", "short": True}
                ],
                "text": f"Details: {json.dumps(remediation_result, indent=2)}"
            }]
        }
        
        requests.post(webhook_url, json=payload, timeout=10)
    
    def execute_remediation(self, event_data):
        """自動復旧の実行"""
        trigger_name = event_data['trigger_name'].lower()
        host_name = event_data['host_name']
        
        # トリガー名に基づく自動復旧ロジック
        if 'disk space' in trigger_name or 'filesystem' in trigger_name:
            result = self.disk_space_cleanup(host_name)
            result['remediation_type'] = 'disk_cleanup'
            
        elif 'service' in trigger_name and 'down' in trigger_name:
            # サービス名を抽出(例:httpd service is down)
            service_name = self.extract_service_name(trigger_name)
            if service_name:
                result = self.service_restart(host_name, service_name)
                result['remediation_type'] = 'service_restart'
                result['service_name'] = service_name
            else:
                result = {'success': False, 'error': 'Could not determine service name'}
                
        elif 'high cpu' in trigger_name or 'cpu utilization' in trigger_name:
            result = self.high_cpu_investigation(host_name)
            result['remediation_type'] = 'cpu_investigation'
            result['success'] = True  # 調査は常に成功
            
        else:
            result = {
                'success': False,
                'error': 'No remediation rule found for this trigger',
                'remediation_type': 'none'
            }
        
        # 通知送信
        self.send_notification(event_data, result)
        
        return result
    
    def extract_service_name(self, trigger_name):
        """トリガー名からサービス名を抽出"""
        common_services = ['httpd', 'nginx', 'mysql', 'postgresql', 'ssh', 'postfix']
        
        for service in common_services:
            if service in trigger_name.lower():
                return service
        
        return None

def main():
    if len(sys.argv) < 6:
        print("Usage: auto_remediation.py event_id date time trigger_name host_name [severity] [item_name] [item_value]")
        sys.exit(1)
    
    remediation = AutoRemediation()
    
    try:
        event_data = remediation.parse_zabbix_params(sys.argv)
        result = remediation.execute_remediation(event_data)
        
        # 結果をJSON形式で出力
        print(json.dumps({
            'event_data': event_data,
            'remediation_result': result,
            'timestamp': datetime.now().isoformat()
        }, indent=2))
        
    except Exception as e:
        error_result = {
            'success': False,
            'error': str(e),
            'timestamp': datetime.now().isoformat()
        }
        print(json.dumps(error_result, indent=2))
        logging.error(f"Auto-remediation failed: {e}")

if __name__ == "__main__":
    main()

設定ファイル例

json
{
  "slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
  "email": {
    "smtp_server": "smtp.company.com",
    "smtp_port": 587,
    "username": "[email protected]",
    "password": "email_password",
    "recipients": ["[email protected]", "[email protected]"]
  },
  "remediation_rules": {
    "disk_space": {
      "enabled": true,
      "threshold": 90,
      "max_cleanup_size_gb": 10
    },
    "service_restart": {
      "enabled": true,
      "allowed_services": ["httpd", "nginx", "mysql", "postgresql"],
      "max_restart_attempts": 3
    },
    "cpu_investigation": {
      "enabled": true,
      "cpu_threshold": 80
    }
  },
  "security": {
    "allowed_hosts": ["web-*", "db-*", "app-*"],
    "ssh_key_path": "/home/zabbix/.ssh/id_rsa",
    "sudo_required": true
  }
}

フロントエンドカスタマイゼーション

カスタムダッシュボードウィジェット

javascript
// custom_widget.js
class CustomMetricsWidget {
    constructor(widgetId, options) {
        this.widgetId = widgetId;
        this.options = options;
        this.container = document.getElementById(widgetId);
        this.refreshInterval = options.refresh_interval || 30000;
        this.init();
    }
    
    init() {
        this.createContainer();
        this.loadData();
        this.startAutoRefresh();
    }
    
    createContainer() {
        this.container.innerHTML = `
            <div class="custom-widget">
                <div class="widget-header">
                    <h3>${this.options.title || 'Custom Metrics'}</h3>
                    <div class="widget-controls">
                        <button class="refresh-btn" onclick="window.customWidgets['${this.widgetId}'].loadData()">
                            🔄
                        </button>
                    </div>
                </div>
                <div class="widget-content" id="${this.widgetId}-content">
                    <div class="loading">Loading...</div>
                </div>
            </div>
        `;
    }
    
    async loadData() {
        try {
            const response = await fetch('/zabbix/api_jsonrpc.php', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    jsonrpc: '2.0',
                    method: 'item.get',
                    params: {
                        output: ['itemid', 'name', 'lastvalue', 'units'],
                        hostids: this.options.hostids,
                        search: {
                            key_: this.options.item_pattern
                        },
                        monitored: true
                    },
                    auth: this.getAuthToken(),
                    id: 1
                })
            });
            
            const data = await response.json();
            
            if (data.result) {
                this.renderData(data.result);
            } else {
                this.showError('Failed to load data');
            }
        } catch (error) {
            this.showError('Network error: ' + error.message);
        }
    }
    
    renderData(items) {
        const content = document.getElementById(`${this.widgetId}-content`);
        
        if (items.length === 0) {
            content.innerHTML = '<div class="no-data">No data available</div>';
            return;
        }
        
        const html = `
            <div class="metrics-grid">
                ${items.map(item => this.renderMetricCard(item)).join('')}
            </div>
        `;
        
        content.innerHTML = html;
    }
    
    renderMetricCard(item) {
        const value = parseFloat(item.lastvalue);
        const colorClass = this.getValueColorClass(value, item.name);
        
        return `
            <div class="metric-card ${colorClass}">
                <div class="metric-name">${item.name}</div>
                <div class="metric-value">
                    ${this.formatValue(value, item.units)}
                </div>
                <div class="metric-trend">
                    ${this.getTrendIcon(value)}
                </div>
            </div>
        `;
    }
    
    getValueColorClass(value, itemName) {
        // カスタムしきい値ロジック
        if (itemName.includes('CPU') && value > 80) return 'critical';
        if (itemName.includes('Memory') && value > 85) return 'warning';
        if (itemName.includes('Disk') && value > 90) return 'critical';
        
        return 'normal';
    }
    
    formatValue(value, units) {
        if (units === '%') {
            return `${value.toFixed(1)}%`;
        } else if (units === 'B') {
            return this.formatBytes(value);
        } else {
            return `${value.toFixed(2)} ${units}`;
        }
    }
    
    formatBytes(bytes) {
        const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
        if (bytes === 0) return '0 B';
        const i = Math.floor(Math.log(bytes) / Math.log(1024));
        return `${(bytes / Math.pow(1024, i)).toFixed(2)} ${sizes[i]}`;
    }
    
    getTrendIcon(value) {
        // 簡単なトレンド表示(実際には履歴データが必要)
        return '📈'; // 実装に応じて動的に変更
    }
    
    showError(message) {
        const content = document.getElementById(`${this.widgetId}-content`);
        content.innerHTML = `<div class="error">${message}</div>`;
    }
    
    getAuthToken() {
        // セッションからトークンを取得
        return sessionStorage.getItem('zabbix_auth_token') || '';
    }
    
    startAutoRefresh() {
        setInterval(() => {
            this.loadData();
        }, this.refreshInterval);
    }
}

// グローバル管理
window.customWidgets = window.customWidgets || {};

// ウィジェット初期化関数
function initCustomWidget(widgetId, options) {
    window.customWidgets[widgetId] = new CustomMetricsWidget(widgetId, options);
}

カスタムスタイル

css
/* custom_widgets.css */
.custom-widget {
    border: 1px solid #ddd;
    border-radius: 8px;
    background: #fff;
    box-shadow: 0 2px 4px rgba(0,0,0,0.1);
    margin: 10px;
}

.widget-header {
    display: flex;
    justify-content: space-between;
    align-items: center;
    padding: 15px 20px;
    border-bottom: 1px solid #eee;
    background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
    color: white;
    border-radius: 7px 7px 0 0;
}

.widget-header h3 {
    margin: 0;
    font-size: 18px;
    font-weight: 600;
}

.widget-controls {
    display: flex;
    gap: 10px;
}

.refresh-btn {
    background: rgba(255,255,255,0.2);
    border: none;
    color: white;
    padding: 8px 12px;
    border-radius: 4px;
    cursor: pointer;
    transition: background 0.3s;
}

.refresh-btn:hover {
    background: rgba(255,255,255,0.3);
}

.widget-content {
    padding: 20px;
    min-height: 200px;
}

.metrics-grid {
    display: grid;
    grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
    gap: 15px;
}

.metric-card {
    padding: 20px;
    border-radius: 8px;
    text-align: center;
    transition: transform 0.3s, box-shadow 0.3s;
    position: relative;
    overflow: hidden;
}

.metric-card:hover {
    transform: translateY(-5px);
    box-shadow: 0 5px 15px rgba(0,0,0,0.2);
}

.metric-card.normal {
    background: linear-gradient(135deg, #a8edea 0%, #fed6e3 100%);
    border-left: 4px solid #00c851;
}

.metric-card.warning {
    background: linear-gradient(135deg, #ffeaa7 0%, #fab1a0 100%);
    border-left: 4px solid #ff8800;
}

.metric-card.critical {
    background: linear-gradient(135deg, #fd79a8 0%, #fdcb6e 100%);
    border-left: 4px solid #cc0000;
}

.metric-name {
    font-size: 14px;
    color: #666;
    margin-bottom: 10px;
    font-weight: 500;
}

.metric-value {
    font-size: 28px;
    font-weight: bold;
    color: #333;
    margin-bottom: 10px;
}

.metric-trend {
    font-size: 18px;
    position: absolute;
    top: 10px;
    right: 15px;
}

.loading {
    text-align: center;
    padding: 50px;
    color: #666;
}

.loading::after {
    content: '';
    display: inline-block;
    width: 20px;
    height: 20px;
    border: 3px solid #f3f3f3;
    border-top: 3px solid #3498db;
    border-radius: 50%;
    animation: spin 1s linear infinite;
    margin-left: 10px;
}

@keyframes spin {
    0% { transform: rotate(0deg); }
    100% { transform: rotate(360deg); }
}

.error {
    color: #cc0000;
    text-align: center;
    padding: 30px;
    background: #ffe6e6;
    border-radius: 4px;
}

.no-data {
    text-align: center;
    padding: 30px;
    color: #999;
    font-style: italic;
}

/* レスポンシブ対応 */
@media (max-width: 768px) {
    .metrics-grid {
        grid-template-columns: 1fr;
    }
    
    .widget-header {
        padding: 10px 15px;
    }
    
    .widget-content {
        padding: 15px;
    }
    
    .metric-card {
        padding: 15px;
    }
    
    .metric-value {
        font-size: 24px;
    }
}

/* ダークモード対応 */
@media (prefers-color-scheme: dark) {
    .custom-widget {
        background: #2c3e50;
        border-color: #34495e;
        color: #ecf0f1;
    }
    
    .widget-header {
        border-bottom-color: #34495e;
    }
    
    .metric-name {
        color: #bdc3c7;
    }
    
    .metric-value {
        color: #ecf0f1;
    }
    
    .loading {
        color: #bdc3c7;
    }
}

テーマカスタマイゼーション

php
<?php
// include/classes/html/CTheme.php への追加

class CustomTheme extends CTheme {
    
    public function __construct() {
        parent::__construct();
        $this->addCustomStyles();
        $this->addCustomScripts();
    }
    
    private function addCustomStyles() {
        // カスタムCSS の追加
        $this->addExternalCSS('assets/css/custom_theme.css');
        $this->addExternalCSS('assets/css/custom_widgets.css');
    }
    
    private function addCustomScripts() {
        // カスタムJavaScript の追加
        $this->addExternalJS('assets/js/custom_widgets.js');
        $this->addExternalJS('assets/js/custom_functions.js');
    }
    
    public function getCustomLogo() {
        // カスタムロゴの設定
        return 'assets/img/custom_logo.png';
    }
    
    public function getCustomFavicon() {
        // カスタムファビコンの設定
        return 'assets/img/custom_favicon.ico';
    }
}

セキュリティとベストプラクティス

セキュアなカスタムスクリプト実装

python
#!/usr/bin/env python3
# セキュアなカスタムスクリプトの例

import os
import sys
import subprocess
import json
import logging
import hashlib
import time
from pathlib import Path

class SecureCustomScript:
    def __init__(self):
        self.setup_logging()
        self.validate_environment()
        self.load_security_config()
    
    def setup_logging(self):
        """セキュアなログ設定"""
        log_file = "/var/log/zabbix_secure_custom.log"
        
        # ログファイルの権限確認
        if os.path.exists(log_file):
            stat = os.stat(log_file)
            if stat.st_mode & 0o077:  # 所有者以外に権限がある
                raise PermissionError("Log file has insecure permissions")
        
        logging.basicConfig(
            filename=log_file,
            level=logging.INFO,
            format='%(asctime)s - PID:%(process)d - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def validate_environment(self):
        """実行環境の検証"""
        # 実行ユーザーの確認
        if os.getuid() == 0:
            raise PermissionError("Script should not run as root")
        
        # 実行ユーザーがzabbixかどうか確認
        import pwd
        current_user = pwd.getpwuid(os.getuid()).pw_name
        if current_user != 'zabbix':
            self.logger.warning(f"Script running as {current_user}, expected zabbix")
        
        # umask設定
        os.umask(0o077)
    
    def load_security_config(self):
        """セキュリティ設定の読み込み"""
        config_file = "/etc/zabbix/secure_custom.json"
        
        try:
            with open(config_file, 'r') as f:
                self.security_config = json.load(f)
        except Exception as e:
            self.logger.error(f"Failed to load security config: {e}")
            self.security_config = {
                "allowed_commands": [],
                "max_execution_time": 300,
                "allowed_paths": ["/usr/bin", "/bin"],
                "rate_limit": {"max_calls": 100, "time_window": 3600}
            }
    
    def validate_input(self, input_data):
        """入力値の検証"""
        if not isinstance(input_data, (str, int, float)):
            raise ValueError("Invalid input type")
        
        if isinstance(input_data, str):
            # 危険な文字の検証
            dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>']
            for char in dangerous_chars:
                if char in input_data:
                    raise ValueError(f"Dangerous character found: {char}")
            
            # 長さ制限
            if len(input_data) > 1000:
                raise ValueError("Input too long")
        
        return input_data
    
    def execute_safe_command(self, command, args=None):
        """安全なコマンド実行"""
        # コマンドのホワイトリストチェック
        if command not in self.security_config.get("allowed_commands", []):
            raise PermissionError(f"Command not allowed: {command}")
        
        # 実行可能ファイルのパス検証
        command_path = None
        for allowed_path in self.security_config.get("allowed_paths", []):
            potential_path = os.path.join(allowed_path, command)
            if os.path.isfile(potential_path) and os.access(potential_path, os.X_OK):
                command_path = potential_path
                break
        
        if not command_path:
            raise FileNotFoundError(f"Command not found in allowed paths: {command}")
        
        # 引数の検証
        if args:
            validated_args = [self.validate_input(arg) for arg in args]
        else:
            validated_args = []
        
        # レート制限チェック
        self.check_rate_limit()
        
        try:
            # タイムアウト付きでコマンド実行
            result = subprocess.run(
                [command_path] + validated_args,
                capture_output=True,
                text=True,
                timeout=self.security_config.get("max_execution_time", 300),
                env={"PATH": ":".join(self.security_config.get("allowed_paths", []))}
            )
            
            self.logger.info(f"Command executed: {command} {' '.join(validated_args)}")
            
            return {
                "returncode": result.returncode,
                "stdout": result.stdout,
                "stderr": result.stderr
            }
            
        except subprocess.TimeoutExpired:
            self.logger.error(f"Command timeout: {command}")
            raise TimeoutError("Command execution timeout")
        
        except Exception as e:
            self.logger.error(f"Command execution failed: {command} - {e}")
            raise
    
    def check_rate_limit(self):
        """レート制限のチェック"""
        rate_file = "/tmp/zabbix_custom_rate_limit"
        current_time = int(time.time())
        
        # レート制限ファイルの読み込み
        try:
            with open(rate_file, 'r') as f:
                rate_data = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            rate_data = {"calls": [], "last_cleanup": current_time}
        
        # 古いエントリのクリーンアップ
        time_window = self.security_config.get("rate_limit", {}).get("time_window", 3600)
        cutoff_time = current_time - time_window
        rate_data["calls"] = [call_time for call_time in rate_data["calls"] if call_time > cutoff_time]
        
        # レート制限チェック
        max_calls = self.security_config.get("rate_limit", {}).get("max_calls", 100)
        if len(rate_data["calls"]) >= max_calls:
            raise PermissionError("Rate limit exceeded")
        
        # 現在の呼び出しを記録
        rate_data["calls"].append(current_time)
        
        # ファイルに保存
        with open(rate_file, 'w') as f:
            json.dump(rate_data, f)
        
        os.chmod(rate_file, 0o600)  # 所有者のみ読み書き可能
    
    def calculate_checksum(self, data):
        """データのチェックサム計算"""
        return hashlib.sha256(str(data).encode()).hexdigest()
    
    def secure_output(self, data):
        """セキュアな出力処理"""
        # 機密情報のマスキング
        if isinstance(data, str):
            # パスワード系の情報をマスキング
            import re
            data = re.sub(r'(password|secret|key)[\s]*[:=][\s]*[^\s,}]+', 
                         r'\1: ****', data, flags=re.IGNORECASE)
        
        return data

# 使用例
def main():
    try:
        script = SecureCustomScript()
        
        # 安全なコマンド実行例
        if len(sys.argv) > 1 and sys.argv[1] == "disk_usage":
            result = script.execute_safe_command("df", ["-h"])
            output = script.secure_output(result["stdout"])
            print(output)
        
        elif len(sys.argv) > 1 and sys.argv[1] == "process_count":
            result = script.execute_safe_command("ps", ["-aux"])
            if result["returncode"] == 0:
                process_count = len(result["stdout"].strip().split('\n')) - 1
                print(process_count)
            else:
                print("0")
        
        else:
            print("Invalid or missing command")
            sys.exit(1)
    
    except Exception as e:
        logging.error(f"Script execution failed: {e}")
        print("Error")
        sys.exit(1)

if __name__ == "__main__":
    main()

参考リンク


本セクションでは、Zabbixのカスタマイゼーション機能について詳しく説明しました。これで第7部「高度な機能」が完成です。次の第8部では、運用とベストプラクティスについて説明する予定です。