-
Notifications
You must be signed in to change notification settings - Fork 202
Open
Labels
questionFurther information is requestedFurther information is requested
Description
Description
I'm replacing Nashorn with graaljs in a Kafka Stream framework as a hookpoint engine. It's working fine for some of our extension points, but we have an extension point to configure some Kafka Stream steps.
This is the code we are using to test our implementation:
function showEngine() {
print("====================================")
if (typeof Graal != 'undefined') {
print("[ENGINE] Using GraalVM.js")
print(Graal.versionJS);
print(Graal.versionGraalVM);
print(Graal.isGraalRuntime());
} else {
print("[ENGINE] Using Nashorn")
}
}
function onCustom(_stream, _properties) {
return _stream.peek(function (_key, _value) {
showEngine();
print("==== PEEK BEFORE ====")
print("[PEEK BEFORE ] KEY = " + _key)
print("[PEEK BEFORE ] VALUE = " + _value)
})
.filter(function (_key, _value) {
showEngine();
print("==== FILTER ====")
print("[FILTER ] KEY = " + _key)
print("[FILTER ] VALUE = " + _value)
print("[FILTER ] FILTER= " + (_value.get('type').toString() == 'A'))
return _value.get('type').toString() == 'A';
})
.peek(function (_key, _value) {
showEngine();
print("==== PEEK AFTER ====")
print("[PEEK AFTER ] KEY = " + _key)
print("[PEEK AFTER ] VALUE = " + _value)
});
}
Our issue happens because after executing the execution Kafka Stream creates some threads and tries to call the created objects inside the javascript code. Then we have the exception bellow because more than one thread is accessing the same context simultaneously.
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_2, processor=KSTREAM-SOURCE-0000000000, topic=streams-input, partition=2, offset=24, stacktrace=java.lang.IllegalStateException: Multi threaded access requested by thread StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
MetadataState:
Tasks:
0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
at com.sun.proxy.$Proxy60.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: Attached Guest Language Frames (1)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: java.lang.IllegalStateException: Multi threaded access requested by thread StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
MetadataState:
Tasks:
0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
at com.sun.proxy.$Proxy60.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
... 6 common frames omitted
Caused by: com.oracle.truffle.api.TruffleStackTrace$LazyStackTrace: null
This is the way we instantiate our script engine:
ScriptEngine engine = GraalJSScriptEngine.create(null, Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL)
.allowHostClassLookup(s -> true)
.allowExperimentalOptions(true)
.out(System.out)
.err(System.err)
.option("js.nashorn-compat", "true"));
Is there a way to provide Class (or Context) factory to avoid using the same context in different threads?
Environment
GraalVM JS version: 21.3.6
Java version: OpenJDK 8
Metadata
Metadata
Assignees
Labels
questionFurther information is requestedFurther information is requested