Skip to content

Commit 6b7fe17

Browse files
committed
Add lastReadAt property to Message and update logic for batch message headers
- Introduce the `lastReadAt` property in the `Message` class to track the last read timestamp. - Update `send_batch` SQL method to handle per-message headers correctly. - Adjust `MessageRowMapper` to include support for the new `lastReadAt` column.
1 parent 5a9295b commit 6b7fe17

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq/Message.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlin.time.Instant
88
* @property msgId Unique identifier for the message.
99
* @property readCt Indicates how many times the message has been read.
1010
* @property enqueuedAt The timestamp when the message was added to the queue.
11+
* @property lastReadAt The timestamp when the message was last read, or null if never read.
1112
* @property vt The visibility timeout of the message, indicating when it becomes visible for processing.
1213
* @property message The content of the message, expected to be in valid JSON format.
1314
* @property headers A map containing metadata key-value pairs associated with the message,
@@ -17,6 +18,7 @@ data class Message(
1718
val msgId: Long,
1819
val readCt: Int,
1920
val enqueuedAt: Instant,
21+
val lastReadAt: Instant?,
2022
val vt: Instant,
2123
val message: String, // Should be a valid JSON.
2224
val headers: Map<String, String> // Will be converted to JSON.

sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq/PgmqClient.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,11 +352,12 @@ class PgmqClient(
352352
delay: Duration = 0.seconds
353353
): Result<List<Long>> {
354354
// language=PostgreSQL
355-
val sql = "SELECT pgmq.send_batch(queue_name := ?, msgs := ARRAY[?]::jsonb[], headers := ?, delay := ?)"
355+
// headers must be jsonb[] with one entry per message — replicate the shared headers map for each
356+
val sql = "SELECT pgmq.send_batch(queue_name := ?, msgs := ARRAY[?]::jsonb[], headers := ARRAY[?]::jsonb[], delay := ?)"
356357
val statement = Statement.create(sql)
357358
.bind(0, queue)
358359
.bind(1, NoWrappingTuple(messages))
359-
.bind(2, headers.toJsonString())
360+
.bind(2, NoWrappingTuple(messages.map { headers.toJsonString() }))
360361
.bind(3, delay.inWholeSeconds.toInt())
361362
return db.fetchAll(statement, LongRowMapper) // returns the message-ids.
362363
}

sqlx4k-postgres-pgmq/src/commonMain/kotlin/io/github/smyrgeorge/sqlx4k/postgres/pgmq/impl/mappers/MessageRowMapper.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,23 @@ import io.github.smyrgeorge.sqlx4k.ResultSet
44
import io.github.smyrgeorge.sqlx4k.RowMapper
55
import io.github.smyrgeorge.sqlx4k.ValueEncoderRegistry
66
import io.github.smyrgeorge.sqlx4k.impl.extensions.asInstant
7+
import io.github.smyrgeorge.sqlx4k.impl.extensions.asInstantOrNull
78
import io.github.smyrgeorge.sqlx4k.impl.extensions.asInt
89
import io.github.smyrgeorge.sqlx4k.impl.extensions.asLong
910
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.Message
1011
import io.github.smyrgeorge.sqlx4k.postgres.pgmq.impl.extensions.toStringMap
1112

1213
internal object MessageRowMapper : RowMapper<Message> {
14+
// Column order (pgmq >= 1.5): msg_id(0), read_ct(1), enqueued_at(2), last_read_at(3), vt(4), message(5), headers(6)
1315
override fun map(row: ResultSet.Row, converters: ValueEncoderRegistry): Message {
1416
return Message(
1517
msgId = row.get(0).asLong(),
1618
readCt = row.get(1).asInt(),
1719
enqueuedAt = row.get(2).asInstant(),
18-
vt = row.get(3).asInstant(),
19-
message = row.get(4).asString(),
20-
headers = row.get(5).asStringOrNull()?.toStringMap() ?: emptyMap(),
20+
lastReadAt = row.get(3).asInstantOrNull(),
21+
vt = row.get(4).asInstant(),
22+
message = row.get(5).asString(),
23+
headers = row.get(6).asStringOrNull()?.toStringMap() ?: emptyMap(),
2124
)
2225
}
2326
}

0 commit comments

Comments
 (0)