Skip to content

Commit b518de0

Browse files
committed
Port new produce mechanism from #100.
1 parent 067f926 commit b518de0

File tree

1 file changed

+19
-1
lines changed

1 file changed

+19
-1
lines changed

src/tapedtask.jl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ struct TapedTask
88
counter::Ref{Int}
99
produce_ch::Channel{Any}
1010
consume_ch::Channel{Int}
11+
produced_val::Vector{Any}
12+
13+
function TapedTask(
14+
t::Task, tf::TapedFunction, counter, pch::Channel{Any}, cch::Channel{Int})
15+
new(t, tf, counter, pch, cch, Any[])
16+
end
1117
end
1218

1319
function TapedTask(tf::TapedFunction, args...)
@@ -55,6 +61,13 @@ function step_in(tf::TapedFunction, counter::Ref{Int}, args)
5561
tf.tape[counter[]]()
5662
counter[] += 1
5763
end
64+
# produce and wait after an instruction is done
65+
ttask = t.owner.owner
66+
if length(ttask.produced_val) > 0
67+
val = pop!(ttask.produced_val)
68+
put!(ttask.produce_ch, val)
69+
take!(ttask.consume_ch) # wait for next consumer
70+
end
5871
end
5972

6073
# A way (the old way) to impl `produce`, which does NOT
@@ -68,7 +81,12 @@ function internal_produce(instr::Instruction, val)
6881
end
6982

7083
function produce(val)
71-
error("Libtask.produce can only be directly called in a task!")
84+
## error("Libtask.produce can only be directly called in a task!")
85+
# put!(ttask.produce_ch, val)
86+
# take!(ttask.consume_ch) # wait for next consumer
87+
length(ttask.produced_val) > 1 &&
88+
error("There is a produced value which is not consumed.")
89+
push!(ttask.produced_val, val)
7290
end
7391

7492
function (instr::Instruction{typeof(produce)})()

0 commit comments

Comments
 (0)