All Articles

Dancing in Shackles Caused by an Unupgradable Apache Kafka

shackles-kafka-java

This is a story about how to get a hack solution to work around an Apache Kafka bug when its upgrade is blocked by the Java version and the official solution is not available.

Root Cause

The call of the KafkaConsumer#poll method may be blocked forever despite having a timeout parameter since that timeout only applies to the ConsumerNetworkClient#poll method and not the ConsumerCoordinator#poll method. It may trap in a loop inside the ConsumerCoordinator#poll method forever.

Official Solution

The official team of Kafka has provided a solution to this issue. The key change is as follows:

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2daadddee6..53834fb81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -198,46 +214,44 @@ public abstract class AbstractCoordinator implements Closeable {
                                            ByteBuffer memberAssignment);

     /**
-     * Block until the coordinator for this group is known and is ready to receive requests.
-     */
-    public synchronized void ensureCoordinatorReady() {
-        // Using zero as current time since timeout is effectively infinite
-        ensureCoordinatorReady(0, Long.MAX_VALUE);
-    }
-
-    /**
+     * Visible for testing.
+     *
      * Ensure that the coordinator is ready to receive requests.
-     * @param startTimeMs Current time in milliseconds
+     *
      * @param timeoutMs Maximum time to wait to discover the coordinator
      * @return true If coordinator discovery and initial connection succeeded, false otherwise
      */
-    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
-        long remainingMs = timeoutMs;
+    protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
+        final long startTimeMs = time.milliseconds();
+        long elapsedTime = 0L;

         while (coordinatorUnknown()) {
-            RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, remainingMs);
+            final RequestFuture<Void> future = lookupCoordinator();
+            client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+            if (!future.isDone()) {
+                // ran out of time
+                break;
+            }

             if (future.failed()) {
                 if (future.isRetriable()) {
-                    remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
-                    if (remainingMs <= 0)
-                        break;
+                    elapsedTime = time.milliseconds() - startTimeMs;
+
+                    if (elapsedTime >= timeoutMs) break;

                     log.debug("Coordinator discovery failed, refreshing metadata");
-                    client.awaitMetadataUpdate(remainingMs);
+                    client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+                    elapsedTime = time.milliseconds() - startTimeMs;
                 } else
                     throw future.exception();
-            } else if (coordinator != null && client.connectionFailed(coordinator)) {
+            } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
-                time.sleep(retryBackoffMs);
+                final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
+                time.sleep(sleepTime);
+                elapsedTime += sleepTime;
             }
-
-            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
-            if (remainingMs <= 0)
-                break;
         }

         return !coordinatorUnknown();

Our Blocker

We could not bring in the official solution because of the following reasons:

  • our application uses Java 7, and could not be migrated to Java 8 since we could not change the JVM version
  • the official solution is patched to Kafka 2.0, which requires Java 8 or higher
  • Kafka 1.1.1 does not have the official solution patched

Hack Solution

We have to come up with a hack solution to work around the blocker. The hack solution is as follows:

  • The major call is wrapped with a Java Future to ensure the timeout, usually a poll should be finised in 100 ms, if not, when the 60_000 ms (1 min) limit exceeded, the poll will exit.

    KafkaTimeoutEnsuredPollWorker<K, V> worker =
        new KafkaTimeoutEnsuredPollWorker<K, V>(kafkaConsumer, 100);
    Future<ConsumerRecords<K, V>> future = this.executor.submit(worker);
    try {
      return future.get(60_000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      exitPoll(future);
      String message = "interrupted when poll kafka data, timeout = 100 ms";
      throw new KafkaException(message);
    } catch (TimeoutException e) {
      exitPoll(future);
      String message = "timeout when poll kafka data, timeout = 100 ms";
      throw new KafkaException(message);
    } catch (ExecutionException e) {
      exitPoll(future);
      if (e.getCause() instanceof KafkaException) {
        throw (KafkaException) e.getCause();
      } else {
        throw new KafkaException(e.getMessage(), e.getCause());
      }
    }
  • The exitPoll method is used to ensure the future is done.

    private void exitPoll(Future<ConsumerRecords<K, V>> future) {
      while (!future.isDone()) {
        future.cancel(true);
        try {
          TimeUnit.MILLISECONDS.sleep(100L);
        } catch (InterruptedException e) {
          // ignore
        }
      }
    }
  • The class KafkaTimeoutEnsuredPollWorker implements the Callable interface.

    private static class KafkaTimeoutEnsuredPollWorker<K, V> implements Callable<ConsumerRecords<K, V>> {
      private final KafkaConsumer<K, V> kafkaConsumer;
      private final long timeoutMillisecond;
    
      private KafkaTimeoutEnsuredPollWorker(KafkaConsumer<K, V> kafkaConsumer, long timeoutMillisecond) {
        this.kafkaConsumer = kafkaConsumer;
        this.timeoutMillisecond = timeoutMillisecond;
      }
    
      @Override
      public ConsumerRecords<K, V> call() {
        return this.kafkaConsumer.poll(this.timeoutMillisecond);
      }
    }

/Users/xuejiang/OpenProjects/blog-source/oxyjiang-ray/content/posts/2020-09-02---Dancing-in-Shackles-Caused-by-an-Unupgradable-Apache-Kafka

Published Sep 3, 2020

Flying code monkey