Dancing in Shackles Caused by an Unupgradable Apache Kafka
This is a story about how to get a hack solution to work around an Apache Kafka bug when its upgrade is blocked by the Java version and the official solution is not available.
Root Cause
The call of the KafkaConsumer#poll method may be blocked forever despite having a timeout parameter since that timeout only applies to the ConsumerNetworkClient#poll method and not the ConsumerCoordinator#poll method. It may trap in a loop inside the ConsumerCoordinator#poll method forever.
Official Solution
The official team of Kafka has provided a solution to this issue. The key change is as follows:
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2daadddee6..53834fb81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -198,46 +214,44 @@ public abstract class AbstractCoordinator implements Closeable {
ByteBuffer memberAssignment);
/**
- * Block until the coordinator for this group is known and is ready to receive requests.
- */
- public synchronized void ensureCoordinatorReady() {
- // Using zero as current time since timeout is effectively infinite
- ensureCoordinatorReady(0, Long.MAX_VALUE);
- }
-
- /**
+ * Visible for testing.
+ *
* Ensure that the coordinator is ready to receive requests.
- * @param startTimeMs Current time in milliseconds
+ *
* @param timeoutMs Maximum time to wait to discover the coordinator
* @return true If coordinator discovery and initial connection succeeded, false otherwise
*/
- protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
- long remainingMs = timeoutMs;
+ protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
+ final long startTimeMs = time.milliseconds();
+ long elapsedTime = 0L;
while (coordinatorUnknown()) {
- RequestFuture<Void> future = lookupCoordinator();
- client.poll(future, remainingMs);
+ final RequestFuture<Void> future = lookupCoordinator();
+ client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ if (!future.isDone()) {
+ // ran out of time
+ break;
+ }
if (future.failed()) {
if (future.isRetriable()) {
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
+ elapsedTime = time.milliseconds() - startTimeMs;
+
+ if (elapsedTime >= timeoutMs) break;
log.debug("Coordinator discovery failed, refreshing metadata");
- client.awaitMetadataUpdate(remainingMs);
+ client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ elapsedTime = time.milliseconds() - startTimeMs;
} else
throw future.exception();
- } else if (coordinator != null && client.connectionFailed(coordinator)) {
+ } else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
- time.sleep(retryBackoffMs);
+ final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ time.sleep(sleepTime);
+ elapsedTime += sleepTime;
}
-
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
}
return !coordinatorUnknown();Our Blocker
We could not bring in the official solution because of the following reasons:
- our application uses Java 7, and could not be migrated to Java 8 since we could not change the JVM version
- the official solution is patched to Kafka 2.0, which requires Java 8 or higher
- Kafka 1.1.1 does not have the official solution patched
Hack Solution
We have to come up with a hack solution to work around the blocker. The hack solution is as follows:
-
The major call is wrapped with a Java
Futureto ensure the timeout, usually a poll should be finised in 100 ms, if not, when the 60_000 ms (1 min) limit exceeded, the poll will exit.KafkaTimeoutEnsuredPollWorker<K, V> worker = new KafkaTimeoutEnsuredPollWorker<K, V>(kafkaConsumer, 100); Future<ConsumerRecords<K, V>> future = this.executor.submit(worker); try { return future.get(60_000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { exitPoll(future); String message = "interrupted when poll kafka data, timeout = 100 ms"; throw new KafkaException(message); } catch (TimeoutException e) { exitPoll(future); String message = "timeout when poll kafka data, timeout = 100 ms"; throw new KafkaException(message); } catch (ExecutionException e) { exitPoll(future); if (e.getCause() instanceof KafkaException) { throw (KafkaException) e.getCause(); } else { throw new KafkaException(e.getMessage(), e.getCause()); } } -
The
exitPollmethod is used to ensure the future is done.private void exitPoll(Future<ConsumerRecords<K, V>> future) { while (!future.isDone()) { future.cancel(true); try { TimeUnit.MILLISECONDS.sleep(100L); } catch (InterruptedException e) { // ignore } } } -
The class
KafkaTimeoutEnsuredPollWorkerimplements theCallableinterface.private static class KafkaTimeoutEnsuredPollWorker<K, V> implements Callable<ConsumerRecords<K, V>> { private final KafkaConsumer<K, V> kafkaConsumer; private final long timeoutMillisecond; private KafkaTimeoutEnsuredPollWorker(KafkaConsumer<K, V> kafkaConsumer, long timeoutMillisecond) { this.kafkaConsumer = kafkaConsumer; this.timeoutMillisecond = timeoutMillisecond; } @Override public ConsumerRecords<K, V> call() { return this.kafkaConsumer.poll(this.timeoutMillisecond); } }
/Users/xuejiang/OpenProjects/blog-source/oxyjiang-ray/content/posts/2020-09-02---Dancing-in-Shackles-Caused-by-an-Unupgradable-Apache-Kafka