OpenTelemetry による分散トレーシング実装
分散システムにおいて、リクエストが複数のサービスを横断する際の動作を理解することは重要です。OpenTelemetry を使用した分散トレーシングの実装方法を詳しく解説します。
OpenTelemetry とは?
OpenTelemetry は、アプリケーションのメトリクス、ログ、トレースを生成・収集・エクスポートするためのオープンソースの observability フレームワークです。
主要なコンポーネント
- Traces: リクエストの経路と処理時間
- Metrics: システムの健全性指標
- Logs: 詳細なイベント情報
実装アーキテクチャ
mermaid
graph TD
A[Client] --> B[API Gateway]
B --> C[User Service]
B --> D[Order Service]
C --> E[Database]
D --> F[Payment Service]
D --> G[Inventory Service]
H[OpenTelemetry Collector] --> I[Jaeger]
H --> J[Prometheus]
C -.-> H
D -.-> H
F -.-> H
G -.-> H
Python での実装
1. 基本設定
python
# requirements.txt
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-instrumentation-fastapi==0.42b0
opentelemetry-instrumentation-requests==0.42b0
opentelemetry-instrumentation-sqlalchemy==0.42b0
opentelemetry-exporter-jaeger-thrift==1.21.0
opentelemetry-propagator-b3==1.21.0
python
# tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
def configure_tracing(service_name: str, jaeger_endpoint: str):
"""OpenTelemetry トレーシングの設定"""
# リソース情報の設定
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production"
})
# TraceProvider の設定
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
# Jaeger エクスポーターの設定
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
# スパンプロセッサーの追加
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 自動計装の有効化
FastAPIInstrumentor.instrument()
RequestsInstrumentor.instrument()
SQLAlchemyInstrumentor.instrument()
return tracer
2. FastAPI アプリケーション
python
# main.py
from fastapi import FastAPI, Depends
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import requests
import asyncio
from tracing import configure_tracing
# トレーシングの初期化
tracer = configure_tracing("user-service", "http://localhost:14268")
app = FastAPI(title="User Service")
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""ユーザー情報を取得"""
# 手動でスパンを作成
with tracer.start_as_current_span("get_user_operation") as span:
try:
# スパンに属性を追加
span.set_attribute("user.id", user_id)
span.set_attribute("service.operation", "get_user")
# データベースからユーザー情報を取得
user_data = await fetch_user_from_db(user_id)
if not user_data:
span.set_status(Status(StatusCode.ERROR, "User not found"))
raise HTTPException(status_code=404, detail="User not found")
# 外部サービスから追加情報を取得
preferences = await fetch_user_preferences(user_id)
span.set_attribute("user.preferences_count", len(preferences))
span.set_status(Status(StatusCode.OK))
return {
"user": user_data,
"preferences": preferences
}
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
async def fetch_user_from_db(user_id: int):
"""データベースからユーザー情報を取得"""
with tracer.start_as_current_span("db_query_user") as span:
span.set_attribute("db.operation", "SELECT")
span.set_attribute("db.table", "users")
span.set_attribute("db.user_id", user_id)
# 模擬的なデータベースクエリ
await asyncio.sleep(0.1) # DB応答時間をシミュレート
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
async def fetch_user_preferences(user_id: int):
"""外部サービスからユーザー設定を取得"""
with tracer.start_as_current_span("external_api_call") as span:
span.set_attribute("http.method", "GET")
span.set_attribute("http.url", f"http://preferences-service/users/{user_id}/preferences")
try:
# 外部API呼び出し(自動計装により自動的にトレース)
response = requests.get(
f"http://preferences-service/api/users/{user_id}/preferences",
timeout=5
)
span.set_attribute("http.status_code", response.status_code)
if response.status_code == 200:
return response.json()
else:
span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}"))
return []
except requests.RequestException as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
return []
3. コンテキスト伝播
python
# context_propagation.py
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import requests
def make_traced_request(url: str, method: str = "GET", **kwargs):
"""トレースコンテキストを伝播するHTTPリクエスト"""
# 現在のトレースコンテキストを取得
headers = kwargs.get("headers", {})
# コンテキストをヘッダーに注入
inject(headers)
kwargs["headers"] = headers
with tracer.start_as_current_span(f"http_{method.lower()}") as span:
span.set_attribute("http.method", method)
span.set_attribute("http.url", url)
response = requests.request(method, url, **kwargs)
span.set_attribute("http.status_code", response.status_code)
return response
def extract_trace_context(headers: dict):
"""受信したリクエストからトレースコンテキストを抽出"""
# ヘッダーからコンテキストを抽出
context = extract(headers)
# 抽出したコンテキストを現在のスパンに設定
token = trace.set_span_in_context(trace.get_current_span(), context)
return token
Go での実装
go
// main.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
func initTracer() func() {
// Jaeger エクスポーターの作成
exp, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
log.Fatal(err)
}
// トレースプロバイダーの設定
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("order-service"),
semconv.ServiceVersionKey.String("v1.0.0"),
)),
)
otel.SetTracerProvider(tp)
return func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}
}
func orderHandler(w http.ResponseWriter, r *http.Request) {
tracer := otel.Tracer("order-service")
ctx, span := tracer.Start(r.Context(), "process_order")
defer span.End()
// スパンに属性を追加
span.SetAttributes(
attribute.String("order.id", "order-123"),
attribute.String("customer.id", "customer-456"),
)
// 支払い処理
if err := processPayment(ctx, "order-123"); err != nil {
span.RecordError(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 在庫確認
if err := checkInventory(ctx, "order-123"); err != nil {
span.RecordError(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Order processed successfully")
}
func processPayment(ctx context.Context, orderID string) error {
tracer := otel.Tracer("order-service")
_, span := tracer.Start(ctx, "process_payment")
defer span.End()
span.SetAttributes(
attribute.String("payment.order_id", orderID),
attribute.String("payment.method", "credit_card"),
)
// 支払い処理のロジック
// ...
return nil
}
func checkInventory(ctx context.Context, orderID string) error {
tracer := otel.Tracer("order-service")
_, span := tracer.Start(ctx, "check_inventory")
defer span.End()
span.SetAttributes(
attribute.String("inventory.order_id", orderID),
)
// 在庫確認のロジック
// ...
return nil
}
func main() {
cleanup := initTracer()
defer cleanup()
// HTTP クライアントとサーバーの計装
client := http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
handler := http.HandlerFunc(orderHandler)
wrappedHandler := otelhttp.NewHandler(handler, "order_endpoint")
http.Handle("/orders", wrappedHandler)
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Docker Compose でのセットアップ
yaml
# docker-compose.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "6831:6831/udp" # Jaeger agent UDP
environment:
- COLLECTOR_OTLP_ENABLED=true
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
depends_on:
- jaeger
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
user-service:
build: ./user-service
ports:
- "8001:8000"
environment:
- JAEGER_ENDPOINT=http://jaeger:14268
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- otel-collector
order-service:
build: ./order-service
ports:
- "8002:8080"
environment:
- JAEGER_ENDPOINT=http://jaeger:14268
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- otel-collector
運用のベストプラクティス
1. サンプリング戦略
python
from opentelemetry.sdk.trace.sampling import (
TraceIdRatioBased,
ParentBased,
Decision
)
# 本番環境では適切なサンプリング率を設定
sampler = ParentBased(
root=TraceIdRatioBased(rate=0.1) # 10% のトレースをサンプリング
)
trace.set_tracer_provider(
TracerProvider(
resource=resource,
sampler=sampler
)
)
2. パフォーマンス監視
python
import time
from opentelemetry.metrics import get_meter
meter = get_meter(__name__)
# カスタムメトリクスの定義
request_duration = meter.create_histogram(
name="http_request_duration_seconds",
description="HTTP request duration",
unit="s"
)
request_counter = meter.create_counter(
name="http_requests_total",
description="Total HTTP requests"
)
def monitor_request_performance(func):
"""リクエストパフォーマンスを監視するデコレーター"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
status = "success"
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
# メトリクスを記録
request_duration.record(
duration,
{"status": status, "endpoint": func.__name__}
)
request_counter.add(
1,
{"status": status, "endpoint": func.__name__}
)
return wrapper
まとめ
OpenTelemetry を使用した分散トレーシングにより、以下の利点が得られます:
- 可視性の向上 - マイクロサービス間の依存関係と通信パターンの把握
- パフォーマンス最適化 - ボトルネックの特定と解決
- 障害調査の効率化 - 問題発生箇所の迅速な特定
- SLA 監視 - サービスレベルの測定と改善
適切に実装された分散トレーシングは、複雑な分散システムの運用において不可欠なツールです。
タグ: OpenTelemetry, 分散トレーシング, 観測可能性, マイクロサービス, テレメトリ
関連記事: Monitoring Best Practices | Microservices Observability