8.1 セキュリティ
企業レベルのZabbix監視環境における包括的セキュリティ実装と管理手法
概要
Zabbixセキュリティは、監視インフラストラクチャを悪意ある攻撃から保護し、監視データの機密性・完全性・可用性を確保する重要な要素です。適切なセキュリティ実装により、信頼性の高い監視環境を構築し、コンプライアンス要件を満たすことができます。
セキュリティの重要性
要素 | リスク | 対策効果 |
---|---|---|
認証・認可 | 不正アクセス・データ漏洩 | 多層防御・権限最小化 |
通信暗号化 | 盗聴・中間者攻撃 | エンドツーエンド暗号化 |
データ保護 | 情報漏洩・改ざん | 暗号化・署名検証 |
アクセス制御 | 内部不正・誤操作 | 最小権限・監査ログ |
脆弱性管理 | システム侵害 | 定期更新・脆弱性スキャン |
認証とアクセス制御
多要素認証(MFA)
LDAP/Active Directory統合
yaml
# LDAP認証設定
LDAP設定:
サーバー: "ldaps://dc.example.com:636"
ベースDN: "dc=example,dc=com"
検索属性: "sAMAccountName"
バインドDN: "CN=zabbix-service,OU=Service Accounts,DC=example,DC=com"
# セキュリティ設定
暗号化: "LDAPS (SSL/TLS)"
証明書検証: "有効"
タイムアウト: "30秒"
# ユーザーマッピング
ユーザー属性:
名前: "displayName"
メール: "mail"
グループ: "memberOf"
# グループマッピング
グループ設定:
管理者: "CN=IT-Admins,OU=Groups,DC=example,DC=com"
運用者: "CN=IT-Operations,OU=Groups,DC=example,DC=com"
閲覧者: "CN=IT-Viewers,OU=Groups,DC=example,DC=com"
SAML統合
yaml
# SAML 2.0設定
SAML設定:
IdP URL: "https://sso.example.com/simplesaml/saml2/idp/SSOService.php"
# エンティティ設定
SP Entity ID: "zabbix.example.com"
IdP Entity ID: "https://sso.example.com/simplesaml/saml2/idp/metadata.php"
# 証明書設定
署名証明書: "/etc/ssl/certs/saml-signing.crt"
暗号化証明書: "/etc/ssl/certs/saml-encryption.crt"
# 属性マッピング
属性設定:
ユーザー名: "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"
表示名: "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname"
メール: "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"
グループ: "http://schemas.microsoft.com/ws/2008/06/identity/claims/groups"
API認証強化
python
#!/usr/bin/env python3
"""強化されたZabbix API認証システム"""
import jwt
import hashlib
import secrets
import time
from typing import Dict, Optional
from datetime import datetime, timedelta
class ZabbixSecureAuth:
def __init__(self, secret_key: str, token_expiry: int = 3600):
self.secret_key = secret_key
self.token_expiry = token_expiry
self.active_sessions = {}
self.failed_attempts = {}
self.max_attempts = 5
self.lockout_duration = 300 # 5分
def authenticate_user(self, username: str, password: str,
client_ip: str, user_agent: str) -> Optional[Dict]:
"""セキュア認証処理"""
# レート制限チェック
if self._is_locked_out(username, client_ip):
raise Exception("Account temporarily locked due to multiple failed attempts")
# 認証検証
if not self._verify_credentials(username, password):
self._record_failed_attempt(username, client_ip)
raise Exception("Invalid credentials")
# セッション作成
session_data = {
'username': username,
'ip_address': client_ip,
'user_agent': user_agent,
'login_time': time.time(),
'last_activity': time.time()
}
# JWTトークン生成
token_payload = {
'username': username,
'session_id': secrets.token_hex(32),
'ip_address': client_ip,
'exp': datetime.utcnow() + timedelta(seconds=self.token_expiry),
'iat': datetime.utcnow()
}
token = jwt.encode(token_payload, self.secret_key, algorithm='HS256')
# セッション保存
self.active_sessions[token_payload['session_id']] = session_data
# 失敗回数リセット
self._reset_failed_attempts(username, client_ip)
return {
'token': token,
'session_id': token_payload['session_id'],
'expires_at': token_payload['exp']
}
def validate_token(self, token: str, client_ip: str) -> Optional[Dict]:
"""トークン検証"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
session_id = payload['session_id']
# セッション確認
if session_id not in self.active_sessions:
raise Exception("Session not found")
session = self.active_sessions[session_id]
# IP確認
if session['ip_address'] != client_ip:
raise Exception("IP address mismatch")
# セッション更新
session['last_activity'] = time.time()
return {
'username': payload['username'],
'session_id': session_id,
'valid': True
}
except jwt.ExpiredSignatureError:
raise Exception("Token expired")
except jwt.InvalidTokenError:
raise Exception("Invalid token")
def _verify_credentials(self, username: str, password: str) -> bool:
"""認証情報検証(実装例)"""
# 実際の実装では、データベースやLDAPと連携
password_hash = hashlib.sha256(password.encode()).hexdigest()
# パスワード複雑性チェック
if not self._validate_password_complexity(password):
return False
# データベース検証(簡略化)
return self._check_user_in_database(username, password_hash)
def _validate_password_complexity(self, password: str) -> bool:
"""パスワード複雑性検証"""
if len(password) < 12:
return False
checks = [
any(c.isupper() for c in password), # 大文字
any(c.islower() for c in password), # 小文字
any(c.isdigit() for c in password), # 数字
any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password) # 特殊文字
]
return sum(checks) >= 3
def _is_locked_out(self, username: str, client_ip: str) -> bool:
"""ロックアウト確認"""
key = f"{username}:{client_ip}"
if key not in self.failed_attempts:
return False
attempts = self.failed_attempts[key]
if attempts['count'] >= self.max_attempts:
if time.time() - attempts['last_attempt'] < self.lockout_duration:
return True
else:
# ロックアウト期間経過、リセット
del self.failed_attempts[key]
return False
def _record_failed_attempt(self, username: str, client_ip: str):
"""失敗試行記録"""
key = f"{username}:{client_ip}"
current_time = time.time()
if key not in self.failed_attempts:
self.failed_attempts[key] = {'count': 0, 'last_attempt': 0}
# 1時間以内の試行のみカウント
if current_time - self.failed_attempts[key]['last_attempt'] > 3600:
self.failed_attempts[key]['count'] = 0
self.failed_attempts[key]['count'] += 1
self.failed_attempts[key]['last_attempt'] = current_time
def _reset_failed_attempts(self, username: str, client_ip: str):
"""失敗回数リセット"""
key = f"{username}:{client_ip}"
if key in self.failed_attempts:
del self.failed_attempts[key]
def _check_user_in_database(self, username: str, password_hash: str) -> bool:
"""データベースユーザー確認(実装例)"""
# 実際の実装では適切なデータベース接続を使用
return True # 簡略化
# 使用例
auth_system = ZabbixSecureAuth(
secret_key="your-secret-key-here",
token_expiry=3600
)
# 認証実行
try:
auth_result = auth_system.authenticate_user(
username="admin",
password="complex-password-123!",
client_ip="192.168.1.100",
user_agent="Mozilla/5.0..."
)
print(f"Authentication successful: {auth_result['token']}")
except Exception as e:
print(f"Authentication failed: {e}")
権限管理
最小権限原則実装
yaml
# 役割ベースアクセス制御(RBAC)
RBAC設計:
# 基本役割定義
役割定義:
システム管理者:
権限:
- 全システム管理
- ユーザー管理
- 設定変更
- API フルアクセス
制限:
- 監査ログ削除不可
インフラ管理者:
権限:
- ホスト管理
- テンプレート管理
- アイテム・トリガー設定
- ダッシュボード管理
制限:
- ユーザー管理不可
- システム設定変更不可
アプリケーション管理者:
権限:
- アプリケーションホスト監視
- 関連ダッシュボード編集
- 問題確認・応答
制限:
- インフラ設定変更不可
- 他アプリケーション不可視
運用者:
権限:
- ダッシュボード閲覧
- 問題確認
- レポート生成
制限:
- 設定変更一切不可
- 削除操作不可
監査者:
権限:
- 読み取り専用アクセス
- 監査ログ閲覧
- レポート出力
制限:
- 変更操作一切不可
- 個人情報アクセス制限
# データアクセス制御
データアクセス制御:
# ホストグループベース制御
ホストグループアクセス:
"Production Servers":
アクセス許可: ["システム管理者", "インフラ管理者"]
読み取り専用: ["運用者", "監査者"]
"Development Servers":
アクセス許可: ["開発チーム", "アプリケーション管理者"]
読み取り専用: ["運用者"]
"Database Servers":
アクセス許可: ["DBA", "システム管理者"]
読み取り専用: ["インフラ管理者", "運用者"]
# 時間ベース制御
時間制限:
営業時間: "08:00-18:00"
メンテナンス時間: "02:00-04:00"
制限:
"運用者": "営業時間のみアクセス可能"
"保守要員": "メンテナンス時間のみ変更操作可能"
通信暗号化
TLS/SSL実装
Webインターフェース暗号化
nginx
# Nginx SSL設定(推奨設定)
server {
listen 443 ssl http2;
server_name zabbix.example.com;
# SSL証明書設定
ssl_certificate /etc/ssl/certs/zabbix.example.com.crt;
ssl_certificate_key /etc/ssl/private/zabbix.example.com.key;
# セキュリティ強化設定
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# HSTS設定
add_header Strict-Transport-Security "max-age=63072000" always;
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Referrer-Policy "strict-origin-when-cross-origin";
# CSP設定
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';";
# セッション設定
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;
# OCSP Stapling
ssl_stapling on;
ssl_stapling_verify on;
ssl_trusted_certificate /etc/ssl/certs/ca-certificates.crt;
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# セキュリティヘッダー
proxy_set_header X-Forwarded-Ssl on;
proxy_redirect off;
# タイムアウト設定
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
# HTTP -> HTTPS リダイレクト
server {
listen 80;
server_name zabbix.example.com;
return 301 https://$server_name$request_uri;
}
エージェント通信暗号化
bash
# Zabbixエージェント暗号化設定
cat > /etc/zabbix/zabbix_agentd.conf.d/encryption.conf << 'EOF'
# TLS暗号化設定
TLSConnect=psk
TLSAccept=psk
# PSK設定
TLSPSKIdentity=PSK-Agent-001
TLSPSKFile=/etc/zabbix/zabbix_agentd.psk
# 証明書ベース暗号化(高セキュリティ環境)
# TLSConnect=cert
# TLSAccept=cert
# TLSCertFile=/etc/zabbix/certs/zabbix_agentd.crt
# TLSKeyFile=/etc/zabbix/certs/zabbix_agentd.key
# TLSCAFile=/etc/zabbix/certs/ca.crt
# サーバー認証
TLSServerCertIssuer=CN=Zabbix-CA,O=Example Corp,C=JP
TLSServerCertSubject=CN=Zabbix-Server,O=Example Corp,C=JP
# ログレベル(デバッグ時のみ)
# DebugLevel=4
EOF
# PSKキー生成
openssl rand -hex 32 > /etc/zabbix/zabbix_agentd.psk
chmod 600 /etc/zabbix/zabbix_agentd.psk
chown zabbix:zabbix /etc/zabbix/zabbix_agentd.psk
# 証明書ベース暗号化の証明書生成
mkdir -p /etc/zabbix/certs
# CA証明書作成
openssl genrsa -out /etc/zabbix/certs/ca.key 4096
openssl req -new -x509 -days 3650 -key /etc/zabbix/certs/ca.key \
-out /etc/zabbix/certs/ca.crt \
-subj "/C=JP/O=Example Corp/CN=Zabbix-CA"
# エージェント証明書作成
openssl genrsa -out /etc/zabbix/certs/zabbix_agentd.key 2048
openssl req -new -key /etc/zabbix/certs/zabbix_agentd.key \
-out /etc/zabbix/certs/zabbix_agentd.csr \
-subj "/C=JP/O=Example Corp/CN=agent.example.com"
openssl x509 -req -in /etc/zabbix/certs/zabbix_agentd.csr \
-CA /etc/zabbix/certs/ca.crt \
-CAkey /etc/zabbix/certs/ca.key \
-CAcreateserial \
-out /etc/zabbix/certs/zabbix_agentd.crt \
-days 365
# 権限設定
chmod 600 /etc/zabbix/certs/*
chown zabbix:zabbix /etc/zabbix/certs/*
データベース暗号化
保存データ暗号化
sql
-- MySQL/MariaDB暗号化設定
-- 1. 暗号化キー設定
INSTALL PLUGIN file_key_management SONAME 'file_key_management.so';
-- my.cnf設定例
/*
[mysqld]
# 暗号化プラグイン
plugin_load_add = file_key_management
file_key_management_filename = /etc/mysql/encryption/keyfile
file_key_management_filekey = FILE:/etc/mysql/encryption/keyfile.key
file_key_management_encryption_algorithm = AES_CTR
# テーブル暗号化
encrypt_tables = ON
encrypt_tmp_disk_tables = ON
encrypt_tmp_files = ON
encrypt_binlog = ON
# InnoDB暗号化
innodb_encrypt_tables = ON
innodb_encrypt_log = ON
innodb_encryption_threads = 4
innodb_encryption_rotate_key_age = 1
*/
-- 2. 暗号化テーブル作成例
CREATE TABLE history_encrypted (
itemid bigint(20) unsigned NOT NULL,
clock int(11) NOT NULL DEFAULT '0',
value double(16,4) NOT NULL DEFAULT '0.0000',
ns int(11) NOT NULL DEFAULT '0',
KEY history_1 (itemid,clock)
) ENGINE=InnoDB
ENCRYPTED=YES
ENCRYPTION_KEY_ID=1
PARTITION BY RANGE (clock) (
PARTITION p2024_01 VALUES LESS THAN (UNIX_TIMESTAMP('2024-02-01 00:00:00')),
PARTITION p2024_02 VALUES LESS THAN (UNIX_TIMESTAMP('2024-03-01 00:00:00')),
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 3. 既存テーブル暗号化
ALTER TABLE history ENCRYPTED=YES ENCRYPTION_KEY_ID=1;
ALTER TABLE history_uint ENCRYPTED=YES ENCRYPTION_KEY_ID=1;
ALTER TABLE history_str ENCRYPTED=YES ENCRYPTION_KEY_ID=1;
ALTER TABLE history_text ENCRYPTED=YES ENCRYPTION_KEY_ID=1;
ALTER TABLE history_log ENCRYPTED=YES ENCRYPTION_KEY_ID=1;
通信暗号化
ini
# PostgreSQL SSL設定
# postgresql.conf
ssl = on
ssl_cert_file = '/etc/ssl/certs/server.crt'
ssl_key_file = '/etc/ssl/private/server.key'
ssl_ca_file = '/etc/ssl/certs/ca.crt'
ssl_crl_file = ''
# 暗号化強度設定
ssl_ciphers = 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384'
ssl_prefer_server_ciphers = on
ssl_min_protocol_version = 'TLSv1.2'
# pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
hostssl zabbix zabbix 192.168.1.0/24 md5 clientcert=1
hostssl all all ::1/128 md5 clientcert=1
脆弱性管理
セキュリティ更新管理
自動更新システム
bash
#!/bin/bash
# Zabbix セキュリティ更新チェックスクリプト
LOG_FILE="/var/log/zabbix/security_updates.log"
ZABBIX_CONFIG="/etc/zabbix/zabbix_server.conf"
BACKUP_DIR="/backup/zabbix/security_updates"
# ログ関数
log_message() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 現在のバージョン確認
check_current_version() {
local current_version=$(zabbix_server --version | head -1 | awk '{print $3}')
log_message "Current Zabbix version: $current_version"
echo "$current_version"
}
# セキュリティアドバイザリ確認
check_security_advisories() {
log_message "Checking Zabbix security advisories..."
# Zabbix公式セキュリティページの確認
curl -s "https://www.zabbix.com/rn/rn6.0.0" | \
grep -i "security\|vulnerability\|CVE" > /tmp/security_check.txt
if [ -s /tmp/security_check.txt ]; then
log_message "Security advisories found:"
cat /tmp/security_check.txt | tee -a "$LOG_FILE"
return 0
else
log_message "No security advisories found"
return 1
fi
}
# バックアップ作成
create_backup() {
local backup_date=$(date '+%Y%m%d_%H%M%S')
local backup_path="$BACKUP_DIR/zabbix_backup_$backup_date"
log_message "Creating backup: $backup_path"
mkdir -p "$backup_path"
# 設定ファイルバックアップ
cp -r /etc/zabbix "$backup_path/"
# データベースバックアップ
mysqldump --single-transaction --routines --triggers \
zabbix > "$backup_path/zabbix_db_backup.sql"
# Webファイルバックアップ
cp -r /usr/share/zabbix "$backup_path/"
log_message "Backup completed: $backup_path"
}
# 更新前検証
pre_update_verification() {
log_message "Running pre-update verification..."
# サービス状態確認
systemctl is-active zabbix-server >/dev/null 2>&1
if [ $? -ne 0 ]; then
log_message "ERROR: Zabbix server is not running"
return 1
fi
# データベース接続確認
mysql -u zabbix -p$ZABBIX_DB_PASSWORD -e "SELECT COUNT(*) FROM hosts;" zabbix >/dev/null 2>&1
if [ $? -ne 0 ]; then
log_message "ERROR: Database connection failed"
return 1
fi
# ディスク容量確認
local available_space=$(df /var/lib/mysql | awk 'NR==2 {print $4}')
if [ "$available_space" -lt 1048576 ]; then # 1GB = 1048576 KB
log_message "WARNING: Low disk space: ${available_space}KB available"
fi
log_message "Pre-update verification completed successfully"
return 0
}
# セキュリティ更新実行
perform_security_update() {
log_message "Starting security update process..."
# パッケージリスト更新
apt update
# セキュリティ更新確認
apt list --upgradable | grep zabbix
# 更新実行(テスト環境での事前確認推奨)
log_message "Installing security updates..."
apt upgrade -y zabbix-server-mysql zabbix-frontend-php zabbix-apache-conf zabbix-agent
# サービス再起動
log_message "Restarting Zabbix services..."
systemctl restart zabbix-server
systemctl restart zabbix-agent
systemctl restart apache2
# 更新後確認
sleep 10
systemctl is-active zabbix-server
if [ $? -eq 0 ]; then
log_message "Security update completed successfully"
return 0
else
log_message "ERROR: Zabbix server failed to start after update"
return 1
fi
}
# 更新後検証
post_update_verification() {
log_message "Running post-update verification..."
local new_version=$(check_current_version)
log_message "Updated to version: $new_version"
# API接続確認
curl -s -X POST \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"apiinfo.version","params":{},"id":1}' \
http://localhost/zabbix/api_jsonrpc.php | jq -r '.result' > /tmp/api_version.txt
if [ -s /tmp/api_version.txt ]; then
log_message "API version: $(cat /tmp/api_version.txt)"
else
log_message "WARNING: API connection test failed"
fi
# データ整合性確認
local host_count=$(mysql -u zabbix -p$ZABBIX_DB_PASSWORD -s -e "SELECT COUNT(*) FROM hosts;" zabbix)
log_message "Host count after update: $host_count"
log_message "Post-update verification completed"
}
# セキュリティ通知
send_security_notification() {
local status="$1"
local message="$2"
# Slack通知例
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🔒 Zabbix Security Update: $status\\n$message\"}" \
"$SLACK_WEBHOOK_URL"
# メール通知
echo "$message" | mail -s "Zabbix Security Update: $status" [email protected]
}
# メイン処理
main() {
log_message "Starting Zabbix security update check..."
# セキュリティアドバイザリ確認
if check_security_advisories; then
log_message "Security updates available - proceeding with update..."
# バックアップ作成
create_backup
# 更新前検証
if pre_update_verification; then
# セキュリティ更新実行
if perform_security_update; then
# 更新後検証
post_update_verification
send_security_notification "SUCCESS" "Security updates applied successfully"
else
send_security_notification "FAILED" "Security update failed - manual intervention required"
exit 1
fi
else
send_security_notification "FAILED" "Pre-update verification failed"
exit 1
fi
else
log_message "No security updates required"
fi
log_message "Security update process completed"
}
# スクリプト実行
main "$@"
脆弱性スキャン
自動脆弱性スキャンシステム
python
#!/usr/bin/env python3
"""Zabbix脆弱性スキャンシステム"""
import requests
import json
import subprocess
import re
from typing import Dict, List
from datetime import datetime
class ZabbixVulnerabilityScanner:
def __init__(self, zabbix_url: str, api_token: str):
self.zabbix_url = zabbix_url
self.api_token = api_token
self.cve_database_url = "https://cve.circl.lu/api"
def scan_zabbix_installation(self) -> Dict:
"""Zabbix環境の脆弱性スキャン"""
results = {
'scan_date': datetime.now().isoformat(),
'vulnerabilities': [],
'recommendations': [],
'severity_summary': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
}
# 1. バージョン確認
version_info = self._get_version_info()
results['version_info'] = version_info
# 2. CVE検索
cve_results = self._search_cves(version_info['version'])
results['vulnerabilities'].extend(cve_results)
# 3. 設定セキュリティ確認
config_issues = self._check_configuration_security()
results['vulnerabilities'].extend(config_issues)
# 4. ネットワークセキュリティ確認
network_issues = self._check_network_security()
results['vulnerabilities'].extend(network_issues)
# 5. 権限設定確認
permission_issues = self._check_permission_security()
results['vulnerabilities'].extend(permission_issues)
# 6. 深刻度集計
for vuln in results['vulnerabilities']:
severity = vuln.get('severity', 'low').lower()
if severity in results['severity_summary']:
results['severity_summary'][severity] += 1
# 7. 推奨事項生成
results['recommendations'] = self._generate_recommendations(results['vulnerabilities'])
return results
def _get_version_info(self) -> Dict:
"""Zabbixバージョン情報取得"""
try:
# API経由でバージョン取得
response = requests.post(
f"{self.zabbix_url}/api_jsonrpc.php",
json={
"jsonrpc": "2.0",
"method": "apiinfo.version",
"params": {},
"id": 1
},
headers={"Authorization": f"Bearer {self.api_token}"}
)
api_version = response.json().get('result', 'unknown')
# システムバージョン取得
try:
server_version = subprocess.check_output(
['zabbix_server', '--version'],
stderr=subprocess.STDOUT
).decode().split('\n')[0]
version_match = re.search(r'(\d+\.\d+\.\d+)', server_version)
system_version = version_match.group(1) if version_match else 'unknown'
except:
system_version = 'unknown'
return {
'api_version': api_version,
'system_version': system_version,
'version': api_version # CVE検索用
}
except Exception as e:
return {'error': str(e), 'version': 'unknown'}
def _search_cves(self, version: str) -> List[Dict]:
"""CVE検索"""
vulnerabilities = []
try:
# Zabbix CVE検索
search_response = requests.get(
f"{self.cve_database_url}/search/zabbix/{version}",
timeout=30
)
if search_response.status_code == 200:
cve_data = search_response.json()
for cve in cve_data:
vulnerability = {
'type': 'CVE',
'id': cve.get('id', ''),
'summary': cve.get('summary', ''),
'severity': self._determine_cvss_severity(cve.get('cvss', 0)),
'published': cve.get('Published', ''),
'modified': cve.get('Modified', ''),
'references': cve.get('references', [])
}
vulnerabilities.append(vulnerability)
except Exception as e:
vulnerabilities.append({
'type': 'ERROR',
'message': f"CVE search failed: {str(e)}",
'severity': 'unknown'
})
return vulnerabilities
def _check_configuration_security(self) -> List[Dict]:
"""設定セキュリティ確認"""
issues = []
try:
# 設定ファイル確認
config_checks = [
{
'file': '/etc/zabbix/zabbix_server.conf',
'checks': [
('DBPassword', 'データベースパスワードが設定されているか'),
('TLSConnect', 'TLS暗号化が有効か'),
('LogFile', 'ログファイルが適切に設定されているか')
]
}
]
for config in config_checks:
try:
with open(config['file'], 'r') as f:
content = f.read()
for param, description in config['checks']:
if param not in content or content.find(f"{param}=") == -1:
issues.append({
'type': 'CONFIGURATION',
'file': config['file'],
'parameter': param,
'description': description,
'severity': 'medium',
'recommendation': f"{param}を適切に設定してください"
})
except FileNotFoundError:
issues.append({
'type': 'CONFIGURATION',
'file': config['file'],
'description': '設定ファイルが見つかりません',
'severity': 'high'
})
except Exception as e:
issues.append({
'type': 'ERROR',
'message': f"Configuration check failed: {str(e)}",
'severity': 'unknown'
})
return issues
def _check_network_security(self) -> List[Dict]:
"""ネットワークセキュリティ確認"""
issues = []
try:
# ポート確認
port_checks = [
('10051', 'Zabbix server port'),
('10050', 'Zabbix agent port'),
('80', 'HTTP port (should redirect to HTTPS)'),
('443', 'HTTPS port')
]
for port, description in port_checks:
try:
result = subprocess.run(
['netstat', '-ln'],
capture_output=True,
text=True
)
if f":{port}" in result.stdout:
if port == '80':
issues.append({
'type': 'NETWORK',
'port': port,
'description': f"{description} - HTTPSリダイレクトを推奨",
'severity': 'low',
'recommendation': 'HTTPをHTTPSにリダイレクトしてください'
})
except Exception:
pass
# SSL/TLS設定確認
try:
ssl_check = subprocess.run(
['openssl', 's_client', '-connect', f'{self.zabbix_url}:443'],
input='',
capture_output=True,
text=True,
timeout=10
)
if 'TLSv1.3' not in ssl_check.stdout and 'TLSv1.2' not in ssl_check.stdout:
issues.append({
'type': 'NETWORK',
'description': 'TLS 1.2以上が使用されていません',
'severity': 'high',
'recommendation': 'TLS 1.2以上を有効にしてください'
})
except Exception:
pass
except Exception as e:
issues.append({
'type': 'ERROR',
'message': f"Network security check failed: {str(e)}",
'severity': 'unknown'
})
return issues
def _check_permission_security(self) -> List[Dict]:
"""権限設定確認"""
issues = []
try:
# ファイル権限確認
file_permission_checks = [
('/etc/zabbix/zabbix_server.conf', '640'),
('/etc/zabbix/zabbix_agentd.conf', '644'),
('/var/log/zabbix/', '755')
]
for file_path, expected_perm in file_permission_checks:
try:
import stat
import os
file_stat = os.stat(file_path)
actual_perm = oct(file_stat.st_mode)[-3:]
if actual_perm != expected_perm:
issues.append({
'type': 'PERMISSION',
'file': file_path,
'expected': expected_perm,
'actual': actual_perm,
'severity': 'medium',
'recommendation': f"chmod {expected_perm} {file_path}"
})
except Exception:
issues.append({
'type': 'PERMISSION',
'file': file_path,
'description': 'ファイル権限の確認に失敗',
'severity': 'low'
})
except Exception as e:
issues.append({
'type': 'ERROR',
'message': f"Permission check failed: {str(e)}",
'severity': 'unknown'
})
return issues
def _determine_cvss_severity(self, cvss_score: float) -> str:
"""CVSSスコアから深刻度判定"""
if cvss_score >= 9.0:
return 'critical'
elif cvss_score >= 7.0:
return 'high'
elif cvss_score >= 4.0:
return 'medium'
else:
return 'low'
def _generate_recommendations(self, vulnerabilities: List[Dict]) -> List[str]:
"""推奨事項生成"""
recommendations = set()
# 深刻度に基づく推奨事項
critical_count = sum(1 for v in vulnerabilities if v.get('severity') == 'critical')
high_count = sum(1 for v in vulnerabilities if v.get('severity') == 'high')
if critical_count > 0:
recommendations.add("🚨 Critical vulnerabilities found - immediate action required")
recommendations.add("📋 Schedule emergency maintenance window")
recommendations.add("🔄 Apply critical security patches immediately")
if high_count > 0:
recommendations.add("⚠️ High severity vulnerabilities found")
recommendations.add("📅 Plan security updates within 24-48 hours")
# 特定タイプの推奨事項
vuln_types = [v.get('type') for v in vulnerabilities]
if 'CVE' in vuln_types:
recommendations.add("🔍 Review CVE details and apply vendor patches")
recommendations.add("📰 Subscribe to Zabbix security announcements")
if 'CONFIGURATION' in vuln_types:
recommendations.add("⚙️ Review and harden Zabbix configuration")
recommendations.add("📋 Implement configuration management")
if 'NETWORK' in vuln_types:
recommendations.add("🌐 Review network security settings")
recommendations.add("🔒 Implement network segmentation")
if 'PERMISSION' in vuln_types:
recommendations.add("👥 Review file and directory permissions")
recommendations.add("🔐 Implement least privilege principle")
# 基本推奨事項
recommendations.add("📊 Schedule regular vulnerability scans")
recommendations.add("🛡️ Implement defense in depth strategy")
recommendations.add("📝 Document security procedures")
return sorted(list(recommendations))
def generate_report(self, scan_results: Dict) -> str:
"""レポート生成"""
report = f"""
# Zabbix脆弱性スキャンレポート
**スキャン日時**: {scan_results['scan_date']}
**バージョン**: {scan_results.get('version_info', {}).get('version', 'Unknown')}
## 📊 深刻度サマリー
- 🚨 Critical: {scan_results['severity_summary']['critical']}
- ⚠️ High: {scan_results['severity_summary']['high']}
- 🔶 Medium: {scan_results['severity_summary']['medium']}
- 🔷 Low: {scan_results['severity_summary']['low']}
## 🔍 発見された脆弱性
"""
for vuln in scan_results['vulnerabilities']:
severity_emoji = {
'critical': '🚨',
'high': '⚠️',
'medium': '🔶',
'low': '🔷'
}.get(vuln.get('severity'), '❓')
report += f"### {severity_emoji} {vuln.get('type', 'Unknown')}\n"
if 'id' in vuln:
report += f"**ID**: {vuln['id']}\n"
if 'summary' in vuln:
report += f"**概要**: {vuln['summary']}\n"
if 'recommendation' in vuln:
report += f"**推奨事項**: {vuln['recommendation']}\n"
report += "\n"
report += "## 💡 推奨事項\n\n"
for rec in scan_results['recommendations']:
report += f"- {rec}\n"
return report
# 使用例
def main():
scanner = ZabbixVulnerabilityScanner(
zabbix_url="https://zabbix.example.com",
api_token="your-api-token"
)
# 脆弱性スキャン実行
results = scanner.scan_zabbix_installation()
# レポート生成
report = scanner.generate_report(results)
# ファイル出力
with open(f"vulnerability_report_{datetime.now().strftime('%Y%m%d')}.md", 'w') as f:
f.write(report)
print("脆弱性スキャン完了")
print(f"Critical: {results['severity_summary']['critical']}")
print(f"High: {results['severity_summary']['high']}")
if __name__ == "__main__":
main()
監査とコンプライアンス
監査ログ管理
包括的監査システム
python
#!/usr/bin/env python3
"""Zabbix監査ログ管理システム"""
import json
import hashlib
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
class ZabbixAuditManager:
def __init__(self, audit_db_path: str = "/var/log/zabbix/audit.db"):
self.audit_db_path = audit_db_path
self.setup_database()
self.setup_logging()
def setup_database(self):
"""監査データベース初期化"""
conn = sqlite3.connect(self.audit_db_path)
cursor = conn.cursor()
# 監査ログテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
user_id TEXT NOT NULL,
username TEXT NOT NULL,
action TEXT NOT NULL,
object_type TEXT,
object_id TEXT,
object_name TEXT,
old_values TEXT,
new_values TEXT,
ip_address TEXT,
user_agent TEXT,
session_id TEXT,
severity TEXT DEFAULT 'info',
checksum TEXT NOT NULL,
created_at TEXT NOT NULL
)
''')
# ログイン監査テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS login_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
username TEXT NOT NULL,
ip_address TEXT NOT NULL,
user_agent TEXT,
status TEXT NOT NULL,
failure_reason TEXT,
session_id TEXT,
created_at TEXT NOT NULL
)
''')
# システムイベント監査テーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
component TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
details TEXT,
checksum TEXT NOT NULL,
created_at TEXT NOT NULL
)
''')
# インデックス作成
cursor.execute('CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_logs(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_logs(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_logs(action)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_login_timestamp ON login_audit(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_login_username ON login_audit(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_system_timestamp ON system_audit(timestamp)')
conn.commit()
conn.close()
def setup_logging(self):
"""ログ設定"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/zabbix/audit_manager.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def log_user_action(self, user_data: Dict) -> str:
"""ユーザーアクション監査ログ"""
conn = sqlite3.connect(self.audit_db_path)
cursor = conn.cursor()
# チェックサム計算
checksum_data = f"{user_data['timestamp']}{user_data['username']}{user_data['action']}"
checksum = hashlib.sha256(checksum_data.encode()).hexdigest()
cursor.execute('''
INSERT INTO audit_logs (
timestamp, user_id, username, action, object_type, object_id,
object_name, old_values, new_values, ip_address, user_agent,
session_id, severity, checksum, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
user_data['timestamp'],
user_data.get('user_id', ''),
user_data['username'],
user_data['action'],
user_data.get('object_type', ''),
user_data.get('object_id', ''),
user_data.get('object_name', ''),
json.dumps(user_data.get('old_values', {})),
json.dumps(user_data.get('new_values', {})),
user_data.get('ip_address', ''),
user_data.get('user_agent', ''),
user_data.get('session_id', ''),
user_data.get('severity', 'info'),
checksum,
datetime.now().isoformat()
))
conn.commit()
audit_id = cursor.lastrowid
conn.close()
self.logger.info(f"User action logged: {user_data['action']} by {user_data['username']}")
return str(audit_id)
def log_login_attempt(self, login_data: Dict) -> str:
"""ログイン試行監査ログ"""
conn = sqlite3.connect(self.audit_db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO login_audit (
timestamp, username, ip_address, user_agent, status,
failure_reason, session_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
login_data['timestamp'],
login_data['username'],
login_data['ip_address'],
login_data.get('user_agent', ''),
login_data['status'],
login_data.get('failure_reason', ''),
login_data.get('session_id', ''),
datetime.now().isoformat()
))
conn.commit()
audit_id = cursor.lastrowid
conn.close()
self.logger.info(f"Login attempt logged: {login_data['status']} for {login_data['username']}")
return str(audit_id)
def log_system_event(self, event_data: Dict) -> str:
"""システムイベント監査ログ"""
conn = sqlite3.connect(self.audit_db_path)
cursor = conn.cursor()
# チェックサム計算
checksum_data = f"{event_data['timestamp']}{event_data['component']}{event_data['message']}"
checksum = hashlib.sha256(checksum_data.encode()).hexdigest()
cursor.execute('''
INSERT INTO system_audit (
timestamp, event_type, component, severity, message,
details, checksum, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
event_data['timestamp'],
event_data['event_type'],
event_data['component'],
event_data['severity'],
event_data['message'],
json.dumps(event_data.get('details', {})),
checksum,
datetime.now().isoformat()
))
conn.commit()
audit_id = cursor.lastrowid
conn.close()
self.logger.info(f"System event logged: {event_data['event_type']} in {event_data['component']}")
return str(audit_id)
def generate_audit_report(self, start_date: str, end_date: str) -> Dict:
"""監査レポート生成"""
conn = sqlite3.connect(self.audit_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
report = {
'period': {'start': start_date, 'end': end_date},
'summary': {},
'user_actions': [],
'login_attempts': [],
'system_events': [],
'security_incidents': []
}
# ユーザーアクションサマリー
cursor.execute('''
SELECT action, COUNT(*) as count, COUNT(DISTINCT username) as unique_users
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
GROUP BY action
ORDER BY count DESC
''', (start_date, end_date))
user_actions_summary = {}
for row in cursor.fetchall():
user_actions_summary[row['action']] = {
'count': row['count'],
'unique_users': row['unique_users']
}
report['summary']['user_actions'] = user_actions_summary
# ログイン試行サマリー
cursor.execute('''
SELECT status, COUNT(*) as count, COUNT(DISTINCT username) as unique_users
FROM login_audit
WHERE timestamp BETWEEN ? AND ?
GROUP BY status
''', (start_date, end_date))
login_summary = {}
for row in cursor.fetchall():
login_summary[row['status']] = {
'count': row['count'],
'unique_users': row['unique_users']
}
report['summary']['login_attempts'] = login_summary
# セキュリティインシデント検出
# 1. 連続ログイン失敗
cursor.execute('''
SELECT username, ip_address, COUNT(*) as failure_count
FROM login_audit
WHERE timestamp BETWEEN ? AND ? AND status = 'failed'
GROUP BY username, ip_address
HAVING failure_count >= 5
''', (start_date, end_date))
for row in cursor.fetchall():
report['security_incidents'].append({
'type': 'multiple_login_failures',
'username': row['username'],
'ip_address': row['ip_address'],
'failure_count': row['failure_count'],
'severity': 'high'
})
# 2. 異常時間帯ログイン
cursor.execute('''
SELECT username, ip_address, timestamp
FROM login_audit
WHERE timestamp BETWEEN ? AND ?
AND status = 'success'
AND (CAST(strftime('%H', timestamp) AS INTEGER) < 6
OR CAST(strftime('%H', timestamp) AS INTEGER) > 22)
''', (start_date, end_date))
for row in cursor.fetchall():
report['security_incidents'].append({
'type': 'unusual_hour_login',
'username': row['username'],
'ip_address': row['ip_address'],
'timestamp': row['timestamp'],
'severity': 'medium'
})
# 3. 特権操作
cursor.execute('''
SELECT username, action, object_type, timestamp
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
AND action IN ('user.create', 'user.delete', 'settings.update', 'template.delete')
''', (start_date, end_date))
for row in cursor.fetchall():
report['security_incidents'].append({
'type': 'privileged_operation',
'username': row['username'],
'action': row['action'],
'object_type': row['object_type'],
'timestamp': row['timestamp'],
'severity': 'info'
})
conn.close()
return report
def verify_log_integrity(self, start_date: str, end_date: str) -> Dict:
"""ログ整合性検証"""
conn = sqlite3.connect(self.audit_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
verification_result = {
'period': {'start': start_date, 'end': end_date},
'total_records': 0,
'corrupted_records': [],
'integrity_status': 'intact'
}
# 監査ログ検証
cursor.execute('''
SELECT id, timestamp, username, action, checksum
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
''', (start_date, end_date))
for row in cursor.fetchall():
verification_result['total_records'] += 1
# チェックサム再計算
checksum_data = f"{row['timestamp']}{row['username']}{row['action']}"
expected_checksum = hashlib.sha256(checksum_data.encode()).hexdigest()
if row['checksum'] != expected_checksum:
verification_result['corrupted_records'].append({
'record_id': row['id'],
'expected_checksum': expected_checksum,
'actual_checksum': row['checksum']
})
if verification_result['corrupted_records']:
verification_result['integrity_status'] = 'compromised'
conn.close()
return verification_result
def export_compliance_report(self, compliance_type: str,
start_date: str, end_date: str) -> str:
"""コンプライアンスレポート出力"""
if compliance_type.upper() == 'SOX':
return self._generate_sox_report(start_date, end_date)
elif compliance_type.upper() == 'GDPR':
return self._generate_gdpr_report(start_date, end_date)
elif compliance_type.upper() == 'HIPAA':
return self._generate_hipaa_report(start_date, end_date)
else:
return self._generate_general_compliance_report(start_date, end_date)
def _generate_sox_report(self, start_date: str, end_date: str) -> str:
"""SOX法対応レポート"""
conn = sqlite3.connect(self.audit_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
report = f"""
# SOX法対応監査レポート
**期間**: {start_date} から {end_date}
**生成日時**: {datetime.now().isoformat()}
## 内部統制関連アクティビティ
### アクセス制御変更
"""
# アクセス制御変更
cursor.execute('''
SELECT timestamp, username, action, object_name, old_values, new_values
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
AND action IN ('user.create', 'user.update', 'user.delete', 'usergroup.update')
ORDER BY timestamp DESC
''', (start_date, end_date))
for row in cursor.fetchall():
report += f"- **{row['timestamp']}**: {row['username']} - {row['action']} - {row['object_name']}\n"
# システム設定変更
report += "\n### システム設定変更\n"
cursor.execute('''
SELECT timestamp, username, action, object_name
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
AND action LIKE '%.update'
AND object_type IN ('settings', 'configuration')
ORDER BY timestamp DESC
''', (start_date, end_date))
for row in cursor.fetchall():
report += f"- **{row['timestamp']}**: {row['username']} - {row['action']} - {row['object_name']}\n"
conn.close()
return report
def _generate_gdpr_report(self, start_date: str, end_date: str) -> str:
"""GDPR対応レポート"""
conn = sqlite3.connect(self.audit_db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
report = f"""
# GDPR対応監査レポート
**期間**: {start_date} から {end_date}
**生成日時**: {datetime.now().isoformat()}
## 個人データアクセスログ
### データアクセス
"""
# データアクセスログ
cursor.execute('''
SELECT timestamp, username, action, ip_address
FROM audit_logs
WHERE timestamp BETWEEN ? AND ?
AND (action LIKE '%user%' OR action LIKE '%host%')
ORDER BY timestamp DESC
''', (start_date, end_date))
for row in cursor.fetchall():
report += f"- **{row['timestamp']}**: {row['username']} ({row['ip_address']}) - {row['action']}\n"
conn.close()
return report
def _generate_general_compliance_report(self, start_date: str, end_date: str) -> str:
"""一般的なコンプライアンスレポート"""
audit_report = self.generate_audit_report(start_date, end_date)
integrity_report = self.verify_log_integrity(start_date, end_date)
report = f"""
# 一般監査コンプライアンスレポート
**期間**: {start_date} から {end_date}
**生成日時**: {datetime.now().isoformat()}
## サマリー
### ユーザーアクション
"""
for action, data in audit_report['summary']['user_actions'].items():
report += f"- {action}: {data['count']}回 ({data['unique_users']}ユーザー)\n"
report += "\n### ログイン試行\n"
for status, data in audit_report['summary']['login_attempts'].items():
report += f"- {status}: {data['count']}回 ({data['unique_users']}ユーザー)\n"
report += f"\n## データ整合性\n"
report += f"- 総レコード数: {integrity_report['total_records']}\n"
report += f"- 破損レコード数: {len(integrity_report['corrupted_records'])}\n"
report += f"- 整合性ステータス: {integrity_report['integrity_status']}\n"
if audit_report['security_incidents']:
report += "\n## セキュリティインシデント\n"
for incident in audit_report['security_incidents']:
report += f"- **{incident['type']}**: {incident.get('username', 'N/A')} - 深刻度: {incident['severity']}\n"
return report
# 使用例
def main():
audit_manager = ZabbixAuditManager()
# ユーザーアクション記録例
user_action = {
'timestamp': datetime.now().isoformat(),
'user_id': '1',
'username': 'admin',
'action': 'host.create',
'object_type': 'host',
'object_id': '12345',
'object_name': 'web-server-01',
'old_values': {},
'new_values': {'name': 'web-server-01', 'ip': '192.168.1.100'},
'ip_address': '192.168.1.50',
'user_agent': 'Mozilla/5.0...',
'session_id': 'sess_123456',
'severity': 'info'
}
audit_manager.log_user_action(user_action)
# コンプライアンスレポート生成
report = audit_manager.export_compliance_report(
'SOX',
(datetime.now() - timedelta(days=30)).isoformat(),
datetime.now().isoformat()
)
print(report)
if __name__ == "__main__":
main()
まとめ
Zabbixセキュリティの実装は、監視インフラストラクチャの信頼性と安全性を確保する重要な要素です。
重要ポイント
- 多層防御: 認証・暗号化・アクセス制御・監査の組み合わせ
- 継続的監視: 脆弱性管理と定期的なセキュリティ評価
- コンプライアンス: 業界標準と法的要件への準拠
- インシデント対応: セキュリティ事案への迅速な対応体制
次のステップ
次章では、日常的なZabbix運用管理手法について学習し、安定した監視環境の維持・改善方法を習得します。