Skip to content

Added timeouts on send and flush calls in KafkaProducerWrapper#696

Open
jzakaryan wants to merge 6 commits intolinkedin:masterfrom
jzakaryan:FTR-KafkaClientTimeouts
Open

Added timeouts on send and flush calls in KafkaProducerWrapper#696
jzakaryan wants to merge 6 commits intolinkedin:masterfrom
jzakaryan:FTR-KafkaClientTimeouts

Conversation

@jzakaryan
Copy link
Collaborator

@jzakaryan jzakaryan commented Mar 24, 2020

The changes in this PR address the issue of kafka client being stuck on send and flush in cases when the destination topic gets dropped. Since kafka treats missing topic metadata as an eventual consistency issue and keeps retrying to send hoping that the topic will be available again, we have to make our calls to those methods bounded (i.e. have them time out after a certain amount of time).
I use CompletableFutures to have timeouts on those calls which are non blocking. Seems like that's the best way to do that in java without having to resort to third party libraries.

@jzakaryan jzakaryan changed the title [WIP] Added timeouts on send and flush calls in KafkaProducerWrapper Added timeouts on send and flush calls in KafkaProducerWrapper Mar 30, 2020
Copy link
Contributor

@somandal somandal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a partial review, let's address these and I'll look deeper into the other parts of the newer code.

Comment on lines 190 to 195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you moved these comments out? Also, I see that you've made the last line into (3), whereas it just talks about the above two points?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there some reason why you had to reorder the catches here? In the previous code, we catch IllegalStateException, and then catch TimeoutException, and then catch KafkaException. Let's not reorder these unless there is a good reason to. It also becomes harder to review since I can't easily see the actual changes vs. reordering.

Also you renamed all exception 'e' to 'ex', is that necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the earlier IllegalStateException block would sleep and you've removed it. why?
Thread.sleep(_sendFailureRetryWaitTimeMs);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is definitely a bug I introduced. Will revert the entire exception handling piece in send(). Better leave it as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't make sense here, move it back to outside the if condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this back to ++numberOfAttempts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I like the rename (added s) but prefer the prefix form.

In fact, it wouldn't be such a bad idea to turn that loop into a for loop:

for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
  ...
}

or even eliminate retry entirely:

for (int numberOfAttempts = 1;; ++numberOfAttempts) {
    try {
        maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
        return;
     } catch (...) {
         ...
     }
}

}

private synchronized void shutdownProducer() {
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intended to be package private? If so, add a comment, otherwise add the appropriate public/protected/private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected, it'll be more visible than it needs to be, private wouldn't allow it to be overriden, and public is too permissive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Just keep things consistent, i.e. if you need a more visible scope for multiple methods, declare them all as package private, and the rest as private. Don't mix protected and package private without having a very good reason to.

Comment on lines 36 to 37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't be the right way to pass configs to your KafkaProducerWrapper. Lets discuss how to do this offline

Comment on lines 30 to 31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the right way to pass configs. Any configs you want to pass here need to be scoped under the transport provider configs. The KafkaProducerWrapper receives all of the transport provider configs, so you'd want to add these under that scope. We don't need to access the full property name, because as the configs pass through the layers, the relevant prefixes are removed. You can see how other configs are accessed in KafkaProducerWrapper, you'll see they don't have that whole "brooklin.server" prefix. You can access these the same way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal I've discussed this with Ahmed. Will push the fixe soon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you overriding this in your Bounded implementation, why make this protected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you overriding this in your Bounded implementation, why make this protected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this with other protected member variables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to mark member fields/methods protected if the extenders of this class live in the same package. This is only useful if you want to make them accessible to extenders in different packages.

Since KafkaProducerWrapper and BoundedKafkaProducerWrapper both live in the same package (com.linkedin.datastream.kafka), all package-private (no modifier) fields/methods in the former are accessible/overridable to/by the latter.

Comment on lines 190 to 195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I like the rename (added s) but prefer the prefix form.

In fact, it wouldn't be such a bad idea to turn that loop into a for loop:

for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
  ...
}

or even eliminate retry entirely:

for (int numberOfAttempts = 1;; ++numberOfAttempts) {
    try {
        maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
        return;
     } catch (...) {
         ...
     }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with this but I'd recommend lang3; it generally offers more modern facilities (better support for more recent versions of Java) and it's the one we have an explicit dependency on.

This would also entail adding an explicit dependency for this module on it in build.gradle :

project(':datastream-kafka') {
  dependencies {
    ...
    compile "org.apache.commons:commons-lang3:$commonslang3Version"

}

private synchronized void shutdownProducer() {
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected, it'll be more visible than it needs to be, private wouldn't allow it to be overriden, and public is too permissive.

Comment on lines 61 to 73
private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

producer.send(record, (metadata, exception) -> {
if (exception == null) {
future.complete(metadata);
} else {
future.completeExceptionally(new KafkaClientException(metadata, exception));
}
});

return future;
}
Copy link
Contributor

@ahmedahamid ahmedahamid Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud:

  • This method is doing almost everything we need: it creates a CompletableFuture that is completed if the callback is called (send success/failure).
  • The only missing bit is canceling future after timeout elapses if future.isDone() is false, which can be accomplished with a ScheduledExecutorService. I know this is exactly what CompletableFutureUtils.failAfter() is doing but I think the logic over there is more than what's absolutely necessary; we don't really need the other CompletableFuture failAfter() creates or the additional logic in within(). We can just cancel this same future if it isn't done when timeout elapses.
     scheduler.schedule(() -> {
         if (!future.isDone()) {
             future.cancel();
         }
     }, _sendTimeout, TimeUnit.MILLISECONDS);
  • future.cancel() causes a java.util.concurrent.CancellationException to be thrown, which means we don't have to construct a TimeoutException ourselves because a CancellationException can only mean we cancelled it after the timeout elapsed.

This seems like something this method can do with a private executor service. I am not sure we really need a utils class just for this purpose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmedahamid I just think that cancellation and timeout are semantically different. We may want to cancel the future after timeout in our case, but that's not necessarily true in general. Also, smth may be cancelled without waiting for timeout (based on user input or other external factors). Just thinking out loud. Will see whether I can get rid of the utils.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think that cancellation and timeout are semantically different.

I didn't mean to suggest we should propagate CancellationException to callback. I was assuming doSend() will still construct a TimeoutException in case future is cancelled. That does sound a bit roundabout though; it would certainly be better to do future.completeExceptionally(new TimeoutException(...)) instead of future.cancel() if future.isDone() is false after timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to propagate metadata if exception is null. The javadocs on Callback.onCompletion() state that metadata is null if an error occurred. This would also spare us having to introduce KafkaClientException.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were tons of checks up the callback chain that are dealing with the metadata and exception. You're probably right, I need to see if it's safe to do so and remove KafkaClientException. I was kind of forced to introduce it in the first place.

Copy link
Contributor

@ahmedahamid ahmedahamid Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.runAsync() uses the common pool which isn't the best option for a potentially long blocking call like Producer.flush(). Even if we provide our own thread pool to runAsync(), the CompletableFuture we'll get won't give us a way to interrupt a Producer.flush() call that exceeds the allowed timeout, which is necessary to free up the thread-pool thread in question. This is because calling cancel(true) on a CompletableFuture returned by runAsync() only causes a cancellation exception to be propagated without interrupting the blocked thread pool.

I'm afraid our only option here seems to be using an ExecutorService directly

    // It's okay to use a single thread executor since flush() is synchronized
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<?> future = executor.submit(() -> super.flush());
    try {
        // Block until timeout elapses
        future.get(_flushTimeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        ...
        // Interrupt the Producer.flush() call to free up the blocked thread
        future.cancel(true);
        ...
    }

Comment on lines 27 to 28
Copy link
Contributor

@ahmedahamid ahmedahamid Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Please encode time units into variable/config names, e.g. DEFAULT_SEND_TIME_OUT_MS
  2. Use longs for timeout configs (e.g. see existing timeout configs)
  3. If you like, you can use Duration methods to initialize (e.g. Duration.ofSeconds(5).toMillis())

Comment on lines 33 to 34
Copy link
Contributor

@ahmedahamid ahmedahamid Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. final
  2. Add time unit suffixes (e.g. _sendTimoutMs)

Comment on lines 23 to 26
/**
* An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send
*/
class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
Copy link
Contributor

@ahmedahamid ahmedahamid Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bounded is a little vague cause it's easy to confuse with buffering. I realize a better name won't be easy. If you can't think of one, just make sure the Javadoc is unambiguous (e.g. with timeouts for flush and send).

@vmaheshw
Copy link
Collaborator

vmaheshw commented Feb 1, 2022

@jzakaryan Do we still need this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments