-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Description
Describe the bug
Hi,
we use snapstart on our quarkus lambdas. Some of them use smallrye-messaging to write or receive messages from a kafka. This works as expected unfortunately in our logs we have some warnings that the connection to a kafka node was lost either to auth error or firewall blocking.
"loggerClassName": "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger",
"loggerName": "org.apache.kafka.clients.NetworkClient",
"level": "WARN",
"message": "[Producer clientId=kafka-producer-event-xxxx] Connection to node xx (hxxxx.amazonaws.com/xxx:9096) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
Afaik during the init phase the whole memory of a started quarkus lambda is stored and when the lambda is reused reloaded into the memory to skip the init phase. That also means that pooled connections are "stored" but in reality are already closed.
Now I thought i simply need to close all open kafka connections before the snapshot is created. I did this with a org.crac.Resource and the beforeCheckpoint method. Now the warnings in the log are gone but it looks like no new connections are initiated and therefore all messages send via a channel fail. I also used KafkaProducer::flush but that didnt help.
Any ideas?
@ApplicationScoped
@Slf4j
public class KafkaHelper implements Resource {
@Inject
KafkaClientService kafkaClientService;
void onStart(@Observes StartupEvent ev) {
Core.getGlobalContext().register(this);
}
@Override
public void beforeCheckpoint(org.crac.Context<? extends Resource> context)
throws Exception {
log.info("kafkaproducer {}", kafkaClientService.getProducerChannels());
log.info("kafkaconsumer {}", kafkaClientService.getConsumerChannels());
log.info("going to sleep");
var listOfProducer = kafkaClientService.getProducerChannels().stream()
.map(kafkaClientService::getProducer)
.map(KafkaProducer::flush) // with KafkaProducer::close log warnings are gone but all future messages fail
.toList();
Uni.combine().all().unis(listOfProducer)
.combinedWith(unused -> null)
.await().atMost(Duration.ofSeconds(10));
log.info("going to sleep 2");
}
@Override
public void afterRestore(org.crac.Context<? extends Resource> context)
throws Exception {
// is there a 'init connection' method?
log.info("i am back");
}
}
I found #31401 which is the same issue but with database connections.
Expected behavior
No response
Actual behavior
No response
How to Reproduce?
No response
Output of uname -a
or ver
No response
Output of java -version
No response
Quarkus version or git rev
No response
Build tool (ie. output of mvnw --version
or gradlew --version
)
No response
Additional information
No response