New Relic APM入門 第5.2章 - 分散トレーシングとサービスマップ

📖 ナビゲーション

メイン: 第5章 New Relic APM(高度化)
前セクション: 第5.1章 APM基本機能
次セクション: 第5.3章 コードレベル分析


💡 この章で学べること

現代のアプリケーションは、複数のマイクロサービスが連携して動作します。1つのユーザーリクエストが、認証サービス→在庫サービス→決済サービス→通知サービスを横断する際、どこで問題が発生しているかを特定するのは困難です。

分散トレーシングは、この複雑なサービス間の処理フローを1本の線として可視化し、パフォーマンス問題の根本原因を迅速に特定する技術です。

学習目標

  • [ ] 分散トレーシングの基本概念:トレース、スパン、コンテキスト伝播を理解
  • [ ] New Relic実装:各言語でのDistributed Tracingセットアップ方法
  • [ ] OpenTelemetry統合:標準化されたテレメトリーデータ収集
  • [ ] サービスマップ活用:依存関係の可視化と問題特定
  • [ ] 実践的トラブルシューティング:実際の障害ケースでの分析手法
  • [ ] パフォーマンス最適化:トレーシングデータを活用した改善戦略

5.2.1 分散トレーシングの基本概念

分散システムの監視課題

従来のモノリシックアプリケーションでは、1つのサーバー内で処理が完結するため、パフォーマンス問題の特定は比較的簡単でした。しかし、マイクロサービスアーキテクチャでは、複数のサービスにまたがる処理の可視化が最大の課題となります。

典型的な問題シナリオ

yaml
# ECサイトのチェックアウトプロセス例
User_Checkout_Request:
  step_1:
    service: "API Gateway"
    duration: "50ms"
    status: "success"
    
  step_2:
    service: "User Service"
    action: "Authentication & Authorization"
    duration: "120ms" 
    status: "success"
    
  step_3:
    service: "Inventory Service"
    action: "Stock Check"
    duration: "2,500ms"  # ←問題!異常に遅い
    status: "success"
    dependencies:
      - "Product Database (MySQL)"
      - "Warehouse API (External)"
    
  step_4:
    service: "Payment Service"
    action: "Process Payment"
    duration: "timeout"  # ←step_3の遅延により実行されず
    status: "failed"
    
  step_5:
    service: "Order Service"
    action: "Create Order"
    status: "not_executed"

# 結果:ユーザーには「決済エラー」と表示されるが、
# 実際の原因は在庫サービスのデータベースクエリ遅延

従来の監視では

  • 各サービスは「正常」と表示(個別には機能している)
  • 決済サービスで「エラー」が記録される
  • 真の原因(在庫サービスの遅延)が見えない

分散トレーシングでは

  • 1つのトレース内で全ステップが可視化
  • 在庫チェック2.5秒が一目で判明
  • データベースクエリレベルまで詳細分析可能

トレーシングの基本要素

分散トレーシングは、以下の要素で構成されます:

1. トレース(Trace)

1つのユーザーリクエストの完全な処理履歴です。

javascript
// トレースの例(ECサイトのチェックアウト)
const checkoutTrace = {
  traceId: "abc123-def456-ghi789",  // 一意のトレースID
  duration: "3.2s",                 // 全体処理時間
  status: "error",                  // 最終ステータス
  services: [
    "api-gateway",
    "user-service", 
    "inventory-service",
    "payment-service"
  ],
  spans: [/* 後述のスパン配列 */],
  error: {
    service: "payment-service",
    message: "Request timeout",
    rootCause: "inventory-service slow query"  // 根本原因
  }
};

2. スパン(Span)

トレース内の個別の処理単位です。各サービスやメソッド呼び出しが1つのスパンになります。

python
# スパンの詳細構造例
checkout_spans = [
    {
        "span_id": "span-001",
        "parent_id": None,  # ルートスパン
        "service": "api-gateway",
        "operation": "POST /checkout",
        "start_time": "2025-07-21T10:00:00.000Z",
        "end_time": "2025-07-21T10:00:03.200Z",
        "duration": "3200ms",
        "tags": {
            "http.method": "POST",
            "http.url": "/api/checkout",
            "user.id": "user123",
            "request.size": "1.2KB"
        }
    },
    {
        "span_id": "span-002", 
        "parent_id": "span-001",  # 親スパン
        "service": "user-service",
        "operation": "authenticate_user",
        "start_time": "2025-07-21T10:00:00.050Z",
        "end_time": "2025-07-21T10:00:00.170Z",
        "duration": "120ms",
        "tags": {
            "auth.method": "JWT",
            "user.role": "premium",
            "cache.hit": True
        }
    },
    {
        "span_id": "span-003",
        "parent_id": "span-001", 
        "service": "inventory-service",
        "operation": "check_stock",
        "start_time": "2025-07-21T10:00:00.200Z", 
        "end_time": "2025-07-21T10:00:02.700Z",
        "duration": "2500ms",  # 問題のスパン!
        "tags": {
            "inventory.items": "3",
            "db.query.slow": True,  # 遅いクエリ検出
            "warehouse.api.timeout": True
        },
        "logs": [
            {
                "timestamp": "2025-07-21T10:00:02.600Z",
                "level": "WARNING", 
                "message": "Warehouse API response time > 2s"
            }
        ]
    }
]

3. コンテキスト伝播(Context Propagation)

トレース情報をサービス間で受け渡す仕組みです。

go
// Go言語でのコンテキスト伝播例
package main

import (
    "context"
    "net/http"
    "github.com/newrelic/go-agent/v3/newrelic"
)

// API Gateway側:トレースコンテキスト作成
func checkoutHandler(w http.ResponseWriter, r *http.Request) {
    // New Relicトランザクション開始
    txn := app.StartTransaction("checkout-process")
    defer txn.End()
    
    // HTTPヘッダーにトレーシング情報埋め込み
    ctx := newrelic.NewContext(context.Background(), txn)
    
    // 下流サービス呼び出し
    userResult := callUserService(ctx, r.Header.Get("Authorization"))
    inventoryResult := callInventoryService(ctx, r.Body)
    
    // 結果処理...
}

// User Service側:トレースコンテキスト受信
func callUserService(ctx context.Context, authToken string) UserResult {
    // New Relicトランザクション情報を引き継ぎ
    txn := newrelic.FromContext(ctx)
    
    // 子スパン作成
    segment := txn.StartSegment("user-authentication")
    defer segment.End()
    
    // JWT検証処理
    user, err := validateJWT(authToken)
    if err != nil {
        // エラー情報もトレースに記録
        segment.AddAttribute("auth.error", err.Error())
        return UserResult{Error: err}
    }
    
    segment.AddAttribute("user.id", user.ID)
    segment.AddAttribute("user.role", user.Role)
    
    return UserResult{User: user}
}

サービス間通信でのトレーシング

HTTP通信でのヘッダー伝播

javascript
// Node.js Express: 発信側
const axios = require('axios');
const newrelic = require('newrelic');

app.post('/checkout', async (req, res) => {
    try {
        // 在庫チェック呼び出し
        const inventoryResponse = await axios.post('http://inventory-service/check', {
            items: req.body.items
        }, {
            headers: {
                // New Relicトレーシングヘッダー自動付与
                ...newrelic.getBrowserTimingHeader(),
                'Content-Type': 'application/json'
            }
        });
        
        // 決済処理呼び出し
        const paymentResponse = await axios.post('http://payment-service/process', {
            amount: req.body.amount,
            method: req.body.payment_method
        });
        
        res.json({ success: true, orderId: generateOrderId() });
        
    } catch (error) {
        // エラーもトレースに自動記録
        newrelic.noticeError(error);
        res.status(500).json({ error: error.message });
    }
});

// Node.js Express: 受信側(在庫サービス)
app.post('/check', (req, res) => {
    // New Relicが自動的にトレーシングヘッダーを検出し、
    // 同一トレース内の子スパンとして記録
    
    const items = req.body.items;
    
    // 時間のかかるデータベース処理
    checkInventoryInDatabase(items)
        .then(result => {
            newrelic.addCustomAttribute('inventory.checked_items', items.length);
            newrelic.addCustomAttribute('inventory.available', result.available);
            
            res.json(result);
        })
        .catch(error => {
            newrelic.noticeError(error);
            res.status(500).json({ error: 'Inventory check failed' });
        });
});

gRPC通信でのメタデータ伝播

python
# Python gRPC: サーバー側
import grpc
import newrelic.agent

class InventoryService(inventory_pb2_grpc.InventoryServiceServicer):
    
    @newrelic.agent.function_trace('inventory.check_stock')
    def CheckStock(self, request, context):
        # gRPCメタデータからトレーシング情報抽出
        metadata = dict(context.invocation_metadata())
        
        # New Relicトレースコンテキスト復元
        newrelic.agent.add_custom_attribute('grpc.method', 'CheckStock')
        newrelic.agent.add_custom_attribute('request.items', len(request.items))
        
        try:
            # データベース処理(自動監視)
            availability = self.check_database_inventory(request.items)
            
            # 外部API呼び出し(ウェアハウス確認)
            with newrelic.agent.FunctionTrace('warehouse.api_call'):
                warehouse_stock = self.call_warehouse_api(request.items)
            
            result = inventory_pb2.StockResponse(
                available=availability and warehouse_stock,
                message="Stock check completed"
            )
            
            newrelic.agent.record_custom_metric('Custom/Inventory/CheckSuccess', 1)
            return result
            
        except Exception as e:
            newrelic.agent.notice_error()
            newrelic.agent.record_custom_metric('Custom/Inventory/CheckError', 1)
            
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f'Inventory check failed: {str(e)}')
            return inventory_pb2.StockResponse()

# Python gRPC: クライアント側  
def call_inventory_service(items):
    
    with grpc.insecure_channel('inventory-service:50051') as channel:
        stub = inventory_pb2_grpc.InventoryServiceStub(channel)
        
        # New Relicトレーシングメタデータ作成
        metadata = []
        
        # gRPC呼び出し(トレーシング情報自動伝播)
        with newrelic.agent.FunctionTrace('grpc.inventory_check'):
            response = stub.CheckStock(
                inventory_pb2.StockRequest(items=items),
                metadata=metadata
            )
        
        return response.available

5.2.2 New Relic Distributed Tracing 実装

基本セットアップ

New Relicの分散トレーシングは、最小限の設定で強力な可視化を実現します。

Node.js実装

javascript
// newrelic.js 設定ファイル
'use strict';

exports.config = {
  app_name: ['ECommerce-API'],
  license_key: process.env.NEW_RELIC_LICENSE_KEY,
  
  // 分散トレーシング有効化(重要!)
  distributed_tracing: {
    enabled: true
  },
  
  // トランザクション詳細設定
  transaction_tracer: {
    enabled: true,
    transaction_threshold: 0.5,      // 500ms以上を詳細記録
    record_sql: 'obfuscated',        // SQLクエリ記録(個人情報マスク)
    explain_enabled: true,           // 実行計画記録
    explain_threshold: 0.5           // 500ms以上のクエリの実行計画
  },
  
  // スパン詳細化
  span_events: {
    enabled: true,
    max_samples_stored: 2000
  }
};

// メインアプリケーション
require('newrelic');  // 最初の行で必須
const express = require('express');
const axios = require('axios');
const app = express();

// チェックアウトAPI(分散トレーシング統合)
app.post('/api/checkout', async (req, res) => {
    // New Relicが自動的にトレース開始
    const newrelic = require('newrelic');
    
    // カスタム属性でビジネスコンテキスト追加
    newrelic.addCustomAttribute('checkout.user_id', req.body.user_id);
    newrelic.addCustomAttribute('checkout.total_amount', req.body.amount);
    newrelic.addCustomAttribute('checkout.item_count', req.body.items.length);
    
    try {
        // ステップ1:ユーザー認証(自動トレーシング)
        const authResult = await callAuthService(req.headers.authorization);
        
        // ステップ2:在庫確認(自動トレーシング)
        const inventoryResult = await checkInventory(req.body.items);
        
        // ステップ3:決済処理(自動トレーシング)
        const paymentResult = await processPayment({
            amount: req.body.amount,
            method: req.body.payment_method,
            user_id: authResult.user_id
        });
        
        // ステップ4:注文作成(自動トレーシング)
        const orderResult = await createOrder({
            user_id: authResult.user_id,
            items: req.body.items,
            payment_id: paymentResult.transaction_id
        });
        
        // 成功メトリクス
        newrelic.recordMetric('Custom/Checkout/Success', 1);
        newrelic.recordMetric('Custom/Checkout/Amount', req.body.amount);
        
        res.json({
            success: true,
            order_id: orderResult.order_id,
            estimated_delivery: orderResult.delivery_date
        });
        
    } catch (error) {
        // エラー詳細記録(トレースと自動関連付け)
        newrelic.noticeError(error, {
            'checkout.step': error.step || 'unknown',
            'checkout.service': error.service || 'unknown'
        });
        
        newrelic.recordMetric('Custom/Checkout/Error', 1);
        
        res.status(500).json({
            error: 'Checkout failed',
            reference: newrelic.getTraceMetadata().traceId
        });
    }
});

// サービス呼び出し関数(詳細トレーシング)
async function callAuthService(authHeader) {
    // カスタムスパン作成
    return await newrelic.startBackgroundTransaction('auth_service_call', async () => {
        const response = await axios.post('http://auth-service/validate', {}, {
            headers: { 'Authorization': authHeader },
            timeout: 5000
        });
        
        return response.data;
    });
}

async function checkInventory(items) {
    return await newrelic.startBackgroundTransaction('inventory_check', async () => {
        // 詳細な監視属性
        newrelic.addCustomAttribute('inventory.items_to_check', items.length);
        
        const response = await axios.post('http://inventory-service/check', {
            items: items
        });
        
        newrelic.addCustomAttribute('inventory.available', response.data.available);
        
        if (!response.data.available) {
            const error = new Error('Items not available');
            error.step = 'inventory_check';
            error.service = 'inventory-service';
            throw error;
        }
        
        return response.data;
    });
}

Python (Django/Flask) 実装

python
# Django settings.py
import newrelic.agent

# New Relic設定
NEW_RELIC_CONFIG = {
    'distributed_tracing.enabled': True,
    'transaction_tracer.enabled': True,
    'transaction_tracer.record_sql': 'obfuscated',
    'span_events.enabled': True,
    'span_events.max_samples_stored': 2000
}

# wsgi.py
import newrelic.agent
newrelic.agent.initialize('/path/to/newrelic.ini')

# views.py - チェックアウトAPI
import requests
import newrelic.agent
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
@newrelic.agent.function_trace('checkout_api')
def checkout_view(request):
    """チェックアウト処理(分散トレーシング統合)"""
    
    # ビジネスコンテキスト追加
    newrelic.agent.add_custom_attribute('user.id', request.POST.get('user_id'))
    newrelic.agent.add_custom_attribute('checkout.amount', request.POST.get('amount'))
    
    try:
        # ステップ1:認証サービス呼び出し
        with newrelic.agent.FunctionTrace('auth_service_call'):
            auth_response = call_auth_service(request.META.get('HTTP_AUTHORIZATION'))
            
            if not auth_response['valid']:
                raise AuthenticationError("Invalid token")
        
        # ステップ2:在庫サービス呼び出し  
        with newrelic.agent.FunctionTrace('inventory_service_call'):
            inventory_response = call_inventory_service(request.POST.get('items'))
            
            if not inventory_response['available']:
                raise InventoryError("Items not available")
        
        # ステップ3:決済サービス呼び出し
        with newrelic.agent.FunctionTrace('payment_service_call'):
            payment_response = call_payment_service({
                'amount': request.POST.get('amount'),
                'method': request.POST.get('payment_method'),
                'user_id': auth_response['user_id']
            })
        
        # ステップ4:注文作成
        with newrelic.agent.FunctionTrace('order_creation'):
            order = create_order_record(
                user_id=auth_response['user_id'],
                items=request.POST.get('items'),
                payment_id=payment_response['transaction_id']
            )
        
        # 成功メトリクス
        newrelic.agent.record_custom_metric('Custom/Checkout/Success', 1)
        
        return JsonResponse({
            'success': True,
            'order_id': order.id,
            'trace_id': newrelic.agent.current_trace_id()  # デバッグ用
        })
        
    except Exception as e:
        # エラー詳細(トレース自動関連付け)
        newrelic.agent.notice_error(attributes={
            'error.step': getattr(e, 'step', 'unknown'),
            'error.service': getattr(e, 'service', 'unknown')
        })
        
        return JsonResponse({
            'error': str(e),
            'trace_id': newrelic.agent.current_trace_id()
        }, status=500)

def call_auth_service(auth_header):
    """認証サービス呼び出し(詳細監視)"""
    
    newrelic.agent.add_custom_attribute('auth.method', 'jwt_validation')
    
    try:
        response = requests.post('http://auth-service/validate', 
            headers={'Authorization': auth_header},
            timeout=5
        )
        
        result = response.json()
        
        # 認証結果属性
        newrelic.agent.add_custom_attribute('auth.user_id', result.get('user_id'))
        newrelic.agent.add_custom_attribute('auth.user_role', result.get('role'))
        
        return result
        
    except requests.exceptions.Timeout:
        error = AuthenticationError("Auth service timeout")
        error.service = 'auth-service'
        raise error
    except Exception as e:
        error = AuthenticationError(f"Auth service error: {str(e)}")
        error.service = 'auth-service' 
        raise error

def call_inventory_service(items_json):
    """在庫サービス呼び出し(詳細監視)"""
    
    items = json.loads(items_json)
    newrelic.agent.add_custom_attribute('inventory.items_count', len(items))
    
    try:
        response = requests.post('http://inventory-service/check', 
            json={'items': items},
            timeout=10
        )
        
        result = response.json()
        
        # 在庫確認結果
        newrelic.agent.add_custom_attribute('inventory.available', result.get('available'))
        newrelic.agent.add_custom_attribute('inventory.reserved_count', result.get('reserved_count'))
        
        return result
        
    except requests.exceptions.Timeout:
        error = InventoryError("Inventory service timeout")
        error.service = 'inventory-service'
        raise error

Java (Spring Boot) 実装

java
// application.yml
newrelic:
  config:
    distributed_tracing:
      enabled: true
    transaction_tracer:
      enabled: true
      record_sql: obfuscated
    span_events:
      enabled: true
      max_samples_stored: 2000

// CheckoutController.java
@RestController
@RequestMapping("/api")
public class CheckoutController {
    
    @Autowired
    private AuthServiceClient authServiceClient;
    
    @Autowired
    private InventoryServiceClient inventoryServiceClient;
    
    @Autowired
    private PaymentServiceClient paymentServiceClient;
    
    @PostMapping("/checkout")
    @Trace(dispatcher = true)  // New Relicトレース有効化
    public ResponseEntity<CheckoutResponse> processCheckout(
            @RequestBody CheckoutRequest request,
            @RequestHeader("Authorization") String authHeader) {
        
        // ビジネスコンテキスト追加
        NewRelic.addCustomAttribute("user.id", request.getUserId());
        NewRelic.addCustomAttribute("checkout.amount", request.getAmount());
        NewRelic.addCustomAttribute("checkout.items", request.getItems().size());
        
        try {
            // ステップ1:認証(自動分散トレーシング)
            AuthResult authResult = authServiceClient.validateUser(authHeader);
            
            // ステップ2:在庫確認(自動分散トレーシング)
            InventoryResult inventoryResult = inventoryServiceClient.checkStock(request.getItems());
            
            if (!inventoryResult.isAvailable()) {
                throw new InventoryNotAvailableException("Requested items not available");
            }
            
            // ステップ3:決済処理(自動分散トレーシング)
            PaymentRequest paymentRequest = PaymentRequest.builder()
                .amount(request.getAmount())
                .method(request.getPaymentMethod())
                .userId(authResult.getUserId())
                .build();
                
            PaymentResult paymentResult = paymentServiceClient.processPayment(paymentRequest);
            
            // ステップ4:注文作成(ローカル処理、自動監視)
            Order order = createOrder(authResult.getUserId(), request.getItems(), paymentResult.getTransactionId());
            
            // 成功メトリクス
            NewRelic.recordMetric("Custom/Checkout/Success", 1);
            NewRelic.recordMetric("Custom/Checkout/Amount", request.getAmount().floatValue());
            
            return ResponseEntity.ok(CheckoutResponse.builder()
                .success(true)
                .orderId(order.getId())
                .traceId(NewRelic.getTraceMetadata().getTraceId())  // デバッグ用
                .build());
                
        } catch (Exception e) {
            // エラー詳細(分散トレーシング自動関連付け)
            NewRelic.noticeError(e, Map.of(
                "checkout.step", getErrorStep(e),
                "checkout.service", getErrorService(e)
            ));
            
            NewRelic.recordMetric("Custom/Checkout/Error", 1);
            
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(CheckoutResponse.builder()
                    .success(false)
                    .error(e.getMessage())
                    .traceId(NewRelic.getTraceMetadata().getTraceId())
                    .build());
        }
    }
    
    @Trace(metricName = "Custom/Order/Creation")  // カスタムメトリクス名
    private Order createOrder(String userId, List<OrderItem> items, String paymentId) {
        // 注文作成処理(データベースアクセス等、自動監視)
        Order order = new Order();
        order.setUserId(userId);
        order.setItems(items);
        order.setPaymentId(paymentId);
        order.setCreatedAt(LocalDateTime.now());
        
        // New Relic属性追加
        NewRelic.addCustomAttribute("order.items_count", items.size());
        NewRelic.addCustomAttribute("order.total_amount", 
            items.stream().mapToDouble(OrderItem::getPrice).sum());
            
        return orderRepository.save(order);
    }
}

// FeignClient(マイクロサービス呼び出し、自動分散トレーシング)
@FeignClient(name = "auth-service")
public interface AuthServiceClient {
    
    @PostMapping("/validate")
    AuthResult validateUser(@RequestHeader("Authorization") String authHeader);
}

@FeignClient(name = "inventory-service") 
public interface InventoryServiceClient {
    
    @PostMapping("/check")
    InventoryResult checkStock(@RequestBody List<OrderItem> items);
}

5.2.3 OpenTelemetry統合

OpenTelemetryとは

OpenTelemetry(OTel)は、テレメトリーデータ(メトリクス、ログ、トレース)を標準化された方法で収集・送信するオープンソースフレームワークです。New Relicは、OpenTelemetryを完全サポートし、ベンダーロックインを回避しながら最高品質の監視を実現できます。

OpenTelemetry の優位性

yaml
# OpenTelemetryの利点
Vendor_Independence:
  benefit: "監視ツール乗り換え時のコスト削減"
  example: "New Relic → Jaeger → Datadog への移行が容易"
  
Standardized_Instrumentation:
  benefit: "一度の実装で複数ツール対応"
  example: "同一コードでNew Relic + Prometheus同時送信可能"
  
Rich_Ecosystem:
  benefit: "豊富なライブラリとインテグレーション"
  example: "HTTP、gRPC、データベース、Redis等の自動計装"
  
Future_Proof:
  benefit: "CNCF標準による長期サポート保証"
  example: "Kubernetes、Istio等との深い統合"

Node.js + OpenTelemetry + New Relic

javascript
// package.json 依存関係
{
  "dependencies": {
    "@opentelemetry/api": "^1.7.0",
    "@opentelemetry/sdk-node": "^0.45.0",
    "@opentelemetry/auto-instrumentations-node": "^0.40.0",
    "@newrelic/opentelemetry-exporter": "^1.0.0"
  }
}

// otel-config.js - OpenTelemetry設定
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { NewRelicExporter } = require('@newrelic/opentelemetry-exporter');

// New Relic エクスポーター設定
const newRelicExporter = new NewRelicExporter({
  apiKey: process.env.NEW_RELIC_LICENSE_KEY,
  endpoint: 'https://otlp.nr-data.net:4317'
});

// OpenTelemetry SDK初期化
const sdk = new NodeSDK({
  serviceName: 'ecommerce-checkout-service',
  serviceVersion: '1.2.3',
  
  // 自動計装(HTTP、Express、MySQL、Redis等)
  instrumentations: [getNodeAutoInstrumentations({
    '@opentelemetry/instrumentation-fs': {
      enabled: false  // ファイルシステムアクセス除外
    }
  })],
  
  // トレースエクスポーター(New Relic)
  traceExporter: newRelicExporter,
  
  // メトリクスエクスポーター(New Relic)
  metricExporter: newRelicExporter,
  
  // リソース属性(サービス識別情報)
  resource: {
    'service.name': 'ecommerce-checkout',
    'service.version': '1.2.3',
    'deployment.environment': process.env.NODE_ENV || 'development',
    'service.namespace': 'ecommerce',
    'service.instance.id': require('os').hostname()
  }
});

// SDK開始
sdk.start();
console.log('OpenTelemetry tracing initialized');

// メインアプリケーション
// app.js
require('./otel-config');  // OpenTelemetry初期化

const express = require('express');
const opentelemetry = require('@opentelemetry/api');
const app = express();

// OpenTelemetry tracer取得
const tracer = opentelemetry.trace.getTracer('checkout-service', '1.0.0');

// チェックアウトAPI(OpenTelemetry手動計装)
app.post('/api/checkout', async (req, res) => {
    // カスタムスパン作成
    const span = tracer.startSpan('checkout_process', {
        attributes: {
            'checkout.user_id': req.body.user_id,
            'checkout.amount': req.body.amount,
            'checkout.items_count': req.body.items.length,
            'http.method': req.method,
            'http.url': req.url
        }
    });
    
    try {
        // 認証処理(子スパン)
        const authResult = await tracer.startActiveSpan('user_authentication', async (authSpan) => {
            try {
                const result = await authenticateUser(req.headers.authorization);
                
                authSpan.setAttributes({
                    'auth.user_id': result.user_id,
                    'auth.user_role': result.role,
                    'auth.method': 'jwt'
                });
                
                authSpan.setStatus({ code: opentelemetry.SpanStatusCode.OK });
                return result;
                
            } catch (error) {
                authSpan.recordException(error);
                authSpan.setStatus({ 
                    code: opentelemetry.SpanStatusCode.ERROR, 
                    message: error.message 
                });
                throw error;
            } finally {
                authSpan.end();
            }
        });
        
        // 在庫確認処理(子スパン) 
        const inventoryResult = await tracer.startActiveSpan('inventory_check', async (inventorySpan) => {
            try {
                const result = await checkInventory(req.body.items);
                
                inventorySpan.setAttributes({
                    'inventory.items_checked': req.body.items.length,
                    'inventory.all_available': result.available,
                    'inventory.reserved': result.reserved_count
                });
                
                inventorySpan.addEvent('inventory_validation_complete', {
                    'validation.duration_ms': result.validation_time_ms
                });
                
                inventorySpan.setStatus({ code: opentelemetry.SpanStatusCode.OK });
                return result;
                
            } catch (error) {
                inventorySpan.recordException(error);
                inventorySpan.setStatus({ 
                    code: opentelemetry.SpanStatusCode.ERROR,
                    message: error.message 
                });
                throw error;
            } finally {
                inventorySpan.end();
            }
        });
        
        // 決済処理(子スパン)
        const paymentResult = await tracer.startActiveSpan('payment_processing', async (paymentSpan) => {
            try {
                const result = await processPayment({
                    amount: req.body.amount,
                    method: req.body.payment_method,
                    user_id: authResult.user_id
                });
                
                paymentSpan.setAttributes({
                    'payment.amount': req.body.amount,
                    'payment.method': req.body.payment_method,
                    'payment.transaction_id': result.transaction_id,
                    'payment.provider': result.provider
                });
                
                paymentSpan.setStatus({ code: opentelemetry.SpanStatusCode.OK });
                return result;
                
            } catch (error) {
                paymentSpan.recordException(error);
                paymentSpan.setStatus({
                    code: opentelemetry.SpanStatusCode.ERROR,
                    message: error.message
                });
                throw error;
            } finally {
                paymentSpan.end();
            }
        });
        
        // 注文作成
        const order = await createOrder({
            user_id: authResult.user_id,
            items: req.body.items,
            payment_id: paymentResult.transaction_id
        });
        
        // 成功属性とメトリクス
        span.setAttributes({
            'checkout.order_id': order.id,
            'checkout.success': true
        });
        
        span.addEvent('checkout_completed', {
            'order.id': order.id,
            'completion.timestamp': new Date().toISOString()
        });
        
        // カスタムメトリクス記録
        const meter = opentelemetry.metrics.getMeter('checkout-service');
        const successCounter = meter.createCounter('checkout_success_total', {
            description: 'Total successful checkouts'
        });
        successCounter.add(1, { 
            payment_method: req.body.payment_method,
            user_tier: authResult.role 
        });
        
        span.setStatus({ code: opentelemetry.SpanStatusCode.OK });
        
        res.json({
            success: true,
            order_id: order.id,
            trace_id: span.spanContext().traceId
        });
        
    } catch (error) {
        // エラー詳細記録
        span.recordException(error);
        span.setStatus({
            code: opentelemetry.SpanStatusCode.ERROR,
            message: error.message
        });
        
        span.setAttributes({
            'checkout.success': false,
            'error.type': error.constructor.name
        });
        
        // エラーメトリクス
        const meter = opentelemetry.metrics.getMeter('checkout-service');
        const errorCounter = meter.createCounter('checkout_error_total');
        errorCounter.add(1, { 
            error_type: error.constructor.name,
            error_step: error.step || 'unknown'
        });
        
        res.status(500).json({
            error: error.message,
            trace_id: span.spanContext().traceId
        });
        
    } finally {
        span.end();
    }
});

// サービス関数(OpenTelemetry自動計装活用)
async function authenticateUser(authHeader) {
    // HTTP呼び出しは自動計装される
    const response = await fetch('http://auth-service/validate', {
        method: 'POST',
        headers: {
            'Authorization': authHeader,
            'Content-Type': 'application/json'
        }
    });
    
    if (!response.ok) {
        const error = new Error('Authentication failed');
        error.step = 'authentication';
        throw error;
    }
    
    return await response.json();
}

async function checkInventory(items) {
    // HTTP + データベース呼び出しも自動計装
    const response = await fetch('http://inventory-service/check', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ items })
    });
    
    if (!response.ok) {
        const error = new Error('Inventory check failed');
        error.step = 'inventory_check';
        throw error;
    }
    
    return await response.json();
}

Python + OpenTelemetry + New Relic

python
# requirements.txt
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-instrumentation-flask==0.42b0
opentelemetry-instrumentation-requests==0.42b0
opentelemetry-instrumentation-sqlalchemy==0.42b0
opentelemetry-exporter-otlp==1.21.0

# otel_config.py - OpenTelemetry設定
import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

def initialize_otel():
    """OpenTelemetry + New Relic初期化"""
    
    # リソース設定(サービス識別情報)
    resource = Resource.create({
        "service.name": "ecommerce-checkout",
        "service.version": "1.2.3", 
        "deployment.environment": os.getenv("ENVIRONMENT", "development"),
        "service.namespace": "ecommerce"
    })
    
    # New Relic エクスポーター設定
    otlp_span_exporter = OTLPSpanExporter(
        endpoint="https://otlp.nr-data.net:4317",
        headers={"api-key": os.getenv("NEW_RELIC_LICENSE_KEY")}
    )
    
    otlp_metric_exporter = OTLPMetricExporter(
        endpoint="https://otlp.nr-data.net:4317", 
        headers={"api-key": os.getenv("NEW_RELIC_LICENSE_KEY")}
    )
    
    # トレース設定
    trace.set_tracer_provider(TracerProvider(resource=resource))
    span_processor = BatchSpanProcessor(otlp_span_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    # メトリクス設定
    metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter, export_interval_millis=30000)
    metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[metric_reader]))
    
    # 自動計装有効化
    FlaskInstrumentor().instrument()           # Flask自動計装
    RequestsInstrumentor().instrument()        # Requests自動計装  
    SQLAlchemyInstrumentor().instrument()      # SQLAlchemy自動計装
    
    print("OpenTelemetry initialized with New Relic")

# app.py - Flask アプリケーション
from flask import Flask, request, jsonify
import requests
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
from otel_config import initialize_otel

# OpenTelemetry初期化
initialize_otel()

app = Flask(__name__)

# トレーサー取得
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

# カスタムメトリクス定義
checkout_counter = meter.create_counter(
    "checkout_requests_total",
    description="Total checkout requests"
)
checkout_duration = meter.create_histogram(
    "checkout_duration_seconds", 
    description="Checkout processing duration"
)

@app.route('/api/checkout', methods=['POST'])
def checkout():
    """チェックアウトAPI(OpenTelemetry統合)"""
    
    # メインスパン開始
    with tracer.start_as_current_span("checkout_process") as span:
        
        # ビジネス属性追加
        span.set_attributes({
            "checkout.user_id": request.json.get('user_id'),
            "checkout.amount": request.json.get('amount'),
            "checkout.items_count": len(request.json.get('items', [])),
            "http.method": request.method,
            "http.route": request.endpoint
        })
        
        try:
            # 認証処理
            with tracer.start_as_current_span("user_authentication") as auth_span:
                auth_result = authenticate_user(request.headers.get('Authorization'))
                
                auth_span.set_attributes({
                    "auth.user_id": auth_result.get('user_id'),
                    "auth.user_role": auth_result.get('role'),
                    "auth.success": True
                })
            
            # 在庫確認処理
            with tracer.start_as_current_span("inventory_check") as inventory_span:
                inventory_result = check_inventory(request.json.get('items'))
                
                inventory_span.set_attributes({
                    "inventory.items_checked": len(request.json.get('items', [])),
                    "inventory.all_available": inventory_result.get('available'),
                    "inventory.processing_time_ms": inventory_result.get('processing_time')
                })
                
                if not inventory_result.get('available'):
                    raise InventoryNotAvailableError("Items not available")
            
            # 決済処理 
            with tracer.start_as_current_span("payment_processing") as payment_span:
                payment_result = process_payment({
                    'amount': request.json.get('amount'),
                    'method': request.json.get('payment_method'),
                    'user_id': auth_result.get('user_id')
                })
                
                payment_span.set_attributes({
                    "payment.amount": request.json.get('amount'),
                    "payment.method": request.json.get('payment_method'),
                    "payment.transaction_id": payment_result.get('transaction_id'),
                    "payment.success": True
                })
            
            # 注文作成
            with tracer.start_as_current_span("order_creation") as order_span:
                order = create_order({
                    'user_id': auth_result.get('user_id'),
                    'items': request.json.get('items'),
                    'payment_id': payment_result.get('transaction_id')
                })
                
                order_span.set_attributes({
                    "order.id": order.get('id'),
                    "order.creation_success": True
                })
            
            # 成功メトリクス
            checkout_counter.add(1, {
                "status": "success",
                "payment_method": request.json.get('payment_method')
            })
            
            span.set_attributes({
                "checkout.success": True,
                "checkout.order_id": order.get('id')
            })
            
            span.add_event("checkout_completed", {
                "order_id": order.get('id'),
                "completion_time": str(datetime.now())
            })
            
            span.set_status(Status(StatusCode.OK))
            
            return jsonify({
                "success": True,
                "order_id": order.get('id'),
                "trace_id": format(span.get_span_context().trace_id, '032x')
            })
            
        except Exception as e:
            # エラー処理
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.set_attributes({
                "checkout.success": False,
                "error.type": type(e).__name__
            })
            
            # エラーメトリクス
            checkout_counter.add(1, {
                "status": "error", 
                "error_type": type(e).__name__
            })
            
            return jsonify({
                "error": str(e),
                "trace_id": format(span.get_span_context().trace_id, '032x')
            }), 500

def authenticate_user(auth_header):
    """認証サービス呼び出し(自動計装)"""
    response = requests.post('http://auth-service/validate', 
        headers={'Authorization': auth_header},
        timeout=5
    )
    
    if not response.ok:
        raise AuthenticationError("Authentication failed")
    
    return response.json()

def check_inventory(items):
    """在庫確認サービス呼び出し(自動計装)"""
    response = requests.post('http://inventory-service/check',
        json={'items': items},
        timeout=10
    )
    
    if not response.ok:
        raise InventoryServiceError("Inventory check failed")
    
    return response.json()

5.2.4 サービスマップの活用

サービス依存関係の可視化

サービスマップは、分散トレーシングデータから自動生成される、サービス間の依存関係とパフォーマンス状況を可視化したダイアグラムです。

サービスマップが解決する課題

yaml
# 複雑なマイクロサービス環境での課題
Dependency_Complexity:
  problem: "50+のサービスがどう連携しているか不明"
  solution: "サービスマップで依存関係を自動可視化"
  
Performance_Bottleneck:
  problem: "どのサービスが全体のパフォーマンスを悪化させているか"
  solution: "リアルタイム応答時間とエラー率表示"
  
Impact_Analysis:
  problem: "1つのサービス障害が他に与える影響範囲が不明"
  solution: "障害波及範囲の即座特定"
  
Capacity_Planning:
  problem: "どのサービスがスケーリング必要か判断困難"
  solution: "トラフィックパターンとリソース使用状況可視化"

New Relic サービスマップの読み方

javascript
// New Relic UI でのサービスマップ要素説明
const serviceMapElements = {
    
    // ノード(各サービス)
    serviceNode: {
        name: "checkout-service",
        health: "healthy|warning|critical",
        metrics: {
            throughput: "245 rpm",        // リクエスト/分
            response_time: "267ms",       // 平均レスポンス時間
            error_rate: "0.8%",          // エラー率
            apdex: "0.92"               // ユーザー満足度スコア
        },
        alerts: {
            active_violations: 2,         // アクティブなアラート数
            recent_deployments: 1         // 最近のデプロイメント
        }
    },
    
    // エッジ(サービス間接続)
    serviceConnection: {
        from: "checkout-service",
        to: "payment-service", 
        metrics: {
            call_count: "156 calls/min",  // 呼び出し頻度
            avg_duration: "445ms",        // 平均呼び出し時間
            error_percentage: "2.1%",     // 失敗率
            timeout_count: "3/hour"       // タイムアウト発生数
        },
        health_indicator: "yellow"        // 接続健全性
    },
    
    // 外部サービス
    externalService: {
        name: "Stripe Payment API",
        type: "external_web_service",
        metrics: {
            response_time: "1.2s",
            availability: "99.2%",
            recent_issues: ["high_latency_detected"]
        }
    },
    
    // データベース  
    database: {
        name: "orders-postgresql",
        type: "database",
        metrics: {
            query_time: "89ms",
            connection_pool: "78% used",
            slow_queries: 5
        }
    }
};

実践的なサービスマップ活用例

ケース1: パフォーマンス問題の根本原因分析

状況: チェックアウト処理が突然遅くなった(平均300ms → 3秒)

yaml
# サービスマップでの問題発見プロセス
Step_1_Overview:
  observation: "checkout-serviceが赤色(critical)表示"
  action: "クリックして詳細確認"
  
Step_2_Dependencies:
  observation: 
    - "auth-service: 120ms(正常)"
    - "inventory-service: 2.8s(異常)← ここが問題!"
    - "payment-service: timeout(連鎖障害)"
  action: "inventory-serviceを詳しく分析"
  
Step_3_DrillDown:
  target: "inventory-service"
  findings:
    database_connection: "orders-db: 2.5s(異常遅延)"
    external_api: "warehouse-api: 300ms(正常)"
  conclusion: "データベースクエリが原因"
  
Step_4_Solution:
  immediate: "遅いクエリの特定・最適化"
  preventive: "データベース監視アラート強化"

ケース2: 新機能デプロイ後の影響分析

状況: 推薦エンジンの新バージョンをデプロイ後、全体パフォーマンス低下

javascript
// デプロイ前後のサービスマップ比較分析
const deploymentImpactAnalysis = {
    
    // デプロイ前(ベースライン)
    before_deployment: {
        "product-service": {
            throughput: "500 rpm",
            response_time: "150ms", 
            error_rate: "0.2%"
        },
        "recommendation-service": {
            throughput: "200 rpm",
            response_time: "300ms",
            error_rate: "0.1%"
        },
        "user-service": {
            throughput: "800 rpm", 
            response_time: "80ms",
            error_rate: "0.1%"
        }
    },
    
    // デプロイ後(問題発生)
    after_deployment: {
        "product-service": {
            throughput: "500 rpm",     // 変化なし
            response_time: "150ms",    // 変化なし
            error_rate: "0.2%"        // 変化なし
        },
        "recommendation-service": {
            throughput: "200 rpm",
            response_time: "2.8s",     // ← 大幅悪化!
            error_rate: "0.1%",
            cpu_usage: "95%",          // CPU使用率急上昇
            memory_usage: "87%"        // メモリ使用率上昇
        },
        "user-service": {
            throughput: "600 rpm",     // ← 減少(連鎖影響)
            response_time: "180ms",    // ← 悪化(連鎖影響)
            error_rate: "1.2%"        // ← 悪化(連鎖影響)
        }
    },
    
    // 根本原因特定
    root_cause_analysis: {
        primary_issue: "recommendation-service の新アルゴリズムがCPU集約的",
        cascade_effects: [
            "recommendation-service の応答遅延",
            "user-service のタイムアウト増加", 
            "全体的なユーザーエクスペリエンス悪化"
        ],
        solution: [
            "recommendation-service のアルゴリズム最適化",
            "CPU/メモリリソースのスケールアップ",
            "非同期処理への変更"
        ]
    }
};

ケース3: 外部API障害の影響範囲特定

状況: 支払いプロバイダー(Stripe)で障害発生、影響範囲の迅速な特定が必要

python
# サービスマップを活用した障害影響分析
def analyze_external_service_outage(external_service_name):
    """外部サービス障害の影響分析"""
    
    impact_analysis = {
        "affected_external_service": external_service_name,
        "direct_dependencies": [],      # 直接影響を受けるサービス
        "indirect_dependencies": [],    # 間接的に影響を受けるサービス 
        "business_impact": {},         # ビジネス影響
        "mitigation_actions": []       # 対処法
    }
    
    # Stripeの例
    if external_service_name == "Stripe Payment API":
        impact_analysis.update({
            "direct_dependencies": [
                {
                    "service": "payment-service",
                    "impact": "critical",
                    "error_rate": "100%",           # 全決済が失敗
                    "fallback_available": False
                }
            ],
            "indirect_dependencies": [
                {
                    "service": "checkout-service", 
                    "impact": "critical",
                    "error_rate": "85%",            # チェックアウトの85%が失敗
                    "degraded_functions": ["payment_processing"]
                },
                {
                    "service": "order-service",
                    "impact": "medium",
                    "error_rate": "25%",            # 注文の25%が未完了状態
                    "degraded_functions": ["order_completion"]
                },
                {
                    "service": "user-service",
                    "impact": "low", 
                    "error_rate": "5%",             # ユーザー体験の軽微な悪化
                    "degraded_functions": ["purchase_history_update"]
                }
            ],
            "business_impact": {
                "revenue_loss_per_hour": "$12,000",
                "affected_customers": 450,
                "conversion_rate_drop": "78%"
            },
            "mitigation_actions": [
                "backup_payment_provider を有効化(PayPal)",
                "checkout プロセスでの明確なエラーメッセージ表示", 
                "顧客サポートチームへの状況連絡",
                "ソーシャルメディアでの障害告知"
            ]
        })
    
    return impact_analysis

# 使用例:リアルタイム監視
def monitor_service_map_health():
    """サービスマップの健全性リアルタイム監視"""
    
    service_health = get_current_service_map_status()
    
    for service in service_health:
        # 異常検出
        if service['error_rate'] > 5.0:  # 5%以上のエラー率
            alert_data = {
                "severity": "critical",
                "service": service['name'],
                "issue": f"High error rate: {service['error_rate']}%",
                "affected_dependencies": get_downstream_services(service['name']),
                "business_impact": calculate_business_impact(service['name'])
            }
            
            send_alert_to_oncall(alert_data)
            trigger_auto_remediation(service['name'])
        
        elif service['response_time'] > service['sla_threshold']:
            alert_data = {
                "severity": "warning", 
                "service": service['name'],
                "issue": f"SLA violation: {service['response_time']}ms > {service['sla_threshold']}ms",
                "trend": get_performance_trend(service['name'], hours=6)
            }
            
            send_alert_to_team(alert_data)

5.2.5 実践的トラブルシューティング

分散トレーシングを活用した障害分析

実際の本番障害では、分散トレーシングが問題解決の時間を大幅に短縮します。

シナリオ1: 間欠的なタイムアウト障害

状況:

  • ユーザーから「たまにチェックアウトが失敗する」という報告
  • 従来監視では「全体的には正常」と表示
  • 分散トレーシングで真の原因を特定
javascript
// 間欠的障害のトレース分析例
const intermittentIssueAnalysis = {
    
    // 正常なトレース(ベースライン)
    normal_trace: {
        trace_id: "normal-abc123",
        total_duration: "450ms",
        spans: [
            { service: "api-gateway", duration: "15ms", status: "success" },
            { service: "auth-service", duration: "85ms", status: "success" },
            { service: "inventory-service", duration: "120ms", status: "success" },
            { service: "payment-service", duration: "180ms", status: "success" },
            { service: "order-service", duration: "50ms", status: "success" }
        ],
        outcome: "checkout_success"
    },
    
    // 問題のあるトレース
    problematic_trace: {
        trace_id: "problem-def456", 
        total_duration: "15.2s",    // 異常な長時間
        spans: [
            { service: "api-gateway", duration: "15ms", status: "success" },
            { service: "auth-service", duration: "85ms", status: "success" },
            { 
                service: "inventory-service", 
                duration: "12.8s",          // ← 異常!
                status: "success",          // ステータスは成功だが異常に遅い
                details: {
                    database_query: "12.5s", // データベースクエリが異常に遅い
                    query: "SELECT * FROM inventory WHERE product_id IN (...)",
                    slow_query_reason: "missing_index_on_large_table"
                }
            },
            { 
                service: "payment-service", 
                duration: "timeout",        // inventory遅延により実行されず
                status: "timeout_error" 
            }
        ],
        outcome: "checkout_timeout_failure"
    },
    
    // パターン分析
    pattern_analysis: {
        frequency: "5% of requests (intermittent)",
        trigger_condition: "large product catalogs with >100 items",
        time_correlation: "occurs during peak hours (high DB load)",
        affected_user_segment: "enterprise customers with large orders"
    },
    
    // 解決策
    resolution: {
        immediate: "add database index on (product_id, warehouse_id)",
        short_term: "implement query result caching", 
        long_term: "partition inventory table by product category"
    }
};

// トレース分析自動化
function analyzeTracePatterns(traces) {
    """トレースパターンの自動分析"""
    
    const analysis = {
        slow_traces: [],
        error_traces: [],
        timeout_traces: [],
        patterns: {}
    };
    
    traces.forEach(trace => {
        // 遅延トレース検出
        if (trace.duration > 5000) {  // 5秒以上
            analysis.slow_traces.push({
                trace_id: trace.id,
                duration: trace.duration,
                bottleneck_service: findBottleneckService(trace.spans),
                business_impact: calculateBusinessImpact(trace)
            });
        }
        
        // エラートレース検出
        if (trace.spans.some(span => span.status === 'error')) {
            analysis.error_traces.push({
                trace_id: trace.id,
                error_service: trace.spans.find(span => span.status === 'error').service,
                error_message: trace.spans.find(span => span.status === 'error').error,
                cascade_effect: analyzeCascadeEffect(trace.spans)
            });
        }
    });
    
    // パターン分析
    analysis.patterns = {
        common_bottlenecks: findCommonBottlenecks(analysis.slow_traces),
        frequent_errors: findFrequentErrors(analysis.error_traces),
        service_correlation: analyzeServiceCorrelation(traces)
    };
    
    return analysis;
}

シナリオ2: カスケード障害の分析

状況:

  • 1つのマイクロサービスの障害が全システムに波及
  • どのサービスが起点かを迅速に特定する必要
python
# カスケード障害の分析例
import json
from datetime import datetime, timedelta

class CascadeFailureAnalyzer:
    
    def __init__(self, nr_client):
        self.nr_client = nr_client
        
    def analyze_cascade_failure(self, incident_start_time):
        """カスケード障害の起点と波及パターンを分析"""
        
        # 障害発生時間帯のトレースを取得
        traces = self.get_traces_in_timerange(
            incident_start_time,
            incident_start_time + timedelta(hours=1)
        )
        
        # 時系列での障害発生パターン分析
        failure_timeline = self.build_failure_timeline(traces)
        
        # 起点サービス特定
        root_cause_service = self.identify_root_cause(failure_timeline)
        
        # 波及経路分析
        propagation_path = self.trace_failure_propagation(failure_timeline, root_cause_service)
        
        return {
            "root_cause_service": root_cause_service,
            "failure_timeline": failure_timeline,
            "propagation_path": propagation_path,
            "business_impact": self.calculate_cascade_impact(propagation_path)
        }
    
    def build_failure_timeline(self, traces):
        """障害発生の時系列分析"""
        
        timeline = {}
        
        for trace in traces:
            trace_time = trace['start_time']
            
            for span in trace['spans']:
                if span['status'] == 'error':
                    service = span['service']
                    
                    if service not in timeline:
                        timeline[service] = {
                            'first_error': trace_time,
                            'error_count': 0,
                            'error_types': {}
                        }
                    
                    timeline[service]['error_count'] += 1
                    
                    error_type = span.get('error_type', 'unknown')
                    if error_type not in timeline[service]['error_types']:
                        timeline[service]['error_types'][error_type] = 0
                    timeline[service]['error_types'][error_type] += 1
        
        # 時刻順にソート
        return dict(sorted(timeline.items(), 
                          key=lambda x: x[1]['first_error']))
    
    def identify_root_cause(self, failure_timeline):
        """起点サービスの特定"""
        
        # 最初にエラーが発生したサービス
        first_failure_service = next(iter(failure_timeline))
        
        # 検証:そのサービスが他サービスの依存関係上流にあるか
        dependency_graph = self.get_service_dependency_graph()
        
        if self.is_upstream_service(first_failure_service, dependency_graph):
            return {
                "service": first_failure_service,
                "confidence": "high",
                "reasoning": "First to fail and upstream dependency"
            }
        else:
            # より詳細な分析が必要
            return self.deep_root_cause_analysis(failure_timeline)
    
    def trace_failure_propagation(self, timeline, root_cause):
        """障害波及経路の分析"""
        
        propagation = {
            "root": root_cause["service"],
            "waves": []  # 波及の段階
        }
        
        dependency_graph = self.get_service_dependency_graph()
        root_service = root_cause["service"]
        
        # 第1波:直接依存サービス
        first_wave = []
        for service, deps in dependency_graph.items():
            if root_service in deps:
                if service in timeline:
                    first_wave.append({
                        "service": service,
                        "failure_delay": self.calculate_delay(
                            timeline[root_service]['first_error'],
                            timeline[service]['first_error']
                        ),
                        "impact_severity": self.assess_impact_severity(service, timeline)
                    })
        
        propagation["waves"].append({
            "wave_number": 1,
            "affected_services": first_wave,
            "propagation_mechanism": "direct_dependency"
        })
        
        # 第2波以降:間接的影響
        second_wave_services = set()
        for first_wave_service in first_wave:
            for service, deps in dependency_graph.items():
                if first_wave_service["service"] in deps:
                    second_wave_services.add(service)
        
        # ... 続きの波及分析
        
        return propagation
    
    def generate_cascade_report(self, analysis):
        """カスケード障害レポート生成"""
        
        report = f"""
        # Cascade Failure Analysis Report
        
        ## Root Cause
        - **Service**: {analysis['root_cause_service']['service']}
        - **Confidence**: {analysis['root_cause_service']['confidence']}
        - **First Error**: {analysis['failure_timeline'][analysis['root_cause_service']['service']]['first_error']}
        
        ## Propagation Pattern
        """
        
        for wave in analysis['propagation_path']['waves']:
            report += f"""
            ### Wave {wave['wave_number']} ({wave['propagation_mechanism']})
            """
            for service in wave['affected_services']:
                report += f"""
            - **{service['service']}**: Failed {service['failure_delay']} after root cause
            """
        
        report += f"""
        ## Business Impact
        - **Total Affected Users**: {analysis['business_impact']['affected_users']}
        - **Revenue Impact**: ${analysis['business_impact']['revenue_loss']}
        - **Recovery Time**: {analysis['business_impact']['recovery_time']}
        """
        
        return report

シナリオ3: パフォーマンス回帰の特定

状況:

  • 新しいデプロイ後、全体的にレスポンスが遅くなった
  • どの変更が影響しているかを特定
javascript
// デプロイメント影響分析
class DeploymentImpactAnalyzer {
    
    constructor(newRelicClient) {
        this.nr = newRelicClient;
    }
    
    async analyzeDeploymentImpact(deploymentTime, compareHours = 2) {
        """デプロイメント前後のパフォーマンス比較分析"""
        
        const beforeStart = new Date(deploymentTime.getTime() - (compareHours * 60 * 60 * 1000));
        const beforeEnd = deploymentTime;
        const afterStart = deploymentTime; 
        const afterEnd = new Date(deploymentTime.getTime() + (compareHours * 60 * 60 * 1000));
        
        // 前後のトレースデータ取得
        const beforeTraces = await this.getTracesInPeriod(beforeStart, beforeEnd);
        const afterTraces = await this.getTracesInPeriod(afterStart, afterEnd);
        
        // サービス別パフォーマンス比較
        const performanceComparison = this.compareServicePerformance(beforeTraces, afterTraces);
        
        // 回帰検出
        const regressions = this.detectPerformanceRegressions(performanceComparison);
        
        // 根本原因分析
        const rootCauseAnalysis = await this.analyzeRootCause(regressions, deploymentTime);
        
        return {
            deployment_time: deploymentTime,
            performance_comparison: performanceComparison,
            detected_regressions: regressions,
            root_cause_analysis: rootCauseAnalysis,
            recommendations: this.generateRecommendations(regressions)
        };
    }
    
    compareServicePerformance(beforeTraces, afterTraces) {
        """サービス別パフォーマンス比較"""
        
        const beforeStats = this.calculateServiceStats(beforeTraces);
        const afterStats = this.calculateServiceStats(afterTraces);
        
        const comparison = {};
        
        // 全サービスについて比較
        const allServices = new Set([...Object.keys(beforeStats), ...Object.keys(afterStats)]);
        
        for (const service of allServices) {
            const before = beforeStats[service] || this.getDefaultStats();
            const after = afterStats[service] || this.getDefaultStats();
            
            comparison[service] = {
                response_time: {
                    before: before.avg_response_time,
                    after: after.avg_response_time,
                    change_percent: ((after.avg_response_time - before.avg_response_time) / before.avg_response_time) * 100,
                    significance: this.assessStatisticalSignificance(before.response_times, after.response_times)
                },
                error_rate: {
                    before: before.error_rate,
                    after: after.error_rate,
                    change_percent: ((after.error_rate - before.error_rate) / before.error_rate) * 100
                },
                throughput: {
                    before: before.throughput,
                    after: after.throughput, 
                    change_percent: ((after.throughput - before.throughput) / before.throughput) * 100
                }
            };
        }
        
        return comparison;
    }
    
    detectPerformanceRegressions(comparison) {
        """パフォーマンス回帰の検出"""
        
        const regressions = [];
        
        for (const [service, metrics] of Object.entries(comparison)) {
            
            // レスポンス時間回帰
            if (metrics.response_time.change_percent > 20 && 
                metrics.response_time.significance === 'significant') {
                regressions.push({
                    service: service,
                    type: 'response_time_regression',
                    severity: this.assessRegressionSeverity(metrics.response_time.change_percent),
                    details: {
                        before: `${metrics.response_time.before}ms`,
                        after: `${metrics.response_time.after}ms`,
                        increase: `${metrics.response_time.change_percent.toFixed(1)}%`
                    }
                });
            }
            
            // エラー率回帰
            if (metrics.error_rate.change_percent > 50) {
                regressions.push({
                    service: service,
                    type: 'error_rate_regression', 
                    severity: 'critical',
                    details: {
                        before: `${metrics.error_rate.before}%`,
                        after: `${metrics.error_rate.after}%`,
                        increase: `${metrics.error_rate.change_percent.toFixed(1)}%`
                    }
                });
            }
            
            // スループット低下
            if (metrics.throughput.change_percent < -15) {
                regressions.push({
                    service: service,
                    type: 'throughput_regression',
                    severity: this.assessRegressionSeverity(Math.abs(metrics.throughput.change_percent)),
                    details: {
                        before: `${metrics.throughput.before} rpm`,
                        after: `${metrics.throughput.after} rpm`, 
                        decrease: `${Math.abs(metrics.throughput.change_percent).toFixed(1)}%`
                    }
                });
            }
        }
        
        return regressions.sort((a, b) => {
            const severityOrder = { 'critical': 3, 'high': 2, 'medium': 1, 'low': 0 };
            return severityOrder[b.severity] - severityOrder[a.severity];
        });
    }
    
    async analyzeRootCause(regressions, deploymentTime) {
        """回帰の根本原因分析"""
        
        const analysis = {
            deployment_correlation: {},
            code_changes: {},
            infrastructure_changes: {},
            dependency_impact: {}
        };
        
        // デプロイメント相関分析
        const deployments = await this.getDeploymentsNear(deploymentTime);
        
        for (const regression of regressions) {
            const serviceDeployments = deployments.filter(d => d.service === regression.service);
            
            if (serviceDeployments.length > 0) {
                analysis.deployment_correlation[regression.service] = {
                    deployment_found: true,
                    deployment_time: serviceDeployments[0].timestamp,
                    version_change: `${serviceDeployments[0].previous_version} → ${serviceDeployments[0].new_version}`,
                    confidence: 'high'
                };
            }
        }
        
        // 依存関係影響分析
        for (const regression of regressions) {
            const dependencies = await this.getServiceDependencies(regression.service);
            const affectedDependencies = dependencies.filter(dep => 
                regressions.some(r => r.service === dep)
            );
            
            if (affectedDependencies.length > 0) {
                analysis.dependency_impact[regression.service] = {
                    affected_dependencies: affectedDependencies,
                    impact_type: 'cascading_effect'
                };
            }
        }
        
        return analysis;
    }
}

5.2.6 パフォーマンス最適化戦略

トレーシングデータを活用した最適化

分散トレーシングから得られるデータは、システム全体の最適化戦略立案に極めて有効です。

データ駆動型最適化アプローチ

python
# トレーシングデータ分析による最適化提案システム
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

class PerformanceOptimizationAdvisor:
    
    def __init__(self, newrelic_client):
        self.nr = newrelic_client
        
    def generate_optimization_recommendations(self, analysis_period_days=30):
        """パフォーマンス最適化推奨事項生成"""
        
        # 過去のトレーシングデータ分析
        traces_data = self.collect_traces_data(analysis_period_days)
        
        # 最適化機会の特定
        optimization_opportunities = {
            'slow_services': self.identify_slow_services(traces_data),
            'inefficient_queries': self.identify_slow_queries(traces_data),
            'external_api_issues': self.analyze_external_api_performance(traces_data),
            'resource_bottlenecks': self.identify_resource_bottlenecks(traces_data),
            'caching_opportunities': self.identify_caching_opportunities(traces_data)
        }
        
        # 優先度付きレコメンデーション生成
        recommendations = self.prioritize_optimizations(optimization_opportunities)
        
        # ROI予測
        roi_predictions = self.predict_optimization_roi(recommendations)
        
        return {
            'analysis_period': f"{analysis_period_days} days",
            'total_traces_analyzed': len(traces_data),
            'optimization_opportunities': optimization_opportunities,
            'prioritized_recommendations': recommendations,
            'roi_predictions': roi_predictions
        }
    
    def identify_slow_services(self, traces_data):
        """遅いサービスの特定と分析"""
        
        service_performance = {}
        
        for trace in traces_data:
            for span in trace['spans']:
                service = span['service']
                duration = span['duration']
                
                if service not in service_performance:
                    service_performance[service] = {
                        'durations': [],
                        'error_count': 0,
                        'total_calls': 0
                    }
                
                service_performance[service]['durations'].append(duration)
                service_performance[service]['total_calls'] += 1
                
                if span.get('error', False):
                    service_performance[service]['error_count'] += 1
        
        # 統計計算と問題サービス特定
        slow_services = []
        
        for service, perf_data in service_performance.items():
            durations = np.array(perf_data['durations'])
            
            metrics = {
                'service': service,
                'avg_duration': np.mean(durations),
                'p95_duration': np.percentile(durations, 95),
                'p99_duration': np.percentile(durations, 99),
                'error_rate': perf_data['error_count'] / perf_data['total_calls'],
                'total_calls': perf_data['total_calls']
            }
            
            # 遅いサービスの判定基準
            if (metrics['avg_duration'] > 1000 or        # 平均1秒以上
                metrics['p95_duration'] > 3000 or       # P95が3秒以上
                metrics['error_rate'] > 0.05):          # エラー率5%以上
                
                metrics['optimization_potential'] = self.calculate_optimization_potential(metrics)
                slow_services.append(metrics)
        
        return sorted(slow_services, key=lambda x: x['optimization_potential'], reverse=True)
    
    def identify_slow_queries(self, traces_data):
        """遅いデータベースクエリの特定"""
        
        database_operations = []
        
        for trace in traces_data:
            for span in trace['spans']:
                if span.get('type') == 'database' or 'sql' in span.get('tags', {}):
                    database_operations.append({
                        'service': span['service'],
                        'query': span.get('tags', {}).get('sql', 'unknown'),
                        'duration': span['duration'],
                        'table': self.extract_table_name(span.get('tags', {}).get('sql', '')),
                        'operation': self.extract_operation_type(span.get('tags', {}).get('sql', ''))
                    })
        
        # クエリパフォーマンス分析
        query_analysis = {}
        
        for op in database_operations:
            query_signature = self.normalize_query(op['query'])
            
            if query_signature not in query_analysis:
                query_analysis[query_signature] = {
                    'durations': [],
                    'services': set(),
                    'tables': set(),
                    'operation_type': op['operation']
                }
            
            query_analysis[query_signature]['durations'].append(op['duration'])
            query_analysis[query_signature]['services'].add(op['service'])
            query_analysis[query_signature]['tables'].add(op['table'])
        
        # 遅いクエリの特定
        slow_queries = []
        
        for query_sig, analysis in query_analysis.items():
            durations = np.array(analysis['durations'])
            
            if np.mean(durations) > 500:  # 平均500ms以上
                slow_queries.append({
                    'query_signature': query_sig,
                    'avg_duration': np.mean(durations),
                    'p95_duration': np.percentile(durations, 95),
                    'execution_count': len(durations),
                    'affected_services': list(analysis['services']),
                    'affected_tables': list(analysis['tables']),
                    'optimization_suggestions': self.suggest_query_optimizations(query_sig, analysis)
                })
        
        return sorted(slow_queries, key=lambda x: x['avg_duration'] * x['execution_count'], reverse=True)
    
    def analyze_external_api_performance(self, traces_data):
        """外部API呼び出しパフォーマンス分析"""
        
        external_apis = {}
        
        for trace in traces_data:
            for span in trace['spans']:
                if span.get('type') == 'http' and self.is_external_call(span):
                    api_endpoint = self.normalize_api_endpoint(span.get('url', ''))
                    
                    if api_endpoint not in external_apis:
                        external_apis[api_endpoint] = {
                            'durations': [],
                            'status_codes': [],
                            'calling_services': set(),
                            'errors': []
                        }
                    
                    external_apis[api_endpoint]['durations'].append(span['duration'])
                    external_apis[api_endpoint]['status_codes'].append(span.get('http_status', 200))
                    external_apis[api_endpoint]['calling_services'].add(span['service'])
                    
                    if span.get('error', False):
                        external_apis[api_endpoint]['errors'].append(span.get('error_message', ''))
        
        # 問題のある外部API特定
        problematic_apis = []
        
        for api, data in external_apis.items():
            durations = np.array(data['durations'])
            error_rate = len(data['errors']) / len(durations)
            
            if np.mean(durations) > 2000 or error_rate > 0.10:  # 平均2秒以上 or エラー率10%以上
                problematic_apis.append({
                    'api_endpoint': api,
                    'avg_duration': np.mean(durations),
                    'p95_duration': np.percentile(durations, 95),
                    'error_rate': error_rate,
                    'call_count': len(durations),
                    'calling_services': list(data['calling_services']),
                    'optimization_recommendations': self.suggest_external_api_optimizations(api, data)
                })
        
        return sorted(problematic_apis, 
                     key=lambda x: x['avg_duration'] * x['call_count'], 
                     reverse=True)
    
    def suggest_query_optimizations(self, query_signature, analysis):
        """データベースクエリ最適化提案"""
        
        suggestions = []
        
        # SELECT * の検出
        if 'SELECT *' in query_signature.upper():
            suggestions.append({
                'type': 'query_optimization',
                'suggestion': 'Select specific columns instead of SELECT *',
                'expected_improvement': '20-40% performance gain',
                'implementation': 'Replace SELECT * with specific column names'
            })
        
        # LIMIT句の不足検出
        if 'LIMIT' not in query_signature.upper() and 'SELECT' in query_signature.upper():
            suggestions.append({
                'type': 'query_optimization', 
                'suggestion': 'Add LIMIT clause to prevent large result sets',
                'expected_improvement': '30-60% performance gain',
                'implementation': 'Add appropriate LIMIT based on business requirements'
            })
        
        # JOINの多用検出
        join_count = query_signature.upper().count('JOIN')
        if join_count > 3:
            suggestions.append({
                'type': 'architecture_optimization',
                'suggestion': f'Complex query with {join_count} JOINs detected',
                'expected_improvement': '40-70% performance gain',
                'implementation': 'Consider query decomposition or denormalization'
            })
        
        # インデックス推奨
        if analysis['operation_type'] in ['SELECT', 'UPDATE', 'DELETE']:
            suggestions.append({
                'type': 'indexing',
                'suggestion': 'Analyze and optimize indexes for this query pattern',
                'expected_improvement': '50-80% performance gain',
                'implementation': 'Run EXPLAIN PLAN and create appropriate indexes'
            })
        
        return suggestions
    
    def suggest_external_api_optimizations(self, api_endpoint, data):
        """外部API最適化提案"""
        
        suggestions = []
        
        avg_duration = np.mean(data['durations'])
        error_rate = len(data['errors']) / len(data['durations'])
        
        # 高レイテンシー対策
        if avg_duration > 2000:
            suggestions.append({
                'type': 'caching',
                'suggestion': 'Implement response caching for this API',
                'expected_improvement': '60-90% latency reduction',
                'implementation': 'Cache responses for 5-15 minutes based on data freshness requirements'
            })
            
            suggestions.append({
                'type': 'async_processing',
                'suggestion': 'Move API calls to background processing',
                'expected_improvement': '95% user-facing latency reduction',
                'implementation': 'Queue API calls and process asynchronously'
            })
        
        # 高エラー率対策
        if error_rate > 0.05:
            suggestions.append({
                'type': 'resilience',
                'suggestion': 'Implement circuit breaker pattern',
                'expected_improvement': 'Prevent cascade failures',
                'implementation': 'Use circuit breaker with fallback mechanisms'
            })
            
            suggestions.append({
                'type': 'retry_strategy',
                'suggestion': 'Implement exponential backoff retry',
                'expected_improvement': '30-50% error rate reduction', 
                'implementation': 'Add retry logic with exponential backoff'
            })
        
        return suggestions
    
    def generate_optimization_roadmap(self, recommendations):
        """最適化ロードマップ生成"""
        
        roadmap = {
            'phase_1_quick_wins': [],     # 1-2週間で実装可能
            'phase_2_medium_effort': [],  # 1-2ヶ月で実装
            'phase_3_major_changes': []   # 3-6ヶ月の大規模変更
        }
        
        for rec in recommendations:
            effort = self.estimate_implementation_effort(rec)
            impact = self.estimate_business_impact(rec)
            
            if effort == 'low' and impact >= 'medium':
                roadmap['phase_1_quick_wins'].append(rec)
            elif effort == 'medium':
                roadmap['phase_2_medium_effort'].append(rec)
            else:
                roadmap['phase_3_major_changes'].append(rec)
        
        return roadmap

まとめ

本章では、分散トレーシングとサービスマップの実装から活用まで、マイクロサービス環境での監視を包括的に解説しました。

🎯 重要なポイント

1. 分散トレーシングの価値

  • 複雑なサービス間の処理フローを1本の線として可視化
  • 根本原因の迅速な特定:障害時のMTTR大幅短縮
  • パフォーマンスボトルネックの正確な局在化

2. New Relic + OpenTelemetry の優位性

  • ベンダーニュートラル:ツール乗り換え時のコスト削減
  • 標準化された実装:一度の実装で複数ツール対応
  • 豊富なエコシステム:自動計装の幅広いサポート

3. サービスマップの戦略的活用

  • リアルタイム依存関係可視化:システム全体の健全性把握
  • 障害影響範囲の即座特定:カスケード障害の予防と対処
  • 容量計画・アーキテクチャ改善:データ駆動型の意思決定

4. 実践的トラブルシューティング

  • 間欠的障害の特定:従来監視では見えない問題の発見
  • カスケード障害分析:起点サービスと波及経路の解明
  • パフォーマンス回帰検出:デプロイメント影響の定量的分析

💡 次のステップ

実装のロードマップ

  1. 基本分散トレーシング:主要サービスでの分散トレーシング有効化
  2. OpenTelemetry統合:標準化されたテレメトリー収集の実装
  3. サービスマップ活用:依存関係可視化とアラート連携
  4. 高度な分析:自動化されたパフォーマンス分析と最適化

期待される成果

  • MTTR短縮: 障害対応時間60-80%削減
  • パフォーマンス向上: ボトルネック特定・解消による20-40%改善
  • システム信頼性: カスケード障害の予防と影響最小化
  • 開発生産性: デバッグ時間短縮による開発効率向上

分散トレーシングは、現代のマイクロサービス環境において不可欠な技術です。New RelicとOpenTelemetryを活用することで、複雑なシステムの可視性を劇的に向上させ、高い信頼性とパフォーマンスを実現できます。


📖 ナビゲーション

メイン: 第5章 New Relic APM(高度化)
前セクション: 第5.1章 APM基本機能
次セクション: 第5.3章 コードレベル分析

関連記事:
第3章: New Relicの機能
第4章: New Relic Infrastructure
OpenTelemetry実践ガイド