Skip to content

Catch task exception #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ function f_ct()
end

t = CTask(f_ct)
# or t = Task(f_ct) |> enable_stack_copying

consume(t) == 0
consume(t) == 1
Expand All @@ -42,8 +41,6 @@ function f_ct2()
end

t = CTask(f_ct2)
# or t = Task(f_ct2) |> enable_stack_copying


consume(t) == 0
consume(t) == 1
Expand All @@ -66,7 +63,7 @@ function f_cta()
end
end

t = Task(f_cta) |> enable_stack_copying
t = CTask(f_cta)

consume(t) == 0
consume(t) == 1
Expand Down
2 changes: 1 addition & 1 deletion src/Libtask.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Libtask

export enable_stack_copying, CTask, consume, produce, TArray, get, tzeros, tfill, TRef
export CTask, consume, produce, TArray, get, tzeros, tfill, TRef

include("../deps/deps.jl"); check_deps();
include("taskcopy.jl")
Expand Down
55 changes: 52 additions & 3 deletions src/taskcopy.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,42 @@ function enable_stack_copying(t::Task)
return ccall((:jl_enable_stack_copying, libtask), Any, (Any,), t)::Task
end

CTask(func) = Task(func) |> enable_stack_copying
"""

task_wrapper()

`task_wrapper` is a wordaround for set the result/exception to the
correct task which maybe copied/forked from another one(the original
one). Without this, the result/exception is always sent to the
original task. That is done in `JULIA_PROJECT/src/task.c`, the
function `start_task` and `finish_task`.

This workaround is not the proper way to do the work it does. The
proper way is refreshing the `current_task` (the variable `t`) in
`start_task` after the call to `jl_apply` returns.

"""
function task_wrapper(func)
() ->
try
res = func()
ct = current_task()
ct.result = res
isa(ct.storage, Nothing) && (ct.storage = IdDict())
ct.storage[:_libtask_state] = :done
wait()
catch ex
ct = current_task()
ct.exception = ex
ct.result = ex
ct.backtrace = catch_backtrace()
isa(ct.storage, Nothing) && (ct.storage = IdDict())
ct.storage[:_libtask_state] = :failed
wait()
end
end

CTask(func) = Task(task_wrapper(func)) |> enable_stack_copying

function Base.copy(t::Task)
t.state != :runnable && t.state != :done &&
Expand Down Expand Up @@ -72,7 +107,10 @@ produce(v) = begin
wait()
end

t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
if !(t.state in [:runnable, :queued])
throw(AssertionError("producer.consumer.state in [:runnable, :queued]"))
end
if t.state == :queued yield() end
if empty
Base.schedule_and_wait(t, v)
ct = current_task() # When a task is copied, ct should be updated to new task ID.
Expand Down Expand Up @@ -129,5 +167,16 @@ consume(p::Task, values...) = begin
push!(p.storage[:consumers].waitq, ct)
end

p.state == :runnable ? Base.schedule_and_wait(p) : wait() # don't attempt to queue it twice
if p.state == :runnable
Base.schedule(p)
yield()

isa(p.storage, IdDict) && haskey(p.storage, :_libtask_state) &&
(p.state = p.storage[:_libtask_state])

if p.exception != nothing
throw(p.exception)
end
end
wait()
end
88 changes: 88 additions & 0 deletions test/brokentask.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using Libtask
using Test

r = @testset "Broken Functions Tests" begin

@testset "Error Test" begin
function ftest()
x = 1
while true
error("error test")
produce(x)
x += 1
end
end

t = CTask(ftest)
try
consume(t)
catch ex
@test isa(ex, ErrorException)
end
@test isa(t.exception, ErrorException)
end

@testset "OutOfBounds Test Before" begin
function ftest()
x = zeros(2)
while true
x[1] = 1
x[2] = 2
x[3] = 3
produce(x[1])
end
end

t = CTask(ftest)
try
consume(t)
catch ex
@test isa(ex, BoundsError)
end
@test isa(t.exception, BoundsError)
end

@testset "OutOfBounds Test After `produce`" begin
function ftest()
x = zeros(2)
while true
x[1] = 1
x[2] = 2
produce(x[2])
x[3] = 3
end
end

t = CTask(ftest)
@test consume(t) == 2
try
consume(t)
catch ex
@test isa(ex, BoundsError)
end
@test isa(t.exception, BoundsError)
end

@testset "OutOfBounds Test After `copy`" begin
function ftest()
x = zeros(2)
while true
x[1] = 1
x[2] = 2
produce(x[2])
x[3] = 3
end
end

t = CTask(ftest)
@test consume(t) == 2
t_copy = copy(t)
try
consume(t_copy)
catch ex
@test isa(ex, BoundsError)
end
@test isa(t_copy.exception, BoundsError)
end
end
Test.print_test_results(r)
16 changes: 15 additions & 1 deletion test/clonetask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function f_ct()
end
end

t = Task(f_ct) |> enable_stack_copying
t = CTask(f_ct)

@test consume(t) == 0
@test consume(t) == 1
Expand Down Expand Up @@ -43,3 +43,17 @@ a = copy(t);
@test consume(t) == 5
@test consume(a) == 6
@test consume(a) == 7


# Breaking test
function g_break()
t = 0
while true
t[3] = 1
produce(t)
t = t + 1
end
end

t = CTask(g_break)
@test_throws MethodError consume(t)
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include("clonetask.jl")
include("brokentask.jl")
include("tarray.jl")
include("tarray2.jl")
2 changes: 1 addition & 1 deletion test/tarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function f_cta()
end
end

t = Task(f_cta) |> enable_stack_copying
t = CTask(f_cta)

consume(t); consume(t)
a = copy(t);
Expand Down
4 changes: 2 additions & 2 deletions test/tref.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function f_cta()
end
end

t = Task(f_cta) |> enable_stack_copying
t = CTask(f_cta)

consume(t); consume(t)
a = copy(t);
Expand All @@ -32,7 +32,7 @@ function dict_test()
end
end

t = Task(dict_test) |> enable_stack_copying
t = CTask(dict_test)

consume(t); consume(t)
a = copy(t);
Expand Down