Skip to content

Commit 0f3bf61

Browse files
committed
remove threadedregion and move jl_threading_run to julia
1 parent 79ea337 commit 0f3bf61

File tree

5 files changed

+23
-111
lines changed

5 files changed

+23
-111
lines changed

base/threadingconstructs.jl

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,17 @@ on `threadid()`.
1818
"""
1919
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
2020

21-
# Only read/written by the main thread
22-
const in_threaded_loop = Ref(false)
23-
2421
function _threadsfor(iter,lbody)
2522
lidx = iter.args[1] # index
2623
range = iter.args[2]
2724
quote
28-
local threadsfor_fun
2925
let range = $(esc(range))
30-
function threadsfor_fun(onethread=false)
26+
function threadsfor_fun(tid)
3127
r = range # Load into local variable
3228
lenr = length(r)
33-
# divide loop iterations among threads
34-
if onethread
35-
tid = 1
36-
len, rem = lenr, 0
37-
else
38-
tid = threadid()
39-
len, rem = divrem(lenr, nthreads())
40-
end
29+
# divide loop iterations among tasks
30+
ngrains = min(nthreads(), lenr)
31+
len, rem = divrem(lenr, ngrains)
4132
# not enough iterations for all the threads?
4233
if len == 0
4334
if tid > rem
@@ -64,21 +55,25 @@ function _threadsfor(iter,lbody)
6455
$(esc(lbody))
6556
end
6657
end
67-
end
68-
# Hack to make nested threaded loops kinda work
69-
if threadid() != 1 || in_threaded_loop[]
70-
# We are in a nested threaded loop
71-
Base.invokelatest(threadsfor_fun, true)
72-
else
73-
in_threaded_loop[] = true
74-
# the ccall is not expected to throw
75-
ccall(:jl_threading_run, Cvoid, (Any,), threadsfor_fun)
76-
in_threaded_loop[] = false
58+
threading_run(threadsfor_fun, length(range))
7759
end
7860
nothing
7961
end
8062
end
8163

64+
function threading_run(func, len)
65+
ngrains = min(nthreads(), len)
66+
tasks = Vector{Task}(undef, ngrains)
67+
for tid = 1:ngrains
68+
t = Task(()->func(tid))
69+
t.sticky = false
70+
tasks[tid] = t
71+
schedule(t)
72+
end
73+
Base.sync_end(tasks)
74+
return nothing
75+
end
76+
8277
"""
8378
Threads.@threads
8479

src/jl_uv.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,10 @@ JL_DLLEXPORT void jl_uv_req_set_data(uv_req_t *req, void *data) { req->data = da
201201
JL_DLLEXPORT void *jl_uv_handle_data(uv_handle_t *handle) { return handle->data; }
202202
JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; }
203203

204-
extern volatile unsigned _threadedregion;
205-
206204
JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
207205
{
208206
jl_ptls_t ptls = jl_get_ptls_states();
209-
if (loop && (_threadedregion || ptls->tid == 0)) {
207+
if (loop) {
210208
jl_gc_safepoint_(ptls);
211209
JL_UV_LOCK();
212210
loop->stop_flag = 0;
@@ -220,7 +218,7 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
220218
JL_DLLEXPORT int jl_process_events(uv_loop_t *loop)
221219
{
222220
jl_ptls_t ptls = jl_get_ptls_states();
223-
if (loop && (_threadedregion || ptls->tid == 0)) {
221+
if (loop) {
224222
jl_gc_safepoint_(ptls);
225223
if (jl_mutex_trylock(&jl_uv_mutex)) {
226224
loop->stop_flag = 0;

src/partr.c

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,6 @@ static int may_sleep(jl_ptls_t ptls)
391391
return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping;
392392
}
393393

394-
extern volatile unsigned _threadedregion;
395-
396394
JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
397395
{
398396
jl_ptls_t ptls = jl_get_ptls_states();
@@ -412,7 +410,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
412410
}
413411

414412
jl_cpu_pause();
415-
if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) {
413+
if (sleep_check_after_threshold(&start_cycles)) {
416414
if (!sleep_check_now(ptls->tid))
417415
continue;
418416
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
@@ -424,14 +422,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
424422
// although none are allowed to create new ones
425423
// outside of threaded regions, all IO is permitted,
426424
// but only on thread 1
427-
int uvlock = 0;
428-
if (_threadedregion) {
429-
uvlock = jl_mutex_trylock(&jl_uv_mutex);
430-
}
431-
else if (ptls->tid == 0) {
432-
uvlock = 1;
433-
JL_UV_LOCK();
434-
}
425+
int uvlock = jl_mutex_trylock(&jl_uv_mutex);
435426
if (uvlock) {
436427
int active = 1;
437428
if (jl_atomic_load(&jl_uv_n_waiters) != 0) {
@@ -461,9 +452,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
461452
// to the last thread to do an explicit operation,
462453
// which may starve other threads of critical work
463454
}
464-
if (!_threadedregion && active && ptls->tid == 0) {
465-
// thread 0 is the only thread permitted to run the event loop
466-
// so it needs to stay alive
455+
if (active) {
467456
start_cycles = 0;
468457
continue;
469458
}

src/threading.c

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -462,74 +462,6 @@ void jl_start_threads(void)
462462
uv_barrier_wait(&thread_init_done);
463463
}
464464

465-
unsigned volatile _threadedregion; // HACK: keep track of whether it is safe to do IO
466-
467-
// simple fork/join mode code
468-
JL_DLLEXPORT void jl_threading_run(jl_value_t *func)
469-
{
470-
jl_ptls_t ptls = jl_get_ptls_states();
471-
int8_t gc_state = jl_gc_unsafe_enter(ptls);
472-
size_t world = jl_world_counter;
473-
jl_method_instance_t *mfunc = jl_lookup_generic(&func, 1, jl_int32hash_fast(jl_return_address()), world);
474-
// Ignore constant return value for now.
475-
jl_code_instance_t *fptr = jl_compile_method_internal(mfunc, world);
476-
if (fptr->invoke == jl_fptr_const_return)
477-
return;
478-
479-
size_t nthreads = jl_n_threads;
480-
jl_svec_t *ts = jl_alloc_svec(nthreads);
481-
JL_GC_PUSH1(&ts);
482-
jl_value_t *wait_func = jl_get_global(jl_base_module, jl_symbol("wait"));
483-
jl_value_t *schd_func = jl_get_global(jl_base_module, jl_symbol("schedule"));
484-
// create and schedule all tasks
485-
_threadedregion += 1;
486-
for (int i = 0; i < nthreads; i++) {
487-
jl_value_t *args2[2];
488-
args2[0] = (jl_value_t*)jl_task_type;
489-
args2[1] = func;
490-
jl_task_t *t = (jl_task_t*)jl_apply(args2, 2);
491-
jl_svecset(ts, i, t);
492-
t->sticky = 1;
493-
t->tid = i;
494-
args2[0] = schd_func;
495-
args2[1] = (jl_value_t*)t;
496-
jl_apply(args2, 2);
497-
if (i == 1) {
498-
// let threads know work is coming (optimistic)
499-
jl_wakeup_thread(-1);
500-
}
501-
}
502-
if (nthreads > 2) {
503-
// let threads know work is ready (guaranteed)
504-
jl_wakeup_thread(-1);
505-
}
506-
// join with all tasks
507-
JL_TRY {
508-
for (int i = 0; i < nthreads; i++) {
509-
jl_value_t *t = jl_svecref(ts, i);
510-
jl_value_t *args[2] = { wait_func, t };
511-
jl_apply(args, 2);
512-
}
513-
}
514-
JL_CATCH {
515-
_threadedregion -= 1;
516-
jl_wake_libuv();
517-
JL_UV_LOCK();
518-
JL_UV_UNLOCK();
519-
jl_rethrow();
520-
}
521-
// make sure no threads are sitting in the event loop
522-
_threadedregion -= 1;
523-
jl_wake_libuv();
524-
// make sure no more callbacks will run while user code continues
525-
// outside thread region and might touch an I/O object.
526-
JL_UV_LOCK();
527-
JL_UV_UNLOCK();
528-
JL_GC_POP();
529-
jl_gc_unsafe_leave(ptls, gc_state);
530-
}
531-
532-
533465
// Make gc alignment available for threading
534466
// see threads.jl alignment
535467
JL_DLLEXPORT int jl_alignment(size_t sz)

test/threads_exec.jl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,9 @@ for period in (0.06, Dates.Millisecond(60))
394394
t = Timer(period)
395395
wait(t)
396396
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
397-
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
398397
wait(c)
399398
sleep(period)
400399
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
401-
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
402400
end))
403401
wait(c)
404402
notify(c)

0 commit comments

Comments
 (0)