File tree Expand file tree Collapse file tree 1 file changed +5
-7
lines changed
spring-integration-core/src/main/java/org/springframework/integration/channel Expand file tree Collapse file tree 1 file changed +5
-7
lines changed Original file line number Diff line number Diff line change 16
16
17
17
package org .springframework .integration .channel ;
18
18
19
- import java .util .concurrent .atomic .AtomicBoolean ;
20
-
21
19
import org .reactivestreams .Publisher ;
22
20
import org .reactivestreams .Subscriber ;
23
21
@@ -47,29 +45,29 @@ public class FluxMessageChannel extends AbstractMessageChannel
47
45
48
46
private final FluxSink <Message <?>> sink ;
49
47
50
- private final AtomicBoolean subscribed = new AtomicBoolean ();
51
-
52
48
private final ReplayProcessor <Boolean > subscribedSignal = ReplayProcessor .create (1 );
53
49
54
50
public FluxMessageChannel () {
55
51
this .processor = EmitterProcessor .create (1 , false );
56
52
this .sink = this .processor .sink (FluxSink .OverflowStrategy .BUFFER );
57
- this .subscribedSignal .subscribe (this .subscribed ::set );
58
53
}
59
54
60
55
@ Override
61
56
protected boolean doSend (Message <?> message , long timeout ) {
62
- Assert .state (this .subscribed . get (),
57
+ Assert .state (this .processor . hasDownstreams (),
63
58
() -> "The [" + this + "] doesn't have subscribers to accept messages" );
64
59
this .sink .next (message );
65
60
return true ;
66
61
}
67
62
68
63
@ Override
69
64
public void subscribe (Subscriber <? super Message <?>> subscriber ) {
70
- this .processor . doOnSubscribe (( s ) -> this . subscribedSignal . onNext ( true ))
65
+ this .processor
71
66
.doFinally ((s ) -> this .subscribedSignal .onNext (this .processor .hasDownstreams ()))
72
67
.subscribe (subscriber );
68
+ if (this .processor .hasDownstreams ()) {
69
+ this .subscribedSignal .onNext (true );
70
+ }
73
71
}
74
72
75
73
@ Override
You can’t perform that action at this time.
0 commit comments