Skip to content

Commit 6497d9a

Browse files
committed
WIP
1 parent 52866cf commit 6497d9a

File tree

4 files changed

+134
-26
lines changed

4 files changed

+134
-26
lines changed

src/DBTransaction.ts

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { ResourceRelease } from '@matrixai/resources';
2-
import type { LockBox, LockRequest as AsyncLocksLockRequest } from '@matrixai/async-locks';
2+
import type { LockBox, MultiLockRequest as AsyncLocksMultiLockRequest } from '@matrixai/async-locks';
33
import type DB from './DB';
44
import type {
55
ToString,
@@ -8,7 +8,7 @@ import type {
88
DBIteratorOptions,
99
DBClearOptions,
1010
DBCountOptions,
11-
LockRequest,
11+
MultiLockRequest,
1212
} from './types';
1313
import type {
1414
RocksDBTransaction,
@@ -31,7 +31,11 @@ class DBTransaction {
3131
protected _db: DB;
3232
protected logger: Logger;
3333
protected lockBox: LockBox<RWLockWriter>;
34-
protected _locks: Map<string, [ResourceRelease, RWLockWriter]> = new Map();
34+
protected _locks: Map<string, {
35+
lock: RWLockWriter,
36+
type: 'read' | 'write',
37+
release: ResourceRelease,
38+
}> = new Map();
3539
protected _options: RocksDBTransactionOptions;
3640
protected _transaction: RocksDBTransaction;
3741
protected _snapshot: RocksDBTransactionSnapshot;
@@ -114,7 +118,11 @@ class DBTransaction {
114118
return this._rollbacked;
115119
}
116120

117-
get locks(): ReadonlyMap<string, [ResourceRelease, RWLockWriter]> {
121+
get locks(): ReadonlyMap<string, {
122+
lock: RWLockWriter,
123+
type: 'read' | 'write',
124+
release: ResourceRelease,
125+
}> {
118126
return this._locks;
119127
}
120128

@@ -127,41 +135,55 @@ class DBTransaction {
127135

128136
/**
129137
* Lock a sequence of lock requests
130-
* If a lock request is not an array, it is converted to `[ToString, RWLockWriter, 'write']`
138+
* If the lock request doesn't specify, it
139+
* defaults to using `RWLockWriter` with `write` type
131140
* Keys are locked in string sorted order
132141
* Even though keys can be arbitrary strings, by convention, you should use
133142
* keys that correspond to keys in the database
134143
* Locking with the same key is idempotent therefore lock re-entrancy is enabled
135-
* There is no deadlock detection, so becareful!
136144
* Keys are automatically unlocked in reverse sorted order
137145
* when the transaction is destroyed
146+
* There is no support for lock upgrading or downgrading
147+
* There is no deadlock detection
138148
*/
139-
public async lock(...requests: Array<LockRequest | string>): Promise<void> {
140-
const requests_: Array<AsyncLocksLockRequest<RWLockWriter>> = [];
149+
public async lock(...requests: Array<MultiLockRequest | string>): Promise<void> {
150+
const requests_: Array<AsyncLocksMultiLockRequest<RWLockWriter>> = [];
141151
for (const request of requests) {
142152
if (Array.isArray(request)) {
143-
const [key, ...rest] = request;
153+
const [key, ...lockingParams] = request;
144154
const key_ = key.toString();
145-
if (!this._locks.has(key_)) {
146-
requests_.push([key_, RWLockWriter, ...rest]);
155+
const lock = this._locks.get(key_);
156+
// Default the lock type to `write`
157+
const lockType = lockingParams[0] = lockingParams[0] ?? 'write';
158+
if (lock == null) {
159+
requests_.push([key_, RWLockWriter, ...lockingParams]);
160+
} else if (lock.type !== lockType) {
161+
throw new errors.ErrorDBTransactionLockType();
147162
}
148163
} else {
149164
const key_ = request.toString();
150-
if (!this._locks.has(key_)) {
165+
const lock = this._locks.get(key_);
166+
if (lock == null) {
151167
// Default to using `RWLockWriter` write lock for just string keys
152168
requests_.push([key_, RWLockWriter, 'write']);
169+
} else if (lock.type !== 'write') {
170+
throw new errors.ErrorDBTransactionLockType();
153171
}
154172
}
155173
}
156174
if (requests_.length > 0) {
157175
// Duplicates are eliminated, and the returned acquisitions are sorted
158176
const lockAcquires = this.lockBox.lockMulti(...requests_);
159-
for (const [key, lockAcquire] of lockAcquires) {
177+
for (const [key, lockAcquire, ...lockingParams] of lockAcquires) {
160178
const [lockRelease, lock] = await lockAcquire();
161179
// The `Map` will maintain insertion order
162180
// these must be unlocked in reverse order
163181
// when the transaction is destroyed
164-
this._locks.set(key as string, [lockRelease, lock!]);
182+
this._locks.set(key as string, {
183+
lock: lock!,
184+
type: lockingParams[0]!, // The `type` is defaulted to `write`
185+
release: lockRelease,
186+
});
165187
}
166188
}
167189
}
@@ -178,9 +200,8 @@ class DBTransaction {
178200
const key_ = key.toString();
179201
const lock = this._locks.get(key_);
180202
if (lock == null) continue;
181-
const [lockRelease] = lock;
182203
this._locks.delete(key_);
183-
await lockRelease();
204+
await lock.release();
184205
}
185206
}
186207

@@ -426,7 +447,9 @@ class DBTransaction {
426447
this.logger.debug(
427448
`Failed Committing ${this.constructor.name} ${this.id} due to ${errors.ErrorDBTransactionConflict.name}`,
428449
);
429-
throw new errors.ErrorDBTransactionConflict(undefined, { cause: e });
450+
throw new errors.ErrorDBTransactionConflict(undefined, {
451+
cause: e,
452+
});
430453
} else {
431454
this.logger.debug(
432455
`Failed Committing ${this.constructor.name} ${this.id} due to ${e.message}`,

src/errors.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ class ErrorDBTransactionConflict<T> extends ErrorDBTransaction<T> {
8282
static description = 'DBTransaction cannot commit due to conflicting writes';
8383
}
8484

85+
class ErrorDBTransactionLockType<T> extends ErrorDBTransaction<T> {
86+
static description = 'DBTransaction does not support upgrading or downgrading the lock type';
87+
}
88+
8589
export {
8690
ErrorDB,
8791
ErrorDBRunning,
@@ -103,4 +107,5 @@ export {
103107
ErrorDBTransactionRollbacked,
104108
ErrorDBTransactionNotCommittedNorRollbacked,
105109
ErrorDBTransactionConflict,
110+
ErrorDBTransactionLockType,
106111
};

src/types.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ type DBOp =
159159

160160
type DBOps = Array<DBOp>;
161161

162-
type LockRequest = [key: ToString, ...lockingParams: Parameters<RWLockWriter['lock']>];
162+
type MultiLockRequest = [
163+
key: ToString,
164+
...lockingParams: Parameters<RWLockWriter['lock']>
165+
];
163166

164167
export type {
165168
POJO,
@@ -179,5 +182,5 @@ export type {
179182
DBBatch,
180183
DBOp,
181184
DBOps,
182-
LockRequest
185+
MultiLockRequest
183186
};

tests/DBTransaction.test.ts

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,74 @@ describe(DBTransaction.name, () => {
10061006
await tran.unlock('bar', 'foo');
10071007
expect(tran.locks.size).toBe(0);
10081008
});
1009+
// Duplicates are eliminated
1010+
await db.withTransactionF(async (tran) => {
1011+
await tran.lock('foo', 'foo');
1012+
expect(tran.locks.size).toBe(1);
1013+
await tran.unlock('foo', 'foo');
1014+
expect(tran.locks.size).toBe(0);
1015+
});
1016+
});
1017+
test('read and write locking', async () => {
1018+
await db.withTransactionF(async (tran1) => {
1019+
await tran1.lock(['foo', 'read']);
1020+
await tran1.lock(['bar', 'write']);
1021+
// There is no automatic lock upgrade or downgrade
1022+
await expect(tran1.lock(['foo', 'write'])).rejects.toThrow(errors.ErrorDBTransactionLockType);
1023+
await expect(tran1.lock(['bar', 'read'])).rejects.toThrow(errors.ErrorDBTransactionLockType);
1024+
await db.withTransactionF(async (tran2) => {
1025+
await tran2.lock(['foo', 'read']);
1026+
await expect(tran2.lock(['bar', 'write', 0])).rejects.toThrow(locksErrors.ErrorAsyncLocksTimeout);
1027+
expect(tran1.locks.size).toBe(2);
1028+
expect(tran1.locks.has('foo')).toBe(true);
1029+
expect(tran1.locks.get('foo')!.type).toBe('read');
1030+
expect(tran2.locks.size).toBe(1);
1031+
expect(tran2.locks.has('foo')).toBe(true);
1032+
expect(tran2.locks.get('foo')!.type).toBe('read');
1033+
});
1034+
expect(tran1.locks.size).toBe(2);
1035+
await tran1.unlock('bar');
1036+
await db.withTransactionF(async (tran2) => {
1037+
await tran2.lock(['foo', 'read']);
1038+
await tran2.lock(['bar', 'write']);
1039+
expect(tran1.locks.size).toBe(1);
1040+
expect(tran1.locks.has('foo')).toBe(true);
1041+
expect(tran1.locks.get('foo')!.type).toBe('read');
1042+
expect(tran2.locks.size).toBe(2);
1043+
expect(tran2.locks.has('foo')).toBe(true);
1044+
expect(tran2.locks.get('foo')!.type).toBe('read');
1045+
expect(tran2.locks.has('bar')).toBe(true);
1046+
expect(tran2.locks.get('bar')!.type).toBe('write');
1047+
});
1048+
});
1049+
});
1050+
test('locks are unlocked in reverse order', async () => {
1051+
let order: Array<string> = [];
1052+
let p1, p2;
1053+
await db.withTransactionF(async (tran) => {
1054+
// '1' and '2' are in sort order
1055+
await tran.lock('1');
1056+
await tran.lock('2');
1057+
p1 = db.withTransactionF(async (tran) => {
1058+
await tran.lock('1');
1059+
order.push('1');
1060+
});
1061+
p2 = db.withTransactionF(async (tran) => {
1062+
await tran.lock('2');
1063+
order.push('2');
1064+
});
1065+
});
1066+
await Promise.all([p2, p1]);
1067+
expect(order).toStrictEqual(['2', '1']);
1068+
});
1069+
test('lock re-entrancy', async () => {
1070+
await db.withTransactionF(async (tran) => {
1071+
// Locking with the same keys is idempotent
1072+
await tran.lock('key1', 'key2');
1073+
await tran.lock('key1', 'key2');
1074+
await tran.lock('key1');
1075+
await tran.lock('key2');
1076+
});
10091077
});
10101078
test('locks are isolated per transaction', async () => {
10111079
await db.withTransactionF(async (tran1) => {
@@ -1034,13 +1102,22 @@ describe(DBTransaction.name, () => {
10341102
expect(tran1.locks.has('key2')).toBe(true);
10351103
});
10361104
});
1037-
test('transactions support lock re-entrancy', async () => {
1038-
await db.withTransactionF(async (tran) => {
1039-
// Locking with the same keys is idempotent
1040-
await tran.lock('key1', 'key2');
1041-
await tran.lock('key1', 'key2');
1042-
await tran.lock('key1');
1043-
await tran.lock('key2');
1105+
test('deadlock', async () => {
1106+
await db.withTransactionF(async (tran1) => {
1107+
await db.withTransactionF(async (tran2) => {
1108+
await tran1.lock('foo');
1109+
await tran2.lock('bar');
1110+
// Currently a deadlock can happen, and the only way to avoid is to use timeouts
1111+
// In the future, we want to have `DBTransaction` detect deadlocks
1112+
// and automatically give us `ErrorDBTransactionDeadlock` exception
1113+
const p1 = tran1.lock(['bar', 'write', 50]);
1114+
const p2 = tran2.lock(['foo', 'write', 50]);
1115+
const results = await Promise.allSettled([p1, p2]);
1116+
expect(results.every(r =>
1117+
r.status === 'rejected' &&
1118+
r.reason instanceof locksErrors.ErrorAsyncLocksTimeout)
1119+
).toBe(true);
1120+
});
10441121
});
10451122
});
10461123
});

0 commit comments

Comments
 (0)