Skip to content

cloudurable/reakt-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Reakt Kafka

This libray adapts Kafka to Reakt promises and streams.

Reakt has promise libraries for Vert.x, Netty, Guava, and Cassandra.

Using Promises with Kafka Producers

final AsyncProducer<Long, String> producer = new AsyncProducer<>(createProducer());
...
producer.send(TOPIC, key, value)
    .catchError(throwable -> {
                System.err.println("Trouble sending record " + throwable.getLocalizedMessage());
                     throwable.printStackTrace(System.err);
    })
    .then(recordMetadata -> {
             System.out.printf("%d %d %s \n", recordMetadata.offset(),
                recordMetadata.partition(), recordMetadata.topic());
    }).invoke();

Using Streams with Kafka Consumers

final StreamConsumer<Long, String> stream = StreamConsumer.subscribe(createConsumer(), TOPIC, result -> {
    result.then(consumerRecords -> {
        System.out.println("Got message " + consumerRecords.count());
        consumerRecords.forEach(record -> {
            countDownLatch.countDown();
        });
        result.request(1); //calls commitAsync
    }).catchError(throwable -> {
        System.err.println("Trouble Getting record " + throwable.getLocalizedMessage());
        throwable.printStackTrace(System.err);
        result.cancel();
    });
});

stream.close();

About

Reakt Kafka library

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages