-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-9228: ZeroMQ - provide binding to ZeroMqMessageHandler #9229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-9228: ZeroMQ - provide binding to ZeroMqMessageHandler #9229
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, also add your name to the @author
list of all the affected classes.
And verify changes locally like ./gradlew :spring-integration-zeromq:check
.
* Create an instance based on the provided {@link ZContext} and binding port. | ||
* @param context the {@link ZContext} to use for creating sockets. | ||
* @param port the port to bind ZeroMq socket to over TCP. | ||
* @since 6.2.6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said in the issue it is something new and will be available only starting with 6.4
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
* @param port the port to bind ZeroMq socket to over TCP. | ||
* @since 6.2.6 | ||
*/ | ||
public ZeroMqMessageHandlerSpec(ZContext context, int port) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These constructors have to be protected
and respective factories have to be introduced into org.springframework.integration.zeromq.dsl.ZeroMq
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See cecc3c5
Mono.just(this.context.createSocket(this.socketType)) | ||
.publishOn(this.publisherScheduler) | ||
.doOnNext((socket) -> this.socketConfigurer.accept(socket)) | ||
.doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The binding has to happen in the start()
.
If there are some failing tests according to a new behavior, then exactly they have to be fixed.
Probably just calling that start()
in their logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, actually we do this in the start()
:
this.socketMonoSubscriber = this.socketMono.subscribe();
So, all good, however looking into this one more time I believe that we have to always renew this socketMono
in the start()
. After the stop()
the environment might be changed, so or now way to connect or bind.
Therefore every time when we start this component we have to renew socket.
I'm also thinking about DRY principle here.
Looks like we now have two socket Mono
which has difference only in these lines:
.doOnNext((socket) -> socket.connect(connectUrl.get()))
in opposed to:
.doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get())))
Essentially if..else
according to the server/client preferences.
This could be just extracted into a private
method where we would do connect
or bind
.
And monoSocket
would be just a single instance in the start
without any duplication.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completely agree with you. I was trying to touch less code as possible, but as you said this is the best approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this new functionality has to be mention in the whats-new.adoc
. Like a new section in the end:
[[x6.4-zeromq-changes]]
=== ZeroMQ Changes
Mono.just(this.context.createSocket(this.socketType)) | ||
.publishOn(this.publisherScheduler) | ||
.doOnNext((socket) -> this.socketConfigurer.accept(socket)) | ||
.doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, actually we do this in the start()
:
this.socketMonoSubscriber = this.socketMono.subscribe();
So, all good, however looking into this one more time I believe that we have to always renew this socketMono
in the start()
. After the stop()
the environment might be changed, so or now way to connect or bind.
Therefore every time when we start this component we have to renew socket.
I'm also thinking about DRY principle here.
Looks like we now have two socket Mono
which has difference only in these lines:
.doOnNext((socket) -> socket.connect(connectUrl.get()))
in opposed to:
.doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get())))
Essentially if..else
according to the server/client preferences.
This could be just extracted into a private
method where we would do connect
or bind
.
And monoSocket
would be just a single instance in the start
without any duplication.
WDYT?
…rs and simple constructors, following the same logic used for ZeroMqMessageProvider
Addressed almost every comment. I also added a typo in |
Thank you for update! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great!
Just couple follow up suggestions.
Thank you!
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java
Outdated
Show resolved
Hide resolved
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java
Outdated
Show resolved
Hide resolved
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java
Show resolved
Hide resolved
...romq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
Outdated
Show resolved
Hide resolved
...romq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
Show resolved
Hide resolved
...romq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java
Outdated
Show resolved
Hide resolved
* @return the spec. | ||
* @since 6.4 | ||
*/ | ||
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if DSL has to expose extra factories to explicitly bind to random port.
Something similar what you have with those new ctors in the ZeroMqMessageHandler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you have missed to push the change, or just didn't implement yet.
Hence the conversion is not resolved yet 😄
Addressed your comments! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice progress!
I believe we are very close for merge.
Please, take a look to the latest review.
Honestly, nothing critical and I can address them on merge.
Just let me know your preferences!
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java
Show resolved
Hide resolved
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java
Show resolved
Hide resolved
...eromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java
Show resolved
Hide resolved
...eromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java
Show resolved
Hide resolved
* @return the spec. | ||
* @since 6.4 | ||
*/ | ||
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you have missed to push the change, or just didn't implement yet.
Hence the conversion is not resolved yet 😄
…et will be bound to a random port
62934de
to
920c127
Compare
Hi, sorry for the delay but I'm very busy with work.. hope now looks good for you :) |
thank you very much for the contribution; looking forward for more! |
Closes #9228
This PR provides the
bind
capability to the outbound channel adapterZeroMqMessageHandler
.I tried, but the current implementation instantiate the socket within the class constructor, and some old tests relies on this mechanism.