Skip to content

Commit f7c41b9

Browse files
committed
Async querying of types
1 parent c9ce9fd commit f7c41b9

File tree

1 file changed

+54
-29
lines changed

1 file changed

+54
-29
lines changed

src/Internal/PgSqlHandle.php

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,19 @@
1717
use Revolt\EventLoop;
1818
use function Amp\async;
1919

20-
/** @internal */
20+
/**
21+
* @internal
22+
*
23+
* @psalm-type PgSqlTypeMap = array<int, PgSqlType> Map of OID to corresponding PgSqlType.
24+
*/
2125
final class PgSqlHandle extends AbstractHandle
2226
{
27+
private const TYPE_QUERY = <<<SQL
28+
SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
29+
FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
30+
WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid
31+
SQL;
32+
2333
private const DIAGNOSTIC_CODES = [
2434
\PGSQL_DIAG_SEVERITY => "severity",
2535
\PGSQL_DIAG_SQLSTATE => "sqlstate",
@@ -35,16 +45,16 @@ final class PgSqlHandle extends AbstractHandle
3545
\PGSQL_DIAG_SOURCE_FUNCTION => "source_function",
3646
];
3747

38-
/** @var array<string, array<int, PgSqlType>> */
48+
/** @var array<string, Future<PgSqlTypeMap>> */
3949
private static array $typeCache;
4050

4151
private static ?\Closure $errorHandler = null;
4252

4353
/** @var \PgSql\Connection PostgreSQL connection handle. */
4454
private ?\PgSql\Connection $handle;
4555

46-
/** @var array<int, PgSqlType> */
47-
private readonly array $types;
56+
/** @var PgSqlTypeMap|null */
57+
private ?array $types = null;
4858

4959
/** @var array<non-empty-string, StatementStorage<string>> */
5060
private array $statements = [];
@@ -57,13 +67,11 @@ final class PgSqlHandle extends AbstractHandle
5767
public function __construct(
5868
\PgSql\Connection $handle,
5969
$socket,
60-
string $id,
70+
private readonly string $id,
6171
PostgresConfig $config,
6272
) {
6373
$this->handle = $handle;
6474

65-
$this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle));
66-
6775
$handle = &$this->handle;
6876
$lastUsedAt = &$this->lastUsedAt;
6977
$deferred = &$this->pendingOperation;
@@ -171,35 +179,50 @@ public function __construct(
171179
}
172180

173181
/**
174-
* @return array<int, PgSqlType>
182+
* @return Future<PgSqlTypeMap>
175183
*/
176-
private static function fetchTypes(\PgSql\Connection $handle): array
184+
private function fetchTypes(): Future
177185
{
178-
$result = \pg_query($handle, "SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
179-
FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
180-
WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid");
186+
if ($this->handle === null) {
187+
throw new \Error("The connection to the database has been closed");
188+
}
181189

190+
$result = \pg_send_query($this->handle, self::TYPE_QUERY);
182191
if ($result === false) {
183-
throw new SqlException(\pg_last_error($handle));
192+
$this->close();
193+
throw new SqlException(\pg_last_error($this->handle));
184194
}
185195

186-
$types = [];
187-
while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) {
188-
[$oid, $typeCategory, $typeName, $delimiter, $element] = $row;
196+
$this->pendingOperation = $queryDeferred = new DeferredFuture();
197+
$typesDeferred = new DeferredFuture();
189198

190-
\assert(
191-
\is_numeric($oid) && \is_numeric($element),
192-
"OID and element type expected to be integers",
193-
);
194-
\assert(
195-
\is_string($typeCategory) && \is_string($typeName) && \is_string($delimiter),
196-
"Unexpected types in type catalog query results",
197-
);
198-
199-
$types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element);
199+
EventLoop::reference($this->poll);
200+
if ($result === 0) {
201+
EventLoop::enable($this->await);
200202
}
201203

202-
return $types;
204+
EventLoop::queue(function () use ($queryDeferred, $typesDeferred): void {
205+
try {
206+
$result = $queryDeferred->getFuture()->await();
207+
if (\pg_result_status($result) !== \PGSQL_TUPLES_OK) {
208+
throw new SqlException(\pg_result_error($result));
209+
}
210+
211+
$types = [];
212+
while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) {
213+
[$oid, $typeCategory, $typeName, $delimiter, $element] = $row;
214+
$types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element);
215+
}
216+
217+
$typesDeferred->complete($types);
218+
} catch (\Throwable $exception) {
219+
$this->close();
220+
$typesDeferred->error($exception);
221+
unset(self::$typeCache[$this->id]);
222+
}
223+
});
224+
225+
return $typesDeferred->getFuture();
203226
}
204227

205228
private static function getErrorHandler(): \Closure
@@ -224,12 +247,12 @@ public function isClosed(): bool
224247
* @param \Closure $function Function to execute.
225248
* @param mixed ...$args Arguments to pass to function.
226249
*
227-
* @return \PgSql\Result
228-
*
229250
* @throws SqlException
230251
*/
231252
private function send(\Closure $function, mixed ...$args): mixed
232253
{
254+
$this->types ??= (self::$typeCache[$this->id] ??= $this->fetchTypes())->await();
255+
233256
while ($this->pendingOperation) {
234257
try {
235258
$this->pendingOperation->getFuture()->await();
@@ -275,6 +298,8 @@ private function createResult(\PgSql\Result $result, string $sql): PostgresResul
275298
throw new \Error("The connection to the database has been closed");
276299
}
277300

301+
\assert($this->types !== null, 'Expected type array to be populated before creating a result');
302+
278303
switch (\pg_result_status($result)) {
279304
case \PGSQL_EMPTY_QUERY:
280305
throw new SqlQueryError("Empty query string");

0 commit comments

Comments
 (0)