diff --git a/src/Libtask.jl b/src/Libtask.jl index d08b9146..69a85640 100644 --- a/src/Libtask.jl +++ b/src/Libtask.jl @@ -1,6 +1,6 @@ module Libtask -export consume, produce, TArray, get, tzeros, tfill +export consume, produce, TArray, get, tzeros, tfill, TRef include("../deps/deps.jl"); check_deps(); include("taskcopy.jl") diff --git a/src/tref.jl b/src/tref.jl new file mode 100644 index 00000000..1c0edfed --- /dev/null +++ b/src/tref.jl @@ -0,0 +1,87 @@ +########## +# TRef # +########## + +""" + TRef(x) + +Implementation of an abstract data structure that +automatically performs copy-on-write after task copying. + +Atomic (single-valued) TRef objects must be set or updated +by indexing. For example, to access `val = TRef(1)`, you +must use `val[]`. + +Usage: + +```julia +TRef(x) +``` + +Example: + +```julia +# Initialize an atomic value +z = TRef(19.2) +z[] += 31 + +# Initialize a multi-index object +x = TRef([1 2 3; 4 5 6]) +x[1, 3] = 999 + +# Initialize a TRef holding a dictionary. +d = TRef(Dict("A" => 1, 5 => "B")) +d["A"] = 10 +``` +""" +struct TRef + ref :: Symbol # object_id + orig_task :: Task + TRef() = new(gensym(), current_task()) +end + +function TRef(x) + res = TRef(); + n = n_copies() + task_local_storage(res.ref, (n,Ref(x))) + return res +end + +function Base.getindex(S::TRef, I::Vararg{Any,N}) where {N} + _, d = task_local_storage(S.ref) + return d[][I...] +end + +function Base.setindex!(S::TRef, x, I::Vararg{Any,N}) where {N} + n, d = task_local_storage(S.ref) + cn = n_copies() + newd = d + if cn > n + # println("[setindex!]: $(S.ref) copying data") + newd = deepcopy(d) + task_local_storage(S.ref, (cn, newd)) + end + + if isa(newd[], Real) + newd[] = x + else + setindex!(newd[], x, I...) + end + return newd[] +end + +function Base.display(S::TRef) + display("Please use show(::TRef) instead.") +end + +Base.show(io::IO, S::TRef) = Base.show(io::IO, task_local_storage(S.ref)[2][]) +Base.size(S::TRef) = Base.size(task_local_storage(S.ref)[2][]) +Base.ndims(S::TRef) = Base.ndims(task_local_storage(S.ref)[2][]) + +Base.get(S::TRef) = (current_task().storage[S.ref][2][]) + +# Implements eltype, firstindex, lastindex, and iterate +# functions. +for F in (:eltype, :firstindex, :lastindex, :iterate) + @eval Base.$F(a::TRef, args...) = $F(get(a), args...) +end diff --git a/test/tref.jl b/test/tref.jl new file mode 100644 index 00000000..bd9c35aa --- /dev/null +++ b/test/tref.jl @@ -0,0 +1,52 @@ +using Libtask +using Test + +# Test atomic values. +function f_cta() + t = TRef(1); + t[] = 0; + while true + produce(t[]) + t[] + t[] += 1 + end +end + +t = Task(f_cta) + +consume(t); consume(t) +a = copy(t); +consume(a); consume(a) + +Base.@assert consume(t) == 2 +Base.@assert consume(a) == 4 + +# Test dictionary functionality. +function dict_test() + t = TRef(Dict("A" => 1, 5 => "B")); + t["A"] = 0; + while true + produce(t["A"]) + t["A"] + t["A"] += 1 + end +end + +t = Task(dict_test) + +consume(t); consume(t) +a = copy(t); +consume(a); consume(a) + +Base.@assert consume(t) == 2 +Base.@assert consume(a) == 4 + +# Create a TRef storing a matrix. +x = TRef([1 2 3; 4 5 6]) +x[1, 3] = 900 +Base.@assert x[1,3] == 900 + +# TRef holding an array. +y = TRef([1,2,3]) +y[2] = 19 +Base.@assert y[2] == 19