-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathsfdb.py
More file actions
150 lines (124 loc) · 4.88 KB
/
sfdb.py
File metadata and controls
150 lines (124 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# Single File Database
# Written by Lvmin Zhang
# 2022 Dec 23 at Stanford University
import sys
import json
import time
import sqlite3
import datetime
import threading
def now():
return datetime.datetime.now().strftime("%Y/%m/%d-%H:%M:%S")
def log(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class Database:
def __init__(self, filename, read_only=False):
self._filename = filename
if read_only:
self._sqlite = sqlite3.connect(f'file:{self._filename}?mode=ro', check_same_thread=False, uri=True)
else:
self._sqlite = sqlite3.connect(self._filename, check_same_thread=False)
self._sqlite.execute("CREATE TABLE IF NOT EXISTS DATA(ID TEXT NOT NULL UNIQUE, TIME TEXT NOT NULL, JSON TEXT NOT NULL, PRIMARY KEY (ID))")
self._sqlite.commit()
self._lock = threading.Lock()
self._iterating = False
self._commit_timer = time.time()
self._commit_counter = 0
log(f'SFDB[{self._filename}] Database ready with {len(self)} rows.')
def _sanity_check(self):
assert self._sqlite is not None, f'SFDB[{self._filename}] Database already closed.'
assert not self._iterating, f'SFDB[{self._filename}] Database cannot be accessed inside an iterating loop.'
def _key_is_str(self, key):
assert isinstance(key, str), f'SFDB[{self._filename}] All keys must be str, get \"{type(key).__name__}\" instead.'
def __len__(self):
self._sanity_check()
with self._lock:
x = self._sqlite.execute("SELECT COUNT(ID) FROM DATA").fetchone()
return x[0] if x is not None else 0
def __getitem__(self, key):
self._sanity_check()
self._key_is_str(key)
with self._lock:
item = self._sqlite.execute("SELECT JSON FROM DATA WHERE ID = ?", (key,)).fetchone()
if item is None:
raise KeyError(key)
return json.loads(item[0])
def get(self, key, default=None):
self._sanity_check()
self._key_is_str(key)
with self._lock:
item = self._sqlite.execute("SELECT JSON FROM DATA WHERE ID = ?", (key,)).fetchone()
return json.loads(item[0]) if item is not None else default
def __contains__(self, key):
self._sanity_check()
self._key_is_str(key)
with self._lock:
return self._sqlite.execute("SELECT 1 FROM DATA WHERE ID = ?", (key,)).fetchone() is not None
def __setitem__(self, key, value):
self._sanity_check()
self._key_is_str(key)
feed = (key, now(), json.dumps(value))
with self._lock:
self._sqlite.execute("INSERT OR REPLACE INTO DATA(ID, TIME, JSON) VALUES(?, ?, ?)", feed)
self._commit_counter += 1
self._auto_commit()
return
def __delitem__(self, key):
self._sanity_check()
self._key_is_str(key)
with self._lock:
self._sqlite.execute("DELETE FROM DATA WHERE ID = ?", (key,))
self._commit_counter += 1
self._auto_commit()
return
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return
def __del__(self):
self.close()
return
def _auto_commit(self):
if time.time() > self._commit_timer + 60 or self._commit_counter > 1024 * 16:
self.commit()
return
def commit(self):
self._sanity_check()
with self._lock:
self._sqlite.commit()
log(f'SFDB[{self._filename}] Committed {self._commit_counter} transactions during the last {"%.2f" % (time.time() - self._commit_timer)} seconds.')
self._commit_timer = time.time()
self._commit_counter = 0
return
def close(self):
if self._sqlite is None:
return
self._sanity_check()
self.commit()
with self._lock:
self._sqlite.close()
self._sqlite = None
log(f'SFDB[{self._filename}] Database connection closed.')
return
def __iter__(self):
self._sanity_check()
with self._lock:
try:
self._iterating = True
for item in self._sqlite.execute('SELECT ID, JSON FROM DATA'):
yield item[0], json.loads(item[1])
finally:
self._iterating = False
def keys(self):
self._sanity_check()
with self._lock:
return [x[0] for x in self._sqlite.execute('SELECT ID FROM DATA').fetchall()]
def todict(self):
self._sanity_check()
with self._lock:
return {x[0]: json.loads(x[1]) for x in self._sqlite.execute('SELECT ID, JSON FROM DATA').fetchall()}
def tolist(self):
self._sanity_check()
with self._lock:
return [(x[0], json.loads(x[1])) for x in self._sqlite.execute('SELECT ID, JSON FROM DATA').fetchall()]