OpenTelemetry Collector Linux 호스트에 직접 설치하여 자체 호스팅 Kafka 클러스터를 모니터링하세요.
설치 및 구성
다음 단계에 따라 브로커 및 구현하다, 배포하다 수집기에 OpenTelemetry 이온 에이전트를 설치하여 지표를 수집하고 뉴렐릭으로 보내는 방식으로 포괄적인 Kafka 모니터링을 설정하세요.
OpenTelemetry 저항 에이전트 다운로드
OpenTelemetry 잔류 에이전트는 Kafka 브로커에 부착된 잔류 에이전트로 실행되어 Kafka 및 JMX 지표를 수집하고 OTLP를 통해 수집기로 보냅니다.
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java Agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarJMX 사용자 정의 설정 만들기
JMX MBeans에서 Kafka 지표를 수집하기 위해 OpenTelemetry 저항 JMX 설정 파일을 만듭니다.
다음 설정으로 파일 ~/opentelemetry/jmx-custom-config.yaml 을 생성하세요.
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline) - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentileKafka 브로커 구성
Kafka를 시작하기 전에 KAFKA_OPTS 환경 변수를 설정하여 OpenTelemetry Java 변환기를 Kafka 브로커에 연결하십시오.
단일 브로커 예시:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/jmx-custom-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &중요
다중 브로커 클러스터: 여러 브로커의 경우 각 브로커에 대해 -Dotel.resource.attributes 클러스터에서 고유한 broker.id 값(예: broker.id=1, broker.id=2, broker.id=3)과 동일한 설정을 사용합니다.
수집기 설정 생성
~/opentelemetry/kafka-config.yaml 에 메인 OpenTelemetry Collector 설정을 생성합니다.
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id from resource attributes for cluster-level metrics - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, filter/remove_partition_level_replicas, batch/aggregation] exporters: [otlp/newrelic]
# Cluster metrics pipeline (only cluster-level metrics, no broker.id) metrics/cluster: receivers: [otlp] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]환경 변수 설정
수집기를 설치하기 전에 필요한 환경 변수를 설정하세요:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="localhost:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region바꾸다:
YOUR_LICENSE_KEY당신의 뉴렐릭 피규어와 함께my-kafka-clusterKafka 클러스터에 고유한 이름을 지정하세요.localhost:9092Kafka 부트스트랩 브로커 주소를 입력하세요. 여러 브로커를 사용하려면 쉼표로 구분된 목록을 사용하세요.broker1:9092,broker2:9092,broker3:9092- OTLP 엔드포인트:
https://otlp.nr-data.net:4317(미국 지역) 또는https://otlp.eu01.nr-data.net:4317(유럽 지역)을 사용합니다. 다른 엔드포인트 설정에 대해서는 OTLP 엔드포인트 구성을참조하세요.
수집기를 설치하고 시작하세요.
NRDOT Collector (뉴렐릭 배포판) 또는 OpenTelemetry Collector 중에서 선택하세요.
팁
NRDOT Collector 는 뉴렐릭이 OpenTelemetry Collector 배포한 버전이며, 뉴렐릭이 지원을 제공합니다.
바이너리 파일을 다운로드하고 설치하세요.
호스트 운영 시스템용 NRDOT Collector 바이너리를 다운로드하여 설치하세요. 아래 예시는 linux_amd64 아키텍처용입니다.
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version중요
다른 운영 시스템 및 복제의 경우 NRDOT Collector 릴리스를 방문하여 시스템에 적합한 바이너리를 다운로드하세요.
수집기를 시작하세요
시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.
$nrdot-collector --config ~/opentelemetry/kafka-config.yaml수집기는 몇 분 내에 Kafka 지표를 뉴렐릭으로 보내기 시작할 것입니다.
바이너리 파일을 다운로드하고 설치하세요.
호스트 운영 체제에 맞는 OpenTelemetry Collector Contrib 바이너리를 다운로드하여 설치하십시오. 아래 예시는 linux_amd64 아키텍처용입니다.
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --version다른 운영 시스템에 대해서는 OpenTelemetry Collector 릴리스 페이지를 방문하세요.
수집기를 시작하세요
시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.
$otelcol-contrib --config ~/opentelemetry/kafka-config.yaml수집기는 몇 분 내에 Kafka 지표를 뉴렐릭으로 보내기 시작할 것입니다.
(선택사항) 제작자 또는 소비자를 위해
중요
언어 지원: 현재 Kafka 클라이언트 측정, OpenTelemetry 측 에이전트를 사용하는 경우에만 로그아웃이 지원됩니다.
Kafka 생산자 및 소비자로부터 로그 수준의 텔레메트리를 수집하려면 1단계 에서 다운로드한 OpenTelemetry 서버 에이전트를 사용하세요.
에이전트로 시작하세요:
$java \> -javaagent:$HOME/opentelemetry/opentelemetry-javaagent.jar \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -jar your-kafka-application.jar바꾸다:
order-process-service생산자 또는 소비자 애플리케이션에 고유한 이름을 지정하세요.my-kafka-cluster수집기 설정에 사용된 것과 동일한 클러스터 이름을 사용합니다.
팁
위의 설정은 텔메트리를 localhost:4317에서 실행되는 OpenTelemetry Collector 로 보냅니다.
이를 통해 처리를 사용자 지정하고, 필터를 추가하거나, 여러 백앤드에게 라우팅할 수 있습니다. 다른 엔드포인트 설정에 대해서는 OTLP 엔드포인트 구성을 참조하세요.
잔류 에이전트는 코드 변경이 전혀 없는 기본 Kafka 측정, 캡처 기능을 제공합니다.
- 요청 지연시간
- 처리량 지표
- 오류율
- 분산 추적
고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.
(선택 사항) Kafka 브로커 로그 전달
Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 filelog 수신기를 구성하십시오.
고급: 컬렉션 사용자 지정
jmx-custom-config.yaml 의 규칙을 확장하여 더 많은 Kafka 메트릭을 추가할 수 있습니다.
- OpenTelemetry JMX 지표 설정 구문에 대해 알아보기
- Kafka 모니터링 문서에서 사용 가능한 MBean 이름을 찾아보세요.
이를 통해 특정 모니터링 요구 사항에 따라 Kafka 브로커에서 노출하는 모든 JMX 메트릭을 수집할 수 있습니다.
데이터 찾기
몇 분 후, Kafka 창이 뉴렐릭에 나타날 것입니다. 뉴렐릭 UI 의 다양한 보기에서 Kafka 범위를 탐색하는 방법에 대한 자세한 지침은 "데이터 찾기"를 참조하세요.
NRQL을 사용하여 데이터를 쿼리할 수도 있습니다.
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'문제점 해결
다음 단계
- Kafka 메트릭 살펴보기 - 전체 메트릭 참조 자료를 확인하세요
- 맞춤형 대시보드 만들기 - Kafka 데이터에 대한 시각화 구축
- 알림 설정 - 소비자 지연 및 복제되지 않은 파티션과 같은 중요한 지표를 모니터링합니다.