OpenTelemetry Collector Linux ホストに直接インストールして、セルフホスト型Apache Kafka クラスターを監視します。
インストレーションと設定
以下の手順に従って、ブローカーにOpenTelemetry Javaエージェントをインストールし、コレクターをデプロイしてメトリクスを収集してNew Relicに送信することで、包括的な Kafka 監視をセットアップします。
OpenTelemetry Javaエージェントをダウンロードする
OpenTelemetry Javaエージェントは、 Kafka ブローカーに接続されたJavaエージェントとして実行され、Kafka と JMX メトリクスを収集し、OTLP 経由でコレクターに送信します。
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java Agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarJMXカスタム設定の作成
OpenTelemetry Javaエージェント JMX 設定ファイルを作成して、JMX MBean から Kafka メトリクスを収集します。
次の設定でファイル~/opentelemetry/jmx-custom-config.yamlを作成します。
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline) - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentileKafkaブローカーを構成する
Kafka を開始する前に、 KAFKA_OPTS環境変数を設定して、 OpenTelemetry Javaエージェントを Kafka ブローカーに接続します。
単一ブローカーの例:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/jmx-custom-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &重要
マルチブローカーの集中: 複数のブローカーの場合は、各ブローカーの-Dotel.resource.attributes問題で一意のbroker.id値 (例: broker.id=1 、 broker.id=2 、 broker.id=3) を持つ同じ設定を使用します。
コレクター設定の作成
~/opentelemetry/kafka-config.yamlにメインのOpenTelemetry Collector設定を作成します。
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id from resource attributes for cluster-level metrics - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, filter/remove_partition_level_replicas, batch/aggregation] exporters: [otlp/newrelic]
# Cluster metrics pipeline (only cluster-level metrics, no broker.id) metrics/cluster: receivers: [otlp] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]環境変数の設定
コレクターをインストールする前に、必要な環境変数を設定します。
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="localhost:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region交換する:
YOUR_LICENSE_KEYNew Relicライセンスキーを使用してmy-kafka-clusterKafka クラスターの一意の名前を付けるlocalhost:9092Kafka ブートストラップ ブローカーのアドレスを入力します。ブローカーが複数ある場合は、コンマ区切りのリストを使用します。broker1:9092,broker2:9092,broker3:9092- OTLP エンドポイント:
https://otlp.nr-data.net:4317(米国リージョン) またはhttps://otlp.eu01.nr-data.net:4317(EU リージョン) を使用します。その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
コレクターをインストールして起動する
NRDOT Collector (New Relic のディストリビューション) または OpenTelemetry Collector を選択します。
ヒント
NRDOT Collector は、New Relic のサポートを受けた OpenTelemetry Collector の New Relic ディストリビューションです。
バイナリをダウンロードしてインストールする
ホスト OS 用の NRDOT Collectorバイナリをダウンロードしてインストールします。 以下の例は linux_amd64 アーキテクチャーの場合です。
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version重要
他の OS およびアーキテクチャーについては、 NRDOT Collectorリリースにアクセスし、システムに適切なバイナリをダウンロードしてください。
コレクターを起動する
設定ファイルを使用してコレクターを実行して、監視を開始します。
$nrdot-collector --config ~/opentelemetry/kafka-config.yamlコレクターは、数分以内に Kafka メトリクスのNew Relicへの送信を開始します。
バイナリをダウンロードしてインストールする
ホスト OS 用のOpenTelemetry Collector Contrib バイナリをダウンロードしてインストールします。 以下の例は linux_amd64 アーキテクチャーの場合です。
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --versionその他の OS については、 OpenTelemetry Collectorリリースページをご覧ください。
コレクターを起動する
設定ファイルを使用してコレクターを実行して、監視を開始します。
$otelcol-contrib --config ~/opentelemetry/kafka-config.yamlコレクターは、数分以内に Kafka メトリクスのNew Relicへの送信を開始します。
(オプション) 計装プロデューサーまたは消費者アプリケーション
重要
言語サポート: 現在、 OpenTelemetry Javaエージェントを使用した Kafka クライアント計装ではJavaアプリケーションのみがサポートされています。
Kafka プロデューサおよびコンシューマアプリケーションからアプリケーションレベルのテレメトリーを収集するには、ステップ 1でダウンロードしたOpenTelemetry Javaエージェントを使用します。
エージェントを使用してアプリケーションを開始します。
$java \> -javaagent:$HOME/opentelemetry/opentelemetry-javaagent.jar \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -jar your-kafka-application.jar交換する:
order-process-serviceプロデューサーまたは消費者アプリケーションの一意の名前を付けるmy-kafka-clusterコレクター設定で使用されているのと同じクラスタ名を持つ
ヒント
上記の設定は、localhost:4317 で実行されているOpenTelemetry Collectorにテレメトリーを送信します。
これにより、処理をカスタマイズしたり、フィルターを追加したり、複数のバックエンドにルーティングしたりできるようになります。その他のエンドポイントの設定については、 「OTLP エンドポイントの設定」を参照してください。
Javaエージェントは、コード変更なしですぐに使用できる Kafka 計装を提供し、以下をキャプチャします。
- リクエストのレイテンシ
- スループット メトリクス
- エラー率
- 分散型トレース
高度な設定については、 Kafka 計装ドキュメントを参照してください。
(オプション)Kafkaブローカーログを転送する
Kafka ブローカー ログを収集して New Relic に送信するには、OpenTelemetry Collector でファイルログ レシーバーを構成します。
詳細: メトリクス コレクションをカスタマイズする
jmx-custom-config.yamlルールを拡張することで、さらに Kafka メトリクスを追加できます。
- OpenTelemetry JMX メトリクス設定構文について学ぶ
- 利用可能なMBean名はKafka監視ドキュメントで確認してください。
これにより、特定の監視ニーズに基づいて、Kafka ブローカーによって公開された JMX メトリクスを収集できるようになります。
データを検索する
数分後、Kafka メトリクスがNew Relicに表示されるはずです。 New Relic UI のさまざまなビューで Kafka メトリクスを探索する詳細な手順については、 「データの検索」を参照してください。
NRQL を使用してデータをクエリすることもできます。
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'トラブルシューティング
次のステップ
- Kafka メトリクスを調べる- 完全なメトリクスリファレンスを見る
- カスタムダッシュボードの作成- Kafka データの視覚化を構築します
- アラートのセットアップ- 消費者のラグやレプリケーションが不十分なパーティションなどの重要なメトリクスを監視します