Datadog入門 第5部 - ログ管理の実践完全ガイド
インフラストラクチャ監視とアプリケーション監視の基盤が整ったら、次はログ管理による深い洞察の獲得です。本記事では、ログ収集の最適化、パーサー設定、分析手法、アーカイブ戦略まで、Datadogログ管理の全領域を実践的に解説します。散在するログデータを統合し、障害の根本原因分析からビジネス洞察まで、包括的なログ活用を実現するためのガイドです。
5.1 Log Management
ログ管理の重要性とDatadogのアプローチ
現代システムにおけるログの課題
マイクロサービス、コンテナ化、クラウドネイティブ環境では、ログデータが複数のシステム、サービス、インフラストラクチャに分散し、従来のログ管理手法では可視性の欠如と分析の困難さが深刻な問題となっています。
従来のログ管理の課題:
分散システムでの課題:
- 複数システム間でのログ相関の困難さ
- サービス境界を超えたトラブルシューティングの複雑化
- マイクロサービス間のリクエストフローの不透明性
技術的課題:
- 大量ログデータの効率的な保存とクエリ
- 異なるログフォーマットの統一化
- リアルタイム分析とアラート機能の不足
運用上の課題:
- ログ検索の時間コストと専門知識要求
- セキュリティとコンプライアンス要件への対応
- ストレージコストの最適化とデータ保持ポリシー
Datadogによる解決アプローチ:
統合ログ管理:
- 統一された収集・解析・可視化プラットフォーム
- メトリクス・トレースとの自動相関
- リアルタイム処理とインタラクティブ検索
スケーラブルアーキテクチャ:
- ペタバイト規模のログデータ処理
- 自動スケーリングとコスト最適化
- 高可用性とデータ耐久性保証
Datadogログ管理の哲学
Datadogのログ管理は**「Logs as Data」**の哲学に基づき、ログを単なるテキストデータではなく、構造化された価値ある情報資産として扱います。
# Datadog Log Management の核心概念
class DatadogLogPhilosophy:
"""
Datadogログ管理の基本原則
"""
def __init__(self):
self.core_principles = {
# 1. すべてのログデータの統合
"unified_logging": {
"description": "異なるソースからのログを統一プラットフォームで管理",
"benefits": [
"クロスサービス分析の実現",
"単一検索インターフェース",
"一貫したアクセス制御"
]
},
# 2. 構造化ログの推進
"structured_logging": {
"description": "JSON構造でのログ出力推奨",
"advantages": [
"高速検索とフィルタリング",
"自動フィールド抽出",
"ビジュアライゼーション対応"
]
},
# 3. コンテキスト保持
"contextual_logging": {
"description": "メタデータとタグによる豊富なコンテキスト",
"elements": [
"サービス識別情報",
"環境・バージョン情報",
"ユーザー・セッション情報"
]
},
# 4. 相関分析
"correlation_analysis": {
"description": "ログ・メトリクス・トレースの自動相関",
"capabilities": [
"トレースIDによるログ関連付け",
"異常メトリクスとログの連携",
"タイムライン一元表示"
]
},
# 5. コスト効率性
"cost_efficiency": {
"description": "インテリジェントなデータ管理によるコスト最適化",
"strategies": [
"ログレベル別保持期間設定",
"自動アーカイブとコールドストレージ",
"サンプリングとフィルタリング"
]
}
}
def demonstrate_value_proposition(self):
"""
Datadogログ管理の価値提案
"""
return {
"開発チーム": [
"デバッグ時間の大幅短縮",
"プロダクション環境での詳細可視性",
"リアルタイムエラー検知とアラート"
],
"運用チーム": [
"一元化されたログ管理",
"自動化されたインシデント対応",
"予防的問題検知"
],
"セキュリティチーム": [
"セキュリティイベントの統合監視",
"コンプライアンス監査対応",
"異常行動パターン検出"
],
"ビジネス": [
"ユーザー行動の深い洞察",
"ビジネスKPI追跡",
"データ駆動型意思決定支援"
]
}
ログ収集の設定方法
Datadog Agentによるログ収集設定
Datadog Agentは、システム全体からのログ収集の中核となるコンポーネントです。効率的で信頼性の高いログ収集を実現するための設定手法を解説します。
# /etc/datadog-agent/datadog.yaml - メイン設定ファイル
---
# ログ収集の基本設定
logs_enabled: true
logs_config:
# ログ処理のパフォーマンス設定
container_collect_all: true
processing_rules:
- type: exclude_at_match
name: exclude_debug_logs
pattern: "DEBUG|TRACE"
- type: include_at_match
name: include_error_logs
pattern: "ERROR|FATAL|CRITICAL"
# バッチ処理設定(パフォーマンス最適化)
batch_wait: 5
batch_max_concurrent_send: 10
batch_max_size: 1000
batch_max_content_size: 5000000
# タイムアウト設定
force_use_http: false
compression_level: 6
# ログフォワーダー設定
use_compression: true
use_proto: false
# セキュリティ設定
use_v2_api: true
# logs_dd_url: "agent-http-intake.logs.datadoghq.com"
# グローバルタグ設定
tags:
- env:production
- service:web-platform
- version:2.1.0
- team:backend
- datacenter:us-east-1
アプリケーションログの設定
各種アプリケーションフレームワークでの構造化ログ設定例を示します。
# Python Flask アプリケーションの構造化ログ設定
import json
import logging
import time
from datetime import datetime
from flask import Flask, request, g
from ddtrace import tracer
import structlog
app = Flask(__name__)
# 構造化ログの設定
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
class DatadogLogFormatter:
"""
Datadog用カスタムログフォーマッター
"""
def format_log(self, level, message, **kwargs):
"""
構造化ログの生成
"""
# 基本ログ構造
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level.upper(),
"message": message,
"service": "ecommerce-api",
"version": "2.1.0",
"env": "production"
}
# トレース情報の追加
span = tracer.current_span()
if span:
log_entry.update({
"dd.trace_id": str(span.trace_id),
"dd.span_id": str(span.span_id),
"dd.service": span.service,
"dd.version": span.get_tag("version")
})
# リクエストコンテキストの追加
if hasattr(g, 'request_context'):
log_entry.update({
"request": {
"id": g.request_context.get("request_id"),
"method": g.request_context.get("method"),
"path": g.request_context.get("path"),
"user_id": g.request_context.get("user_id"),
"ip_address": g.request_context.get("ip_address"),
"user_agent": g.request_context.get("user_agent")
}
})
# 追加コンテキスト
log_entry.update(kwargs)
return json.dumps(log_entry, ensure_ascii=False)
# カスタムログ関数
def log_info(message, **kwargs):
print(DatadogLogFormatter().format_log("info", message, **kwargs))
def log_error(message, error=None, **kwargs):
if error:
kwargs.update({
"error": {
"type": type(error).__name__,
"message": str(error),
"stack_trace": str(error.__traceback__) if hasattr(error, '__traceback__') else None
}
})
print(DatadogLogFormatter().format_log("error", message, **kwargs))
def log_warning(message, **kwargs):
print(DatadogLogFormatter().format_log("warning", message, **kwargs))
# リクエストミドルウェア
@app.before_request
def before_request():
g.start_time = time.time()
g.request_context = {
"request_id": generate_request_id(),
"method": request.method,
"path": request.path,
"ip_address": request.remote_addr,
"user_agent": request.headers.get("User-Agent"),
"user_id": get_current_user_id() # 認証システムから取得
}
log_info("Request started",
request_details=g.request_context,
query_params=dict(request.args),
headers={k: v for k, v in request.headers if k.lower() not in ['authorization', 'cookie']}
)
@app.after_request
def after_request(response):
duration = (time.time() - g.start_time) * 1000 # ミリ秒
log_info("Request completed",
request_id=g.request_context.get("request_id"),
status_code=response.status_code,
duration_ms=duration,
response_size=len(response.get_data()) if response.get_data() else 0
)
return response
# ビジネスロジックでのログ活用例
@app.route('/api/products/<int:product_id>')
def get_product(product_id):
try:
log_info("Product fetch initiated",
product_id=product_id,
operation="product_fetch"
)
# キャッシュから取得試行
cached_product = cache.get(f"product:{product_id}")
if cached_product:
log_info("Product cache hit",
product_id=product_id,
cache_key=f"product:{product_id}",
data_source="cache"
)
return cached_product
# データベースから取得
log_info("Database query initiated",
product_id=product_id,
query_type="product_select",
data_source="database"
)
product = Product.query.get(product_id)
if not product:
log_warning("Product not found",
product_id=product_id,
result="not_found"
)
return {"error": "Product not found"}, 404
# キャッシュに保存
cache.set(f"product:{product_id}", product.to_dict(), timeout=3600)
log_info("Product fetch completed",
product_id=product_id,
product_name=product.name,
product_category=product.category,
operation_result="success"
)
return product.to_dict()
except DatabaseError as e:
log_error("Database error occurred",
product_id=product_id,
error=e,
operation="product_fetch",
error_category="database"
)
return {"error": "Internal server error"}, 500
except Exception as e:
log_error("Unexpected error occurred",
product_id=product_id,
error=e,
operation="product_fetch",
error_category="unexpected"
)
return {"error": "Internal server error"}, 500
def generate_request_id():
"""リクエストID生成"""
import uuid
return str(uuid.uuid4())
def get_current_user_id():
"""現在のユーザーID取得(認証システムから)"""
# 実際の認証システムと連携
return request.headers.get("X-User-ID")
コンテナ環境でのログ収集
DockerおよびKubernetes環境での効率的なログ収集設定を解説します。
# Docker Compose でのDatadog Agent設定
version: '3.8'
services:
datadog-agent:
image: gcr.io/datadoghq/agent:7
environment:
- DD_API_KEY=${DD_API_KEY}
- DD_SITE=datadoghq.com
- DD_LOGS_ENABLED=true
- DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL=true
- DD_CONTAINER_EXCLUDE="name:datadog-agent"
- DD_ENV=production
- DD_SERVICE=ecommerce-platform
- DD_VERSION=2.1.0
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- /proc/:/host/proc/:ro
- /opt/datadog-agent/run:/opt/datadog-agent/run:rw
- /sys/fs/cgroup/:/host/sys/fs/cgroup:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
labels:
com.datadoghq.ad.logs: '[{"source": "datadog-agent", "service": "datadog-agent"}]'
web-app:
build: .
environment:
- DD_ENV=production
- DD_SERVICE=web-api
- DD_VERSION=2.1.0
labels:
# Datadog Autodiscoveryラベル
com.datadoghq.ad.logs: >
[{
"source": "python",
"service": "web-api",
"log_processing_rules": [
{
"type": "multi_line",
"name": "python_traceback",
"pattern": "Traceback \\(most recent call last\\):"
}
]
}]
depends_on:
- datadog-agent
nginx:
image: nginx:alpine
labels:
# Nginx ログの構造化パース設定
com.datadoghq.ad.logs: >
[{
"source": "nginx",
"service": "nginx",
"log_processing_rules": [
{
"type": "exclude_at_match",
"name": "exclude_healthcheck",
"pattern": "GET /health"
}
]
}]
# Kubernetes DaemonSet でのDatadog Agent設定
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: datadog-agent
namespace: default
spec:
selector:
matchLabels:
app: datadog-agent
template:
metadata:
labels:
app: datadog-agent
name: datadog-agent
spec:
serviceAccountName: datadog-agent
containers:
- image: gcr.io/datadoghq/agent:7
imagePullPolicy: Always
name: datadog-agent
ports:
- containerPort: 8125
name: dogstatsdport
protocol: UDP
- containerPort: 8126
name: traceport
protocol: TCP
env:
- name: DD_API_KEY
valueFrom:
secretKeyRef:
name: datadog-secret
key: api-key
- name: DD_SITE
value: "datadoghq.com"
- name: DD_LOGS_ENABLED
value: "true"
- name: DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL
value: "true"
- name: DD_LOGS_CONFIG_K8S_CONTAINER_USE_FILE
value: "true"
- name: DD_HEALTH_PORT
value: "5555"
- name: KUBERNETES
value: "true"
- name: DD_KUBERNETES_KUBELET_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: dockersocketdir
mountPath: /var/run/docker.sock
readOnly: true
- name: procdir
mountPath: /host/proc
readOnly: true
- name: cgroups
mountPath: /host/sys/fs/cgroup
readOnly: true
- name: s6-run
mountPath: /var/run/s6
- name: logpodpath
mountPath: /var/log/pods
readOnly: true
- name: logcontainerpath
mountPath: /var/lib/docker/containers
readOnly: true
livenessProbe:
httpGet:
path: /health
port: 5555
initialDelaySeconds: 15
periodSeconds: 15
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
volumes:
- name: dockersocketdir
hostPath:
path: /var/run/docker.sock
- name: procdir
hostPath:
path: /proc
- name: cgroups
hostPath:
path: /sys/fs/cgroup
- name: s6-run
emptyDir: {}
- name: logpodpath
hostPath:
path: /var/log/pods
- name: logcontainerpath
hostPath:
path: /var/lib/docker/containers
---
# アプリケーションPodでのログ設定
apiVersion: apps/v1
kind: Deployment
metadata:
name: ecommerce-api
spec:
replicas: 3
selector:
matchLabels:
app: ecommerce-api
template:
metadata:
labels:
app: ecommerce-api
annotations:
# Datadog Autodiscovery annotations
ad.datadoghq.com/ecommerce-api.logs: >
[{
"source": "python",
"service": "ecommerce-api",
"log_processing_rules": [
{
"type": "multi_line",
"name": "python_traceback",
"pattern": "Traceback \\(most recent call last\\):"
},
{
"type": "exclude_at_match",
"name": "exclude_healthcheck",
"pattern": "GET /health"
}
]
}]
spec:
containers:
- name: ecommerce-api
image: ecommerce-api:2.1.0
env:
- name: DD_ENV
value: "production"
- name: DD_SERVICE
value: "ecommerce-api"
- name: DD_VERSION
value: "2.1.0"
ports:
- containerPort: 8080
ログパーサーとパイプライン
カスタムパーサーの作成
Datadogでは、様々なログフォーマットに対応するためのカスタムパーサーを作成できます。
# Datadog Log Pipeline API を使用したパーサー作成例
import requests
import json
class DatadogLogPipelineManager:
"""
Datadog Log Pipeline 管理クラス
"""
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.base_url = "https://api.datadoghq.com/api/v1"
self.headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key
}
def create_grok_parser(self, name, grok_pattern, samples=None):
"""
Grokパーサーの作成
"""
parser_config = {
"type": "grok-parser",
"name": name,
"is_enabled": True,
"source": "message",
"grok": {
"support_rules": "",
"match_rules": grok_pattern
}
}
if samples:
parser_config["samples"] = samples
return self._create_parser(parser_config)
def create_json_parser(self, name, source_field="message"):
"""
JSONパーサーの作成
"""
parser_config = {
"type": "json-parser",
"name": name,
"is_enabled": True,
"source": source_field
}
return self._create_parser(parser_config)
def create_date_parser(self, name, date_format, source_field="timestamp"):
"""
日付パーサーの作成
"""
parser_config = {
"type": "date-parser",
"name": name,
"is_enabled": True,
"source": source_field,
"target": "@timestamp",
"format": date_format
}
return self._create_parser(parser_config)
def create_nginx_pipeline(self):
"""
Nginxアクセスログ用パイプライン
"""
# Nginxログフォーマット:
# log_format combined '$remote_addr - $remote_user [$time_local] '
# '"$request" $status $body_bytes_sent '
# '"$http_referer" "$http_user_agent"';
grok_pattern = """
nginx_access %{IPORHOST:network.client.ip} - %{USER:user.name} \\[%{HTTPDATE:timestamp}\\] "%{WORD:http.method} %{URIPATH:http.url_details.path}(?:%{URIPARAM:http.url_details.queryString})? HTTP/%{NUMBER:http.version}" %{INT:http.status_code} %{INT:network.bytes_written} "%{DATA:http.referer}" "%{DATA:http.useragent}"
"""
pipeline_config = {
"name": "Nginx Access Logs",
"is_enabled": True,
"filter": {
"query": "source:nginx"
},
"processors": [
# 1. Grokパーサー
{
"type": "grok-parser",
"name": "Parse Nginx Access Log",
"is_enabled": True,
"source": "message",
"grok": {
"support_rules": "",
"match_rules": grok_pattern
}
},
# 2. 日付パーサー
{
"type": "date-parser",
"name": "Parse Timestamp",
"is_enabled": True,
"source": "timestamp",
"target": "@timestamp",
"format": "dd/MMM/yyyy:HH:mm:ss Z"
},
# 3. ステータスコードカテゴリ
{
"type": "category-processor",
"name": "Categorize Status Code",
"is_enabled": True,
"target": "http.status_category",
"categories": [
{"filter": {"query": "@http.status_code:[200 TO 299]"}, "name": "OK"},
{"filter": {"query": "@http.status_code:[300 TO 399]"}, "name": "Redirect"},
{"filter": {"query": "@http.status_code:[400 TO 499]"}, "name": "Client Error"},
{"filter": {"query": "@http.status_code:[500 TO 599]"}, "name": "Server Error"}
]
},
# 4. User Agentパーサー
{
"type": "user-agent-parser",
"name": "Parse User Agent",
"is_enabled": True,
"source": "http.useragent",
"target": "http.useragent_details"
},
# 5. URLパーサー
{
"type": "url-parser",
"name": "Parse URL",
"is_enabled": True,
"source": "http.url_details.path",
"target": "http.url_details"
},
# 6. 地理的情報の抽出
{
"type": "geo-ip-parser",
"name": "Enrich with GeoIP",
"is_enabled": True,
"source": "network.client.ip",
"target": "network.client.geoip"
}
]
}
return self._create_pipeline(pipeline_config)
def create_application_error_pipeline(self):
"""
アプリケーションエラーログ用パイプライン
"""
pipeline_config = {
"name": "Application Error Logs",
"is_enabled": True,
"filter": {
"query": "source:python level:error"
},
"processors": [
# 1. JSONパーサー
{
"type": "json-parser",
"name": "Parse JSON Log",
"is_enabled": True,
"source": "message"
},
# 2. ログレベルの正規化
{
"type": "string-builder-processor",
"name": "Normalize Log Level",
"is_enabled": True,
"target": "level",
"template": "%{level}",
"is_replace_missing": False
},
# 3. エラータイプの分類
{
"type": "category-processor",
"name": "Categorize Error Type",
"is_enabled": True,
"target": "error.category",
"categories": [
{"filter": {"query": "error.type:DatabaseError"}, "name": "Database"},
{"filter": {"query": "error.type:ValidationError"}, "name": "Validation"},
{"filter": {"query": "error.type:PermissionError"}, "name": "Authorization"},
{"filter": {"query": "error.type:TimeoutError"}, "name": "Timeout"},
{"filter": {"query": "error.type:ConnectionError"}, "name": "Network"}
]
},
# 4. スタックトレースの抽出
{
"type": "trace-id-remapper",
"name": "Map Trace ID",
"is_enabled": True,
"sources": ["dd.trace_id"]
},
# 5. アラート用タグ追加
{
"type": "string-builder-processor",
"name": "Add Alert Tag",
"is_enabled": True,
"target": "alert.required",
"template": "true",
"is_replace_missing": False
}
]
}
return self._create_pipeline(pipeline_config)
def _create_parser(self, parser_config):
"""
パーサー作成の内部メソッド
"""
url = f"{self.base_url}/logs/config/processors"
response = requests.post(url, headers=self.headers, data=json.dumps(parser_config))
return response.json()
def _create_pipeline(self, pipeline_config):
"""
パイプライン作成の内部メソッド
"""
url = f"{self.base_url}/logs/config/pipelines"
response = requests.post(url, headers=self.headers, data=json.dumps(pipeline_config))
return response.json()
# 使用例
pipeline_manager = DatadogLogPipelineManager(
api_key="your-api-key",
app_key="your-app-key"
)
# Nginxパイプラインの作成
nginx_pipeline = pipeline_manager.create_nginx_pipeline()
# アプリケーションエラーパイプラインの作成
error_pipeline = pipeline_manager.create_application_error_pipeline()
高度なログ処理ルール
# ログ処理ルールの詳細設定例
log_processing_rules:
# 1. 機密情報のマスキング
- type: mask_sequences
name: mask_credit_card
pattern: '\b(?:\d{4}[-\s]?){3}\d{4}\b'
replace_placeholder: "[MASKED-CREDIT-CARD]"
- type: mask_sequences
name: mask_email
pattern: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
replace_placeholder: "[MASKED-EMAIL]"
- type: mask_sequences
name: mask_password
pattern: '"password":\s*"[^"]*"'
replace_placeholder: '"password": "[MASKED]"'
# 2. ノイズログの除外
- type: exclude_at_match
name: exclude_health_checks
pattern: 'GET /health|GET /ping|GET /status'
- type: exclude_at_match
name: exclude_debug_logs
pattern: 'level:debug|DEBUG|TRACE'
# 3. 重要ログの優先処理
- type: include_at_match
name: include_critical_errors
pattern: 'level:(error|critical|fatal)|ERROR|CRITICAL|FATAL'
# 4. マルチライン処理
- type: multi_line
name: java_stack_trace
pattern: '^Exception|^\s+at\s+|^\s+\.\.\.\s+\d+\s+more|^\s*Caused by:'
- type: multi_line
name: python_traceback
pattern: 'Traceback \(most recent call last\):'
# 5. ログサンプリング
- type: sampling
name: sample_info_logs
rate: 0.1 # 10%のサンプリング
pattern: 'level:info'
ログアーカイブと保持設定
インテリジェントアーカイブ戦略
# Datadog Log Archive API を使用したアーカイブ設定
class DatadogLogArchiveManager:
"""
Datadogログアーカイブ管理クラス
"""
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.base_url = "https://api.datadoghq.com/api/v2"
self.headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key
}
def create_s3_archive(self, name, bucket, path, access_key_id, secret_access_key, region="us-east-1"):
"""
S3ログアーカイブの作成
"""
archive_config = {
"data": {
"type": "archives",
"attributes": {
"name": name,
"query": "*", # すべてのログをアーカイブ
"destination": {
"type": "s3",
"container": bucket,
"path": path,
"region": region,
"account_id": "",
"integration": {
"access_key_id": access_key_id,
"secret_access_key": secret_access_key
}
},
"rehydration_tags": ["env", "service", "version"],
"include_tags": True
}
}
}
url = f"{self.base_url}/logs/config/archives"
response = requests.post(url, headers=self.headers, data=json.dumps(archive_config))
return response.json()
def create_tiered_archive_strategy(self):
"""
階層化アーカイブ戦略の実装
"""
strategies = []
# ホットデータ(0-7日): インデックス済み、高速検索
hot_archive = {
"name": "Hot Data Archive",
"query": "*",
"retention_days": 7,
"storage_class": "hot",
"description": "Recent logs for real-time analysis"
}
strategies.append(hot_archive)
# ウォームデータ(8-30日): 圧縮済み、中速検索
warm_archive = {
"name": "Warm Data Archive",
"query": "level:(warn OR error OR critical)",
"retention_days": 30,
"storage_class": "warm",
"description": "Important logs for trend analysis"
}
strategies.append(warm_archive)
# コールドデータ(31日-1年): 高圧縮、検索には時間要
cold_archive = {
"name": "Cold Data Archive",
"query": "level:error OR source:security",
"retention_days": 365,
"storage_class": "cold",
"description": "Critical logs for compliance and audit"
}
strategies.append(cold_archive)
return strategies
def create_compliance_archive(self):
"""
コンプライアンス要件に対応するアーカイブ設定
"""
compliance_configs = {
# GDPR対応: 個人データの適切な管理
"gdpr_archive": {
"name": "GDPR Compliance Archive",
"query": "source:application @user_id:*",
"retention_days": 2555, # 7年間
"encryption": True,
"immutable": True,
"audit_logging": True,
"data_classification": "personal_data"
},
# SOX対応: 財務関連ログの長期保存
"sox_archive": {
"name": "SOX Compliance Archive",
"query": "source:financial OR service:payment",
"retention_days": 2555, # 7年間
"encryption": True,
"immutable": True,
"audit_logging": True,
"data_classification": "financial_data"
},
# PCI DSS対応: 決済関連ログ
"pci_archive": {
"name": "PCI DSS Compliance Archive",
"query": "source:payment OR @credit_card:*",
"retention_days": 365, # 1年間
"encryption": True,
"immutable": True,
"audit_logging": True,
"data_classification": "payment_data"
}
}
return compliance_configs
def calculate_storage_costs(self, daily_log_volume_gb, retention_days, storage_tier="warm"):
"""
ストレージコストの計算
"""
# AWS S3ストレージ料金例(2024年基準)
storage_rates = {
"hot": 0.023, # $0.023/GB/月 (S3 Standard)
"warm": 0.0125, # $0.0125/GB/月 (S3 Standard-IA)
"cold": 0.004 # $0.004/GB/月 (S3 Glacier)
}
# 月間ストレージ容量の計算
monthly_storage_gb = daily_log_volume_gb * retention_days
# 月間コスト
monthly_cost = monthly_storage_gb * storage_rates[storage_tier]
# 年間コスト
annual_cost = monthly_cost * 12
return {
"daily_volume_gb": daily_log_volume_gb,
"retention_days": retention_days,
"storage_tier": storage_tier,
"monthly_storage_gb": monthly_storage_gb,
"monthly_cost_usd": round(monthly_cost, 2),
"annual_cost_usd": round(annual_cost, 2),
"cost_per_gb_month": storage_rates[storage_tier]
}
def optimize_log_retention(self, services_config):
"""
サービス別ログ保持期間の最適化
"""
optimized_config = {}
for service, config in services_config.items():
log_volume = config.get("daily_volume_gb", 0)
criticality = config.get("criticality", "medium")
compliance_required = config.get("compliance_required", False)
# サービス重要度に基づく保持期間
if criticality == "critical":
hot_retention = 30
warm_retention = 90
cold_retention = 365
elif criticality == "high":
hot_retention = 14
warm_retention = 60
cold_retention = 180
elif criticality == "medium":
hot_retention = 7
warm_retention = 30
cold_retention = 90
else: # low
hot_retention = 3
warm_retention = 14
cold_retention = 30
# コンプライアンス要件の調整
if compliance_required:
cold_retention = max(cold_retention, 2555) # 7年間
optimized_config[service] = {
"hot_retention_days": hot_retention,
"warm_retention_days": warm_retention,
"cold_retention_days": cold_retention,
"estimated_costs": {
"hot": self.calculate_storage_costs(log_volume, hot_retention, "hot"),
"warm": self.calculate_storage_costs(log_volume, warm_retention, "warm"),
"cold": self.calculate_storage_costs(log_volume, cold_retention, "cold")
}
}
return optimized_config
# 使用例
archive_manager = DatadogLogArchiveManager(
api_key="your-api-key",
app_key="your-app-key"
)
# サービス設定例
services = {
"web-api": {
"daily_volume_gb": 5.2,
"criticality": "critical",
"compliance_required": True
},
"background-worker": {
"daily_volume_gb": 1.8,
"criticality": "high",
"compliance_required": False
},
"frontend": {
"daily_volume_gb": 0.5,
"criticality": "medium",
"compliance_required": False
}
}
# アーカイブ戦略の最適化
optimized_config = archive_manager.optimize_log_retention(services)
print(json.dumps(optimized_config, indent=2))
ログ検索とライブテール
高度なログ検索手法
# Datadog Log Search API を使用した高度検索
class DatadogLogSearcher:
"""
Datadog高度ログ検索クラス
"""
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.base_url = "https://api.datadoghq.com/api/v1"
self.headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key
}
def search_error_patterns(self, service, time_range="1h", limit=100):
"""
エラーパターンの検索と分析
"""
query = f"service:{service} level:error"
search_params = {
"query": query,
"time": {
"from": f"now-{time_range}",
"to": "now"
},
"sort": "desc",
"limit": limit
}
url = f"{self.base_url}/logs-queries/list"
response = requests.post(url, headers=self.headers, data=json.dumps(search_params))
if response.status_code == 200:
logs = response.json()
return self._analyze_error_patterns(logs)
else:
return {"error": "Search failed", "status": response.status_code}
def _analyze_error_patterns(self, logs):
"""
エラーパターンの分析
"""
error_analysis = {
"total_errors": len(logs.get("logs", [])),
"error_types": {},
"error_frequency": {},
"affected_endpoints": {},
"time_distribution": {}
}
for log in logs.get("logs", []):
# エラータイプの集計
error_type = log.get("attributes", {}).get("error", {}).get("type", "Unknown")
error_analysis["error_types"][error_type] = error_analysis["error_types"].get(error_type, 0) + 1
# エンドポイント分析
endpoint = log.get("attributes", {}).get("request", {}).get("path", "Unknown")
error_analysis["affected_endpoints"][endpoint] = error_analysis["affected_endpoints"].get(endpoint, 0) + 1
# 時間分布
timestamp = log.get("attributes", {}).get("@timestamp")
if timestamp:
hour = timestamp[:13] # YYYY-MM-DDTHH形式
error_analysis["time_distribution"][hour] = error_analysis["time_distribution"].get(hour, 0) + 1
return error_analysis
def search_performance_anomalies(self, service, duration_threshold_ms=5000, time_range="4h"):
"""
パフォーマンス異常の検索
"""
query = f"service:{service} @duration:>[{duration_threshold_ms}]"
search_params = {
"query": query,
"time": {
"from": f"now-{time_range}",
"to": "now"
},
"sort": "desc",
"limit": 100,
"aggregation": {
"type": "count",
"group_by": [
{"field": "@http.method"},
{"field": "@http.url_details.path"}
]
}
}
url = f"{self.base_url}/logs-queries/list"
response = requests.post(url, headers=self.headers, data=json.dumps(search_params))
return response.json()
def search_security_events(self, time_range="24h"):
"""
セキュリティイベントの検索
"""
security_queries = [
# 認証失敗
"source:auth level:warning @event_type:authentication_failed",
# 権限エラー
"level:error @http.status_code:403",
# 異常なアクセスパターン
"@network.client.ip:* @http.status_code:404",
# SQLインジェクション試行
"message:*UNION* OR message:*SELECT* AND level:warning"
]
security_analysis = {}
for query in security_queries:
search_params = {
"query": query,
"time": {
"from": f"now-{time_range}",
"to": "now"
},
"sort": "desc",
"limit": 50
}
url = f"{self.base_url}/logs-queries/list"
response = requests.post(url, headers=self.headers, data=json.dumps(search_params))
if response.status_code == 200:
security_analysis[query] = response.json()
return security_analysis
def create_saved_view(self, name, query, columns=None):
"""
保存されたビューの作成
"""
if columns is None:
columns = ["@timestamp", "level", "service", "message"]
view_config = {
"data": {
"type": "logs_saved_views",
"attributes": {
"name": name,
"query": query,
"columns": columns
}
}
}
url = f"{self.base_url}/logs/config/saved_views"
response = requests.post(url, headers=self.headers, data=json.dumps(view_config))
return response.json()
def live_tail_errors(self, service, callback_function):
"""
エラーログのライブテール(WebSocket使用)
"""
import websocket
import json
def on_message(ws, message):
log_data = json.loads(message)
if log_data.get("level") in ["error", "critical", "fatal"]:
callback_function(log_data)
def on_error(ws, error):
print(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg):
print("WebSocket connection closed")
def on_open(ws):
# ライブテールクエリの送信
subscribe_message = {
"action": "subscribe",
"query": f"service:{service} level:error"
}
ws.send(json.dumps(subscribe_message))
websocket_url = f"wss://api.datadoghq.com/v1/logs/tail?dd-api-key={self.api_key}"
ws = websocket.WebSocketApp(
websocket_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever()
# 使用例
searcher = DatadogLogSearcher(
api_key="your-api-key",
app_key="your-app-key"
)
# エラーパターン分析
error_patterns = searcher.search_error_patterns("web-api", "2h")
print("Error Patterns:", json.dumps(error_patterns, indent=2))
# パフォーマンス異常検索
performance_issues = searcher.search_performance_anomalies("web-api", 3000, "1h")
# セキュリティイベント検索
security_events = searcher.search_security_events("6h")
# ライブテールの設定
def handle_error_log(log_data):
print(f"ALERT: New error in {log_data.get('service')}: {log_data.get('message')}")
# Slack通知やPagerDuty連携などの処理
# searcher.live_tail_errors("web-api", handle_error_log)
5.2 ログ分析と可視化
ログベースメトリクス
カスタムメトリクスの生成
ログベースメトリクスにより、ログデータからカスタムビジネスメトリクスを生成し、ダッシュボードやアラートで活用できます。
# Datadog Log-based Metrics API
class DatadogLogMetricsManager:
"""
Datadogログベースメトリクス管理クラス
"""
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.base_url = "https://api.datadoghq.com/api/v2"
self.headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key
}
def create_error_rate_metric(self):
"""
エラー率メトリクスの作成
"""
metric_config = {
"data": {
"type": "logs_metrics",
"id": "error_rate_by_service",
"attributes": {
"compute": {
"aggregation_type": "count"
},
"filter": {
"query": "level:error"
},
"group_by": [
{
"facet": "service",
"limit": 10,
"sort": {
"aggregation": "count",
"order": "desc"
}
}
]
}
}
}
return self._create_metric(metric_config)
def create_response_time_metric(self):
"""
レスポンス時間メトリクスの作成
"""
metric_config = {
"data": {
"type": "logs_metrics",
"id": "avg_response_time",
"attributes": {
"compute": {
"aggregation_type": "distribution",
"path": "@duration"
},
"filter": {
"query": "source:nginx @duration:*"
},
"group_by": [
{
"facet": "service",
"limit": 10
},
{
"facet": "@http.url_details.path",
"limit": 20
}
]
}
}
}
return self._create_metric(metric_config)
def create_business_kpi_metrics(self):
"""
ビジネスKPIメトリクスの作成
"""
business_metrics = []
# 1. ユーザー登録数
user_registration_metric = {
"data": {
"type": "logs_metrics",
"id": "user_registrations",
"attributes": {
"compute": {
"aggregation_type": "count"
},
"filter": {
"query": "source:application @event_type:user_registered"
},
"group_by": [
{
"facet": "@user.source",
"limit": 10
}
]
}
}
}
business_metrics.append(user_registration_metric)
# 2. 購入金額合計
purchase_amount_metric = {
"data": {
"type": "logs_metrics",
"id": "total_purchase_amount",
"attributes": {
"compute": {
"aggregation_type": "sum",
"path": "@order.total"
},
"filter": {
"query": "source:ecommerce @event_type:purchase_completed"
},
"group_by": [
{
"facet": "@order.payment_method",
"limit": 5
},
{
"facet": "@user.segment",
"limit": 10
}
]
}
}
}
business_metrics.append(purchase_amount_metric)
# 3. API使用量
api_usage_metric = {
"data": {
"type": "logs_metrics",
"id": "api_usage_by_endpoint",
"attributes": {
"compute": {
"aggregation_type": "count"
},
"filter": {
"query": "source:api-gateway @http.method:*"
},
"group_by": [
{
"facet": "@http.method",
"limit": 10
},
{
"facet": "@http.url_details.path",
"limit": 50
},
{
"facet": "@client.api_key",
"limit": 100
}
]
}
}
}
business_metrics.append(api_usage_metric)
return business_metrics
def create_security_metrics(self):
"""
セキュリティメトリクスの作成
"""
security_metrics = []
# 1. 認証失敗回数
auth_failures_metric = {
"data": {
"type": "logs_metrics",
"id": "authentication_failures",
"attributes": {
"compute": {
"aggregation_type": "count"
},
"filter": {
"query": "source:auth @event_type:authentication_failed"
},
"group_by": [
{
"facet": "@network.client.ip",
"limit": 100
},
{
"facet": "@auth.failure_reason",
"limit": 10
}
]
}
}
}
security_metrics.append(auth_failures_metric)
# 2. 異常アクセスパターン
anomalous_access_metric = {
"data": {
"type": "logs_metrics",
"id": "anomalous_access_patterns",
"attributes": {
"compute": {
"aggregation_type": "count"
},
"filter": {
"query": "@http.status_code:[400 TO 499] OR @suspicious_activity:true"
},
"group_by": [
{
"facet": "@network.client.ip",
"limit": 50
},
{
"facet": "@http.status_code",
"limit": 10
}
]
}
}
}
security_metrics.append(anomalous_access_metric)
return security_metrics
def _create_metric(self, metric_config):
"""
メトリクス作成の内部メソッド
"""
url = f"{self.base_url}/logs/config/metrics"
response = requests.post(url, headers=self.headers, data=json.dumps(metric_config))
return response.json()
# ログベースメトリクスの使用例
metrics_manager = DatadogLogMetricsManager(
api_key="your-api-key",
app_key="your-app-key"
)
# 各種メトリクスの作成
error_rate_metric = metrics_manager.create_error_rate_metric()
response_time_metric = metrics_manager.create_response_time_metric()
business_metrics = metrics_manager.create_business_kpi_metrics()
security_metrics = metrics_manager.create_security_metrics()
ログパターン分析
自動パターン検出とクラスタリング
# ログパターン分析とクラスタリング
from collections import defaultdict, Counter
import re
from datetime import datetime, timedelta
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
class LogPatternAnalyzer:
"""
ログパターン分析とクラスタリングクラス
"""
def __init__(self):
self.pattern_cache = {}
self.anomaly_threshold = 0.8
self.min_cluster_size = 5
def extract_log_templates(self, log_messages, similarity_threshold=0.9):
"""
ログメッセージからテンプレートを抽出
"""
# 1. 前処理
processed_messages = []
for message in log_messages:
# 数値、ID、タイムスタンプを変数化
processed = self._normalize_log_message(message)
processed_messages.append(processed)
# 2. TF-IDFベクトル化
vectorizer = TfidfVectorizer(
ngram_range=(1, 3),
max_features=1000,
stop_words='english',
min_df=2
)
tfidf_matrix = vectorizer.fit_transform(processed_messages)
# 3. クラスタリング
clustering = DBSCAN(
eps=1-similarity_threshold,
min_samples=self.min_cluster_size,
metric='cosine'
)
cluster_labels = clustering.fit_predict(tfidf_matrix)
# 4. クラスター分析
clusters = defaultdict(list)
for idx, label in enumerate(cluster_labels):
if label != -1: # ノイズ除外
clusters[label].append({
'original_message': log_messages[idx],
'processed_message': processed_messages[idx],
'index': idx
})
# 5. テンプレート生成
templates = {}
for cluster_id, messages in clusters.items():
template = self._generate_template_from_cluster(messages)
templates[cluster_id] = {
'template': template,
'count': len(messages),
'examples': messages[:5], # 例を5つまで保存
'frequency': len(messages) / len(log_messages)
}
return templates
def _normalize_log_message(self, message):
"""
ログメッセージの正規化
"""
# タイムスタンプの正規化
message = re.sub(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', '<TIMESTAMP>', message)
# IPアドレスの正規化
message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message)
# UUID/IDの正規化
message = re.sub(r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b', '<UUID>', message)
# 数値の正規化
message = re.sub(r'\b\d+\b', '<NUMBER>', message)
# ファイルパスの正規化
message = re.sub(r'/[^\s]+', '<PATH>', message)
# メールアドレスの正規化
message = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '<EMAIL>', message)
return message
def _generate_template_from_cluster(self, messages):
"""
クラスターからテンプレートを生成
"""
# 最も頻出する要素を特定してテンプレート化
processed_messages = [msg['processed_message'] for msg in messages]
# 単語レベルでの分析
word_positions = defaultdict(lambda: defaultdict(int))
max_length = 0
for message in processed_messages:
words = message.split()
max_length = max(max_length, len(words))
for pos, word in enumerate(words):
word_positions[pos][word] += 1
# テンプレート構築
template_words = []
for pos in range(max_length):
if pos in word_positions:
# 最頻出単語を選択(頻度50%以上)
total_messages = len(messages)
most_common_word = max(word_positions[pos].items(), key=lambda x: x[1])
if most_common_word[1] / total_messages >= 0.5:
template_words.append(most_common_word[0])
else:
template_words.append('<VARIABLE>')
return ' '.join(template_words)
def detect_anomalous_patterns(self, log_messages, time_window_hours=1):
"""
異常なログパターンの検出
"""
current_time = datetime.now()
time_threshold = current_time - timedelta(hours=time_window_hours)
# 時間窓内のログパターンを分析
recent_patterns = defaultdict(int)
total_logs = 0
for message in log_messages:
normalized = self._normalize_log_message(message)
recent_patterns[normalized] += 1
total_logs += 1
# 異常パターンの特定
anomalies = []
for pattern, count in recent_patterns.items():
frequency = count / total_logs
# 履歴データとの比較(実際の実装では外部データソースを使用)
historical_frequency = self._get_historical_frequency(pattern)
if historical_frequency > 0:
deviation = abs(frequency - historical_frequency) / historical_frequency
if deviation > self.anomaly_threshold:
anomalies.append({
'pattern': pattern,
'current_frequency': frequency,
'historical_frequency': historical_frequency,
'deviation': deviation,
'count': count,
'severity': self._calculate_severity(deviation, count)
})
return sorted(anomalies, key=lambda x: x['severity'], reverse=True)
def _get_historical_frequency(self, pattern):
"""
パターンの履歴頻度を取得(デモ用)
"""
# 実際の実装では、過去のデータから頻度を計算
if pattern in self.pattern_cache:
return self.pattern_cache[pattern]
else:
# デフォルト値(新しいパターン)
return 0.01
def _calculate_severity(self, deviation, count):
"""
異常の重要度を計算
"""
# 偏差と発生回数を組み合わせて重要度を算出
severity_score = deviation * np.log(count + 1)
if severity_score > 5.0:
return "CRITICAL"
elif severity_score > 3.0:
return "HIGH"
elif severity_score > 1.0:
return "MEDIUM"
else:
return "LOW"
def analyze_error_patterns(self, error_logs):
"""
エラーパターンの詳細分析
"""
error_analysis = {
'total_errors': len(error_logs),
'error_categories': defaultdict(int),
'temporal_patterns': defaultdict(int),
'correlation_patterns': [],
'root_causes': []
}
for log in error_logs:
# エラーカテゴリの分類
category = self._categorize_error(log)
error_analysis['error_categories'][category] += 1
# 時間パターンの分析
timestamp = log.get('timestamp', '')
if timestamp:
hour = timestamp.split('T')[1][:2] if 'T' in timestamp else timestamp.split(' ')[1][:2]
error_analysis['temporal_patterns'][hour] += 1
# 相関パターンの検出
error_analysis['correlation_patterns'] = self._detect_error_correlations(error_logs)
# 根本原因の推定
error_analysis['root_causes'] = self._estimate_root_causes(error_logs)
return error_analysis
def _categorize_error(self, log):
"""
エラーの分類
"""
message = log.get('message', '').lower()
if any(keyword in message for keyword in ['database', 'sql', 'connection', 'timeout']):
return 'DATABASE'
elif any(keyword in message for keyword in ['network', 'connection refused', 'unreachable']):
return 'NETWORK'
elif any(keyword in message for keyword in ['permission', 'unauthorized', 'forbidden']):
return 'AUTHORIZATION'
elif any(keyword in message for keyword in ['validation', 'invalid', 'format']):
return 'VALIDATION'
elif any(keyword in message for keyword in ['memory', 'out of memory', 'heap']):
return 'MEMORY'
else:
return 'OTHER'
def _detect_error_correlations(self, error_logs):
"""
エラー間の相関パターン検出
"""
correlations = []
# 時間的相関の検出
time_sorted_logs = sorted(error_logs, key=lambda x: x.get('timestamp', ''))
for i in range(len(time_sorted_logs) - 1):
current_log = time_sorted_logs[i]
next_log = time_sorted_logs[i + 1]
# 時間差が5分以内の場合、相関の可能性
time_diff = self._calculate_time_diff(
current_log.get('timestamp', ''),
next_log.get('timestamp', '')
)
if time_diff <= 300: # 5分
correlations.append({
'first_error': current_log.get('message', '')[:100],
'second_error': next_log.get('message', '')[:100],
'time_diff_seconds': time_diff,
'correlation_strength': 1.0 / (time_diff + 1)
})
return correlations[:10] # 上位10個の相関
def _estimate_root_causes(self, error_logs):
"""
根本原因の推定
"""
root_causes = []
# パターンベースの根本原因推定
patterns = {
'cascading_failure': {
'pattern': r'(service|connection)\s+(unavailable|timeout|refused)',
'description': 'Cascading service failure detected',
'recommended_action': 'Check upstream service health'
},
'resource_exhaustion': {
'pattern': r'(memory|disk|cpu)\s+(limit|exhausted|full)',
'description': 'Resource exhaustion detected',
'recommended_action': 'Scale resources or optimize usage'
},
'configuration_error': {
'pattern': r'(config|configuration|property)\s+(missing|invalid|not found)',
'description': 'Configuration issue detected',
'recommended_action': 'Verify configuration settings'
}
}
for cause_name, cause_info in patterns.items():
matching_logs = [
log for log in error_logs
if re.search(cause_info['pattern'], log.get('message', ''), re.IGNORECASE)
]
if matching_logs:
root_causes.append({
'cause': cause_name,
'description': cause_info['description'],
'recommended_action': cause_info['recommended_action'],
'affected_logs_count': len(matching_logs),
'confidence': min(len(matching_logs) / len(error_logs) * 100, 100)
})
return sorted(root_causes, key=lambda x: x['confidence'], reverse=True)
def _calculate_time_diff(self, timestamp1, timestamp2):
"""
タイムスタンプ間の差分を秒で計算
"""
try:
dt1 = datetime.fromisoformat(timestamp1.replace('Z', '+00:00'))
dt2 = datetime.fromisoformat(timestamp2.replace('Z', '+00:00'))
return abs((dt2 - dt1).total_seconds())
except:
return float('inf')
# ログパターン分析の使用例
analyzer = LogPatternAnalyzer()
# サンプルログメッセージ
sample_logs = [
"2024-01-15T10:30:15Z INFO User 12345 logged in from 192.168.1.100",
"2024-01-15T10:30:16Z INFO User 67890 logged in from 10.0.0.50",
"2024-01-15T10:30:17Z ERROR Database connection timeout for user 12345",
"2024-01-15T10:30:18Z ERROR Database connection timeout for user 67890",
"2024-01-15T10:30:20Z INFO User 11111 logged in from 172.16.0.25"
]
# パターン分析の実行
templates = analyzer.extract_log_templates(sample_logs)
anomalies = analyzer.detect_anomalous_patterns(sample_logs)
print("Detected Templates:")
for cluster_id, template_info in templates.items():
print(f"Cluster {cluster_id}: {template_info['template']} (Count: {template_info['count']})")
print("\nDetected Anomalies:")
for anomaly in anomalies:
print(f"Pattern: {anomaly['pattern'][:50]}... (Severity: {anomaly['severity']})")
ログ相関分析とコンテキスト
マルチソースログ相関
# マルチソースログ相関分析
class MultiSourceLogCorrelator:
"""
複数ソースからのログ相関分析クラス
"""
def __init__(self):
self.correlation_rules = self._initialize_correlation_rules()
self.context_window_seconds = 300 # 5分
def _initialize_correlation_rules(self):
"""
相関ルールの初期化
"""
return {
'request_flow': {
'sources': ['nginx', 'application', 'database'],
'correlation_fields': ['request_id', 'trace_id'],
'time_tolerance_seconds': 30
},
'error_cascade': {
'sources': ['application', 'database', 'cache'],
'correlation_fields': ['session_id', 'user_id'],
'time_tolerance_seconds': 60
},
'security_incident': {
'sources': ['auth', 'firewall', 'application'],
'correlation_fields': ['client_ip', 'user_id'],
'time_tolerance_seconds': 120
},
'performance_degradation': {
'sources': ['loadbalancer', 'application', 'database', 'cache'],
'correlation_fields': ['service_name', 'endpoint'],
'time_tolerance_seconds': 180
}
}
def correlate_request_flow(self, logs_by_source, request_id):
"""
リクエストフローの相関分析
"""
correlated_events = {
'request_id': request_id,
'flow_timeline': [],
'performance_metrics': {},
'error_points': [],
'bottlenecks': []
}
# 各ソースからのログを時系列で整理
all_logs = []
for source, logs in logs_by_source.items():
for log in logs:
if self._matches_request(log, request_id):
log['source'] = source
all_logs.append(log)
# 時系列でソート
all_logs.sort(key=lambda x: x.get('timestamp', ''))
# フロー分析
flow_stages = {
'request_received': None,
'authentication': None,
'business_logic': None,
'database_query': None,
'response_sent': None
}
total_duration = 0
stage_durations = {}
for i, log in enumerate(all_logs):
timestamp = log.get('timestamp', '')
source = log.get('source', '')
# ステージ特定
stage = self._identify_stage(log)
if stage and not flow_stages[stage]:
flow_stages[stage] = {
'timestamp': timestamp,
'source': source,
'log': log
}
# パフォーマンス指標の抽出
if 'duration' in log:
stage_durations[stage] = log['duration']
# エラーポイントの特定
if log.get('level') in ['error', 'warning']:
correlated_events['error_points'].append({
'timestamp': timestamp,
'source': source,
'error': log.get('message', ''),
'stage': stage
})
correlated_events['flow_timeline'].append({
'timestamp': timestamp,
'source': source,
'stage': stage,
'message': log.get('message', '')[:100],
'level': log.get('level', 'info')
})
# パフォーマンス分析
if flow_stages['request_received'] and flow_stages['response_sent']:
total_duration = self._calculate_duration(
flow_stages['request_received']['timestamp'],
flow_stages['response_sent']['timestamp']
)
correlated_events['performance_metrics']['total_duration_ms'] = total_duration
# ボトルネック特定
correlated_events['bottlenecks'] = self._identify_bottlenecks(stage_durations, total_duration)
return correlated_events
def _matches_request(self, log, request_id):
"""
ログがリクエストIDにマッチするかチェック
"""
return (log.get('request_id') == request_id or
log.get('trace_id') == request_id or
request_id in log.get('message', ''))
def _identify_stage(self, log):
"""
ログのフローステージを特定
"""
message = log.get('message', '').lower()
source = log.get('source', '')
if source == 'nginx' and 'request' in message:
return 'request_received'
elif 'auth' in message or 'login' in message:
return 'authentication'
elif source == 'database' or 'sql' in message:
return 'database_query'
elif source == 'nginx' and 'response' in message:
return 'response_sent'
else:
return 'business_logic'
def _calculate_duration(self, start_timestamp, end_timestamp):
"""
期間の計算(ミリ秒)
"""
try:
start = datetime.fromisoformat(start_timestamp.replace('Z', '+00:00'))
end = datetime.fromisoformat(end_timestamp.replace('Z', '+00:00'))
return (end - start).total_seconds() * 1000
except:
return 0
def _identify_bottlenecks(self, stage_durations, total_duration):
"""
ボトルネックの特定
"""
bottlenecks = []
if total_duration > 0:
for stage, duration in stage_durations.items():
if duration and duration / total_duration > 0.3: # 30%以上
bottlenecks.append({
'stage': stage,
'duration_ms': duration,
'percentage': (duration / total_duration) * 100,
'severity': 'high' if duration / total_duration > 0.5 else 'medium'
})
return sorted(bottlenecks, key=lambda x: x['percentage'], reverse=True)
def analyze_error_cascade(self, logs_by_source, time_window_minutes=5):
"""
エラーカスケードの分析
"""
cascade_analysis = {
'cascade_detected': False,
'initial_error': None,
'cascade_timeline': [],
'affected_services': set(),
'impact_scope': 'unknown'
}
# 時間窓内のエラーログを収集
error_logs = []
for source, logs in logs_by_source.items():
for log in logs:
if log.get('level') in ['error', 'critical']:
log['source'] = source
error_logs.append(log)
# 時系列でソート
error_logs.sort(key=lambda x: x.get('timestamp', ''))
if len(error_logs) < 2:
return cascade_analysis
# カスケード検出ロジック
initial_error = error_logs[0]
cascade_analysis['initial_error'] = {
'timestamp': initial_error.get('timestamp'),
'source': initial_error.get('source'),
'message': initial_error.get('message', '')[:100]
}
# 後続エラーの分析
cascade_errors = []
for error in error_logs[1:]:
time_diff = self._calculate_duration(
initial_error.get('timestamp', ''),
error.get('timestamp', '')
)
if time_diff <= time_window_minutes * 60 * 1000: # ミリ秒変換
cascade_errors.append({
'timestamp': error.get('timestamp'),
'source': error.get('source'),
'message': error.get('message', '')[:100],
'time_since_initial_ms': time_diff
})
cascade_analysis['affected_services'].add(error.get('source', ''))
if len(cascade_errors) >= 2:
cascade_analysis['cascade_detected'] = True
cascade_analysis['cascade_timeline'] = cascade_errors
cascade_analysis['impact_scope'] = self._calculate_impact_scope(cascade_analysis['affected_services'])
return cascade_analysis
def _calculate_impact_scope(self, affected_services):
"""
影響範囲の計算
"""
service_count = len(affected_services)
if service_count >= 5:
return 'critical'
elif service_count >= 3:
return 'high'
elif service_count >= 2:
return 'medium'
else:
return 'low'
def create_contextual_dashboard(self, correlation_results):
"""
相関分析結果のダッシュボード作成
"""
dashboard_config = {
"title": "Log Correlation Analysis Dashboard",
"description": "Multi-source log correlation and context analysis",
"widgets": []
}
# 1. リクエストフローのタイムライン
if 'flow_timeline' in correlation_results:
timeline_widget = {
"definition": {
"type": "timeseries",
"title": "Request Flow Timeline",
"requests": [
{
"q": "avg:request.duration{*} by {service,stage}",
"display_type": "line"
}
]
}
}
dashboard_config["widgets"].append(timeline_widget)
# 2. エラーカスケード表示
if correlation_results.get('cascade_detected'):
cascade_widget = {
"definition": {
"type": "alert_graph",
"title": "Error Cascade Detection",
"alert_id": "error_cascade_alert",
"viz_type": "timeseries"
}
}
dashboard_config["widgets"].append(cascade_widget)
# 3. パフォーマンスボトルネック
bottleneck_widget = {
"definition": {
"type": "toplist",
"title": "Performance Bottlenecks",
"requests": [
{
"q": "top(avg:stage.duration{*} by {stage}, 10, 'mean', 'desc')"
}
]
}
}
dashboard_config["widgets"].append(bottleneck_widget)
return dashboard_config
# 相関分析の使用例
correlator = MultiSourceLogCorrelator()
# サンプルログデータ
sample_logs_by_source = {
'nginx': [
{
'timestamp': '2024-01-15T10:30:00Z',
'message': 'Request received GET /api/users/123',
'request_id': 'req-123',
'level': 'info'
}
],
'application': [
{
'timestamp': '2024-01-15T10:30:01Z',
'message': 'Processing user request',
'request_id': 'req-123',
'level': 'info',
'duration': 150
}
],
'database': [
{
'timestamp': '2024-01-15T10:30:02Z',
'message': 'SELECT * FROM users WHERE id = 123',
'request_id': 'req-123',
'level': 'info',
'duration': 50
}
]
}
# リクエストフロー分析
flow_analysis = correlator.correlate_request_flow(sample_logs_by_source, 'req-123')
print("Request Flow Analysis:", json.dumps(flow_analysis, indent=2))
# エラーカスケード分析
cascade_analysis = correlator.analyze_error_cascade(sample_logs_by_source)
print("Cascade Analysis:", json.dumps(cascade_analysis, indent=2))
ログからのアラート設定
インテリジェントアラート設定
# ログベースアラートの高度設定
class DatadogLogAlerting:
"""
Datadogログアラート管理クラス
"""
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.base_url = "https://api.datadoghq.com/api/v1"
self.headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key
}
def create_error_rate_alert(self, service_name, threshold_percent=5.0):
"""
エラー率アラートの作成
"""
alert_config = {
"name": f"High Error Rate - {service_name}",
"type": "log alert",
"query": f"logs(\"service:{service_name} level:error\").index(\"*\").rollup(\"count\").by(\"service\").last(\"5m\") > {threshold_percent}",
"message": f"""
🚨 **High Error Rate Detected**
Service: {service_name}
Error Rate: {{{{value}}}}% (Threshold: {threshold_percent}%)
Time: {{{{last_triggered_ts}}}}
**Immediate Actions:**
1. Check service health dashboard
2. Review recent deployments
3. Investigate error patterns in logs
**Links:**
- [Service Dashboard](https://app.datadoghq.com/dashboard/service-{service_name})
- [Error Logs](https://app.datadoghq.com/logs?query=service:{service_name} level:error)
@slack-alerts @pagerduty
""",
"tags": ["service:" + service_name, "alert_type:error_rate"],
"options": {
"thresholds": {
"critical": threshold_percent * 2,
"warning": threshold_percent
},
"notify_audit": False,
"locked": False,
"timeout_h": 0,
"include_tags": True,
"no_data_timeframe": 10,
"notify_no_data": True,
"renotify_interval": 30,
"escalation_message": "Error rate is still high! Escalating to on-call engineer.",
"evaluation_delay": 60
}
}
return self._create_monitor(alert_config)
def create_anomaly_detection_alert(self, service_name):
"""
異常検知アラートの作成
"""
alert_config = {
"name": f"Log Anomaly Detection - {service_name}",
"type": "log alert",
"query": f"anomalies(logs(\"service:{service_name}\").index(\"*\").rollup(\"count\").by(\"service\"), \"basic\", 2, direction=\"both\", alert_window=\"last_10m\", interval=60, count_default_zero=\"true\") >= 1",
"message": f"""
🔍 **Log Anomaly Detected**
Service: {service_name}
Anomaly Score: {{{{value}}}}
Pattern: {{{{anomaly_type}}}}
**Context:**
- Expected pattern based on historical data
- Current behavior significantly deviates from normal
- Potential impact: {{{{impact_assessment}}}}
**Investigation Steps:**
1. Check concurrent metric anomalies
2. Review recent changes or deployments
3. Analyze log pattern changes
4. Validate against business events
@slack-dev-ops
""",
"tags": ["service:" + service_name, "alert_type:anomaly"],
"options": {
"thresholds": {
"critical": 2.0,
"warning": 1.0
},
"notify_audit": False,
"locked": False,
"include_tags": True,
"no_data_timeframe": 10,
"notify_no_data": True,
"renotify_interval": 60
}
}
return self._create_monitor(alert_config)
def create_security_alert(self):
"""
セキュリティアラートの作成
"""
security_alerts = []
# 1. 認証失敗の急増
auth_failure_alert = {
"name": "Authentication Failure Spike",
"type": "log alert",
"query": "logs(\"source:auth event_type:authentication_failed\").index(\"*\").rollup(\"count\").last(\"5m\") > 50",
"message": """
🛡️ **Security Alert: Authentication Failure Spike**
Failed Authentication Attempts: {{value}} in last 5 minutes
Threshold: 50 attempts
**Potential Threats:**
- Brute force attack
- Credential stuffing
- Account takeover attempt
**Immediate Response:**
1. Check source IP addresses
2. Review affected user accounts
3. Consider temporary rate limiting
4. Alert security team
@slack-security @pagerduty-security
""",
"tags": ["alert_type:security", "threat:auth_failure"],
"options": {
"thresholds": {
"critical": 100,
"warning": 50
},
"renotify_interval": 15,
"escalation_message": "Continued authentication failures - possible ongoing attack!"
}
}
security_alerts.append(auth_failure_alert)
# 2. 異常なIPアクセスパターン
suspicious_ip_alert = {
"name": "Suspicious IP Access Pattern",
"type": "log alert",
"query": "logs(\"@network.client.ip:* @http.status_code:[400 TO 499]\").index(\"*\").rollup(\"count\").by(\"@network.client.ip\").last(\"10m\") > 100",
"message": """
🚨 **Security Alert: Suspicious IP Activity**
IP Address: {{client_ip}}
Failed Requests: {{value}} in last 10 minutes
**Analysis:**
- High volume of failed requests from single IP
- Potential scanning or attack activity
- Geographic location: {{geo_location}}
**Response Actions:**
1. Investigate IP reputation
2. Check request patterns and targets
3. Consider IP blocking
4. Review security logs
@slack-security
""",
"tags": ["alert_type:security", "threat:suspicious_ip"],
"options": {
"thresholds": {
"critical": 200,
"warning": 100
},
"renotify_interval": 30
}
}
security_alerts.append(suspicious_ip_alert)
return security_alerts
def create_business_kpi_alert(self, kpi_name, threshold_value, comparison="below"):
"""
ビジネスKPIアラートの作成
"""
operator = "<" if comparison == "below" else ">"
alert_config = {
"name": f"Business KPI Alert - {kpi_name}",
"type": "log alert",
"query": f"logs(\"@kpi.name:{kpi_name}\").index(\"*\").rollup(\"avg\", \"@kpi.value\").last(\"15m\") {operator} {threshold_value}",
"message": f"""
📊 **Business KPI Alert**
KPI: {kpi_name}
Current Value: {{{{value}}}}
Threshold: {threshold_value} ({comparison})
**Business Impact:**
- Performance indicator is {comparison} expected levels
- Potential revenue or user experience impact
- Requires immediate business review
**Escalation:**
- Business stakeholders notified
- Technical teams investigating
- Trending analysis needed
@slack-business @business-stakeholders
""",
"tags": ["alert_type:business_kpi", f"kpi:{kpi_name}"],
"options": {
"thresholds": {
"critical": threshold_value * 0.8 if comparison == "below" else threshold_value * 1.2,
"warning": threshold_value
},
"renotify_interval": 60,
"evaluation_delay": 300 # 5分間の評価遅延
}
}
return self._create_monitor(alert_config)
def create_multi_alert_composite(self, service_name):
"""
複合アラートの作成(複数条件の組み合わせ)
"""
composite_alert = {
"name": f"Service Health Composite Alert - {service_name}",
"type": "composite",
"query": f"(avg(last_5m):logs(\"service:{service_name} level:error\").index(\"*\").rollup(\"count\") > 10) && (avg(last_5m):avg:service.response_time{{service:{service_name}}} > 2000)",
"message": f"""
🔥 **Critical Service Health Alert**
Service: {service_name}
Multiple health indicators triggered simultaneously:
**Error Conditions:**
- High error rate: {{{{error_rate}}}}
- Slow response time: {{{{response_time}}}}ms
**Severity Assessment:**
- Compound service degradation detected
- User experience significantly impacted
- Immediate intervention required
**Emergency Response:**
1. Page on-call engineer immediately
2. Activate incident response process
3. Prepare rollback procedures
4. Brief executive team if prolonged
@pagerduty-critical @slack-incidents
""",
"tags": ["service:" + service_name, "alert_type:composite", "severity:critical"],
"options": {
"notify_audit": True,
"locked": True,
"include_tags": True,
"renotify_interval": 5, # 5分ごとに再通知
"escalation_message": "SERVICE CRITICAL - Multiple failures detected!"
}
}
return self._create_monitor(composite_alert)
def _create_monitor(self, alert_config):
"""
モニター作成の内部メソッド
"""
url = f"{self.base_url}/monitor"
response = requests.post(url, headers=self.headers, data=json.dumps(alert_config))
return response.json()
def setup_alert_escalation_policies(self):
"""
アラートエスカレーションポリシーの設定
"""
escalation_policies = {
"low_severity": {
"initial_notification": ["slack-dev"],
"escalation_time_minutes": 30,
"escalation_targets": ["team-lead"],
"max_escalations": 2
},
"medium_severity": {
"initial_notification": ["slack-ops", "email-ops"],
"escalation_time_minutes": 15,
"escalation_targets": ["on-call-engineer", "team-lead"],
"max_escalations": 3
},
"high_severity": {
"initial_notification": ["pagerduty", "slack-ops", "sms-ops"],
"escalation_time_minutes": 10,
"escalation_targets": ["senior-engineer", "engineering-manager"],
"max_escalations": 4
},
"critical_severity": {
"initial_notification": ["pagerduty-critical", "phone-call", "slack-executives"],
"escalation_time_minutes": 5,
"escalation_targets": ["director-engineering", "cto"],
"max_escalations": 5,
"executive_notification": True
}
}
return escalation_policies
# アラート設定の使用例
alerting = DatadogLogAlerting(
api_key="your-api-key",
app_key="your-app-key"
)
# エラー率アラートの作成
error_alert = alerting.create_error_rate_alert("web-api", 3.0)
# 異常検知アラートの作成
anomaly_alert = alerting.create_anomaly_detection_alert("web-api")
# セキュリティアラートの作成
security_alerts = alerting.create_security_alert()
# ビジネスKPIアラートの作成
kpi_alert = alerting.create_business_kpi_alert("daily_revenue", 10000, "below")
# 複合アラートの作成
composite_alert = alerting.create_multi_alert_composite("web-api")
print("All alerts configured successfully!")
ログ分析のベストプラクティス
効率的なログ分析戦略
# ログ分析ベストプラクティス実装
class LogAnalysisBestPractices:
"""
ログ分析のベストプラクティス実装クラス
"""
def __init__(self):
self.analysis_strategies = self._initialize_strategies()
self.performance_thresholds = self._set_performance_thresholds()
def _initialize_strategies(self):
"""
分析戦略の初期化
"""
return {
"structured_logging": {
"description": "構造化ログによる効率的な分析",
"implementation": self._implement_structured_logging,
"benefits": [
"高速検索・フィルタリング",
"自動フィールド抽出",
"一貫性のある分析"
]
},
"contextual_enrichment": {
"description": "コンテキスト情報による分析の充実",
"implementation": self._implement_contextual_enrichment,
"benefits": [
"包括的な問題分析",
"根本原因の特定",
"影響範囲の評価"
]
},
"temporal_analysis": {
"description": "時系列分析による傾向把握",
"implementation": self._implement_temporal_analysis,
"benefits": [
"パターン識別",
"予測分析",
"季節性の理解"
]
},
"automated_insights": {
"description": "自動化された洞察抽出",
"implementation": self._implement_automated_insights,
"benefits": [
"24/7監視",
"即座の問題検出",
"プロアクティブな対応"
]
}
}
def _set_performance_thresholds(self):
"""
パフォーマンス閾値の設定
"""
return {
"query_response_time": {
"excellent": 100, # < 100ms
"good": 500, # < 500ms
"acceptable": 2000, # < 2s
"poor": 5000 # < 5s
},
"log_volume": {
"low": 1000, # < 1K logs/min
"medium": 10000, # < 10K logs/min
"high": 100000, # < 100K logs/min
"extreme": 1000000 # < 1M logs/min
},
"error_rate": {
"excellent": 0.1, # < 0.1%
"good": 1.0, # < 1%
"acceptable": 5.0, # < 5%
"critical": 10.0 # < 10%
}
}
def _implement_structured_logging(self):
"""
構造化ログの実装ガイドライン
"""
return {
"field_standards": {
"timestamp": "ISO 8601形式(UTC)",
"level": "DEBUG/INFO/WARN/ERROR/FATAL",
"service": "サービス名",
"version": "セマンティックバージョニング",
"environment": "dev/staging/production",
"correlation_id": "リクエスト追跡ID",
"user_id": "ユーザー識別子",
"session_id": "セッション識別子"
},
"message_guidelines": {
"clarity": "明確で理解しやすいメッセージ",
"consistency": "一貫した形式と用語",
"actionability": "対応可能な情報を含む",
"context": "十分なコンテキスト情報"
},
"implementation_example": {
"json_format": {
"timestamp": "2024-01-15T10:30:00.000Z",
"level": "ERROR",
"service": "payment-service",
"version": "2.1.0",
"environment": "production",
"correlation_id": "req-123-456",
"user_id": "user-789",
"message": "Payment processing failed",
"error": {
"type": "PaymentGatewayError",
"code": "INSUFFICIENT_FUNDS",
"details": "Card ending in 1234 has insufficient funds"
},
"context": {
"amount": 99.99,
"currency": "USD",
"payment_method": "credit_card",
"gateway": "stripe"
}
}
}
}
def _implement_contextual_enrichment(self):
"""
コンテキスト充実の実装
"""
return {
"enrichment_sources": {
"user_context": [
"ユーザー属性(プラン、地域、デバイス)",
"セッション情報",
"行動履歴"
],
"system_context": [
"サーバー情報(ホスト、コンテナID)",
"リソース使用状況",
"ネットワーク情報"
],
"business_context": [
"機能フラグ状態",
"A/Bテストバリアント",
"ビジネスイベント"
]
},
"enrichment_pipeline": [
{
"stage": "collection",
"action": "基本ログデータの収集",
"tools": ["Datadog Agent", "Custom loggers"]
},
{
"stage": "parsing",
"action": "構造化データの抽出",
"tools": ["Grok parsers", "JSON parsers"]
},
{
"stage": "enrichment",
"action": "外部データソースとの結合",
"tools": ["User DB", "Config service", "GeoIP"]
},
{
"stage": "tagging",
"action": "検索・分析用タグの付与",
"tools": ["Tag processors", "Category processors"]
}
]
}
def _implement_temporal_analysis(self):
"""
時系列分析の実装
"""
return {
"analysis_patterns": {
"trend_analysis": {
"description": "長期トレンドの識別",
"timeframes": ["daily", "weekly", "monthly", "quarterly"],
"metrics": ["volume", "error_rate", "performance"],
"techniques": ["moving_averages", "linear_regression", "seasonal_decomposition"]
},
"anomaly_detection": {
"description": "異常パターンの検出",
"algorithms": ["statistical", "machine_learning", "threshold_based"],
"sensitivity": ["low", "medium", "high"],
"confidence_levels": [95, 99, 99.9]
},
"cyclical_patterns": {
"description": "周期的パターンの識別",
"cycles": ["hourly", "daily", "weekly", "seasonal"],
"applications": ["capacity_planning", "alert_tuning", "performance_optimization"]
}
},
"implementation_tools": {
"datadog_features": [
"Anomaly Detection",
"Forecasting",
"Outlier Detection"
],
"custom_analytics": [
"Time series decomposition",
"Change point detection",
"Pattern matching"
]
}
}
def _implement_automated_insights(self):
"""
自動洞察の実装
"""
return {
"insight_categories": {
"performance_insights": {
"slow_queries": "応答時間の異常なクエリの特定",
"resource_bottlenecks": "リソース制約の識別",
"scaling_opportunities": "スケーリングが必要な箇所"
},
"reliability_insights": {
"error_patterns": "エラーの傾向と根本原因",
"failure_predictions": "障害の予兆検出",
"recovery_analysis": "復旧時間とパターン"
},
"security_insights": {
"threat_detection": "セキュリティ脅威の特定",
"access_anomalies": "異常なアクセスパターン",
"compliance_violations": "コンプライアンス違反"
},
"business_insights": {
"user_behavior": "ユーザー行動の分析",
"feature_adoption": "機能利用状況",
"conversion_optimization": "コンバージョン改善機会"
}
},
"automation_framework": {
"data_collection": "自動ログ収集とパース",
"pattern_recognition": "機械学習によるパターン識別",
"insight_generation": "AIによる洞察生成",
"action_recommendation": "推奨アクションの提示",
"continuous_learning": "フィードバックによる改善"
}
}
def create_analysis_playbook(self, use_case):
"""
用途別分析プレイブックの作成
"""
playbooks = {
"incident_response": {
"phase_1_detection": [
"アラート情報の確認",
"影響範囲の初期評価",
"類似インシデントの検索"
],
"phase_2_investigation": [
"エラーログの時系列分析",
"関連システムの状態確認",
"相関するイベントの特定"
],
"phase_3_resolution": [
"根本原因の特定",
"修正アクションの実施",
"回復の確認"
],
"phase_4_postmortem": [
"インシデントタイムラインの作成",
"改善アクションの策定",
"予防策の実装"
]
},
"performance_optimization": {
"baseline_establishment": [
"現在のパフォーマンス指標測定",
"ユーザー体験ベンチマーク",
"リソース使用状況分析"
],
"bottleneck_identification": [
"応答時間分析",
"スループット測定",
"エラー率評価"
],
"optimization_planning": [
"改善機会の優先順位付け",
"実装計画の策定",
"成功指標の定義"
],
"validation": [
"改善効果の測定",
"パフォーマンス回帰テスト",
"継続監視の設定"
]
},
"security_analysis": {
"threat_hunting": [
"異常アクセスパターン検索",
"権限昇格試行の検出",
"データ流出の兆候確認"
],
"compliance_monitoring": [
"アクセスログの監査",
"データ保護規則の遵守確認",
"セキュリティポリシー違反の検出"
],
"incident_investigation": [
"セキュリティイベントの時系列分析",
"攻撃ベクトルの特定",
"影響範囲の評価"
]
}
}
return playbooks.get(use_case, {})
def optimize_log_analysis_performance(self):
"""
ログ分析パフォーマンスの最適化
"""
optimization_strategies = {
"query_optimization": {
"index_usage": [
"頻繁に検索されるフィールドのインデックス化",
"複合インデックスの活用",
"時間範囲の適切な指定"
],
"query_patterns": [
"ワイルドカード検索の最小化",
"フィルタ条件の最適化",
"結果セットサイズの制限"
]
},
"data_organization": {
"partitioning": [
"時間ベースパーティショニング",
"サービス別データ分離",
"ログレベル別ストレージ"
],
"retention_policies": [
"ホット/ウォーム/コールドストレージ活用",
"データライフサイクル管理",
"コスト効率的な長期保存"
]
},
"analysis_automation": {
"pre_computed_metrics": [
"定期的な集計処理",
"ダッシュボード用データ準備",
"アラート条件の事前計算"
],
"caching_strategies": [
"頻繁なクエリ結果のキャッシュ",
"ダッシュボードデータの事前生成",
"分析結果の再利用"
]
}
}
return optimization_strategies
# ベストプラクティスの使用例
best_practices = LogAnalysisBestPractices()
# 構造化ログの実装指針
structured_logging = best_practices._implement_structured_logging()
print("Structured Logging Guidelines:", json.dumps(structured_logging, indent=2))
# インシデント対応プレイブック
incident_playbook = best_practices.create_analysis_playbook("incident_response")
print("Incident Response Playbook:", json.dumps(incident_playbook, indent=2))
# パフォーマンス最適化策
optimization = best_practices.optimize_log_analysis_performance()
print("Performance Optimization:", json.dumps(optimization, indent=2))
この第5部では、Datadogのログ管理機能を包括的に解説しました。効率的なログ収集から高度な分析手法、インテリジェントなアラート設定まで、ログデータを価値ある洞察に変換するための実践的な手法を学習できます。
次の記事では、さらに高度なDatadog活用手法について詳しく解説していく予定です。構造化ログ、相関分析、自動化されたインサイト抽出を通じて、データ駆動型の運用とプロアクティブな問題解決を実現しましょう。