Skip to content

Initial Reactive Client Support #882

Open
@garyrussell

Description

@garyrussell
  • Map to/from Spring AMQP Message abstractions
  • Add support for existing payload conversion

PoC based on SampleSender and SampleReceiver

public class SampleSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(SampleSender.class);

	private final Sender sender;

	private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();

	public SampleSender() {
		this.sender = RabbitFlux.createSender();
	}

	public void send(String queue, int count, CountDownLatch latch) {
		Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
				.map(i -> this.converter.toOutbound("", queue, "Message_" + i))); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

		sender.declareQueue(QueueSpecification.queue(queue))
				.thenMany(confirmations)
				.doOnError(e -> LOGGER.error("Send failed", e))
				.subscribe(r -> {
					if (r.isAck()) {
						LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
						latch.countDown();
					}
				});
	}

	public void close() {
		this.sender.close();
	}

}
public class SampleReceiver {

	private static final Logger LOGGER = LoggerFactory.getLogger(SampleReceiver.class);

	private final Receiver receiver;

	private final Sender sender;

	private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();

	public SampleReceiver() {
		this.receiver = RabbitFlux.createReceiver();
		this.sender = RabbitFlux.createSender();
	}

	public Disposable consume(String queue, CountDownLatch latch) {
		Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(queue));
		Flux<Delivery> messages = receiver.consumeAutoAck(queue);
		return queueDeclaration.thenMany(messages)
				.map(d -> this.converter.deliveryToObject(d)) // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
				.subscribe(m -> {
					LOGGER.info("Received message {}", m.toString());
					latch.countDown();
		});
	}

	public void close() {
		this.sender.close();
		this.receiver.close();
	}

}
public class SpringReactorMessageConverter {

	private final MessagePropertiesConverter propertiesConverter;

	private final MessageConverter converter;

	public SpringReactorMessageConverter() {
		this(new DefaultMessagePropertiesConverter(), new SimpleMessageConverter());
	}

	public SpringReactorMessageConverter(MessagePropertiesConverter propertiesConverter, MessageConverter converter) {
		this.propertiesConverter = propertiesConverter;
		this.converter = converter;
	}

	public Message deliveryToMessage(Delivery delivery) {
		MessageProperties messageProperties = this.propertiesConverter.toMessageProperties(delivery.getProperties(),
				delivery.getEnvelope(), StandardCharsets.UTF_8.name());
		return new Message(delivery.getBody(), messageProperties);
	}

	public Object deliveryToObject(Delivery delivery) {
		return this.converter.fromMessage(deliveryToMessage(delivery));
	}

	public OutboundMessage toOutbound(String exchange, String routingKey, Object object) {
		Message message = this.converter.toMessage(object, new MessageProperties());
		return new OutboundMessage(exchange, routingKey,
				this.propertiesConverter.fromMessageProperties(message.getMessageProperties(),
						StandardCharsets.UTF_8.name()),
				message.getBody());
	}

}

cc/ @artembilan @acogoluegnes - very raw at this stage.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions