AMQ Streams simplifies the process of running Apache Kafka in an OpenShift cluster.
— AMQ Streams Product docs
All references of source cluster, target cluster implicitly mean AMQ Streams v1.4, AMQ Streams v1.7 respectively; unless when otherwise explicitly pointed out. I have made an attempt to define certain commonly utilized terms in the Glossary section.
Furthermore, the post assumes the reader has some basic understanding of Apache kafka.
There is a Kafka CLI command cheat sheet added towards the end of the article; all credits go to the team who put it together. Chapeau a vous!
This guide attempts to discuss lessons learned from a migration project for a large freight company. We helped migrate an AMQ Streams cluster from OpenShift 3.11 to 4.7; additionally, we also migrated over 100s Java event-driven microservices to the new cluster(AMQ Streams V1.7).
In the section titled Technical Implementation, I will show how to migrate an AMQ Stream cluster between OpenShift environments(Example: OCP3.11 to OCP4.8) using MirrorMaker2; a Kafka component used to mirror data between two or more active Kafka clusters, within or across Kubernetes clusters.
We will begin with an overview of the two clusters(kafka-source and kafka-target), the components deployed, then lay down steps to deploy AMQ Stream v1.8, Prometheus(metrics collection), and Grafana(Dashboard, and Alerts). All these components are set up via a fleet of helm charts with the possibility of selectively installing specific resources(amqs, mirrormaker, grafana).
In condensed terms, there was a need to migrate a pre-existing AMQ Streams(v1.4) cluster from OpenShift(v3.11) to AMQ Streams(v1.7) on OpenShift(v4.7).
These are the primary goals:
- minimal downtime
- minimal to no data loss
- zero message duplication
- new cluster(AMQ Streams V1.7) should be capable to handle more throughput than the former
- monitor and alert on the Kafka cluster and applications states and behaviors.
To successfully carry out this migration, we adopted the Think-Do-Check mindset; in other words, we followed the concept of Dry-Run to build a repeatable and predictable migration strategy.
This strategy involved the following points:
- Make sure source and target clusters are monitored, and all key performance indicators(KPI) needed to track mirroring progress are displayed on easy to access dashboards,
- Prometheus and Grafana were used in this case,
- Strimzi Kafka Exporter dashboard was to gauge Consumer Group Lag, Consumer Group Current/Log-End Offsets, Incoming Messages/Seconds, Outgoing Messages/Seconds,
- Messages Count/Topic to make rough estimates about topic/partition size.
- Validate data retention periods match for source and target clusters,
- spec.kafka.config.log.retention.hours for time based TTL,
- spec.kafak.config.log.retention.bytes for byte size based retention,
- Having these two configurations match ensures messages have the same TTL across the two clusters.
- For this case, we had one KafkaUser resource per application.
- once all required producers and consumers are healthy, shut them down until ready for migration,
- above action created all required consumer groups;
- it set created Consumer Group CURRENT_OFFSET to zero which means the next time a producer or consumer spins up, it will start reading or writing at that CURRENT OFFSET of zero rather than the latest offset;
- hence preventing apps from skipping messages when they are launched during Cut Over.
- depending on data size, data ingestion rate, replication speed, it might take from minutes, days, to weeks before MirrorMaker2 is caught up with the source cluster .
- use the monitoring dashboards(source and target), kcat, and kafka cli programs for validation.
Cut Over Plan
The plan describes steps involved in switching from the source cluster over to the target cluster.
MirrorMaker2 must be running ahead of cut over time, this can be hours, days, weeks. Utilize data size, data ingestion rate and replication speed to estimate for how long you need MirrorMaker2 running.
- Stop MM2 instance
- This action is performed after confirming the target cluster has caught up with the source cluster .
- use monitoring, kcat, kafka cli tools to help with this.
- hint: using kcat program, grab last n(100, 5000,…) messages from each cluster, encode each message to base64, write the output to files, execute the diff command on the files,
- in our case we could not compare offsets because for source and target cluster, Consumer Group Offsets were always different regardless of mirroring progress,
- I hope this gets resolved in the near future.
- This may be KafkaConnect, Debezium for Change Data Capture, or some other system that pumps data into the AMQ Streams cluster.
- a consumer topic is called drain when CURRENT OFFSET(read position) and LOG-END OFFSET(write position) matches for the associated Consumer Group;
- when Consumer Group Lag is zero, a topic can also be designated drained.
- in some cases applications behave as both producers and consumers, you need to be very diligent in this case;
- I would also recommend that you partition your applications into tiers of Upstream, Dam, and Downstream based on data flow and dependency chain.
- this ensures all messages on the source cluster have been processed.
- Doing this creates a choke point in the data pipeline; it gives you the ability to dial up/down data flow to Downstream applications.
- it also provides us the ability to sample and verify data coming in before it reaches Downstream.
- These may be applications on the left side of the Dam.
- being the choke point of the data pipeline, spinning up these apps will open up the floodgate.
- look at data ingestion volume; should be nearly or exactly the same as the it was on the source cluster;
- for your apps writing to data stores, validate there is similar read/write rate or traffic patterns.
- validate apps logs and that there are no errors or abnormal messages appearing in the logs.
- watch for resource consumption spikes and network traffic patterns in the cluster.
- and last but not least, verify applications are still able to handle your business transactions.
This plan might be per use case basis, but the distilled form is as follow:
- Switch cluster data ingestion system to source cluster.
- On source cluster:
- spin down producers;
- wait for consumer apps to drain their topics and shut them down.
- turn on producer/upstream apps,
- turn on consumers.
- you may use the monitoring dashboards for this; for instance you may look at:
- incoming and outgoing messages per second
- data ingestion rate in the data stores if any
- application health
- Consumer Group CURRENT OFFSET, LOG-END OFFSET, LAG
- bytes in/out per second, and much more.
Issues faced and how we solved them
MirrorMaker2 replication strategy
In simple terms when mirroring an existing cluster:
- set mirrors.sourceConnector.config.auto.offset.reset: latest
- when replicating historical data is not of concerns
- an example use case might be when MM2 is set up at the same time as the AMQ Streams cluster
- or for some requirement, you just want to begin mirroring at the end of the topic; no concerns for data loss.
- set mirrors.sourceConnector.config.auto.offset.reset: earliest
- when replicating historical data is a requirement
- when you want all available historical data in the source cluster copied over to the target cluster,
- An example use case might be when migrating an existing AMQ Streams cluster to another with minimal or zero data loss.
- or you want an active/passive setup whereby the passive cluster is installed at a moment when the active cluster already holds data.
- Fore more on SourceConnector configs, read here.
Pods killed due to Out Of Memory error in new cluster
Pods were being terminated in the target cluster, at first we thought it was the JVM eating up all the pod memory. We implemented JVM boundaries with -Xms and Xmx values as arguments for the ENTRYPOINT command of the container image; java -jar in this case. Despite this change we still got OOMKilled errors, pods were being terminated.
After a few days of troubleshooting and head scratches, we found the root cause. The issue was due to some of the apps creating more processes in their containers than the PID limit set at 1024 by default; hence OOMKilled error after the pod runs for few hours despite having an HorizontalPodAutoscaler resource monitoring this particular application for auto-scaling scaling needs.
- Click here for more on cluster memory configuration
- Read on Kubernetes docs about PID limit
- Click here for how to change PID limit in OpenShift
AMQ-Streams commands Cheat-Sheet
In this guide, we have explained the strategies and plans involved in the migration of an AMQ Streams cluster in an Active/Passive setting between two OpenShift installations. Have a look at the Technical Implementations section for a walk through demo.
- Apache Kafka: Software product for building large scale messaging networks.
- AMQ Streams: RedHat product for simplifying Apache Kafka deployment in an OpenShift Cluster.
- Source Cluster: AMQ Streams v1.4
- Target Cluster: AMQ Streams v1.7
- Producer: Any application that publishes to AMQ Streams.
- Consumer: Any application that consumes messages or subscribes to topics.
- Consumer Group: a collection of consumers who collaborate to consume data from topics.
- Consumer Group Current Offset: Read position of the consumer subscribed to a topic via a Consumer Group.
- Consumer Group Log End Offset: Write position of the producer subscribed to a topic via a Consumer Group.
- Consumer Group Lag: indicates the lag(diff between LOD-END OFFSET and CURRENT-OFFSET) between producers and consumers subscribing to the same topic via a consumer group.
- Drained Topic: a topic is drained when the associated consumer group lag is zero.
- Topic: it has a unique name across the Kafka cluster; it helps with message categorization.
- Partition: breaks topics into multiple logs each of which can live on a different node; messages are written at the partition level; moreover, partitions provide means of concurrency for the Kafka cluster.