With the launch of MirrorMaker 2 (MM2), kafka can be used at a global scale while increasing the resiliency by allowing data to be duplicated across clusters globally.
While MM2 is majorly being used for cloud migrations, where organizations are moving from on-prem installations to the cloud, it is exceedingly growing where the data is produced and consumed in geographically distributed locations. Replication of data between clusters ensures minimal latency and optimal throughput.
Prometheus is a popular free, open-source metric instrumentation, collection and monitoring tool and can be setup easily on a client machine where MM2 is also installed.
Following steps illustrate how Prometheus can be used to monitor MM2:
- Download the prometheus jar file
cd /home/ec2-user
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.13.0/jmx_prometheus_javaagent-0.13.0.jar
2. Create a kafka-connect.yml with the following contents
lowercaseOutputName: true
rules:
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: GAUGE#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: UNTYPED#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE#kafka.connect.mirror:type=MirrorSourceConnector
- pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=([0-9]+)><>([a-z-]+)
name: kafka_connect_mirror_source_connector_$4
help: Kafka Connect MM2 Source Connector Information
labels:
destination: "$1"
topic: "$2"
partition: "$3"
type: GAUGE#kafka.connect.mirror:type=MirrorCheckpointConnector
- pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+)><>([a-z-]+)
name: kafka_connect_mirror_checkpoint_connector_$3
help: Kafka Connect MM2 Checkpoint Connector Information
labels:
source: "$1"
target: "$2"
type: GAUGE
3. Configure the environment variable KAFKA_OPTS to the above kafka_connect.yml and the jar file
export KAFKA_OPTS=-javaagent:/root/prometheus/jmx_prometheus_javaagent-0.13.0.jar=3600:/root/prometheus/kafka-connect.yml
4. Start MM2 in the same terminal
$KAFKA/connect-mirror-maker.sh $CONFIG/connect-mirror-maker.properties
where $KAFKA points to /root/kafka/bin and $CONFIG points to /root/kafka/config locations
5. You can now verify the latency via
curl localhost:3600 | grep -i kafka_connect_mirror_source_connector_replication_latency_ms_max
When MM2 is in sync the output will be as follows:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 230k 100 230k 0 0 197k 0 0:00:01 0:00:01 --:--:-- 197k#HELP kafka_connect_mirror_source_connector_replication_latency_ms_max Kafka Connect MM2 Source Connector Information#TYPE kafka_connect_mirror_source_connector_replication_latency_ms_max gaugekafka_connect_mirror_source_connector_replication_latency_ms_max{destination="destination-name",partition="7",topic="test-topic",} NaN
There will be a row for every partition of the topic for which MM2 is running and when it ends with NaN , it means that the partition is in sync.
Note: if you don’t start the MM2 in the same session where the environment variable KAFKA_OPTS is set then you will encounter an error with the below curl command. The error being
curl: (7) Failed to connect to localhost port 3600: Connection refused
6. Other metrics that can be viewed are as follows:
curl localhost:3600 | grep -i <topic-name>
The output will have different metric points for the <topic-name> that is being syncd by MM2. The ones that need attention are the following:
kafka_connect_mirror_source_connector_record_count will tell us the record that is being migrated/ syncd by MM2
kafka_connect_mirror_source_connector_record_count{destination="destination-name",partition="7",topic="test-topic",} 32.0
kafka_connect_mirror_source_connector_replication_latency_ms_avg shows the average latency which in the following case is 7 ms
kafka_connect_mirror_source_connector_replication_latency_ms_max{destination="destination-name",partition="7",topic="test-topic",} 7.0
MM2 is in sync when both the above record ends with NaN.