Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>

</dependencies>

Expand Down
53 changes: 37 additions & 16 deletions src/main/java/io/openmessaging/storage/dledger/DLedger.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,47 @@

import com.alibaba.fastjson.JSON;
import com.beust.jcommander.JCommander;
import io.openmessaging.storage.dledger.cmdline.ConfigCommand;
import io.openmessaging.storage.dledger.dledger.DLedgerProxy;
import io.openmessaging.storage.dledger.dledger.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.utils.ConfigUtils;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DLedger {

private static Logger logger = LoggerFactory.getLogger(DLedger.class);

@Deprecated
public static void main(String args[]) {
DLedgerConfig dLedgerConfig = new DLedgerConfig();
JCommander.newBuilder().addObject(dLedgerConfig).build().parse(args);
bootstrapDLedger(dLedgerConfig);
}

public static void bootstrapDLedger(DLedgerConfig dLedgerConfig) {

if (null == dLedgerConfig) {
logger.error("Bootstrap DLedger server error", new IllegalArgumentException("DLedgerConfig is null"));
System.exit(-1);
public static void main(String[] args) {
List<DLedgerConfig> dLedgerConfigs = new LinkedList<>();
if ("--config".equals(args[0]) || "-c".equals(args[0])) {
ConfigCommand configCommand = new ConfigCommand();
JCommander.newBuilder().addObject(configCommand).build().parse(args);
try {
DLedgerProxyConfig dLedgerProxyConfig = ConfigUtils.parseDLedgerProxyConfig(configCommand.getConfigPath());
dLedgerConfigs.addAll(dLedgerProxyConfig.getConfigs());
} catch (Exception e) {
logger.error("Create DLedgerProxyConfig error", e);
System.exit(-1);
}
} else {
DLedgerConfig dLedgerConfig = new DLedgerConfig();
JCommander.newBuilder().addObject(dLedgerConfig).build().parse(args);
dLedgerConfigs.add(dLedgerConfig);
}
bootstrapDLedger(dLedgerConfigs);
}

DLedgerServer dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerServer.startup();
logger.info("[{}] group {} start ok with config {}", dLedgerConfig.getSelfId(), dLedgerConfig.getGroup(), JSON.toJSONString(dLedgerConfig));
public static void bootstrapDLedger(List<DLedgerConfig> dLedgerConfigs) {
if (dLedgerConfigs == null || dLedgerConfigs.isEmpty()) {
logger.error("Bootstrap DLedger server error", new IllegalArgumentException("DLedgerConfigs is null or empty"));
}
DLedgerProxy dLedgerProxy = new DLedgerProxy(dLedgerConfigs);
dLedgerProxy.startup();
logger.info("DLedgers start ok with config {}", JSON.toJSONString(dLedgerConfigs));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;

Expand All @@ -52,12 +69,16 @@ public void run() {
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
dLedgerServer.shutdown();
dLedgerProxy.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
logger.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
}

public static void bootstrapDLedger(DLedgerConfig dLedgerConfig) {
bootstrapDLedger(Collections.singletonList(dLedgerConfig));
}
}
58 changes: 56 additions & 2 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@

import com.beust.jcommander.Parameter;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;

import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public class DLedgerConfig {

public static final String MEMORY = "MEMORY";
public static final String FILE = "FILE";
public static final String MULTI_PATH_SPLITTER = System.getProperty("dLedger.multiPath.Splitter", ",");

@Parameter(names = {"--config", "-c"}, description = "Config path of DLedger")
private String configFilePath;

@Parameter(names = {"--group", "-g"}, description = "Group of this server")
private String group = "default";

Expand Down Expand Up @@ -85,8 +93,8 @@ public class DLedgerConfig {
@Parameter(names = {"--preferred-leader-id"}, description = "Preferred LeaderId")
private String preferredLeaderIds;
private long maxLeadershipTransferWaitIndex = 1000;
private int minTakeLeadershipVoteIntervalMs = 30;
private int maxTakeLeadershipVoteIntervalMs = 100;
private int minTakeLeadershipVoteIntervalMs = 30;
private int maxTakeLeadershipVoteIntervalMs = 100;

private boolean isEnableBatchPush = false;
private int maxBatchPushSize = 4 * 1024;
Expand Down Expand Up @@ -418,4 +426,50 @@ public String getReadOnlyDataStoreDirs() {
public void setReadOnlyDataStoreDirs(String readOnlyDataStoreDirs) {
this.readOnlyDataStoreDirs = readOnlyDataStoreDirs;
}

private String selfAddress;

// groupId#selfIf -> address
private Map<String, String> peerAddressMap;

private final AtomicBoolean inited = new AtomicBoolean(false);

public void init() {
if (inited.compareAndSet(false, true)) {
initSelfAddress();
initPeerAddressMap();
}
}

private void initSelfAddress() {
for (String peerInfo : this.peers.split(";")) {
String peerSelfId = peerInfo.split("-")[0];
String peerAddress = peerInfo.substring(peerSelfId.length() + 1);
if (this.selfId.equals(peerSelfId)) {
this.selfAddress = peerAddress;
return;
}
}
// can't find itself
throw new IllegalArgumentException("[DLedgerConfig] fail to init self address, config: " + this);
}

private void initPeerAddressMap() {
Map<String, String> peerMap = new HashMap<>();
for (String peerInfo : this.peers.split(";")) {
String peerSelfId = peerInfo.split("-")[0];
String peerAddress = peerInfo.substring(peerSelfId.length() + 1);
peerMap.put(DLedgerUtils.generateDLedgerId(this.group, peerSelfId), peerAddress);
}
this.peerAddressMap = peerMap;
}

public String getSelfAddress() {
return this.selfAddress;
}


public Map<String, String> getPeerAddressMap() {
return this.peerAddressMap;
}
}
Loading