Skip to content

Commit 266b2e6

Browse files
authored
feat: Utility for passing debug correlation id to query implementations (#32851)
1 parent 83c43ff commit 266b2e6

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query
6+
7+
import java.util.Optional
8+
import scala.jdk.OptionConverters.RichOption
9+
import scala.jdk.OptionConverters.RichOptional
10+
11+
/**
12+
* (Optional) mechanism for query implementations to pick up a correlation id from the caller, to use in logging and
13+
* error messages. Used by akka-projections to make correlating projection logs with debug and trace logging from the
14+
* underlying akka persistence query implementations possible.
15+
*/
16+
object QueryCorrelationId {
17+
18+
private val threadLocal = new ThreadLocal[String]
19+
20+
/**
21+
* Scala API: Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
22+
* to make sure there is no leak between logic executed on shared threads.
23+
*/
24+
def withCorrelationId[T](correlationId: String)(f: () => T): T = {
25+
threadLocal.set(correlationId)
26+
try {
27+
f()
28+
} finally {
29+
threadLocal.remove()
30+
}
31+
}
32+
33+
/**
34+
* Scala API: Expected to be used "around" calls to plugin query method to pass along a previously extracted optional correlation id,
35+
* will clear the correlation id from thread local to make sure there is no leak between logic executed on shared threads.
36+
*/
37+
def withCorrelationId[T](correlationId: Option[String])(f: () => T): T = {
38+
correlationId match {
39+
case None => f()
40+
case Some(actualId) => withCorrelationId(actualId)(f)
41+
}
42+
}
43+
44+
/**
45+
* Scala API: Expected to be called by query plugins directly after receiving a query call, before starting any asynchronous tasks.
46+
* Calling code is responsible to clear it out after method returns. The value is stored in a thread local so is not available
47+
* across threads or streams. Further passing around of the uuid inside the query plugin implementation is up to the implementer.
48+
*/
49+
def get(): Option[String] =
50+
Option(threadLocal.get)
51+
52+
/**
53+
* Java API: Expected to be used "around" calls to plugin query method to pass along a previously extracted optional correlation id,
54+
* will clear the correlation id from thread local to make sure there is no leak between logic executed on shared threads.
55+
*/
56+
def callWithCorrelationId[T](correlationId: Optional[String], function: java.util.function.Supplier[T]): T =
57+
withCorrelationId(correlationId.toScala)(function.get _)
58+
59+
/**
60+
* Java API: Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
61+
* to make sure there is no leak between logic executed on shared threads.
62+
*/
63+
def callWithCorrelationId[T](correlationId: String, function: java.util.function.Supplier[T]): T =
64+
withCorrelationId(correlationId)(function.get _)
65+
66+
/**
67+
* Java API: Expected to be called by query plugins directly after receiving a query call, before starting any asynchronous tasks.
68+
* Calling code is responsible to clear it out after method returns. The value is stored in a thread local so is not available
69+
* across threads or streams. Further passing around of the uuid inside the query plugin implementation is up to the implementer.
70+
*/
71+
def getCorrelationId(): Optional[String] =
72+
get().toJava
73+
74+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query
6+
7+
import akka.testkit.TestException
8+
import org.scalatest.matchers.should.Matchers
9+
import org.scalatest.wordspec.AnyWordSpecLike
10+
11+
import java.util.UUID
12+
13+
class QueryCorrelationIdSpec extends AnyWordSpecLike with Matchers {
14+
15+
def pretendQueryMethod(): Option[String] =
16+
QueryCorrelationId.get()
17+
18+
"The query correlation id utility" should {
19+
20+
"pass and clear correlation id" in {
21+
val uuid = UUID.randomUUID().toString
22+
val observed =
23+
QueryCorrelationId.withCorrelationId(uuid) { () =>
24+
pretendQueryMethod()
25+
}
26+
observed shouldEqual Some(uuid)
27+
28+
// cleared after returning
29+
QueryCorrelationId.get() shouldBe None
30+
}
31+
32+
"pass along and clear correlation id if present" in {
33+
val uuid = UUID.randomUUID().toString
34+
val observed =
35+
QueryCorrelationId.withCorrelationId(Some(uuid)) { () =>
36+
pretendQueryMethod()
37+
}
38+
observed shouldEqual Some(uuid)
39+
40+
// cleared after returning
41+
QueryCorrelationId.get() shouldBe None
42+
}
43+
44+
"just invoke the block if correlation id not present" in {
45+
val observed =
46+
QueryCorrelationId.withCorrelationId(None) { () =>
47+
pretendQueryMethod()
48+
}
49+
observed shouldEqual None
50+
}
51+
52+
"clear correlation id when call fails" in {
53+
val uuid = UUID.randomUUID().toString
54+
intercept[TestException] {
55+
QueryCorrelationId.withCorrelationId(uuid) { () =>
56+
throw TestException("boom")
57+
}
58+
}
59+
60+
// cleared after throwing
61+
QueryCorrelationId.get() shouldBe None
62+
}
63+
64+
}
65+
66+
}

0 commit comments

Comments
 (0)