diff --git a/README.md b/README.md index 3b0a34d9..2b9fe2f2 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ function f_ct() end t = CTask(f_ct) -# or t = Task(f_ct) |> enable_stack_copying consume(t) == 0 consume(t) == 1 @@ -42,8 +41,6 @@ function f_ct2() end t = CTask(f_ct2) -# or t = Task(f_ct2) |> enable_stack_copying - consume(t) == 0 consume(t) == 1 @@ -66,7 +63,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t) == 0 consume(t) == 1 diff --git a/src/Libtask.jl b/src/Libtask.jl index 7bd6caf1..2746eb93 100644 --- a/src/Libtask.jl +++ b/src/Libtask.jl @@ -1,6 +1,6 @@ module Libtask -export enable_stack_copying, CTask, consume, produce, TArray, get, tzeros, tfill, TRef +export CTask, consume, produce, TArray, get, tzeros, tfill, TRef include("../deps/deps.jl"); check_deps(); include("taskcopy.jl") diff --git a/src/taskcopy.jl b/src/taskcopy.jl index 4568cc10..6d296cf3 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -22,7 +22,42 @@ function enable_stack_copying(t::Task) return ccall((:jl_enable_stack_copying, libtask), Any, (Any,), t)::Task end -CTask(func) = Task(func) |> enable_stack_copying +""" + + task_wrapper() + +`task_wrapper` is a wordaround for set the result/exception to the +correct task which maybe copied/forked from another one(the original +one). Without this, the result/exception is always sent to the +original task. That is done in `JULIA_PROJECT/src/task.c`, the +function `start_task` and `finish_task`. + +This workaround is not the proper way to do the work it does. The +proper way is refreshing the `current_task` (the variable `t`) in +`start_task` after the call to `jl_apply` returns. + +""" +function task_wrapper(func) + () -> + try + res = func() + ct = current_task() + ct.result = res + isa(ct.storage, Nothing) && (ct.storage = IdDict()) + ct.storage[:_libtask_state] = :done + wait() + catch ex + ct = current_task() + ct.exception = ex + ct.result = ex + ct.backtrace = catch_backtrace() + isa(ct.storage, Nothing) && (ct.storage = IdDict()) + ct.storage[:_libtask_state] = :failed + wait() + end +end + +CTask(func) = Task(task_wrapper(func)) |> enable_stack_copying function Base.copy(t::Task) t.state != :runnable && t.state != :done && @@ -72,7 +107,10 @@ produce(v) = begin wait() end - t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable")) + if !(t.state in [:runnable, :queued]) + throw(AssertionError("producer.consumer.state in [:runnable, :queued]")) + end + if t.state == :queued yield() end if empty Base.schedule_and_wait(t, v) ct = current_task() # When a task is copied, ct should be updated to new task ID. @@ -129,5 +167,16 @@ consume(p::Task, values...) = begin push!(p.storage[:consumers].waitq, ct) end - p.state == :runnable ? Base.schedule_and_wait(p) : wait() # don't attempt to queue it twice + if p.state == :runnable + Base.schedule(p) + yield() + + isa(p.storage, IdDict) && haskey(p.storage, :_libtask_state) && + (p.state = p.storage[:_libtask_state]) + + if p.exception != nothing + throw(p.exception) + end + end + wait() end diff --git a/test/brokentask.jl b/test/brokentask.jl new file mode 100644 index 00000000..8546d141 --- /dev/null +++ b/test/brokentask.jl @@ -0,0 +1,88 @@ +using Libtask +using Test + +r = @testset "Broken Functions Tests" begin + + @testset "Error Test" begin + function ftest() + x = 1 + while true + error("error test") + produce(x) + x += 1 + end + end + + t = CTask(ftest) + try + consume(t) + catch ex + @test isa(ex, ErrorException) + end + @test isa(t.exception, ErrorException) + end + + @testset "OutOfBounds Test Before" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + x[3] = 3 + produce(x[1]) + end + end + + t = CTask(ftest) + try + consume(t) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t.exception, BoundsError) + end + + @testset "OutOfBounds Test After `produce`" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + produce(x[2]) + x[3] = 3 + end + end + + t = CTask(ftest) + @test consume(t) == 2 + try + consume(t) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t.exception, BoundsError) + end + + @testset "OutOfBounds Test After `copy`" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + produce(x[2]) + x[3] = 3 + end + end + + t = CTask(ftest) + @test consume(t) == 2 + t_copy = copy(t) + try + consume(t_copy) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t_copy.exception, BoundsError) + end +end +Test.print_test_results(r) diff --git a/test/clonetask.jl b/test/clonetask.jl index 8376bb47..e3c967c9 100644 --- a/test/clonetask.jl +++ b/test/clonetask.jl @@ -12,7 +12,7 @@ function f_ct() end end -t = Task(f_ct) |> enable_stack_copying +t = CTask(f_ct) @test consume(t) == 0 @test consume(t) == 1 @@ -43,3 +43,17 @@ a = copy(t); @test consume(t) == 5 @test consume(a) == 6 @test consume(a) == 7 + + +# Breaking test +function g_break() + t = 0 + while true + t[3] = 1 + produce(t) + t = t + 1 + end +end + +t = CTask(g_break) +@test_throws MethodError consume(t) diff --git a/test/runtests.jl b/test/runtests.jl index e2eb1ddc..a8c02f5b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,4 @@ include("clonetask.jl") +include("brokentask.jl") include("tarray.jl") include("tarray2.jl") diff --git a/test/tarray.jl b/test/tarray.jl index c8c2f57e..87de9f85 100644 --- a/test/tarray.jl +++ b/test/tarray.jl @@ -13,7 +13,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t); consume(t) a = copy(t); diff --git a/test/tref.jl b/test/tref.jl index 190a4d8c..ced7211b 100644 --- a/test/tref.jl +++ b/test/tref.jl @@ -12,7 +12,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t); consume(t) a = copy(t); @@ -32,7 +32,7 @@ function dict_test() end end -t = Task(dict_test) |> enable_stack_copying +t = CTask(dict_test) consume(t); consume(t) a = copy(t);