Logstashを使用したNew Relicログ統合 - Elastic Stackとの連携実装

Logstashは、Elastic Stackの中核コンポーネントとして、強力なデータ処理エンジンとしての地位を確立しています。New Relicとの統合により、既存のElastic Stack環境を活用しながら、包括的なオブザーバビリティを実現できます。本記事では、LogstashからNew Relicへのログ統合の詳細な実装方法と、ハイブリッド監視アーキテクチャの構築について解説します。

Logstashとは

Logstashは、様々なソースからデータを収集し、変換処理を行って、複数の出力先に送信するサーバーサイドデータ処理パイプラインです。Ruby で開発され、JVMで実行されるため、高いスケーラビリティとパフォーマンスを提供します。

Logstashのアーキテクチャ概要

Logstashのデータ処理パイプラインは、入力(Input)、フィルター(Filter)、出力(Output)の3つのステージから構成されています。

入力ステージでは、ファイル、ネットワーク、メッセージキュー、データベースなど、多様なデータソースからログを収集します。プラグインベースの設計により、ほぼすべてのデータソースに対応できます。

フィルターステージでは、収集されたデータの変換、正規化、エンリッチメントを行います。Grokパターン、JSON パース、日時変換、地理情報追加など、豊富な処理機能を提供します。

出力ステージでは、処理されたデータを最終的な出力先に送信します。Elasticsearch、New Relic、ファイル、データベースなど、複数の出力先への同時送信が可能です。

New Relic出力プラグインの設定

LogstashからNew Relicにログデータを送信するには、HTTP出力プラグインを使用します。New Relic専用プラグインは存在しないため、REST APIを直接呼び出す形で実装します。

基本設定の実装

最も基本的な設定例から始めましょう。

ruby
input {
  file {
    path => "/var/log/application/*.log"
    start_position => "beginning"
    codec => json
  }
}

filter {
  mutate {
    add_field => { "service" => "myapp" }
    add_field => { "environment" => "production" }
  }
}

output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    http_method => "post"
    headers => {
      "Content-Type" => "application/json"
      "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
    }
    format => "json_batch"
    mapping => {
      "logs" => "[logs]"
    }
  }
}

この設定では、アプリケーションログファイルを監視し、JSON形式でNew Relicに送信します。

高度なデータ変換

LogstashとNew Relicの統合において、適切なデータ変換は重要です。以下の例では、複雑なデータ処理を実装しています。

ruby
filter {
  # Grokパターンによる非構造化ログの解析
  if [source] =~ /nginx/ {
    grok {
      match => { 
        "message" => '%{NGINXACCESS}' 
      }
    }
    
    # IPアドレスからGeoIP情報を取得
    geoip {
      source => "clientip"
      target => "geoip"
    }
    
    # ユーザーエージェントの解析
    useragent {
      source => "agent"
      target => "user_agent"
    }
  }
  
  # アプリケーションログの処理
  if [fields][service] == "myapp" {
    # JSON ログのパース
    json {
      source => "message"
      target => "app_data"
    }
    
    # レスポンス時間の数値変換
    mutate {
      convert => { "[app_data][response_time]" => "float" }
    }
    
    # エラーレベルの正規化
    translate {
      field => "[app_data][level]"
      destination => "log_level"
      dictionary => {
        "ERROR" => "error"
        "WARN" => "warning"
        "INFO" => "info"
        "DEBUG" => "debug"
      }
    }
  }
  
  # New Relic形式への変換
  mutate {
    add_field => { 
      "timestamp" => "%{@timestamp}"
      "hostname" => "%{host}"
    }
    remove_field => [ "@version", "@timestamp" ]
  }
}

マルチ出力設定

Logstashの強力な機能の一つは、複数の出力先への同時送信です。ElasticsearchとNew Relicの両方にデータを送信する設定例を示します。

Elasticsearch + New Relic設定

ruby
input {
  beats {
    port => 5044
  }
  
  file {
    path => "/var/log/syslog"
    start_position => "beginning"
  }
}

filter {
  # 共通の前処理
  mutate {
    add_field => { 
      "collected_at" => "%{@timestamp}"
      "logstash_host" => "%{host}"
    }
  }
  
  # ログタイプ別の処理
  if [fields][logtype] == "application" {
    json {
      source => "message"
    }
  } else if [fields][logtype] == "system" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:logsource} %{PROG:program}: %{GREEDYDATA:logmessage}" }
    }
  }
}

output {
  # Elasticsearchへの出力
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
  
  # New Relicへの出力
  http {
    url => "https://log-api.newrelic.com/log/v1"
    http_method => "post"
    headers => {
      "Content-Type" => "application/json"
      "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
    }
    format => "json"
    mapping => {
      "timestamp" => "%{@timestamp}"
      "message" => "%{message}"
      "service" => "%{[fields][service]}"
      "environment" => "%{[fields][environment]}"
      "host" => "%{host}"
      "logtype" => "%{[fields][logtype]}"
    }
  }
  
  # デバッグ用のファイル出力
  if [log_level] == "debug" {
    file {
      path => "/var/log/logstash/debug.log"
      codec => json_lines
    }
  }
}

条件付き出力とルーティング

ログの内容や属性に応じて、異なる処理や出力先を設定できます。

エラーログの優先処理

ruby
filter {
  # エラーレベルの判定
  if [level] == "ERROR" or [level] == "FATAL" {
    mutate {
      add_field => { "priority" => "high" }
      add_field => { "alert_required" => "true" }
    }
  }
  
  # セキュリティログの特別処理
  if [program] == "sshd" or [program] == "sudo" {
    mutate {
      add_field => { "security_event" => "true" }
      add_field => { "category" => "security" }
    }
  }
}

output {
  # 高優先度ログの即座送信
  if [priority] == "high" {
    http {
      url => "https://log-api.newrelic.com/log/v1"
      http_method => "post"
      headers => {
        "Content-Type" => "application/json"
        "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
      }
      format => "json"
    }
  }
  
  # セキュリティログの専用出力
  if [security_event] == "true" {
    http {
      url => "https://log-api.newrelic.com/log/v1"
      http_method => "post"
      headers => {
        "Content-Type" => "application/json"
        "X-License-Key" => "${NEW_RELIC_SECURITY_LICENSE_KEY}"
      }
      format => "json"
      mapping => {
        "category" => "security"
        "event_type" => "%{program}"
        "timestamp" => "%{@timestamp}"
        "message" => "%{message}"
      }
    }
  }
  
  # 通常ログのバッチ送信
  else {
    http {
      url => "https://log-api.newrelic.com/log/v1"
      http_method => "post"
      headers => {
        "Content-Type" => "application/json"
        "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
      }
      format => "json_batch"
    }
  }
}

パフォーマンス最適化

大量のログデータを処理する環境では、Logstashのパフォーマンス最適化が重要です。

JVM設定の最適化

Logstashのjvm.optionsファイルでJVMパラメータを調整します。

# ヒープサイズの設定(システムメモリの50-75%)
-Xms4g
-Xmx4g

# ガベージコレクションの最適化
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200

# JIT コンパイラの最適化
-XX:+TieredCompilation
-XX:TieredStopAtLevel=1

パイプライン設定の最適化

logstash.ymlでパイプライン設定を最適化します。

yaml
# ワーカースレッド数(CPUコア数と同等)
pipeline.workers: 8

# バッチサイズ(メモリ使用量とレイテンシのバランス)
pipeline.batch.size: 1000

# バッチ遅延(リアルタイム性との兼ね合い)
pipeline.batch.delay: 50

出力バッファリング

HTTP出力プラグインでバッファリングを設定し、ネットワーク効率を向上させます。

ruby
output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    http_method => "post"
    headers => {
      "Content-Type" => "application/json"
      "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
    }
    format => "json_batch"
    
    # HTTP設定の最適化
    keepalive => true
    pool_max => 10
    pool_max_per_route => 2
    
    # 再試行設定
    retry_non_idempotent => true
    retry_on_failure => 3
    
    # タイムアウト設定
    connect_timeout => 30
    socket_timeout => 60
  }
}

監視とメトリクス

Logstash自体の監視とパフォーマンストラッキングについて説明します。

内部メトリクスの監視

Logstashの内部メトリクスAPIを有効にします。

yaml
# logstash.yml
http.host: "0.0.0.0"
http.port: 9600

# メトリクス収集の有効化
monitoring.enabled: true

メトリクス情報をNew Relicに送信する設定例です。

ruby
input {
  http_poller {
    urls => {
      logstash_stats => "http://localhost:9600/_node/stats"
    }
    request_timeout => 60
    interval => 60
    codec => "json"
    tags => ["logstash_metrics"]
  }
}

filter {
  if "logstash_metrics" in [tags] {
    mutate {
      add_field => { "metric_type" => "logstash_performance" }
    }
  }
}

output {
  if "logstash_metrics" in [tags] {
    http {
      url => "https://log-api.newrelic.com/log/v1"
      http_method => "post"
      headers => {
        "Content-Type" => "application/json"
        "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
      }
      format => "json"
    }
  }
}

セキュリティとコンプライアンス

本番環境でのLogstash運用におけるセキュリティベストプラクティスを説明します。

認証情報の保護

環境変数やキーストアを使用して認証情報を保護します。

bash
# 環境変数での設定
export NEW_RELIC_LICENSE_KEY="your_license_key_here"

# Logstashキーストアの使用
bin/logstash-keystore create
bin/logstash-keystore add NEW_RELIC_LICENSE_KEY

設定ファイルでキーストアの値を参照します。

ruby
output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    headers => {
      "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
    }
  }
}

強化されたセキュリティ設定

ruby
output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    http_method => "post"
    headers => {
      "Content-Type" => "application/json"
      "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
      "User-Agent" => "Logstash/8.9.0"
    }
    
    # 強化されたSSL/TLS設定
    ssl_verification_mode => "full"
    ssl_certificate_authorities => ["/etc/ssl/certs/ca-certificates.crt"]
    ssl_cipher_suites => ["TLS_AES_128_GCM_SHA256", "TLS_AES_256_GCM_SHA384", "TLS_CHACHA20_POLY1305_SHA256"]
    ssl_supported_protocols => ["TLSv1.2", "TLSv1.3"]
    ssl_verify_hostname => true
    
    # 接続タイムアウト設定
    connect_timeout => 30
    socket_timeout => 60
    request_timeout => 90
    
    # 接続プール設定
    pool_max => 20
    pool_max_per_route => 5
    keepalive => true
    keepalive_timeout => 120
  }
}

データマスキング

機密情報をマスキングまたは除去する設定例です。

ruby
filter {
  # クレジットカード番号のマスキング
  mutate {
    gsub => [
      "message", "\d{4}-\d{4}-\d{4}-\d{4}", "****-****-****-****"
    ]
  }
  
  # 機密フィールドの除去
  mutate {
    remove_field => ["password", "api_key", "secret"]
  }
  
  # IPアドレスの匿名化
  anonymize {
    fields => ["client_ip"]
    algorithm => "SHA1"
    key => "${ANONYMIZE_KEY}"
  }
}

トラブルシューティング

一般的な問題と解決方法について説明します。

パフォーマンス問題の診断

bash
# Logstashプロセスの監視
curl -X GET "localhost:9600/_node/stats/pipeline"

# JVMメモリ使用量の確認
curl -X GET "localhost:9600/_node/stats/jvm"

# パイプライン統計の確認
curl -X GET "localhost:9600/_node/stats/pipeline?pretty"

接続問題の解決

ruby
output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    # デバッグログの有効化
    codec => rubydebug
    
    # 詳細ログの設定
    metadata => true
    
    # 失敗時のファイル出力
    retry_on_failure => 3
  }
  
  # 失敗ログの保存
  file {
    path => "/var/log/logstash/failed-newrelic.log"
    codec => json_lines
  }
}

コスト最適化戦略

大規模環境でのLogstashによるログ管理コストを効率的に制御するための戦略について説明します。

ログ量削減のサンプリング戦略

ruby
filter {
  # 環境別サンプリング設定
  if [environment] == "production" {
    if [level] == "DEBUG" {
      # 本番環境のDEBUGログを95%削減
      ruby {
        code => '
          if rand() > 0.05
            event.cancel
          end
        '
      }
    }
  }
  
  # 高頻度アクセスログのサンプリング
  if [fields][logtype] == "access" {
    if [request_uri] =~ /^\/health/ {
      # ヘルスチェックログを90%削減
      ruby {
        code => '
          if rand() > 0.1
            event.cancel
          end
        '
      }
    }
  }
  
  # 重要度別保存期間設定
  mutate {
    add_field => {
      "retention_days" => "%{level == 'ERROR' ? '90' : level == 'WARN' ? '30' : '7'}"
      "retention_policy" => "%{level == 'ERROR' ? 'long' : level == 'WARN' ? 'standard' : 'short'}"
    }
  }
}

効率的なバッチ処理設定

ruby
output {
  http {
    url => "https://log-api.newrelic.com/log/v1"
    http_method => "post"
    
    # コスト効率を考慮した設定
    format => "json_batch"
    batch_events => 2000
    batch_timeout => 30
    
    # 圧縮有効化
    http_compression => true
    compression_level => 6
    
    # 効率的な再試行設定
    retry_on_failure => 5
    retry_non_idempotent => true
    automatic_retries => 3
    retry_initial_interval => 2
    retry_max_interval => 300
  }
}

保存期間の最適化手法

ruby
filter {
  # カテゴリ別保存期間設定
  if [fields][category] == "security" {
    mutate {
      add_field => {
        "data_retention_days" => "365"
        "retention_policy" => "compliance"
        "archive_required" => "true"
      }
    }
  } else if [fields][category] == "audit" {
    mutate {
      add_field => {
        "data_retention_days" => "180"
        "retention_policy" => "regulatory"
      }
    }
  } else if [level] == "DEBUG" and [environment] == "development" {
    mutate {
      add_field => {
        "data_retention_days" => "3"
        "retention_policy" => "temporary"
      }
    }
  }
}

運用監視の拡充

ログ転送エージェントのヘルスチェック

ruby
input {
  http_poller {
    urls => {
      logstash_node_stats => "http://localhost:9600/_node/stats"
    }
    request_timeout => 30
    interval => 60
    codec => "json"
    tags => ["logstash_monitoring"]
  }
}

filter {
  if "logstash_monitoring" in [tags] {
    # メトリクス処理
    ruby {
      code => '
        jvm = event.get("[jvm]") || {}
        mem = jvm["mem"] || {}
        heap_used_percent = mem["heap_used_percent"] || 0
        
        pipeline = event.get("[pipeline]") || {}
        events = pipeline["events"] || {}
        
        # メモリ使用量チェック(85%超過でアラート)
        if heap_used_percent > 85
          event.set("alert_type", "high_memory_usage")
          event.set("severity", "critical")
        end
        
        # イベント処理停止チェック
        events_in = events["in"] || 0
        events_out = events["out"] || 0
        if events_in > 0 && events_out == 0
          event.set("alert_type", "events_stalled")
          event.set("severity", "critical")
        end
        
        # キュー使用率チェック
        queue = pipeline["queue"] || {}
        queue_events = queue["events"] || 0
        if queue_events > 10000
          event.set("alert_type", "high_queue_usage")
          event.set("severity", "warning")
        end
        
        event.set("service", "logstash-monitoring")
        event.set("monitoring_timestamp", Time.now.iso8601)
      '
    }
  }
}

output {
  if "logstash_monitoring" in [tags] {
    http {
      url => "https://log-api.newrelic.com/log/v1"
      http_method => "post"
      headers => {
        "Content-Type" => "application/json"
        "X-License-Key" => "${NEW_RELIC_LICENSE_KEY}"
      }
      format => "json"
    }
  }
}

パフォーマンスメトリクスの監視設定

ruby
input {
  exec {
    command => "sh -c 'cat /proc/loadavg; free -m | grep Mem; df -h / | tail -1'"
    interval => 60
    codec => "plain"
    tags => ["system_performance"]
  }
}

filter {
  if "system_performance" in [tags] {
    ruby {
      code => '
        lines = event.get("message").split("\n")
        
        # Load average解析
        load_avg = lines[0].split(" ")[0].to_f
        
        # Memory使用量解析
        mem_line = lines[1].split
        mem_total = mem_line[1].to_i
        mem_used = mem_line[2].to_i
        mem_usage_percent = (mem_used.to_f / mem_total * 100).round(1)
        
        # Disk使用量解析
        disk_line = lines[2].split
        disk_usage_percent = disk_line[4].gsub("%", "").to_i
        
        # パフォーマンスアラート判定
        alerts = []
        if load_avg > 4.0
          alerts << "high_load_average"
        end
        if mem_usage_percent > 85
          alerts << "high_memory_usage"
        end
        if disk_usage_percent > 90
          alerts << "high_disk_usage"
        end
        
        event.set("load_average", load_avg)
        event.set("memory_usage_percent", mem_usage_percent)
        event.set("disk_usage_percent", disk_usage_percent)
        event.set("performance_alerts", alerts)
        event.set("service", "logstash-performance")
        event.set("severity", alerts.empty? ? "info" : "warning")
      '
    }
  }
}

まとめ

LogstashとNew Relicの統合により、既存のElastic Stack環境を活用しながら包括的なログ管理システムを構築できます。強力なデータ変換機能と柔軟な出力設定により、複雑な要件にも対応可能です。

コスト最適化戦略として、サンプリング設定、保存期間の最適化、効率的なバッチ処理により、大規模環境でも経済的なログ管理が可能です。運用監視の拡充により、Logstashの健全性とパフォーマンスを継続的に監視し、安定した運用を実現できます。

適切なパフォーマンス最適化とセキュリティ設定を実装することで、スケーラブルで安全なハイブリッド監視アーキテクチャを実現できます。Elasticsearchでの詳細分析とNew Relicでの統合監視を組み合わせることで、最大の価値を得られます。

次のステップとして、Filebeatを使用した軽量ログ転送の実装について学んでいきましょう。ELK StackのBeatsファミリーとNew Relicの効率的な統合手法を詳しく解説していきます。


関連記事: Filebeatを使用したNew Relicログ統合関連記事: Fluentdを使用したNew Relicログ統合