OpenTelemetry による分散トレーシング実装

分散システムにおいて、リクエストが複数のサービスを横断する際の動作を理解することは重要です。OpenTelemetry を使用した分散トレーシングの実装方法を詳しく解説します。

OpenTelemetry とは?

OpenTelemetry は、アプリケーションのメトリクス、ログ、トレースを生成・収集・エクスポートするためのオープンソースの observability フレームワークです。

主要なコンポーネント

  • Traces: リクエストの経路と処理時間
  • Metrics: システムの健全性指標
  • Logs: 詳細なイベント情報

実装アーキテクチャ

mermaid
graph TD
    A[Client] --> B[API Gateway]
    B --> C[User Service]
    B --> D[Order Service]
    C --> E[Database]
    D --> F[Payment Service]
    D --> G[Inventory Service]
    
    H[OpenTelemetry Collector] --> I[Jaeger]
    H --> J[Prometheus]
    
    C -.-> H
    D -.-> H
    F -.-> H
    G -.-> H

Python での実装

1. 基本設定

python
# requirements.txt
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-instrumentation-fastapi==0.42b0
opentelemetry-instrumentation-requests==0.42b0
opentelemetry-instrumentation-sqlalchemy==0.42b0
opentelemetry-exporter-jaeger-thrift==1.21.0
opentelemetry-propagator-b3==1.21.0
python
# tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource

def configure_tracing(service_name: str, jaeger_endpoint: str):
    """OpenTelemetry トレーシングの設定"""
    
    # リソース情報の設定
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
        "deployment.environment": "production"
    })
    
    # TraceProvider の設定
    trace.set_tracer_provider(TracerProvider(resource=resource))
    tracer = trace.get_tracer(__name__)
    
    # Jaeger エクスポーターの設定
    jaeger_exporter = JaegerExporter(
        agent_host_name="localhost",
        agent_port=6831,
    )
    
    # スパンプロセッサーの追加
    span_processor = BatchSpanProcessor(jaeger_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    # 自動計装の有効化
    FastAPIInstrumentor.instrument()
    RequestsInstrumentor.instrument()
    SQLAlchemyInstrumentor.instrument()
    
    return tracer

2. FastAPI アプリケーション

python
# main.py
from fastapi import FastAPI, Depends
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import requests
import asyncio
from tracing import configure_tracing

# トレーシングの初期化
tracer = configure_tracing("user-service", "http://localhost:14268")

app = FastAPI(title="User Service")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    """ユーザー情報を取得"""
    
    # 手動でスパンを作成
    with tracer.start_as_current_span("get_user_operation") as span:
        try:
            # スパンに属性を追加
            span.set_attribute("user.id", user_id)
            span.set_attribute("service.operation", "get_user")
            
            # データベースからユーザー情報を取得
            user_data = await fetch_user_from_db(user_id)
            
            if not user_data:
                span.set_status(Status(StatusCode.ERROR, "User not found"))
                raise HTTPException(status_code=404, detail="User not found")
            
            # 外部サービスから追加情報を取得
            preferences = await fetch_user_preferences(user_id)
            
            span.set_attribute("user.preferences_count", len(preferences))
            span.set_status(Status(StatusCode.OK))
            
            return {
                "user": user_data,
                "preferences": preferences
            }
            
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            raise

async def fetch_user_from_db(user_id: int):
    """データベースからユーザー情報を取得"""
    
    with tracer.start_as_current_span("db_query_user") as span:
        span.set_attribute("db.operation", "SELECT")
        span.set_attribute("db.table", "users")
        span.set_attribute("db.user_id", user_id)
        
        # 模擬的なデータベースクエリ
        await asyncio.sleep(0.1)  # DB応答時間をシミュレート
        
        return {
            "id": user_id,
            "name": f"User {user_id}",
            "email": f"user{user_id}@example.com"
        }

async def fetch_user_preferences(user_id: int):
    """外部サービスからユーザー設定を取得"""
    
    with tracer.start_as_current_span("external_api_call") as span:
        span.set_attribute("http.method", "GET")
        span.set_attribute("http.url", f"http://preferences-service/users/{user_id}/preferences")
        
        try:
            # 外部API呼び出し(自動計装により自動的にトレース)
            response = requests.get(
                f"http://preferences-service/api/users/{user_id}/preferences",
                timeout=5
            )
            
            span.set_attribute("http.status_code", response.status_code)
            
            if response.status_code == 200:
                return response.json()
            else:
                span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}"))
                return []
                
        except requests.RequestException as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            return []

3. コンテキスト伝播

python
# context_propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests

def make_traced_request(url: str, method: str = "GET", **kwargs):
    """トレースコンテキストを伝播するHTTPリクエスト"""
    
    # 現在のトレースコンテキストを取得
    headers = kwargs.get("headers", {})
    
    # コンテキストをヘッダーに注入
    inject(headers)
    kwargs["headers"] = headers
    
    with tracer.start_as_current_span(f"http_{method.lower()}") as span:
        span.set_attribute("http.method", method)
        span.set_attribute("http.url", url)
        
        response = requests.request(method, url, **kwargs)
        span.set_attribute("http.status_code", response.status_code)
        
        return response

def extract_trace_context(headers: dict):
    """受信したリクエストからトレースコンテキストを抽出"""
    
    # ヘッダーからコンテキストを抽出
    context = extract(headers)
    
    # 抽出したコンテキストを現在のスパンに設定
    token = trace.set_span_in_context(trace.get_current_span(), context)
    
    return token

Go での実装

go
// main.go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func initTracer() func() {
    // Jaeger エクスポーターの作成
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint())
    if err != nil {
        log.Fatal(err)
    }
    
    // トレースプロバイダーの設定
    tp := trace.NewTracerProvider(
        trace.WithBatcher(exp),
        trace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("order-service"),
            semconv.ServiceVersionKey.String("v1.0.0"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    
    return func() {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Fatal(err)
        }
    }
}

func orderHandler(w http.ResponseWriter, r *http.Request) {
    tracer := otel.Tracer("order-service")
    
    ctx, span := tracer.Start(r.Context(), "process_order")
    defer span.End()
    
    // スパンに属性を追加
    span.SetAttributes(
        attribute.String("order.id", "order-123"),
        attribute.String("customer.id", "customer-456"),
    )
    
    // 支払い処理
    if err := processPayment(ctx, "order-123"); err != nil {
        span.RecordError(err)
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 在庫確認
    if err := checkInventory(ctx, "order-123"); err != nil {
        span.RecordError(err)
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    fmt.Fprintf(w, "Order processed successfully")
}

func processPayment(ctx context.Context, orderID string) error {
    tracer := otel.Tracer("order-service")
    
    _, span := tracer.Start(ctx, "process_payment")
    defer span.End()
    
    span.SetAttributes(
        attribute.String("payment.order_id", orderID),
        attribute.String("payment.method", "credit_card"),
    )
    
    // 支払い処理のロジック
    // ...
    
    return nil
}

func checkInventory(ctx context.Context, orderID string) error {
    tracer := otel.Tracer("order-service")
    
    _, span := tracer.Start(ctx, "check_inventory")
    defer span.End()
    
    span.SetAttributes(
        attribute.String("inventory.order_id", orderID),
    )
    
    // 在庫確認のロジック
    // ...
    
    return nil
}

func main() {
    cleanup := initTracer()
    defer cleanup()
    
    // HTTP クライアントとサーバーの計装
    client := http.Client{
        Transport: otelhttp.NewTransport(http.DefaultTransport),
    }
    
    handler := http.HandlerFunc(orderHandler)
    wrappedHandler := otelhttp.NewHandler(handler, "order_endpoint")
    
    http.Handle("/orders", wrappedHandler)
    
    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Docker Compose でのセットアップ

yaml
# docker-compose.yml
version: '3.8'

services:
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # Jaeger UI
      - "6831:6831/udp"  # Jaeger agent UDP
    environment:
      - COLLECTOR_OTLP_ENABLED=true

  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"  # OTLP gRPC receiver
      - "4318:4318"  # OTLP HTTP receiver
    depends_on:
      - jaeger

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  user-service:
    build: ./user-service
    ports:
      - "8001:8000"
    environment:
      - JAEGER_ENDPOINT=http://jaeger:14268
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - otel-collector

  order-service:
    build: ./order-service
    ports:
      - "8002:8080"
    environment:
      - JAEGER_ENDPOINT=http://jaeger:14268
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - otel-collector

運用のベストプラクティス

1. サンプリング戦略

python
from opentelemetry.sdk.trace.sampling import (
    TraceIdRatioBased, 
    ParentBased, 
    Decision
)

# 本番環境では適切なサンプリング率を設定
sampler = ParentBased(
    root=TraceIdRatioBased(rate=0.1)  # 10% のトレースをサンプリング
)

trace.set_tracer_provider(
    TracerProvider(
        resource=resource,
        sampler=sampler
    )
)

2. パフォーマンス監視

python
import time
from opentelemetry.metrics import get_meter

meter = get_meter(__name__)

# カスタムメトリクスの定義
request_duration = meter.create_histogram(
    name="http_request_duration_seconds",
    description="HTTP request duration",
    unit="s"
)

request_counter = meter.create_counter(
    name="http_requests_total",
    description="Total HTTP requests"
)

def monitor_request_performance(func):
    """リクエストパフォーマンスを監視するデコレーター"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            status = "success"
            return result
        except Exception as e:
            status = "error"
            raise
        finally:
            duration = time.time() - start_time
            
            # メトリクスを記録
            request_duration.record(
                duration,
                {"status": status, "endpoint": func.__name__}
            )
            request_counter.add(
                1,
                {"status": status, "endpoint": func.__name__}
            )
    
    return wrapper

まとめ

OpenTelemetry を使用した分散トレーシングにより、以下の利点が得られます:

  1. 可視性の向上 - マイクロサービス間の依存関係と通信パターンの把握
  2. パフォーマンス最適化 - ボトルネックの特定と解決
  3. 障害調査の効率化 - 問題発生箇所の迅速な特定
  4. SLA 監視 - サービスレベルの測定と改善

適切に実装された分散トレーシングは、複雑な分散システムの運用において不可欠なツールです。

タグ: OpenTelemetry, 分散トレーシング, 観測可能性, マイクロサービス, テレメトリ


関連記事: Monitoring Best Practices | Microservices Observability