Skip to content

Commit c6b3780

Browse files
committed
WIP
1 parent 7717f9f commit c6b3780

File tree

4 files changed

+57
-103
lines changed

4 files changed

+57
-103
lines changed

src/DB.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class DB {
3838
dbPath,
3939
crypto,
4040
fs = require('fs'),
41+
deadlock = false,
4142
logger = new Logger(this.name),
4243
fresh = false,
4344
...dbOptions
@@ -49,12 +50,14 @@ class DB {
4950
};
5051
fs?: FileSystem;
5152
logger?: Logger;
53+
deadlock?: boolean;
5254
fresh?: boolean;
5355
} & DBOptions): Promise<DB> {
5456
logger.info(`Creating ${this.name}`);
5557
const db = new this({
5658
dbPath,
5759
fs,
60+
deadlock,
5861
logger,
5962
});
6063
await db.start({
@@ -75,6 +78,7 @@ class DB {
7578
protected logger: Logger;
7679
protected workerManager?: DBWorkerManagerInterface;
7780
protected _lockBox: LockBox<RWLockWriter> = new LockBox();
81+
protected _locksPending?: Map<string, { count: number}>;
7882
protected _db: RocksDBDatabase;
7983
/**
8084
* References to iterators
@@ -110,15 +114,20 @@ class DB {
110114
constructor({
111115
dbPath,
112116
fs,
117+
deadlock,
113118
logger,
114119
}: {
115120
dbPath: string;
116121
fs: FileSystem;
122+
deadlock: boolean;
117123
logger: Logger;
118124
}) {
119125
this.logger = logger;
120126
this.dbPath = dbPath;
121127
this.fs = fs;
128+
if (deadlock) {
129+
this._locksPending = new Map();
130+
}
122131
}
123132

124133
public async start({
@@ -213,6 +222,7 @@ class DB {
213222
const tran = new DBTransaction({
214223
db: this,
215224
lockBox: this._lockBox,
225+
locksPending: this._locksPending,
216226
logger: this.logger,
217227
});
218228
return [

src/DBTransaction.ts

Lines changed: 32 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
import type { ResourceRelease } from '@matrixai/resources';
2-
import type {
3-
LockBox,
4-
MultiLockRequest as AsyncLocksMultiLockRequest,
5-
} from '@matrixai/async-locks';
2+
import type { LockBox } from '@matrixai/async-locks';
63
import type DB from './DB';
74
import type {
8-
ToString,
95
KeyPath,
106
LevelPath,
117
DBIteratorOptions,
128
DBClearOptions,
139
DBCountOptions,
14-
MultiLockRequest,
1510
} from './types';
1611
import type {
1712
RocksDBTransaction,
@@ -20,7 +15,12 @@ import type {
2015
} from './native/types';
2116
import Logger from '@matrixai/logger';
2217
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
23-
import { Lock, RWLockWriter } from '@matrixai/async-locks';
18+
import {
19+
Monitor,
20+
Lock,
21+
RWLockWriter,
22+
errors as asyncLocksErrors
23+
} from '@matrixai/async-locks';
2424
import DBIterator from './DBIterator';
2525
import { rocksdbP } from './native';
2626
import * as utils from './utils';
@@ -33,15 +33,7 @@ class DBTransaction {
3333

3434
protected _db: DB;
3535
protected logger: Logger;
36-
protected lockBox: LockBox<RWLockWriter>;
37-
protected _locks: Map<
38-
string,
39-
{
40-
lock: RWLockWriter;
41-
type: 'read' | 'write';
42-
release: ResourceRelease;
43-
}
44-
> = new Map();
36+
protected monitor: Monitor<RWLockWriter>;
4537
protected _options: RocksDBTransactionOptions;
4638
protected _transaction: RocksDBTransaction;
4739
protected _snapshot: RocksDBTransactionSnapshot;
@@ -58,18 +50,25 @@ class DBTransaction {
5850
public constructor({
5951
db,
6052
lockBox,
53+
locksPending,
6154
logger,
6255
...options
6356
}: {
6457
db: DB;
6558
lockBox: LockBox<RWLockWriter>;
59+
locksPending?: Map<string, { count: number }>;
6660
logger?: Logger;
6761
} & RocksDBTransactionOptions) {
6862
logger = logger ?? new Logger(this.constructor.name);
6963
logger.debug(`Constructing ${this.constructor.name}`);
7064
this.logger = logger;
7165
this._db = db;
72-
this.lockBox = lockBox;
66+
// This creates the monitor that I care about
67+
this.monitor = new Monitor(
68+
lockBox,
69+
RWLockWriter,
70+
locksPending
71+
);
7372
const options_ = {
7473
...options,
7574
// Transactions should be synchronous
@@ -96,9 +95,7 @@ class DBTransaction {
9695
// this then allows the destruction to proceed
9796
await this.commitOrRollbackLock.waitForUnlock();
9897
this._db.transactionRefs.delete(this);
99-
// Unlock all locked keys in reverse
100-
const lockedKeys = [...this._locks.keys()].reverse();
101-
await this.unlock(...lockedKeys);
98+
await this.monitor.unlockAll();
10299
this.logger.debug(`Destroyed ${this.constructor.name} ${this.id}`);
103100
}
104101

@@ -150,15 +147,8 @@ class DBTransaction {
150147
return this._rollbacked;
151148
}
152149

153-
get locks(): ReadonlyMap<
154-
string,
155-
{
156-
lock: RWLockWriter;
157-
type: 'read' | 'write';
158-
release: ResourceRelease;
159-
}
160-
> {
161-
return this._locks;
150+
get locks(): Monitor<RWLockWriter>['locks'] {
151+
return this.monitor.locks;
162152
}
163153

164154
/**
@@ -168,78 +158,26 @@ class DBTransaction {
168158
return this._iteratorRefs;
169159
}
170160

171-
/**
172-
* Lock a sequence of lock requests
173-
* If the lock request doesn't specify, it
174-
* defaults to using `RWLockWriter` with `write` type
175-
* Keys are locked in string sorted order
176-
* Even though keys can be arbitrary strings, by convention, you should use
177-
* keys that correspond to keys in the database
178-
* Locking with the same key is idempotent therefore lock re-entrancy is enabled
179-
* Keys are automatically unlocked in reverse sorted order
180-
* when the transaction is destroyed
181-
* There is no support for lock upgrading or downgrading
182-
* There is no deadlock detection
183-
*/
184161
public async lock(
185-
...requests: Array<MultiLockRequest | ToString>
162+
...params: Parameters<Monitor<RWLockWriter>['lock']>
186163
): Promise<void> {
187-
const requests_: Array<AsyncLocksMultiLockRequest<RWLockWriter>> = [];
188-
for (const request of requests) {
189-
if (Array.isArray(request)) {
190-
const [key, ...lockingParams] = request;
191-
const key_ = key.toString();
192-
const lock = this._locks.get(key_);
193-
// Default the lock type to `write`
194-
const lockType = (lockingParams[0] = lockingParams[0] ?? 'write');
195-
if (lock == null) {
196-
requests_.push([key_, RWLockWriter, ...lockingParams]);
197-
} else if (lock.type !== lockType) {
198-
throw new errors.ErrorDBTransactionLockType();
199-
}
200-
} else {
201-
const key_ = request.toString();
202-
const lock = this._locks.get(key_);
203-
if (lock == null) {
204-
// Default to using `RWLockWriter` write lock for just string keys
205-
requests_.push([key_, RWLockWriter, 'write']);
206-
} else if (lock.type !== 'write') {
207-
throw new errors.ErrorDBTransactionLockType();
208-
}
164+
try {
165+
await this.monitor.lock(...params)();
166+
} catch (e) {
167+
if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorLockType) {
168+
throw new errors.ErrorDBTransactionLockType(undefined, { cause: e });
209169
}
210-
}
211-
if (requests_.length > 0) {
212-
// Duplicates are eliminated, and the returned acquisitions are sorted
213-
const lockAcquires = this.lockBox.lockMulti(...requests_);
214-
for (const [key, lockAcquire, ...lockingParams] of lockAcquires) {
215-
const [lockRelease, lock] = await lockAcquire();
216-
// The `Map` will maintain insertion order
217-
// these must be unlocked in reverse order
218-
// when the transaction is destroyed
219-
this._locks.set(key as string, {
220-
lock: lock!,
221-
type: lockingParams[0]!, // The `type` is defaulted to `write`
222-
release: lockRelease,
223-
});
170+
if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorDeadlock) {
171+
throw new errors.ErrorDBTransactionDeadlock(undefined, { cause: e });
224172
}
173+
throw e;
225174
}
226175
}
227176

228-
/**
229-
* Unlock a sequence of lock keys
230-
* Unlocking will be done in the order of the keys
231-
* A transaction instance is only allowed to unlock keys that it previously
232-
* locked, all keys that are not part of the `this._locks` is ignored
233-
* Unlocking the same keys is idempotent
234-
*/
235-
public async unlock(...keys: Array<ToString>): Promise<void> {
236-
for (const key of keys) {
237-
const key_ = key.toString();
238-
const lock = this._locks.get(key_);
239-
if (lock == null) continue;
240-
this._locks.delete(key_);
241-
await lock.release();
242-
}
177+
public async unlock(
178+
...params: Parameters<Monitor<RWLockWriter>['unlock']>
179+
): Promise<void> {
180+
await this.monitor.unlock(...params);
243181
}
244182

245183
public async get<T>(

src/errors.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ class ErrorDBTransactionLockType<T> extends ErrorDBTransaction<T> {
8787
'DBTransaction does not support upgrading or downgrading the lock type';
8888
}
8989

90+
class ErrorDBTransactionDeadlock<T> extends ErrorDBTransaction<T> {
91+
static description =
92+
'DBTransaction encountered a pessimistic deadlock';
93+
}
94+
9095
export {
9196
ErrorDB,
9297
ErrorDBRunning,
@@ -109,4 +114,5 @@ export {
109114
ErrorDBTransactionNotCommittedNorRollbacked,
110115
ErrorDBTransactionConflict,
111116
ErrorDBTransactionLockType,
117+
ErrorDBTransactionDeadlock,
112118
};

src/types.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ type POJO = { [key: string]: any };
2020
/**
2121
* Any type that can be turned into a string
2222
*/
23-
interface ToString {
24-
toString(): string;
25-
}
23+
// interface ToString {
24+
// toString(): string;
25+
// }
2626

2727
/**
2828
* Opaque types are wrappers of existing types
@@ -159,14 +159,14 @@ type DBOp =
159159

160160
type DBOps = Array<DBOp>;
161161

162-
type MultiLockRequest = [
163-
key: ToString,
164-
...lockingParams: Parameters<RWLockWriter['lock']>,
165-
];
162+
// type MultiLockRequest = [
163+
// key: ToString,
164+
// ...lockingParams: Parameters<RWLockWriter['lock']>,
165+
// ];
166166

167167
export type {
168168
POJO,
169-
ToString,
169+
// ToString,
170170
Opaque,
171171
Callback,
172172
Merge,
@@ -182,5 +182,5 @@ export type {
182182
DBBatch,
183183
DBOp,
184184
DBOps,
185-
MultiLockRequest,
185+
// MultiLockRequest,
186186
};

0 commit comments

Comments
 (0)