diff --git a/src/taskcopy.jl b/src/taskcopy.jl index cc2be1b5..9b1eace2 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -32,7 +32,7 @@ function Base.copy(t::Task) newt end -produce(v) = begin +function produce(v) ct = current_task() if ct.storage == nothing @@ -56,7 +56,12 @@ produce(v) = begin wait() end + + @info "[produce]: Got task $t and value: $v" + + # TODO: Do something else here? t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable")) + if empty Base.schedule_and_wait(t, v) ct = current_task() # When a task is copied, ct should be updated to new task ID. @@ -81,15 +86,22 @@ produce(v) = begin end end -consume(p::Task, values...) = begin +function consume(p::Task, values...) + + p.exception != nothing ? rethrow(p.exception) : nothing if p.storage == nothing p.storage = IdDict() end + haskey(p.storage, :consumers) || (p.storage[:consumers] = nothing) if istaskdone(p) - return wait(p) + try + return wait(p) + catch ex + rethrow(e) + end end ct = current_task() @@ -113,5 +125,51 @@ consume(p::Task, values...) = begin push!(p.storage[:consumers].waitq, ct) end - p.state == :runnable ? Base.schedule_and_wait(p) : wait() # don't attempt to queue it twice + @info "[consume]: Consumers of task p: $(p.storage[:consumers])" + @info "[consume]: Using task $p" + if p.state == :runnable + @info "[consume]: Starting task $p" + + # Start the task. This is probably very problematic. + yield(p) + p.exception != nothing ? rethrow(p.exception) : nothing + + @info "[consume]: Before schedule and wait for task: $p" + Base.schedule_and_wait(p) + + + # NOTE: The lines below are not sufficient to catch the exceptions. + """ + if isempty(Base.Workqueue) + @info "yieldto" + try + yieldto(p) + catch ex + rethrow(ex) + Base.throwto(p, InterruptException()) + end + else + @info "schedule" + try + Base.schedule(p) + catch ex + rethrow(ex) + Base.throwto(p, InterruptException()) + end + end + @info "before wait" + try + Base.wait() + catch ex + rethrow(ex) + Base.throwto(p, InterruptException()) + end + """ + else + try + Base.wait() + catch ex + rethrow(ex) + end + end end diff --git a/test/brokenTask.jl b/test/brokenTask.jl new file mode 100644 index 00000000..ba40f0eb --- /dev/null +++ b/test/brokenTask.jl @@ -0,0 +1,46 @@ +using Libtask +using Test + +r = @testset "Broken Functions Tests" begin + + @testset "Error Test" begin + + function ftest() + x = 1 + while true + @error "test" + produce(x) + x += 1 + end + end + + t = Task(ftest) + try + consume(t) + catch ex + @test isa(ex, InterruptException) + end + @test isa(t.exception, InterruptException) + end + + @testset "OutOfBounds Test Before" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + x[3] = 3 + produce(x[1]) + end + end + + t = Task(ftest) + try + consume(t) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t.exception, BoundsError) + end +end +Test.print_test_results(r) diff --git a/test/clonetask.jl b/test/clonetask.jl index f94ea695..d0c06125 100644 --- a/test/clonetask.jl +++ b/test/clonetask.jl @@ -41,3 +41,17 @@ a = copy(t); @test consume(t) == 5 @test consume(a) == 6 @test consume(a) == 7 + +# Breaking test +function g_break() + t = 0 + while true + t[3] = 1 + produce(t) + t = t + 1 + end +end + +t = Task(g_break) + +@test_throws MethodError consume(t)