Skip to content

Commit 61487d3

Browse files
committed
Add topic-based message routing with bind/unbind methods and AMQP-style patterns in PgMqClient.
1 parent a7c236c commit 61487d3

File tree

2 files changed

+107
-6
lines changed

2 files changed

+107
-6
lines changed

sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq/PgMqClient.kt

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,9 @@ import io.github.smyrgeorge.sqlx4k.Statement
77
import io.github.smyrgeorge.sqlx4k.Transaction
88
import io.github.smyrgeorge.sqlx4k.impl.types.NoWrappingTuple
99
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.extensions.toJsonString
10-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.BooleanRowMapper
10+
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.*
1111
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.BooleanRowMapper.toSingleBooleanResult
12-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.LongRowMapper
1312
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.LongRowMapper.toSingleLongResult
14-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.MessageRowMapper
15-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.MetricsRowMapper
16-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.QueueRecordRowMapper
17-
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.UnitRowMapper
1813
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.mappers.UnitRowMapper.toSingleUnitResult
1914
import kotlinx.coroutines.runBlocking
2015
import kotlin.time.Duration
@@ -503,6 +498,94 @@ class PgMqClient(
503498
*/
504499
suspend fun nack(queue: String, id: Long, vt: Duration = Duration.ZERO): Result<Long> = setVt(queue, id, vt)
505500

501+
/**
502+
* Creates a topic binding between a pattern and a queue.
503+
*
504+
* Topic bindings use AMQP-style wildcard patterns to route messages:
505+
* - `*` (star) matches exactly ONE segment between dots
506+
* - `#` (hash) matches ZERO or MORE segments
507+
*
508+
* The pattern is automatically validated before insertion, and the operation
509+
* is idempotent (safe to call multiple times with the same arguments).
510+
*
511+
* @param pattern The AMQP-style wildcard pattern for routing key matching.
512+
* Examples: "logs.*", "logs.#", "*.error", "#.critical"
513+
* @param queueName The name of the queue that will receive matching messages.
514+
* @return A [Result] containing [Unit] on success, or an error if validation fails.
515+
*/
516+
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit> {
517+
// language=SQL
518+
val sql = "SELECT pgmq.bind_topic(pattern := ?, queue_name := ?)"
519+
val statement = Statement.create(sql).bind(0, pattern).bind(1, queueName)
520+
return pg.fetchAll(statement, UnitRowMapper).toSingleUnitResult()
521+
}
522+
523+
/**
524+
* Removes a topic binding between a pattern and a queue.
525+
*
526+
* This operation is idempotent (safe to call multiple times with the same arguments).
527+
*
528+
* @param pattern The pattern to unbind from the queue.
529+
* @param queueName The name of the queue to unbind from the pattern.
530+
* @return A [Result] containing `true` if a binding was removed, `false` if no matching binding was found.
531+
*/
532+
suspend fun unbindTopic(pattern: String, queueName: String): Result<Boolean> {
533+
// language=SQL
534+
val sql = "SELECT pgmq.unbind_topic(pattern := ?, queue_name := ?)"
535+
val statement = Statement.create(sql).bind(0, pattern).bind(1, queueName)
536+
return pg.fetchAll(statement, BooleanRowMapper).toSingleBooleanResult()
537+
}
538+
539+
/**
540+
* Sends a message to all queues that match the routing key pattern.
541+
*
542+
* Uses AMQP-style topic routing with wildcards:
543+
* - The routing_key is matched against all patterns in the topic_bindings table
544+
* - Messages are sent to ALL matching queues
545+
* - The operation is atomic: either all matching queues receive the message or none do
546+
*
547+
* @param routingKey The routing key for the message (e.g., "logs.error", "app.user.created").
548+
* Must not contain wildcards (* or #).
549+
* @param message The content of the message to be sent.
550+
* @param headers Optional metadata to include with the message. Defaults to an empty map.
551+
* @param delay The duration to delay the message delivery. Defaults to 0 seconds.
552+
* @return A [Result] containing the number of queues that received the message.
553+
*/
554+
suspend fun sendTopic(
555+
routingKey: String,
556+
message: String,
557+
headers: Map<String, String> = emptyMap(),
558+
delay: Duration = 0.seconds
559+
): Result<Long> = with(pg) { sendTopic(routingKey, message, headers, delay) }
560+
561+
/**
562+
* Sends a message to all queues that match the routing key pattern.
563+
*
564+
* This is the context version that can be used within a transaction or query executor.
565+
*
566+
* @param routingKey The routing key for the message.
567+
* @param message The content of the message to be sent.
568+
* @param headers Optional metadata to include with the message.
569+
* @param delay The duration to delay the message delivery.
570+
* @return A [Result] containing the number of queues that received the message.
571+
*/
572+
context(db: QueryExecutor)
573+
suspend fun sendTopic(
574+
routingKey: String,
575+
message: String,
576+
headers: Map<String, String> = emptyMap(),
577+
delay: Duration = 0.seconds
578+
): Result<Long> {
579+
// language=SQL
580+
val sql = "SELECT pgmq.send_topic(routing_key := ?, msg := ?, headers := ?, delay := ?)"
581+
val statement = Statement.create(sql)
582+
.bind(0, routingKey)
583+
.bind(1, message)
584+
.bind(2, headers.toJsonString())
585+
.bind(3, delay.inWholeSeconds)
586+
return db.fetchAll(statement, LongRowMapper).toSingleLongResult()
587+
}
588+
506589
/**
507590
* Represents a queue configuration with specific attributes.
508591
*
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.github.smyrgeorge.sqlx4k.postgres.pgmq
2+
3+
/**
4+
* Represents a topic binding that maps a routing pattern to a queue.
5+
*
6+
* Topic bindings use AMQP-style wildcard patterns to match routing keys:
7+
* - `*` (star) matches exactly ONE segment between dots
8+
* - `#` (hash) matches ZERO or MORE segments
9+
*
10+
* @property pattern The AMQP-style wildcard pattern for routing key matching.
11+
* @property queueName The name of the queue that receives messages when the pattern matches.
12+
* @property compiledRegex The pre-compiled regex pattern used internally for matching.
13+
*/
14+
data class TopicBinding(
15+
val pattern: String,
16+
val queueName: String,
17+
val compiledRegex: String,
18+
)

0 commit comments

Comments
 (0)