Skip to content

Commit c21c183

Browse files
authored
[ISSUE #118] Support multi-dirs storage
1 parent 5c24c84 commit c21c183

File tree

5 files changed

+221
-38
lines changed

5 files changed

+221
-38
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class DLedgerConfig {
2424

2525
public static final String MEMORY = "MEMORY";
2626
public static final String FILE = "FILE";
27+
public static final String MULTI_PATH_SPLITTER = System.getProperty("dLedger.multiPath.Splitter", ",");
2728

2829
@Parameter(names = {"--group", "-g"}, description = "Group of this server")
2930
private String group = "default";
@@ -37,6 +38,8 @@ public class DLedgerConfig {
3738
@Parameter(names = {"--store-base-dir", "-s"}, description = "The base store dir of this server")
3839
private String storeBaseDir = File.separator + "tmp" + File.separator + "dledgerstore";
3940

41+
@Parameter(names = {"--read-only-data-store-dirs"}, description = "The dirs of this server to be read only")
42+
private String readOnlyDataStoreDirs = null;
4043

4144
@Parameter(names = {"--peer-push-throttle-point"}, description = "When the follower is behind the leader more than this value, it will trigger the throttle")
4245
private int peerPushThrottlePoint = 300 * 1024 * 1024;
@@ -407,4 +410,12 @@ public long getLeadershipTransferWaitTimeout() {
407410
public void setLeadershipTransferWaitTimeout(long leadershipTransferWaitTimeout) {
408411
this.leadershipTransferWaitTimeout = leadershipTransferWaitTimeout;
409412
}
413+
414+
public String getReadOnlyDataStoreDirs() {
415+
return readOnlyDataStoreDirs;
416+
}
417+
418+
public void setReadOnlyDataStoreDirs(String readOnlyDataStoreDirs) {
419+
this.readOnlyDataStoreDirs = readOnlyDataStoreDirs;
420+
}
410421
}

src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
import java.io.File;
3131
import java.nio.ByteBuffer;
3232
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.HashSet;
3335
import java.util.List;
3436
import java.util.Properties;
37+
import java.util.Set;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
@@ -67,10 +70,17 @@ public class DLedgerMmapFileStore extends DLedgerStore {
6770
private AtomicBoolean hasLoaded = new AtomicBoolean(false);
6871
private AtomicBoolean hasRecovered = new AtomicBoolean(false);
6972

73+
private volatile Set<String> fullStorePaths = Collections.emptySet();
74+
7075
public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) {
7176
this.dLedgerConfig = dLedgerConfig;
7277
this.memberState = memberState;
73-
this.dataFileList = new MmapFileList(dLedgerConfig.getDataStorePath(), dLedgerConfig.getMappedFileSizeForEntryData());
78+
if (dLedgerConfig.getDataStorePath().contains(DLedgerConfig.MULTI_PATH_SPLITTER)) {
79+
this.dataFileList = new MultiPathMmapFileList(dLedgerConfig, dLedgerConfig.getMappedFileSizeForEntryData(),
80+
this::getFullStorePaths);
81+
} else {
82+
this.dataFileList = new MmapFileList(dLedgerConfig.getDataStorePath(), dLedgerConfig.getMappedFileSizeForEntryData());
83+
}
7484
this.indexFileList = new MmapFileList(dLedgerConfig.getIndexStorePath(), dLedgerConfig.getMappedFileSizeForEntryIndex());
7585
localEntryBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4 * 1024 * 1024));
7686
localIndexBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(INDEX_UNIT_SIZE * 2));
@@ -615,6 +625,14 @@ public MmapFileList getIndexFileList() {
615625
return indexFileList;
616626
}
617627

628+
public Set<String> getFullStorePaths() {
629+
return fullStorePaths;
630+
}
631+
632+
public void setFullStorePaths(Set<String> fullStorePaths) {
633+
this.fullStorePaths = fullStorePaths;
634+
}
635+
618636
public interface AppendHook {
619637
void doHook(DLedgerEntry entry, ByteBuffer buffer, int bodyOffset);
620638
}
@@ -656,7 +674,7 @@ public FlushDataService(String name, Logger logger) {
656674
class CleanSpaceService extends ShutdownAbleThread {
657675

658676
double storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir());
659-
double dataRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getDataStorePath());
677+
double dataRatio = calcDataStorePathPhysicRatio();
660678

661679
public CleanSpaceService(String name, Logger logger) {
662680
super(name, logger);
@@ -665,7 +683,7 @@ public CleanSpaceService(String name, Logger logger) {
665683
@Override public void doWork() {
666684
try {
667685
storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir());
668-
dataRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getDataStorePath());
686+
dataRatio = calcDataStorePathPhysicRatio();
669687
long hourOfMs = 3600L * 1000L;
670688
long fileReservedTimeMs = dLedgerConfig.getFileReservedHours() * hourOfMs;
671689
if (fileReservedTimeMs < hourOfMs) {
@@ -727,5 +745,21 @@ private boolean isNeedForbiddenWrite() {
727745
}
728746
return false;
729747
}
748+
749+
public double calcDataStorePathPhysicRatio() {
750+
Set<String> fullStorePath = new HashSet<>();
751+
String storePath = dLedgerConfig.getDataStorePath();
752+
String[] paths = storePath.trim().split(DLedgerConfig.MULTI_PATH_SPLITTER);
753+
double minPhysicRatio = 100;
754+
for (String path : paths) {
755+
double physicRatio = DLedgerUtils.isPathExists(path) ? DLedgerUtils.getDiskPartitionSpaceUsedPercent(path) : -1;
756+
minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
757+
if (physicRatio > dLedgerConfig.getDiskSpaceRatioToForceClean()) {
758+
fullStorePath.add(path);
759+
}
760+
}
761+
DLedgerMmapFileStore.this.setFullStorePaths(fullStorePath);
762+
return minPhysicRatio;
763+
}
730764
}
731765
}

src/main/java/io/openmessaging/storage/dledger/store/file/MmapFileList.java

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -282,30 +282,34 @@ public boolean load() {
282282
File dir = new File(this.storePath);
283283
File[] files = dir.listFiles();
284284
if (files != null) {
285-
// ascending order
286-
Arrays.sort(files);
287-
for (File file : files) {
285+
return doLoad(Arrays.asList(files));
286+
}
287+
return true;
288+
}
289+
290+
public boolean doLoad(List<File> files) {
291+
// ascending order
292+
files.sort(Comparator.comparing(File::getName));
293+
for (File file : files) {
288294

289-
if (file.length() != this.mappedFileSize) {
290-
logger.warn(file + "\t" + file.length()
295+
if (file.length() != this.mappedFileSize) {
296+
logger.warn(file + "\t" + file.length()
291297
+ " length not matched message store config value, please check it manually. You should delete old files before changing mapped file size");
292-
return false;
293-
}
294-
try {
295-
MmapFile mappedFile = new DefaultMmapFile(file.getPath(), mappedFileSize);
296-
297-
mappedFile.setWrotePosition(this.mappedFileSize);
298-
mappedFile.setFlushedPosition(this.mappedFileSize);
299-
mappedFile.setCommittedPosition(this.mappedFileSize);
300-
this.mappedFiles.add(mappedFile);
301-
logger.info("load " + file.getPath() + " OK");
302-
} catch (IOException e) {
303-
logger.error("load file " + file + " error", e);
304-
return false;
305-
}
298+
return false;
306299
}
307-
}
300+
try {
301+
MmapFile mappedFile = new DefaultMmapFile(file.getPath(), mappedFileSize);
308302

303+
mappedFile.setWrotePosition(this.mappedFileSize);
304+
mappedFile.setFlushedPosition(this.mappedFileSize);
305+
mappedFile.setCommittedPosition(this.mappedFileSize);
306+
this.mappedFiles.add(mappedFile);
307+
logger.info("load " + file.getPath() + " OK");
308+
} catch (IOException e) {
309+
logger.error("load file " + file + " error", e);
310+
return false;
311+
}
312+
}
309313
return true;
310314
}
311315

@@ -320,25 +324,33 @@ public MmapFile getLastMappedFile(final long startOffset, boolean needCreate) {
320324
}
321325

322326
if (createOffset != -1 && needCreate) {
323-
String nextFilePath = this.storePath + File.separator + DLedgerUtils.offset2FileName(createOffset);
324-
MmapFile mappedFile = null;
325-
try {
326-
mappedFile = new DefaultMmapFile(nextFilePath, this.mappedFileSize);
327-
} catch (IOException e) {
328-
logger.error("create mappedFile exception", e);
329-
}
327+
return tryCreateMappedFile(createOffset);
328+
}
330329

331-
if (mappedFile != null) {
332-
if (this.mappedFiles.isEmpty()) {
333-
mappedFile.setFirstCreateInQueue(true);
334-
}
335-
this.mappedFiles.add(mappedFile);
336-
}
330+
return mappedFileLast;
331+
}
332+
333+
protected MmapFile tryCreateMappedFile(long createOffset) {
334+
String nextFilePath = this.storePath + File.separator + DLedgerUtils.offset2FileName(createOffset);
335+
return doCreateMappedFile(nextFilePath);
336+
}
337337

338-
return mappedFile;
338+
protected MmapFile doCreateMappedFile(String nextFilePath) {
339+
MmapFile mappedFile = null;
340+
try {
341+
mappedFile = new DefaultMmapFile(nextFilePath, this.mappedFileSize);
342+
} catch (IOException e) {
343+
logger.error("create mappedFile exception", e);
339344
}
340345

341-
return mappedFileLast;
346+
if (mappedFile != null) {
347+
if (this.mappedFiles.isEmpty()) {
348+
mappedFile.setFirstCreateInQueue(true);
349+
}
350+
this.mappedFiles.add(mappedFile);
351+
}
352+
353+
return mappedFile;
342354
}
343355

344356
public MmapFile getLastMappedFile(final long startOffset) {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.store.file;
18+
19+
import io.netty.util.internal.StringUtil;
20+
import io.openmessaging.storage.dledger.DLedgerConfig;
21+
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
22+
23+
import java.io.File;
24+
import java.util.ArrayList;
25+
import java.util.Arrays;
26+
import java.util.Collections;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Set;
30+
import java.util.function.Supplier;
31+
32+
public class MultiPathMmapFileList extends MmapFileList {
33+
34+
private final Supplier<Set<String>> fullStorePathsSupplier;
35+
private final DLedgerConfig config;
36+
37+
public MultiPathMmapFileList(DLedgerConfig config, int mappedFileSize, Supplier<Set<String>> fullStorePathsSupplier) {
38+
super(config.getDataStorePath(), mappedFileSize);
39+
this.config = config;
40+
this.fullStorePathsSupplier = fullStorePathsSupplier;
41+
}
42+
43+
private Set<String> getPaths() {
44+
String[] paths = this.config.getDataStorePath().trim().split(DLedgerConfig.MULTI_PATH_SPLITTER);
45+
return new HashSet<>(Arrays.asList(paths));
46+
}
47+
48+
private Set<String> getReadonlyPaths() {
49+
String pathStr = config.getReadOnlyDataStoreDirs();
50+
if (StringUtil.isNullOrEmpty(pathStr)) {
51+
return Collections.emptySet();
52+
}
53+
String[] paths = pathStr.trim().split(DLedgerConfig.MULTI_PATH_SPLITTER);
54+
return new HashSet<>(Arrays.asList(paths));
55+
}
56+
57+
@Override
58+
public boolean load() {
59+
Set<String> storePathSet = getPaths();
60+
storePathSet.addAll(getReadonlyPaths());
61+
62+
List<File> files = new ArrayList<>();
63+
for (String path : storePathSet) {
64+
File dir = new File(path);
65+
File[] ls = dir.listFiles();
66+
if (ls != null) {
67+
Collections.addAll(files, ls);
68+
}
69+
}
70+
71+
return doLoad(files);
72+
}
73+
74+
@Override
75+
protected MmapFile tryCreateMappedFile(long createOffset) {
76+
long fileIdx = createOffset / this.getMappedFileSize();
77+
Set<String> storePath = getPaths();
78+
Set<String> readonlyPathSet = getReadonlyPaths();
79+
Set<String> fullStorePaths =
80+
fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get();
81+
82+
83+
HashSet<String> availableStorePath = new HashSet<>(storePath);
84+
//do not create file in readonly store path.
85+
availableStorePath.removeAll(readonlyPathSet);
86+
87+
//do not create file is space is nearly full.
88+
availableStorePath.removeAll(fullStorePaths);
89+
90+
//if no store path left, fall back to writable store path.
91+
if (availableStorePath.isEmpty()) {
92+
availableStorePath = new HashSet<>(storePath);
93+
availableStorePath.removeAll(readonlyPathSet);
94+
}
95+
96+
String[] paths = availableStorePath.toArray(new String[]{});
97+
Arrays.sort(paths);
98+
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
99+
+ DLedgerUtils.offset2FileName(createOffset);
100+
return doCreateMappedFile(nextFilePath);
101+
}
102+
103+
@Override
104+
public void destroy() {
105+
for (MmapFile mf : this.getMappedFiles()) {
106+
mf.destroy(1000 * 3);
107+
}
108+
this.getMappedFiles().clear();
109+
this.setFlushedWhere(0);
110+
111+
Set<String> storePathSet = getPaths();
112+
storePathSet.addAll(getReadonlyPaths());
113+
114+
for (String path : storePathSet) {
115+
File file = new File(path);
116+
if (file.isDirectory()) {
117+
file.delete();
118+
}
119+
}
120+
}
121+
}

src/main/java/io/openmessaging/storage/dledger/utils/DLedgerUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,9 @@ public static double getDiskPartitionSpaceUsedPercent(final String path) {
9090
}
9191
return -1;
9292
}
93+
94+
public static boolean isPathExists(final String path) {
95+
File file = new File(path);
96+
return file.exists();
97+
}
9398
}

0 commit comments

Comments
 (0)