Skip to content

Commit 2b3f509

Browse files
committed
Add support for binding multiple topics to a queue with AMQP-style patterns in PgMqClient.
1 parent 61487d3 commit 2b3f509

File tree

1 file changed

+31
-1
lines changed
  • sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq

1 file changed

+31
-1
lines changed

sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq/PgMqClient.kt

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,41 @@ class PgMqClient(
513513
* @param queueName The name of the queue that will receive matching messages.
514514
* @return A [Result] containing [Unit] on success, or an error if validation fails.
515515
*/
516+
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit> = with(pg) { bindTopic(pattern, queueName) }
517+
518+
/**
519+
* Creates a topic binding between a pattern and a queue.
520+
*
521+
* This is the context version that can be used within a transaction or query executor.
522+
*
523+
* @param pattern The AMQP-style wildcard pattern for routing key matching.
524+
* @param queueName The name of the queue that will receive matching messages.
525+
* @return A [Result] containing [Unit] on success, or an error if validation fails.
526+
*/
527+
context(db: QueryExecutor)
516528
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit> {
517529
// language=SQL
518530
val sql = "SELECT pgmq.bind_topic(pattern := ?, queue_name := ?)"
519531
val statement = Statement.create(sql).bind(0, pattern).bind(1, queueName)
520-
return pg.fetchAll(statement, UnitRowMapper).toSingleUnitResult()
532+
return db.fetchAll(statement, UnitRowMapper).toSingleUnitResult()
533+
}
534+
535+
/**
536+
* Creates topic bindings for multiple patterns to a single queue.
537+
*
538+
* This operation is performed atomically within a transaction. Either all bindings
539+
* are created successfully, or none are.
540+
*
541+
* @param patterns A list of AMQP-style wildcard patterns for routing key matching.
542+
* Examples: listOf("logs.*", "logs.#", "*.error", "#.critical")
543+
* @param queueName The name of the queue that will receive matching messages.
544+
* @return A [Result] containing [Unit] on success, or an error if any binding fails.
545+
*/
546+
suspend fun bindTopic(patterns: List<String>, queueName: String): Result<Unit> {
547+
return pg.transaction {
548+
patterns.forEach { pattern -> bindTopic(pattern, queueName).getOrThrow() }
549+
Result.success(Unit)
550+
}
521551
}
522552

523553
/**

0 commit comments

Comments
 (0)