41
41
import io .openmessaging .storage .dledger .protocol .VoteResponse ;
42
42
import io .openmessaging .storage .dledger .utils .DLedgerUtils ;
43
43
44
- import java .util .concurrent .*;
44
+ import java .util .concurrent .CompletableFuture ;
45
+ import java .util .concurrent .ExecutorService ;
46
+ import java .util .concurrent .Executors ;
47
+ import java .util .concurrent .ThreadFactory ;
45
48
import java .util .concurrent .atomic .AtomicInteger ;
46
49
50
+ import org .apache .rocketmq .remoting .ChannelEventListener ;
47
51
import org .apache .rocketmq .remoting .netty .NettyClientConfig ;
48
52
import org .apache .rocketmq .remoting .netty .NettyRemotingClient ;
49
53
import org .apache .rocketmq .remoting .netty .NettyRemotingServer ;
@@ -65,13 +69,9 @@ public class DLedgerRpcNettyService extends DLedgerRpcService {
65
69
private NettyRemotingServer remotingServer ;
66
70
private NettyRemotingClient remotingClient ;
67
71
68
- // selfId -> remotingClient
69
- private ConcurrentHashMap <String , NettyRemotingClient > clientMap ;
70
-
71
72
private DLedgerProxy dLedgerProxy ;
72
73
private MemberState memberState ;
73
74
74
-
75
75
private ExecutorService futureExecutor = Executors .newFixedThreadPool (4 , new ThreadFactory () {
76
76
private AtomicInteger threadIndex = new AtomicInteger (0 );
77
77
@@ -100,8 +100,15 @@ public Thread newThread(Runnable r) {
100
100
});
101
101
102
102
public DLedgerRpcNettyService (DLedgerProxy dLedgerProxy ) {
103
+ this (dLedgerProxy , null , null , null );
104
+ }
105
+
106
+ public DLedgerRpcNettyService (DLedgerProxy dLedgerProxy , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig ) {
107
+ this (dLedgerProxy , nettyServerConfig , nettyClientConfig , null );
108
+ }
109
+
110
+ public DLedgerRpcNettyService (DLedgerProxy dLedgerProxy , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig , ChannelEventListener channelEventListener ) {
103
111
this .dLedgerProxy = dLedgerProxy ;
104
- this .clientMap = new ConcurrentHashMap <>();
105
112
DLedgerProxyConfig dLedgerProxyConfig = this .dLedgerProxy .getConfigManager ().getdLedgerProxyConfig ();
106
113
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor () {
107
114
@ Override
@@ -114,20 +121,25 @@ public boolean rejectRequest() {
114
121
return false ;
115
122
}
116
123
};
124
+
117
125
//register remoting server(We will only listen to one port. Limit in the configuration file)
118
126
//check if the config has more than one port to bind
119
127
if (!checkOnePort (dLedgerProxyConfig )) {
120
128
logger .error ("Bind the port error, because of more than one port in config" );
121
129
throw new RuntimeException ("Bind the port error, because of more than one port in config" );
122
130
}
123
131
DLedgerConfig dLedgerConfig = dLedgerProxyConfig .getConfigs ().get (0 );
124
- NettyRemotingServer nettyRemotingServer = registerRemotingServer (dLedgerConfig .getSelfAddress (), protocolProcessor );
125
- this .remotingServer = nettyRemotingServer ;
132
+ String address = dLedgerConfig .getSelfAddress ();
133
+ if (nettyServerConfig == null ) {
134
+ nettyServerConfig = new NettyServerConfig ();
135
+ }
136
+ nettyServerConfig .setListenPort (Integer .valueOf (address .split (":" )[1 ]));
137
+ this .remotingServer = registerRemotingServer (nettyServerConfig , channelEventListener , protocolProcessor );
126
138
//start the remoting client
127
- for ( DLedgerConfig config : dLedgerProxyConfig . getConfigs () ) {
128
- this . clientMap . put ( config . getSelfId (), new NettyRemotingClient ( new NettyClientConfig (), null ) );
139
+ if ( nettyClientConfig == null ) {
140
+ nettyClientConfig = new NettyClientConfig ();
129
141
}
130
- this .remotingClient = new NettyRemotingClient (new NettyClientConfig () , null );
142
+ this .remotingClient = new NettyRemotingClient (nettyClientConfig , null );
131
143
}
132
144
133
145
private boolean checkOnePort (DLedgerProxyConfig dLedgerProxyConfig ) {
@@ -156,10 +168,8 @@ private void registerProcessor(NettyRemotingServer remotingServer, NettyRequestP
156
168
remotingServer .registerProcessor (DLedgerRequestCode .LEADERSHIP_TRANSFER .getCode (), protocolProcessor , null );
157
169
}
158
170
159
- private NettyRemotingServer registerRemotingServer (String address , NettyRequestProcessor protocolProcessor ) {
160
- NettyServerConfig nettyServerConfig = new NettyServerConfig ();
161
- nettyServerConfig .setListenPort (Integer .valueOf (address .split (":" )[1 ]));
162
- NettyRemotingServer remotingServer = new NettyRemotingServer (nettyServerConfig , null );
171
+ private NettyRemotingServer registerRemotingServer (NettyServerConfig nettyServerConfig , ChannelEventListener channelEventListener , NettyRequestProcessor protocolProcessor ) {
172
+ NettyRemotingServer remotingServer = new NettyRemotingServer (nettyServerConfig , channelEventListener );
163
173
registerProcessor (remotingServer , protocolProcessor );
164
174
return remotingServer ;
165
175
}
@@ -514,4 +524,8 @@ public DLedgerProxy getdLedgerProxy() {
514
524
public void setdLedgerProxy (DLedgerProxy dLedgerProxy ) {
515
525
this .dLedgerProxy = dLedgerProxy ;
516
526
}
527
+
528
+ public NettyRemotingServer getRemotingServer () {
529
+ return remotingServer ;
530
+ }
517
531
}
0 commit comments