Monitorez votre cluster Apache Kafka auto-hébergé en installant le collecteur OpenTelemetry directement sur les hôtes Linux.
Installation et configuration
Suivez ces étapes pour configurer un monitoring complet de Kafka en installant l'agent Java OpenTelemetry sur vos brokers et en déployant un collecteur pour récupérer et envoyer des métriques à New Relic.
Avant de commencer
Assurez-vous d'avoir :
- Un compte New Relic avec un
- Accès réseau du collecteur au port du serveur bootstrap Kafka (généralement 9092)
Télécharger l'agent Java OpenTelemetry
L'agent Java OpenTelemetry s'exécute en tant qu'agent Java attaché à vos brokers Kafka, collectant les métriques Kafka et JMX et les envoyant via OTLP au collecteur :
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java Agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarCréer une configuration JMX personnalisée
Créez un fichier de configuration JMX pour l'agent Java OpenTelemetry afin de collecter les métriques Kafka à partir des MBeans JMX.
Créez le fichier ~/opentelemetry/jmx-custom-config.yaml avec la configuration suivante :
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline) - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentileConfigurer le broker Kafka
Attachez l'agent Java OpenTelemetry à votre broker Kafka en définissant la variable d'environnement KAFKA_OPTS avant de démarrer Kafka.
Exemple de broker unique:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/jmx-custom-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &Important
Clusters multi-brokers: Pour plusieurs brokers, utilisez la même configuration avec des valeurs broker.id uniques (par ex. broker.id=1, broker.id=2, broker.id=3) dans le paramètre -Dotel.resource.attributes pour chaque broker.
Créer une configuration de collecteur
Créez la configuration principale du collecteur OpenTelemetry à ~/opentelemetry/kafka-config.yaml.
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id from resource attributes for cluster-level metrics - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, filter/remove_partition_level_replicas, batch/aggregation] exporters: [otlp/newrelic]
# Cluster metrics pipeline (only cluster-level metrics, no broker.id) metrics/cluster: receivers: [otlp] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/remove_extra_attributes, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]Définir les variables d'environnement
Définissez les variables d'environnement requises avant d'installer le collecteur :
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="localhost:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US regionRemplacer :
YOUR_LICENSE_KEYavec votre clé de licence New Relicmy-kafka-clusteravec un nom unique pour votre cluster Kafkalocalhost:9092avec l'adresse (ou les adresses) de votre broker d'amorçage Kafka. Pour plusieurs courtiers, utilisez une liste séparée par des virgules :broker1:9092,broker2:9092,broker3:9092- Endpoint OTLP : utilise
https://otlp.nr-data.net:4317(région US) ouhttps://otlp.eu01.nr-data.net:4317(région UE). Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP
Installer et démarrer le collecteur
Choisissez entre NRDOT Collector (la distribution de New Relic) ou OpenTelemetry Collector :
Conseil
NRDOT Collector est la distribution New Relic de l'OpenTelemetry Collector avec le support de New Relic pour l'assistance.
Télécharger et installer le binaire
Téléchargez et installez le binaire NRDOT Collector pour votre système d'exploitation hôte. L'exemple ci-dessous concerne l'architecture linux_amd64 :
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --versionImportant
Pour les autres systèmes d'exploitation et architectures, consultez les versions de NRDOT Collector et téléchargez le binaire approprié pour votre système.
Démarrer le collecteur
Lancez le collecteur avec votre fichier de configuration pour commencer le monitoring :
$nrdot-collector --config ~/opentelemetry/kafka-config.yamlLe collecteur commencera à envoyer des métriques Kafka à New Relic en quelques minutes.
Télécharger et installer le binaire
Téléchargez et installez le binaire OpenTelemetry Collector Contrib pour votre système d'exploitation hôte. L'exemple ci-dessous concerne l'architecture linux_amd64 :
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --versionPour les autres systèmes d'exploitation, consultez la page des versions du Collecteur OpenTelemetry.
Démarrer le collecteur
Lancez le collecteur avec votre fichier de configuration pour commencer le monitoring :
$otelcol-contrib --config ~/opentelemetry/kafka-config.yamlLe collecteur commencera à envoyer des métriques Kafka à New Relic en quelques minutes.
(Facultatif) Instrumenter les applications productrices ou consommatrices
Important
Prise en charge des langages: Actuellement, seules les applications Java sont prises en charge pour l'instrumentation des clients Kafka à l'aide de l'agent Java OpenTelemetry.
Pour collecter la télémétrie applicative de vos applications productrices et consommatrices Kafka, utilisez l'agent Java OpenTelemetry que vous avez téléchargé à l'étape 1.
Démarrez votre application avec l'agent :
$java \> -javaagent:$HOME/opentelemetry/opentelemetry-javaagent.jar \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://localhost:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -jar your-kafka-application.jarRemplacer :
order-process-serviceavec un nom unique pour votre application producteur ou consommateurmy-kafka-clusteravec le même nom de cluster utilisé dans votre configuration de collecteur
Conseil
La configuration ci-dessus envoie la télémétrie à un Collector OpenTelemetry s'exécutant sur localhost:4317.
Cela vous permet de personnaliser le traitement, d'ajouter des filtres ou d'acheminer vers plusieurs backends. Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
L'agent Java fournit l'instrumentation Kafka prête à l'emploi sans aucune modification de code, capturant :
- Latences des requêtes
- Métriques de débit
- Taux d'erreur
- traces distribuées
Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.
(Facultatif) Transmettre les logs du broker Kafka
Pour collecter les logs des brokers Kafka et les envoyer à New Relic, configurez le récepteur filelog dans votre OpenTelemetry Collector.
Avancé : Personnaliser la collecte de métriques
Vous pouvez ajouter plus de métriques Kafka en étendant les règles dans jmx-custom-config.yaml:
- En savoir plus sur la syntaxe de configuration des métriques JMX OpenTelemetry
- Trouvez les noms de MBean disponibles dans la documentation de monitoring Kafka
Cela vous permet de collecter n'importe quelle métrique JMX exposée par les brokers Kafka en fonction de vos besoins de monitoring spécifiques.
Trouvez vos données
Après quelques minutes, vos métriques Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l'exploration de vos métriques Kafka dans différentes vues de l'interface utilisateur New Relic.
Vous pouvez également interroger vos données avec NRQL :
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Dépannage
Prochaines étapes
- Explorer les métriques Kafka - Consulter la référence complète des métriques
- Créer des dashboards personnalisés - Créez des visualisations pour vos données Kafka
- Configurer des alertes - Monitorez les métriques critiques telles que le retard du consommateur et les partitions sous-répliquées