Skip to content

Commit ac12deb

Browse files
authored
Support multi-raft (#150)
* feat(support Multi-DLedger): support Multi-DLedger 1.add DLedgerProxy to be the container of DLedgerServer. 2.add ConfigManager to manage the config about DLedgerProxy and DLedgerServer. 3.add DLedgerManager to manage the DLedgerServers. 4.support configuration file startup. 5.start parameters are compatible with the old version. 6.refactor the tests. * test(fix some tests for new model): fix some tests for new model 1.fix some tests for new model * style(multi-raft): code format 1.code format * refactor(multi-raft): refactor the code and test 1.refactor the code and test * test(multi-raft): fix some test and add a new test 1.fix some test and add a new test * style(multi-raft): delete the print 1.delete the print * fix(multi-raft): fix some bugs and add some tests about leaderElector 1.fix some bugs and add some tests about leaderElector * test(multi-raft): add more tests about leaderElector 1.add more tests about leaderElector * feat(multi-raft): update conflict code 1.update conflict code * refactor(multi-raft): Refactor the startup args 1.add startup args "server -c" to start with config file. 2.remove DLedgerRpcNettyServer#memstate. 3.check config before initialize DLedgerRpcNettyService. * refactor(multi-raft): refactor some code 1.remove DLedgerRpcNettyService#checkOnePort 2.remove printStack * fix(multi-raft): fix some code 1.use "groupId#selfId" to identify different DLedgerServer. * style(multi-raft): remove unused imports 1.remove unused imports * feat(support Multi-DLedger): make ConfigManager more flexible 1. make ConfigManager more flexible 2. refactor structure about multi-raft 3. more friendly for updating version * fix(support Multi-DLedger): fix incompatible code after merge master branch 1. fix incompatible code after merge master branch * refactor(main): refactor the main method in DLedger 1. refactor the main method in DLedger Co-authored-by: TheR1sing3un <[email protected]>
1 parent 2214e5c commit ac12deb

37 files changed

+2229
-273
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@
9898
<version>${mockito.version}</version>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>org.yaml</groupId>
103+
<artifactId>snakeyaml</artifactId>
104+
<version>1.30</version>
105+
</dependency>
101106

102107
</dependencies>
103108

src/main/java/io/openmessaging/storage/dledger/DLedger.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,47 @@
1818

1919
import com.alibaba.fastjson.JSON;
2020
import com.beust.jcommander.JCommander;
21+
import io.openmessaging.storage.dledger.cmdline.ConfigCommand;
22+
import io.openmessaging.storage.dledger.dledger.DLedgerProxy;
23+
import io.openmessaging.storage.dledger.dledger.DLedgerProxyConfig;
24+
import io.openmessaging.storage.dledger.utils.ConfigUtils;
25+
import java.util.Collections;
26+
import java.util.LinkedList;
27+
import java.util.List;
2128
import org.slf4j.Logger;
2229
import org.slf4j.LoggerFactory;
2330

2431
public class DLedger {
2532

2633
private static Logger logger = LoggerFactory.getLogger(DLedger.class);
27-
28-
@Deprecated
29-
public static void main(String args[]) {
30-
DLedgerConfig dLedgerConfig = new DLedgerConfig();
31-
JCommander.newBuilder().addObject(dLedgerConfig).build().parse(args);
32-
bootstrapDLedger(dLedgerConfig);
33-
}
34-
35-
public static void bootstrapDLedger(DLedgerConfig dLedgerConfig) {
3634

37-
if (null == dLedgerConfig) {
38-
logger.error("Bootstrap DLedger server error", new IllegalArgumentException("DLedgerConfig is null"));
39-
System.exit(-1);
35+
public static void main(String[] args) {
36+
List<DLedgerConfig> dLedgerConfigs = new LinkedList<>();
37+
if ("--config".equals(args[0]) || "-c".equals(args[0])) {
38+
ConfigCommand configCommand = new ConfigCommand();
39+
JCommander.newBuilder().addObject(configCommand).build().parse(args);
40+
try {
41+
DLedgerProxyConfig dLedgerProxyConfig = ConfigUtils.parseDLedgerProxyConfig(configCommand.getConfigPath());
42+
dLedgerConfigs.addAll(dLedgerProxyConfig.getConfigs());
43+
} catch (Exception e) {
44+
logger.error("Create DLedgerProxyConfig error", e);
45+
System.exit(-1);
46+
}
47+
} else {
48+
DLedgerConfig dLedgerConfig = new DLedgerConfig();
49+
JCommander.newBuilder().addObject(dLedgerConfig).build().parse(args);
50+
dLedgerConfigs.add(dLedgerConfig);
4051
}
52+
bootstrapDLedger(dLedgerConfigs);
53+
}
4154

42-
DLedgerServer dLedgerServer = new DLedgerServer(dLedgerConfig);
43-
dLedgerServer.startup();
44-
logger.info("[{}] group {} start ok with config {}", dLedgerConfig.getSelfId(), dLedgerConfig.getGroup(), JSON.toJSONString(dLedgerConfig));
55+
public static void bootstrapDLedger(List<DLedgerConfig> dLedgerConfigs) {
56+
if (dLedgerConfigs == null || dLedgerConfigs.isEmpty()) {
57+
logger.error("Bootstrap DLedger server error", new IllegalArgumentException("DLedgerConfigs is null or empty"));
58+
}
59+
DLedgerProxy dLedgerProxy = new DLedgerProxy(dLedgerConfigs);
60+
dLedgerProxy.startup();
61+
logger.info("DLedgers start ok with config {}", JSON.toJSONString(dLedgerConfigs));
4562
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
4663
private volatile boolean hasShutdown = false;
4764

@@ -52,12 +69,16 @@ public void run() {
5269
if (!this.hasShutdown) {
5370
this.hasShutdown = true;
5471
long beginTime = System.currentTimeMillis();
55-
dLedgerServer.shutdown();
72+
dLedgerProxy.shutdown();
5673
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
5774
logger.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
5875
}
5976
}
6077
}
6178
}, "ShutdownHook"));
6279
}
80+
81+
public static void bootstrapDLedger(DLedgerConfig dLedgerConfig) {
82+
bootstrapDLedger(Collections.singletonList(dLedgerConfig));
83+
}
6384
}

src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,22 @@
1818

1919
import com.beust.jcommander.Parameter;
2020
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
21+
22+
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
2123
import java.io.File;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2227

2328
public class DLedgerConfig {
2429

2530
public static final String MEMORY = "MEMORY";
2631
public static final String FILE = "FILE";
2732
public static final String MULTI_PATH_SPLITTER = System.getProperty("dLedger.multiPath.Splitter", ",");
2833

34+
@Parameter(names = {"--config", "-c"}, description = "Config path of DLedger")
35+
private String configFilePath;
36+
2937
@Parameter(names = {"--group", "-g"}, description = "Group of this server")
3038
private String group = "default";
3139

@@ -85,8 +93,8 @@ public class DLedgerConfig {
8593
@Parameter(names = {"--preferred-leader-id"}, description = "Preferred LeaderId")
8694
private String preferredLeaderIds;
8795
private long maxLeadershipTransferWaitIndex = 1000;
88-
private int minTakeLeadershipVoteIntervalMs = 30;
89-
private int maxTakeLeadershipVoteIntervalMs = 100;
96+
private int minTakeLeadershipVoteIntervalMs = 30;
97+
private int maxTakeLeadershipVoteIntervalMs = 100;
9098

9199
private boolean isEnableBatchPush = false;
92100
private int maxBatchPushSize = 4 * 1024;
@@ -418,4 +426,50 @@ public String getReadOnlyDataStoreDirs() {
418426
public void setReadOnlyDataStoreDirs(String readOnlyDataStoreDirs) {
419427
this.readOnlyDataStoreDirs = readOnlyDataStoreDirs;
420428
}
429+
430+
private String selfAddress;
431+
432+
// groupId#selfIf -> address
433+
private Map<String, String> peerAddressMap;
434+
435+
private final AtomicBoolean inited = new AtomicBoolean(false);
436+
437+
public void init() {
438+
if (inited.compareAndSet(false, true)) {
439+
initSelfAddress();
440+
initPeerAddressMap();
441+
}
442+
}
443+
444+
private void initSelfAddress() {
445+
for (String peerInfo : this.peers.split(";")) {
446+
String peerSelfId = peerInfo.split("-")[0];
447+
String peerAddress = peerInfo.substring(peerSelfId.length() + 1);
448+
if (this.selfId.equals(peerSelfId)) {
449+
this.selfAddress = peerAddress;
450+
return;
451+
}
452+
}
453+
// can't find itself
454+
throw new IllegalArgumentException("[DLedgerConfig] fail to init self address, config: " + this);
455+
}
456+
457+
private void initPeerAddressMap() {
458+
Map<String, String> peerMap = new HashMap<>();
459+
for (String peerInfo : this.peers.split(";")) {
460+
String peerSelfId = peerInfo.split("-")[0];
461+
String peerAddress = peerInfo.substring(peerSelfId.length() + 1);
462+
peerMap.put(DLedgerUtils.generateDLedgerId(this.group, peerSelfId), peerAddress);
463+
}
464+
this.peerAddressMap = peerMap;
465+
}
466+
467+
public String getSelfAddress() {
468+
return this.selfAddress;
469+
}
470+
471+
472+
public Map<String, String> getPeerAddressMap() {
473+
return this.peerAddressMap;
474+
}
421475
}

0 commit comments

Comments
 (0)