This is a story about how to get a hack solution to work around an Apache Kafka bug when its upgrade is blocked by the Java version and the official solution is not available.
The call of the KafkaConsumer#poll
method may be blocked forever despite having a timeout parameter since that timeout only applies to the ConsumerNetworkClient#poll
method and not the ConsumerCoordinator#poll
method. It may trap in a loop inside the ConsumerCoordinator#poll
method forever.
The official team of Kafka has provided a solution to this issue. The key change is as follows:
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2daadddee6..53834fb81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -198,46 +214,44 @@ public abstract class AbstractCoordinator implements Closeable {
ByteBuffer memberAssignment);
/**
- * Block until the coordinator for this group is known and is ready to receive requests.
- */
- public synchronized void ensureCoordinatorReady() {
- // Using zero as current time since timeout is effectively infinite
- ensureCoordinatorReady(0, Long.MAX_VALUE);
- }
-
- /**
+ * Visible for testing.
+ *
* Ensure that the coordinator is ready to receive requests.
- * @param startTimeMs Current time in milliseconds
+ *
* @param timeoutMs Maximum time to wait to discover the coordinator
* @return true If coordinator discovery and initial connection succeeded, false otherwise
*/
- protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
- long remainingMs = timeoutMs;
+ protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
+ final long startTimeMs = time.milliseconds();
+ long elapsedTime = 0L;
while (coordinatorUnknown()) {
- RequestFuture<Void> future = lookupCoordinator();
- client.poll(future, remainingMs);
+ final RequestFuture<Void> future = lookupCoordinator();
+ client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ if (!future.isDone()) {
+ // ran out of time
+ break;
+ }
if (future.failed()) {
if (future.isRetriable()) {
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
+ elapsedTime = time.milliseconds() - startTimeMs;
+
+ if (elapsedTime >= timeoutMs) break;
log.debug("Coordinator discovery failed, refreshing metadata");
- client.awaitMetadataUpdate(remainingMs);
+ client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ elapsedTime = time.milliseconds() - startTimeMs;
} else
throw future.exception();
- } else if (coordinator != null && client.connectionFailed(coordinator)) {
+ } else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
- time.sleep(retryBackoffMs);
+ final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+ time.sleep(sleepTime);
+ elapsedTime += sleepTime;
}
-
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
}
return !coordinatorUnknown();
We could not bring in the official solution because of the following reasons:
We have to come up with a hack solution to work around the blocker. The hack solution is as follows:
The major call is wrapped with a Java Future
to ensure the timeout, usually a poll should be finised in 100 ms, if not, when the 60_000 ms (1 min) limit exceeded, the poll will exit.
KafkaTimeoutEnsuredPollWorker<K, V> worker =
new KafkaTimeoutEnsuredPollWorker<K, V>(kafkaConsumer, 100);
Future<ConsumerRecords<K, V>> future = this.executor.submit(worker);
try {
return future.get(60_000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
exitPoll(future);
String message = "interrupted when poll kafka data, timeout = 100 ms";
throw new KafkaException(message);
} catch (TimeoutException e) {
exitPoll(future);
String message = "timeout when poll kafka data, timeout = 100 ms";
throw new KafkaException(message);
} catch (ExecutionException e) {
exitPoll(future);
if (e.getCause() instanceof KafkaException) {
throw (KafkaException) e.getCause();
} else {
throw new KafkaException(e.getMessage(), e.getCause());
}
}
The exitPoll
method is used to ensure the future is done.
private void exitPoll(Future<ConsumerRecords<K, V>> future) {
while (!future.isDone()) {
future.cancel(true);
try {
TimeUnit.MILLISECONDS.sleep(100L);
} catch (InterruptedException e) {
// ignore
}
}
}
The class KafkaTimeoutEnsuredPollWorker
implements the Callable
interface.
private static class KafkaTimeoutEnsuredPollWorker<K, V> implements Callable<ConsumerRecords<K, V>> {
private final KafkaConsumer<K, V> kafkaConsumer;
private final long timeoutMillisecond;
private KafkaTimeoutEnsuredPollWorker(KafkaConsumer<K, V> kafkaConsumer, long timeoutMillisecond) {
this.kafkaConsumer = kafkaConsumer;
this.timeoutMillisecond = timeoutMillisecond;
}
@Override
public ConsumerRecords<K, V> call() {
return this.kafkaConsumer.poll(this.timeoutMillisecond);
}
}
/Users/xuejiang/OpenProjects/blog-source/oxyjiang-ray/content/posts/2020-09-02---Dancing-in-Shackles-Caused-by-an-Unupgradable-Apache-Kafka