-
Notifications
You must be signed in to change notification settings - Fork 156
Description
- Neo4j version: Community
4.2.1
- Neo4j Mode: Single instance (Docker)
- Driver version: Java driver
4.2.0
(Scala) - Operating system: Ubuntu 20.04 on WSL2 / Amazon Linux 2 on AWS (t3.medium) / Ubuntu 18.04 on AWS (t3.small) / Ubuntu 18.04 on Azure (Standard_DS2_v2)
Actual behaviour
Calling beginTransaction
on a RxSession
returns a Publisher
that was "empty".
Meaning, that the Subscriber
that was subscribed
to that Publisher
receive an onComplete
before any onNext
or onError
.
Expected behaviour
That onNext
was called on the Subscription
passing an instance of RxTransaction
, or a call to onError
.
Background
First of all, this bug was hard to isolate and replicate so I will try to share all the relevant information here first; apologies beforehand for the long post.
The context of this error is neotypes a Scala wrapper over this Java driver. I am one of the maintainers of that project and I found this problem by trying to provide a Streaming abstraction over the new Rx module; just in case the PR is this one: neotypes/neotypes#221
The problem is that on Github Actions the tests are failing due to the problem described above.
(Starting a new RxTransaction
"failed")
After playing a with the code, looking at metrics and logs I was able to isolate the error to the RxSession.beginTransaction
call.
I decided to take a look at the source code and I believe the problem is that, for some mythical reason, the code is executing this line meaning that there was no RxTransaction
but also no error.
Steps to reproduce
Run the following code while running a neo4j server (Docker) in background.
Note that the code is written using Scala (I am sorry but it is hard to come back to Java). Also, note that using a single thread and executing the callbacks asynchronously (i.e. inside a Future
) is essential to make the code fail. Which makes me believe that this is a concurrency problem.
package neotypes
import org.neo4j.{driver => neo4j}
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import java.util.concurrent.Executors
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace
object Main {
implicit val ec =
ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor()
)
def main(args: Array[String]): Unit = {
val driver =
neo4j.GraphDatabase.driver(
"bolt://localhost:7687",
neo4j.Config.builder
.withoutEncryption
.withDriverMetrics
.withLogging(neo4j.Logging.slf4j)
.build()
)
val neotypesDriver = new NeotypesDriver(driver)
def loop(attempts: Int): Future[Unit] = {
println()
println("--------------------------------------------------")
println(s"Remaining attempts ${attempts}")
println(s"Metrics: ${driver.metrics.connectionPoolMetrics.asScala}")
neotypesDriver.run("MATCH (p: Person { name: 'Charlize Theron' }) RETURN p.name").flatMap { r =>
println(s"Results: ${r}")
if (attempts > 0) loop(attempts - 1)
else Future.unit
}
}
def setup: Future[Unit] =
for {
_ <- neotypesDriver.run("MATCH (n) DETACH DELETE n")
_ <- neotypesDriver.run("CREATE (Charlize: Person { name: 'Charlize Theron', born: 1975 })")
} yield ()
val app = setup.flatMap { _ =>
loop(attempts = 1000)
} recover {
case NoTransactionError =>
println(s"Transaction was not created!")
case ex =>
println(s"Unexpected error ${ex.getMessage}")
ex.printStackTrace()
}
Await.ready(app, Duration.Inf)
println()
println("-------------------------------------------------")
println(s"Final metrics: ${driver.metrics.connectionPoolMetrics.asScala}")
driver.close()
ec.shutdown()
}
}
final class NeotypesDriver(driver: neo4j.Driver)
(implicit ec: ExecutionContext) {
import Syntax._
def run(query: String): Future[Option[Map[String, String]]] = {
def runQuery(tx: neo4j.reactive.RxTransaction): Future[Option[Map[String, String]]] =
tx
.run(query)
.records
.toFuture
.map { recordOption =>
recordOption.map { record =>
record
.fields
.asScala
.iterator
.map(p => p.key -> p.value.toString)
.toMap
}
}
val session = driver.rxSession
for {
tx <- session.beginTransaction.toFuture.transform(_.flatMap(_.toRight(left = NoTransactionError).toTry))
result <- runQuery(tx)
_ <- tx.commit[Unit].toFuture
_ <- session.close[Unit].toFuture
} yield result
}
}
object Syntax {
implicit final class PublisherOps[A] (private val publisher: Publisher[A]) extends AnyVal {
def toFuture(implicit ec: ExecutionContext): Future[Option[A]] = {
val promise = Promise[Option[A]]()
val subscriber = new Subscriber[A] {
var s: Subscription = _
override def onSubscribe(subscription: Subscription): Unit = {
s = subscription
Future(s.request(1))
}
override def onNext(a: A): Unit = {
promise.success(Some(a))
Future(s.cancel())
}
override def onError(ex: Throwable): Unit = {
promise.failure(ex)
}
override def onComplete(): Unit = {
if (!promise.isCompleted) {
promise.success(None)
}
}
}
publisher.subscribe(subscriber)
promise.future
}
}
}
object NoTransactionError extends Throwable("Transaction was not created!") with NoStackTrace
The code can be found here, the README includes instructions about how to run it.
Also, the repo contains one additional snippet (on its own branch) using fs2 which is a popular Streaming library in the Scala ecosystem, that one is closer to the original code that found the bug.
Extras
- If one attempts to begin a new
RxTransaction
after getting the "empty" one, then the following exception will be throw:
org.neo4j.driver.exceptions.TransactionNestingException: You cannot begin a transaction on a session with an open transaction; either run from within the transaction or use a different session.
- I was able to "fix" the problem by replacing these lines in my implementation:
val txIO = session.beginTransaction.toStream.toIO.flatMap(opt => IO.fromOption(opt)(orElse = NoTransactionError))
With these (unsafe) ones:
val rxs = session.asInstanceOf[org.neo4j.driver.internal.reactive.InternalRxSession]
val f = rxs.getClass.getDeclaredField("session")
f.setAccessible(true)
val ns = f.get(rxs).asInstanceOf[org.neo4j.driver.internal.async.NetworkSession]
val txIO = IO.async[neo4j.reactive.RxTransaction] { cb =>
ns.beginTransactionAsync(neo4j.TransactionConfig.empty).thenAccept { tx =>
cb(Right(new org.neo4j.driver.internal.reactive.InternalRxTransaction(tx)))
} exceptionally { ex =>
cb(Left(ex))
None.orNull
}
}
(do not worry too much about the Scala details, the point is the use of Reflection to by-pass the Publisher
)
Which somewhat confirms that error is related to the code mentioned above.
- I found something interesting by looking at the
Driver.metrics.connectionPoolMetrics
when finishing the execution:
When there is no error they look like this:
localhost:33245-1027665495=[created=1, closed=0, creating=0, failedToCreate=0, acquiring=0, acquired=4, timedOutToAcquire=0, inUse=0, idle=1, totalAcquisitionTime=462, totalConnectionTime=449, totalInUseTime=3356, totalInUseCount=4]
When there is an error they look like this:
localhost:33241-1862328262=[created=1, closed=0, creating=0, failedToCreate=0, acquiring=0, acquired=3, timedOutToAcquire=0, inUse=1, idle=0, totalAcquisitionTime=516, totalConnectionTime=498, totalInUseTime=3301, totalInUseCount=2]
Note the "inUse=1" in the error case.
It seems that even if the RxTransaction
was not created, its connection was!
Also, printing the metrics
before the failed transaction did not show anything abnormal.
The logs, however, do no show anything relevant; no matter if the error happened or not the final logs are something similar to this:
22:09:09.261 [pool-1-thread-1-ScalaTest-running-Fs2Suite] INFO Driver - Closing driver instance 317516020
22:09:09.264 [Neo4jDriverIO-2-1] DEBUG OutboundMessageHandler - [0xc0839333][localhost:33269][bolt-2] C: GOODBYE
22:09:09.271 [Neo4jDriverIO-2-1] DEBUG ChannelErrorHandler - [0xc0839333][localhost:33269][bolt-2] Channel is inactive
22:09:09.274 [pool-1-thread-1-ScalaTest-running-Fs2Suite] INFO ConnectionPool - Closing connection pool towards localhost:33269
22:09:09.275 [Neo4jDriverIO-2-1] DEBUG ChannelErrorHandler - [0xc0839333][localhost:33269][bolt-2] Closing channel because of a failure 'org.neo4j.driver.exceptions.ServiceUnavailableException: Connection to the database terminated. Please ensure that your database is listening on the correct host and port and that you have compatible encryption settings both on Neo4j server and driver. Note that the default encryption setting has changed in Neo4j 4.0.'
22:09:09.487 [Neo4jDriverIO-2-1] DEBUG org.neo4j.driver.internal.shaded.io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: Neo4jDriverIO-2-1
Finally, thanks in advance for the help.
And, let me know if I can do anything to help.