diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java index 838ed6be9d5..506e68e00b1 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.integration.zeromq; /** - * The message headers constants to repsent ZeroMq message attributes. + * The message headers constants to represent ZeroMq message attributes. * * @author Artem Bilan * diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java new file mode 100644 index 00000000000..4863f7329b1 --- /dev/null +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.zeromq; + +import org.zeromq.ZMQ; + +/** + * Module that wraps common methods of ZeroMq integration classes + * + * @author Alessio Matricardi + * + * @since 6.4 + * + */ +public final class ZeroMqUtils { + + /** + * Bind the ZeroMq socket to the given port over the TCP transport protocol. + * @param socket the ZeroMq socket + * @param port the port to bind ZeroMq socket to over TCP. If equal to 0, the socket will bind to a random port. + * @return the effectively bound port + */ + public static int bindSocket(ZMQ.Socket socket, int port) { + if (port == 0) { + return socket.bindToRandomPort("tcp://*"); + } + else { + boolean bound = socket.bind("tcp://*:" + port); + if (!bound) { + throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); + } + return port; + } + } + + private ZeroMqUtils() { + } + +} diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java index 74a2c530f9f..8aa72615f47 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java @@ -25,6 +25,7 @@ * Factory class for ZeroMq components DSL. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -58,6 +59,17 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, return outboundChannelAdapter(context, () -> connectUrl); } + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use. + * @param port the port to bind ZeroMq socket to over TCP. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port) { + return new ZeroMqMessageHandlerSpec(context, port); + } + /** * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} * and connection URL supplier. @@ -84,6 +96,43 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType); } + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context) { + return new ZeroMqMessageHandlerSpec(context); + } + + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and {@link SocketType}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use. + * @param socketType the {@link SocketType} for ZeroMq socket. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, SocketType socketType) { + return new ZeroMqMessageHandlerSpec(context, socketType); + } + + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, binding port + * and {@link SocketType}. + * @param context the {@link ZContext} to use. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} for ZeroMq socket. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port, + SocketType socketType) { + return new ZeroMqMessageHandlerSpec(context, port, socketType); + } + /** * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, * connection URL supplier and {@link SocketType}. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index b99fe26996c..393d1ca3bc4 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -52,6 +52,26 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) { this(context, () -> connectUrl); } + /** + * Create an instance based on the provided {@link ZContext}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use for creating sockets. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context) { + this(context, SocketType.PAIR); + } + + /** + * Create an instance based on the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context, int port) { + this(context, port, SocketType.PAIR); + } + /** * Create an instance based on the provided {@link ZContext} and connection string supplier. * @param context the {@link ZContext} to use for creating sockets. @@ -73,6 +93,30 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy this(context, () -> connectUrl, socketType); } + /** + * Create an instance based on the provided {@link ZContext} and {@link SocketType}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use for creating sockets. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context, SocketType socketType) { + super(new ZeroMqMessageHandler(context, socketType)); + } + + /** + * Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context, int port, SocketType socketType) { + super(new ZeroMqMessageHandler(context, port, socketType)); + } + /** * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}. * @param context the {@link ZContext} to use for creating sockets. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index d2c484c3e19..1d203315916 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -40,6 +40,7 @@ import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.integration.zeromq.ZeroMqHeaders; +import org.springframework.integration.zeromq.ZeroMqUtils; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.lang.Nullable; @@ -263,7 +264,7 @@ protected void doStart() { socket.connect(this.connectUrl); } else { - this.bindPort.set(bindSocket(socket, this.bindPort.get())); + this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get())); } }) .cache() @@ -319,17 +320,4 @@ public void destroy() { this.socketMono.doOnNext(ZMQ.Socket::close).block(); } - private static int bindSocket(ZMQ.Socket socket, int port) { - if (port == 0) { - return socket.bindToRandomPort("tcp://*"); - } - else { - boolean bound = socket.bind("tcp://*:" + port); - if (!bound) { - throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); - } - return port; - } - } - } diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 2a668148318..8074d860d4a 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -43,6 +44,8 @@ import org.springframework.integration.mapping.OutboundMessageMapper; import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; +import org.springframework.integration.zeromq.ZeroMqUtils; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.Assert; @@ -50,14 +53,14 @@ /** * The {@link AbstractReactiveMessageHandler} implementation for publishing messages over ZeroMq socket. * Only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. - * This component is only connecting (no Binding) to another side, e.g. ZeroMq proxy. + * This component can bind or connect the socket. *

* When the {@link SocketType#PUB} is used, the {@link #topicExpression} is evaluated against a * request message to inject a topic frame into a ZeroMq message if it is not {@code null}. * The subscriber side must receive the topic frame first before parsing the actual data. *

* When the payload of the request message is a {@link ZMsg}, no any conversion and topic extraction happen: - * the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing. + * the {@link ZMsg} is sent into a socket as is, and it is not destroyed for possible further reusing. * * @author Artem Bilan * @author Alessio Matricardi @@ -74,7 +77,7 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private final Scheduler publisherScheduler = Schedulers.newSingle("zeroMqMessageHandlerScheduler"); - private final Mono socketMono; + private volatile Mono socketMono; private OutboundMessageMapper messageMapper; @@ -91,6 +94,38 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private volatile boolean wrapTopic = true; + private final ZContext context; + + private final SocketType socketType; + + private final AtomicInteger bindPort = new AtomicInteger(); + + @Nullable + private Supplier connectUrl; + + /** + * Create an instance based on the provided {@link ZContext}. + * @param context the {@link ZContext} to use for creating sockets. + * @since 6.4 + */ + public ZeroMqMessageHandler(ZContext context) { + this(context, SocketType.PAIR); + } + + /** + * Create an instance based on the provided {@link ZContext} and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + */ + public ZeroMqMessageHandler(ZContext context, SocketType socketType) { + Assert.notNull(context, "'context' must not be null"); + Assert.state(VALID_SOCKET_TYPES.contains(socketType), + () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES); + this.context = context; + this.socketType = socketType; + } + /** * Create an instance based on the provided {@link ZContext} and connection string. * @param context the {@link ZContext} to use for creating sockets. @@ -100,6 +135,16 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl) { this(context, connectUrl, SocketType.PAIR); } + /** + * Create an instance based on the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @since 6.4 + */ + public ZeroMqMessageHandler(ZContext context, int port) { + this(context, port, SocketType.PAIR); + } + /** * Create an instance based on the provided {@link ZContext} and connection string supplier. * @param context the {@link ZContext} to use for creating sockets. @@ -122,6 +167,20 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock Assert.hasText(connectUrl, "'connectUrl' must not be empty"); } + /** + * Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.4 + */ + public ZeroMqMessageHandler(ZContext context, int port, SocketType socketType) { + this(context, socketType); + Assert.isTrue(port > 0, "'port' must not be zero or negative"); + this.bindPort.set(port); + } + /** * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}. * @param context the {@link ZContext} to use for creating sockets. @@ -131,17 +190,9 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock * @since 5.5.9 */ public ZeroMqMessageHandler(ZContext context, Supplier connectUrl, SocketType socketType) { - Assert.notNull(context, "'context' must not be null"); + this(context, socketType); Assert.notNull(connectUrl, "'connectUrl' must not be null"); - Assert.state(VALID_SOCKET_TYPES.contains(socketType), - () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES); - this.socketMono = - Mono.just(context.createSocket(socketType)) - .publishOn(this.publisherScheduler) - .doOnNext((socket) -> this.socketConfigurer.accept(socket)) - .doOnNext((socket) -> socket.connect(connectUrl.get())) - .cache() - .publishOn(this.publisherScheduler); + this.connectUrl = connectUrl; } /** @@ -206,6 +257,16 @@ public void wrapTopic(boolean wrapTopic) { this.wrapTopic = wrapTopic; } + /** + * Return the port a socket is bound or 0 if this message producer has not been started yet + * or the socket is connected - not bound. + * @return the port for a socket or 0. + * @since 6.4 + */ + public int getBoundPort() { + return this.bindPort.get(); + } + @Override public String getComponentType() { return "zeromq:outbound-channel-adapter"; @@ -228,6 +289,20 @@ protected void onInit() { @Override public void start() { if (!this.running.getAndSet(true)) { + this.socketMono = + Mono.just(this.context.createSocket(this.socketType)) + .publishOn(this.publisherScheduler) + .doOnNext((socket) -> this.socketConfigurer.accept(socket)) + .doOnNext((socket) -> { + if (this.connectUrl != null) { + socket.connect(this.connectUrl.get()); + } + else { + this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get())); + } + }) + .cache() + .publishOn(this.publisherScheduler); this.socketMonoSubscriber = this.socketMono.subscribe(); } } diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java index 1431047c437..79920c8b5aa 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java @@ -35,6 +35,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.util.TestSocketUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -65,12 +66,12 @@ void testMessageHandlerForPair() { messageHandler.setBeanFactory(mock(BeanFactory.class)); messageHandler.setSocketConfigurer(s -> s.setZapDomain("global")); messageHandler.afterPropertiesSet(); + messageHandler.start(); @SuppressWarnings("unchecked") Mono socketMono = TestUtils.getPropertyValue(messageHandler, "socketMono", Mono.class); ZMQ.Socket socketInUse = socketMono.block(Duration.ofSeconds(10)); assertThat(socketInUse.getZapDomain()).isEqualTo("global"); - messageHandler.start(); Message testMessage = new GenericMessage<>("test"); messageHandler.handleMessage(testMessage).subscribe(); @@ -187,4 +188,41 @@ void testMessageHandlerForPubSubDisabledWrapTopic() { subSocket.close(); } + @Test + void testMessageHandlerForPubSubWithBind() { + int boundPort = TestSocketUtils.findAvailableTcpPort(); + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(CONTEXT, boundPort, SocketType.PUB); + messageHandler.setBeanFactory(mock(BeanFactory.class)); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); + messageHandler.wrapTopic(false); + messageHandler.afterPropertiesSet(); + messageHandler.start(); + + ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB); + subSocket.setReceiveTimeOut(0); + subSocket.connect("tcp://localhost:" + boundPort); + subSocket.subscribe("test"); + + Message testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build(); + + await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> { + subSocket.subscribe("test"); + messageHandler.handleMessage(testMessage).subscribe(); + ZMsg msg = ZMsg.recvMsg(subSocket); + assertThat(msg).isNotNull(); + assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic"); + Message capturedMessage = + new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData()); + assertThat(capturedMessage).isEqualTo(testMessage); + msg.destroy(); + }); + + messageHandler.destroy(); + subSocket.close(); + } + } diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 51e10351291..9328aec8297 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -30,4 +30,9 @@ The references stay in cache because polling configuration does not allow to pro The `LobHandler` (and respective API) has been deprecated for removal in Spring Framework `6.2`. Respective option on `JdbcMessageStore` (and similar) have been deprecated as well. -The byte array handling for serialized message is fully deferred to JDBC driver. \ No newline at end of file +The byte array handling for serialized message is fully deferred to JDBC driver. + +[[x6.4-zeromq-changes]] +=== ZeroMQ Changes + +The outbound component `ZeroMqMessageHandler` (and respective API) can now bind a TCP port instead of connecting to a given URL. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/zeromq.adoc b/src/reference/antora/modules/ROOT/pages/zeromq.adoc index a0b85efdfee..8d9d8442719 100644 --- a/src/reference/antora/modules/ROOT/pages/zeromq.adoc +++ b/src/reference/antora/modules/ROOT/pages/zeromq.adoc @@ -146,7 +146,9 @@ ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel out The `ZeroMqMessageHandler` is a `ReactiveMessageHandler` implementation to produce publish messages into a ZeroMQ socket. Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported. -The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported. +This component can connect to the remote socket or bind onto TCP protocol with the provided or random port. +The actual port can be obtained via `getBoundPort()` after this component is started and ZeroMQ socket is bound. + When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. @@ -158,7 +160,7 @@ Otherwise, an `OutboundMessageMapper` is used to convert a request messa By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`. The socket options (e.g. security or write timeout) can be configured via `setSocketConfigurer(Consumer socketConfigurer)` callback. -Here is a sample of `ZeroMqMessageHandler` configuration: +Here is a sample of `ZeroMqMessageHandler` configuration which connect to a socket: [source,java] ---- @@ -173,6 +175,21 @@ ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { } ---- +Here is a sample of `ZeroMqMessageHandler` configuration which bind to a provided port: + +[source,java] +---- +@Bean +@ServiceActivator(inputChannel = "zeroMqPublisherChannel") +ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(context, 7070, SocketType.PUB); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); +} +---- + [[zeromq-dsl]] == ZeroMQ Java DSL Support