-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Description
Background
The current database adopts a multi-instance model with a checkpoint mechanism to ensure the atomicity for write-op. The Checkpoint has evolved through two versions.
-
v1: Retain all data changes of the latest block and write them atomically to the
tmp
database. When the service stops abnormally, restore the complete data of the latest block from thetmp
database. However,tmp
only retains the data changes of the latest block, so there may be data inconsistencies in the crash scenario. -
v2: Retain all data changes of the blocks within the last 2 minutes to simulate the function similar to WAL and solve the problem of inconsistent data in the crash scenario.
Now, when using the Toolkit to split a snapshot, the data in the Checkpoint and DbStore need to be merged to obtain the complete snapshot data.
When generating a snapshot with Checkpoint v2, only the data of the latest block is read. However, Checkpoint v2 is composed of multiple consecutive blocks. This behavior may miss some data.
Example: One Checkpoint whose version is v2 retains the data of three blocks, which are block1
& block2
& block3
. It is expected to obtain the block body of blocknumber = 2
which only exists in block2 (and due to a crash, DbStore did not persist it in time.). But if only the data of block3
is retrieved, the result will be null, and all blocks should be traversed in reverse order to obtain the corresponding data.
function getDataFromSourceDB(String sourceDir, String dbName, byte[] key){
step 1. get the checkpoint
step 2. try to find the specified key from the checkpoint
step 3. if nothing is found, try to read from the DbStores
}
function getCheckpointDb(String sourceDir) {
version = getCheckpointVersion()
// This action will miss some data.
if (version == 2)
// which only contains the latest block changeset
return the DBInterface
if (version == 1) { return tmp db }
}
Rationale
For security and convenience considerations, an external interface should be provided and responsible for all database query operations. It is prohibited to skip this interface to access the database.
The interface should meet the following conditions:
- Able to identify the checkpoint version of the current database, and correctly provide data query in checkpoint
- Able to merge data in the Stores and checkpoints and provide correct data
Specification
checkpointV2FlatMap
: Store the merged Checkpoint v2 datainitFlatCheckpointV2()
: InitializecheckpointV2FlatMap
getDataFromSourceDB()
: The query interface, all data is queried through this interface
Test Specification
-
Build a data set with the service stops normally (such as
kill -15
), split the data set with both the new version tool and the old one, and then compare the data consistency. -
Construct a data set with the service stops abnormally (such as
kill -9
), split the data set respectively, and then compare data consistency.
Scope Of Impact
This issue is to fix data inconsistency incurred by splitting DB with Toolkit and will not affect the fullnode.
Implementation
The changes:
- Add a new global hashMap to store the data of checkpoint v2:
checkpointV2FlatMap
- Check the current checkpoint version when the Toolkit starts
- If the version is v2, obtain the checkpoint list and merge all data into
checkpointV2FlatMap
in order which isinitFlatCheckpointV2()
. This logic needs to be placed in the first step of service startup to ensure that any subsequent read operations can get the correct data.
- If the version is v2, obtain the checkpoint list and merge all data into
- Encapsulates the query interface
getDataFromSourceDB()
, and all queries from the original database are unified through this interface
For getDataFromSourceDB()
:
- If the checkpoint version is v2: Query the data from
checkpointV2FlatMap
. If the result is empty, try to read it from the DbStore - If the checkpoint version is v1: Obtain data from the
tmp
. If the result is empty, try to read it from the DbStore
getDataFromSourceDB()
logic
public byte[] getDataFromSourceDB(String sourceDir, String dbName, byte[] key)
throws IOException, RocksDBException {
byte[] keyInCp = Bytes.concat(simpleEncode(dbName), key);
byte[] valueInCp = null;
DBInterface sourceDb = DbTool.getDB(sourceDir, dbName);
// get data from checkpoint first.
if (getCheckpointV2List(sourceDir).size() > 0) {
valueInCp = checkpointV2FlatMap.get(WrappedByteArray.of(keyInCp));
} else {
valueInCp = DbTool.getDB(sourceDir, CHECKPOINT_DB).get(keyInCp);
}
byte[] value;
if (isEmptyBytes(valueInCp)) {
value = sourceDb.get(key);
} else {
value = DBUtils.Operator.DELETE.getValue() == valueInCp[0]
? null : Arrays.copyOfRange(valueInCp, 1, valueInCp.length);
}
if (isEmptyBytes(value)) {
throw new RuntimeException(String.format("data not found in store, dbName: %s, key: %s",
dbName, Arrays.toString(key)));
}
return value;
}
Init checkpointV2FlatMap
public void initFlatCheckpointV2(String path)
throws IOException, RocksDBException {
List<String> cpList = getCheckpointV2List(path);
if (cpList.size() == 0) {
return;
}
checkpointV2FlatMap = Maps.newHashMap();
// reverse iteration
for (String cp: cpList) {
DBInterface db = DbTool.getDB(path + "/" + DBUtils.CHECKPOINT_DB_V2, cp);
DBIterator it = db.iterator();
it.seekToFirst();
while(it.hasNext()) {
checkpointV2FlatMap.put(WrappedByteArray.of(it.getKey()), it.getValue());
it.next();
}
it.close();
}
}
public void generateSnapshot(String sourceDir, String snapshotDir) {
...
snapshotDir = Paths.get(snapshotDir, SNAPSHOT_DIR_NAME).toString();
try {
initFlatCheckpointV2(sourceDir);
hasEnoughBlock(sourceDir);
}
......
}
public void generateHistory(String sourceDir, String historyDir) {
....
historyDir = Paths.get(historyDir, HISTORY_DIR_NAME).toString();
try {
if (isLite(sourceDir)) {
throw new IllegalStateException(
String.format("Unavailable sourceDir: %s is not fullNode data.", sourceDir));
}
initFlatCheckpointV2(sourceDir);
hasEnoughBlock(sourceDir);
}
.....
}
public void completeHistoryData(String historyDir, String liteDir) {
....
try {
// check historyDir is from lite data
if (isLite(historyDir)) {
throw new IllegalStateException(
String.format("Unavailable history: %s is not generated by fullNode data.",
historyDir));
}
initFlatCheckpointV2(liteDir);
....
}