New Relicデータエクスポート方法 - APIとツール統合による外部連携

New Relicで収集されたテレメトリデータを外部システムで活用することで、より幅広い分析や自動化が可能になります。この記事では、New Relicが提供する各種エクスポート方法から、サードパーティツールとの統合まで、実践的なデータ活用手法を包括的に解説します。効率的なデータ利用により、監視データの価値を最大化する方法を学んでいきましょう。

データエクスポートの概要と利用場面

New Relicからのデータエクスポートは、監視データを他のシステムやツールで活用するための重要な機能です。主な利用場面として、ビジネスインテリジェンス(BI)ツールでの長期トレンド分析、カスタムレポートの自動生成、他の監視ツールとの統合、データサイエンス分析のためのデータ準備などがあります。

New Relicでは、リアルタイムでのデータアクセスから大容量データの一括取得まで、様々なニーズに対応したエクスポート方法が用意されています。各方法にはそれぞれ特徴と適用場面があり、目的に応じて適切な手法を選択することが重要です。

データエクスポートの際には、データ保持期間、レート制限、認証方法、データ形式といった技術的制約を理解しておく必要があります。NerdGraph APIは1分間に3,000リクエスト、1時間に200,000リクエストの制限があり、REST APIは1分間に1,000リクエストまでとなっています。これらの要素を考慮した設計により、安定したデータ連携システムを構築できます。

NerdGraph GraphQL APIによるデータアクセス

NerdGraph APIは、New Relicの統一された現代的なAPIインターフェースです。GraphQLベースの設計により、必要なデータのみを効率的に取得でき、複数のデータソースを単一のクエリで組み合わせることが可能です。

基本的なNerdGraphクエリ構造

javascript
const query = `
  {
    actor {
      account(id: YOUR_ACCOUNT_ID) {
        nrql(query: "SELECT count(*) FROM Transaction FACET appName SINCE 1 hour ago") {
          results
          metadata {
            timeWindow {
              begin
              end
            }
            facets
          }
        }
      }
    }
  }
`;

// Node.js での実装例
const fetch = require('node-fetch');

async function queryNewRelicData(graphqlQuery, apiKey) {
  const response = await fetch('https://api.newrelic.com/graphql', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'API-Key': apiKey
    },
    body: JSON.stringify({ query: graphqlQuery })
  });
  
  const data = await response.json();
  return data;
}

複雑なデータ取得クエリ例

javascript
const complexQuery = `
  {
    actor {
      account(id: YOUR_ACCOUNT_ID) {
        # アプリケーションパフォーマンスデータ
        applicationPerformance: nrql(
          query: "SELECT average(duration), percentile(duration, 95), count(*) FROM Transaction FACET appName SINCE 24 hours ago"
        ) {
          results
        }
        
        # エラー率データ
        errorRates: nrql(
          query: "SELECT percentage(count(*), WHERE error IS true) FROM Transaction FACET appName SINCE 24 hours ago"
        ) {
          results
        }
        
        # インフラストラクチャメトリクス
        infrastructureMetrics: nrql(
          query: "SELECT average(cpuPercent), average(memoryUsedPercent) FROM SystemSample FACET entityName SINCE 1 hour ago"
        ) {
          results
        }
      }
    }
  }
`;

REST APIとデータ取得パターン

従来のREST APIも引き続きサポートされており、特定の用途では使いやすい場合があります。特に、単純なメトリクス取得や既存システムとの統合において有効です。

Python での REST API 活用例

python
import requests
import json
from datetime import datetime, timedelta

class NewRelicDataExporter:
    def __init__(self, api_key, account_id):
        self.api_key = api_key
        self.account_id = account_id
        self.base_url = 'https://api.newrelic.com/v2'
        self.headers = {
            'X-Api-Key': api_key,
            'Content-Type': 'application/json'
        }
    
    def get_application_metrics(self, app_id, metric_names, start_time=None, end_time=None):
        """アプリケーションメトリクスの取得"""
        if not start_time:
            start_time = datetime.now() - timedelta(hours=1)
        if not end_time:
            end_time = datetime.now()
        
        url = f"{self.base_url}/applications/{app_id}/metrics/data.json"
        params = {
            'names[]': metric_names,
            'from': start_time.isoformat(),
            'to': end_time.isoformat(),
            'summarize': 'true'
        }
        
        response = requests.get(url, headers=self.headers, params=params)
        return response.json()
    
    def export_to_csv(self, data, filename):
        """データをCSV形式でエクスポート"""
        import csv
        
        with open(filename, 'w', newline='') as csvfile:
            if data and 'metric_data' in data:
                fieldnames = ['timestamp', 'metric_name', 'value']
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                
                for metric in data['metric_data']['metrics']:
                    metric_name = metric['name']
                    for datapoint in metric['timeslices']:
                        writer.writerow({
                            'timestamp': datapoint['from'],
                            'metric_name': metric_name,
                            'value': datapoint['values'].get('average_value', 0)
                        })

# 使用例
exporter = NewRelicDataExporter('YOUR_API_KEY', 'YOUR_ACCOUNT_ID')
metrics = exporter.get_application_metrics(
    app_id='123456',
    metric_names=['Apdex', 'HttpDispatcher', 'Errors/all']
)
exporter.export_to_csv(metrics, 'newrelic_metrics.csv')

データストリーミングとリアルタイム統合

大量のデータを継続的に処理する場合や、リアルタイムでのデータ統合が必要な場合は、ストリーミングAPIや Webhook機能を活用できます。

Webhook による リアルタイムアラートデータの取得

javascript
const express = require('express');
const app = express();

app.use(express.json());

// New Relic からのWebhookを受信
app.post('/newrelic-webhook', (req, res) => {
  const alertData = req.body;
  
  // アラートデータの処理
  processAlertData(alertData);
  
  res.status(200).send('OK');
});

function processAlertData(alertData) {
  // Slack通知
  sendSlackNotification(alertData);
  
  // データベースへの保存
  saveToDatabase(alertData);
  
  // 他システムへの連携
  forwardToTicketingSystem(alertData);
}

async function sendSlackNotification(alertData) {
  const slackMessage = {
    text: `🚨 New Relic Alert: ${alertData.condition_name}`,
    attachments: [{
      color: alertData.current_state === 'OPEN' ? 'danger' : 'good',
      fields: [
        { title: 'Application', value: alertData.targets[0].name, short: true },
        { title: 'Condition', value: alertData.condition_name, short: true },
        { title: 'State', value: alertData.current_state, short: true },
        { title: 'Details', value: alertData.details, short: false }
      ]
    }]
  };
  
  // Slack API への送信処理
  // ...
}

サードパーティツール統合

New Relicデータを活用した外部ツールとの統合により、より包括的な分析環境を構築できます。

Tableau との統合例

python
import pandas as pd
import tableau_server_client as TSC
from newrelic_data_client import NewRelicClient

class TableauNewRelicIntegrator:
    def __init__(self, nr_api_key, tableau_server_url, tableau_token):
        self.nr_client = NewRelicClient(nr_api_key)
        self.tableau_server = tableau_server_url
        self.tableau_token = tableau_token
    
    def create_performance_dashboard_data(self):
        """パフォーマンスダッシュボード用データの準備"""
        
        # New Relic からデータ取得
        performance_data = self.nr_client.query_nrql("""
            SELECT average(duration) as avg_response_time,
                   percentile(duration, 95) as p95_response_time,
                   count(*) as request_count,
                   percentage(count(*), WHERE error IS true) as error_rate
            FROM Transaction
            FACET appName, DATE(timestamp)
            SINCE 30 days ago
        """)
        
        # Pandas DataFrame に変換
        df = pd.DataFrame(performance_data['results'])
        df['date'] = pd.to_datetime(df['facet.date'])
        df['application'] = df['facet.appName']
        
        # Tableau 用にデータ整形
        tableau_data = df[['date', 'application', 'avg_response_time', 
                          'p95_response_time', 'request_count', 'error_rate']]
        
        return tableau_data
    
    def publish_to_tableau(self, data, datasource_name):
        """Tableau Server へのデータ公開"""
        server = TSC.Server(self.tableau_server, use_server_version=True)
        
        with server.auth.sign_in(self.tableau_token):
            # データソースとして公開
            datasource = TSC.DatasourceItem()
            datasource.name = datasource_name
            
            # CSV として一時保存してアップロード
            csv_path = f'/tmp/{datasource_name}.csv'
            data.to_csv(csv_path, index=False)
            
            # Tableau Server に公開
            server.datasources.publish(datasource, csv_path, 'Overwrite')

Power BI との統合

python
class PowerBINewRelicConnector:
    def __init__(self, nr_api_key, powerbi_settings):
        self.nr_client = NewRelicClient(nr_api_key)
        self.powerbi_settings = powerbi_settings
    
    def create_business_metrics_report(self):
        """ビジネスメトリクスレポートの生成"""
        
        # カスタムイベントからビジネスデータ取得
        business_data = self.nr_client.query_nrql("""
            SELECT count(*) as transaction_count,
                   sum(orderAmount) as total_revenue,
                   average(orderAmount) as avg_order_value,
                   uniqueCount(customerId) as unique_customers
            FROM PurchaseCompleted
            FACET DATE(timestamp), productCategory
            SINCE 90 days ago
        """)
        
        # アプリケーションパフォーマンスデータ
        performance_data = self.nr_client.query_nrql("""
            SELECT average(duration) as avg_response_time,
                   count(*) as total_requests
            FROM Transaction
            WHERE appName = 'ecommerce-api'
            FACET DATE(timestamp)
            SINCE 90 days ago
        """)
        
        # データ結合とPower BI 用フォーマット
        combined_data = self.merge_business_and_performance_data(
            business_data, performance_data
        )
        
        return combined_data
    
    def schedule_data_refresh(self):
        """定期的なデータ更新スケジュール"""
        # Power BI REST API を使用した自動更新設定
        pass

大規模データ処理とバッチエクスポート

大量のヒストリカルデータを処理する場合は、効率的なバッチ処理パターンを採用する必要があります。

python
import asyncio
import aiohttp
from datetime import datetime, timedelta
import json

class BatchDataExporter:
    def __init__(self, api_key, account_id):
        self.api_key = api_key
        self.account_id = account_id
        self.session = None
    
    async def export_historical_data(self, start_date, end_date, chunk_hours=6):
        """大量ヒストリカルデータのバッチエクスポート"""
        
        async with aiohttp.ClientSession() as session:
            self.session = session
            
            current_date = start_date
            all_data = []
            
            while current_date < end_date:
                chunk_end = min(current_date + timedelta(hours=chunk_hours), end_date)
                
                # 並列でデータ取得
                tasks = [
                    self.fetch_application_data(current_date, chunk_end),
                    self.fetch_infrastructure_data(current_date, chunk_end),
                    self.fetch_browser_data(current_date, chunk_end)
                ]
                
                chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
                
                # エラーハンドリング
                for result in chunk_results:
                    if isinstance(result, Exception):
                        print(f"Error processing chunk {current_date}: {result}")
                    else:
                        all_data.extend(result)
                
                current_date = chunk_end
                
                # レート制限回避のための待機
                await asyncio.sleep(1)
            
            return all_data
    
    async def fetch_application_data(self, start_time, end_time):
        """アプリケーションデータの取得"""
        query = f"""
        SELECT timestamp, appName, duration, error
        FROM Transaction
        WHERE timestamp >= '{start_time.isoformat()}'
        AND timestamp < '{end_time.isoformat()}'
        LIMIT MAX
        """
        
        return await self.execute_nrql_query(query)
    
    async def execute_nrql_query(self, nrql_query):
        """NRQL クエリの実行"""
        graphql_query = {
            "query": f"""
            {{
              actor {{
                account(id: {self.account_id}) {{
                  nrql(query: "{nrql_query}") {{
                    results
                  }}
                }}
              }}
            }}
            """
        }
        
        headers = {
            'Content-Type': 'application/json',
            'API-Key': self.api_key
        }
        
        async with self.session.post(
            'https://api.newrelic.com/graphql',
            json=graphql_query,
            headers=headers
        ) as response:
            data = await response.json()
            return data['data']['actor']['account']['nrql']['results']

# 使用例
async def main():
    exporter = BatchDataExporter('YOUR_API_KEY', 'YOUR_ACCOUNT_ID')
    
    start_date = datetime(2024, 1, 1)
    end_date = datetime(2024, 1, 31)
    
    historical_data = await exporter.export_historical_data(start_date, end_date)
    
    # データを JSON ファイルに保存
    with open('historical_data.json', 'w') as f:
        json.dump(historical_data, f, indent=2, default=str)

# 実行
asyncio.run(main())

エクスポート時の考慮事項とベストプラクティス

データエクスポートを実装する際には、いくつかの重要な考慮事項があります。

レート制限とスロットリングに対する適切な対応が必要です。New Relic APIには使用量制限があり、これを超えるとリクエストが拒否される可能性があります。

セキュリティとアクセス制御も重要な要素です。APIキーは適切に管理し、最小権限の原則に従ってアクセススコープを設定します。

データ品質とエラーハンドリングを考慮した堅牢な実装が求められます。ネットワーク障害やAPI の一時的な問題に対応できる再試行機構を実装しましょう。

一般的なエラーとその対処法:

  • 429 Too Many Requests: 指数バックオフによる再試行
  • 403 Forbidden: APIキーとアクセス権限の確認
  • 400 Bad Request: クエリ構文とパラメータの検証
  • 504 Gateway Timeout: クエリの複雑度とデータ量の削減

コスト最適化の観点から、必要なデータのみを取得し、不要な重複処理を避ける設計が重要です。

まとめ

New Relicのデータエクスポート機能を活用することで、監視データの価値を大幅に拡張できます。NerdGraph APIによる柔軟なデータアクセス、REST APIによる簡単な統合、リアルタイムストリーミング、そしてサードパーティツールとの連携により、包括的なデータ活用環境を構築できます。

適切なエクスポート手法の選択と実装により、New Relicで収集した貴重なテレメトリデータを、ビジネス価値創出のための重要な資産として活用していきましょう。


関連記事: NRQL基礎関連記事: カスタムダッシュボード作成