Branch data Line data Source code
1 : : // This file is a part of Julia. License is MIT: https://julialang.org/license
2 : :
3 : : #include <assert.h>
4 : : #include <stdio.h>
5 : : #include <stdlib.h>
6 : : #include <strings.h>
7 : :
8 : : #include "julia.h"
9 : : #include "julia_internal.h"
10 : : #include "gc.h"
11 : : #include "threading.h"
12 : :
13 : : #ifdef __cplusplus
14 : : extern "C" {
15 : : #endif
16 : :
17 : :
18 : : // thread sleep state
19 : :
20 : : // default to DEFAULT_THREAD_SLEEP_THRESHOLD; set via $JULIA_THREAD_SLEEP_THRESHOLD
21 : : uint64_t sleep_threshold;
22 : :
23 : : // thread should not be sleeping--it might need to do work.
24 : : static const int16_t not_sleeping = 0;
25 : :
26 : : // it is acceptable for the thread to be sleeping.
27 : : static const int16_t sleeping = 1;
28 : :
29 : : // invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
30 : : // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
31 : : // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
32 : : // information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
33 : : // information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
34 : : // information: These observations require sequentially-consistent fences to be inserted between each of those operational phases.
35 : : // [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where
36 : : // * Dequeuer:
37 : : // * 1: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)`
38 : : // * Enqueuer:
39 : : // * 2: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping`
40 : : // i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition.
41 : : // [^store_buffering_2]: and also
42 : : // * Enqueuer:
43 : : // * 1a: `jl_atomic_store_relaxed(jl_uv_n_waiters, 1)` in `JL_UV_LOCK`
44 : : // * 1b: "cheap read" of `handle->pending` in `uv_async_send` (via `JL_UV_LOCK`) loads `0`
45 : : // * Dequeuer:
46 : : // * 2a: store `2` to `handle->pending` in `uv_async_send` (via `JL_UV_LOCK` in `jl_task_get_next`)
47 : : // * 2b: `jl_atomic_load_relaxed(jl_uv_n_waiters)` in `jl_task_get_next` returns `0`
48 : : // i.e., the dequeuer misses the `n_waiters` is set and enqueuer misses the `uv_stop` flag (in `signal_async`) transition to cleared
49 : :
50 : : JULIA_DEBUG_SLEEPWAKE(
51 : : uint64_t wakeup_enter;
52 : : uint64_t wakeup_leave;
53 : : uint64_t io_wakeup_enter;
54 : : uint64_t io_wakeup_leave;
55 : : );
56 : :
57 : 4506180 : JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT
58 : : {
59 : : // Try to acquire the lock on this task.
60 : 4506180 : int16_t was = jl_atomic_load_relaxed(&task->tid);
61 [ + + ]: 4506180 : if (was == tid)
62 : 2890370 : return 1;
63 [ + + ]: 1615800 : if (was == -1)
64 : 1473150 : return jl_atomic_cmpswap(&task->tid, &was, tid);
65 : 142653 : return 0;
66 : : }
67 : :
68 : 547380 : JL_DLLEXPORT int jl_set_task_threadpoolid(jl_task_t *task, int8_t tpid) JL_NOTSAFEPOINT
69 : : {
70 [ + # + + ]: 547380 : if (tpid < 0 || tpid >= jl_n_threadpools)
71 : 4 : return 0;
72 : 547376 : task->threadpoolid = tpid;
73 : 547376 : return 1;
74 : : }
75 : :
76 : : // GC functions used
77 : : extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache,
78 : : jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT;
79 : :
80 : : // parallel task runtime
81 : : // ---
82 : :
83 : 12954100 : JL_DLLEXPORT uint32_t jl_rand_ptls(uint32_t max, uint32_t unbias)
84 : : {
85 : 12954100 : jl_ptls_t ptls = jl_current_task->ptls;
86 : : // one-extend unbias back to 64-bits
87 : 12954100 : return cong(max, -(uint64_t)-unbias, &ptls->rngseed);
88 : : }
89 : :
90 : : // initialize the threading infrastructure
91 : : // (called only by the main thread)
92 : 573 : void jl_init_threadinginfra(void)
93 : : {
94 : : /* initialize the synchronization trees pool */
95 : 573 : sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD;
96 : 573 : char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME);
97 [ - + ]: 573 : if (cp) {
98 [ # # ]: 0 : if (!strncasecmp(cp, "infinite", 8))
99 : 0 : sleep_threshold = UINT64_MAX;
100 : : else
101 : 0 : sleep_threshold = (uint64_t)strtol(cp, NULL, 10);
102 : : }
103 : 573 : }
104 : :
105 : :
106 : : void JL_NORETURN jl_finish_task(jl_task_t *t);
107 : :
108 : : // thread function: used by all except the main thread
109 : 116 : void jl_threadfun(void *arg)
110 : : {
111 : 116 : jl_threadarg_t *targ = (jl_threadarg_t*)arg;
112 : :
113 : : // initialize this thread (set tid, create heap, set up root task)
114 : 116 : jl_ptls_t ptls = jl_init_threadtls(targ->tid);
115 : : void *stack_lo, *stack_hi;
116 : 119 : jl_init_stack_limits(0, &stack_lo, &stack_hi);
117 : : // warning: this changes `jl_current_task`, so be careful not to call that from this function
118 : 119 : jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
119 : : JL_GC_PROMISE_ROOTED(ct);
120 : :
121 : : // wait for all threads
122 : 119 : jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0);
123 : 119 : uv_barrier_wait(targ->barrier);
124 : :
125 : : // free the thread argument here
126 : 112 : free(targ);
127 : :
128 : 112 : (void)jl_gc_unsafe_enter(ptls);
129 : 118 : jl_finish_task(ct); // noreturn
130 : : }
131 : :
132 : :
133 : 4301630000 : int jl_running_under_rr(int recheck)
134 : : {
135 : : #ifdef _OS_LINUX_
136 : : #define RR_CALL_BASE 1000
137 : : #define SYS_rrcall_check_presence (RR_CALL_BASE + 8)
138 : : static _Atomic(int) is_running_under_rr = 0;
139 : 4301630000 : int rr = jl_atomic_load_relaxed(&is_running_under_rr);
140 [ + # + + ]: 4301630000 : if (rr == 0 || recheck) {
141 : 0 : int ret = syscall(SYS_rrcall_check_presence, 0, 0, 0, 0, 0, 0);
142 [ + - ]: 572 : if (ret == -1)
143 : : // Should always be ENOSYS, but who knows what people do for
144 : : // unknown syscalls with their seccomp filters, so just say
145 : : // that we don't have rr.
146 : 572 : rr = 2;
147 : : else
148 : 0 : rr = 1;
149 : 572 : jl_atomic_store_relaxed(&is_running_under_rr, rr);
150 : : }
151 : 4301710000 : return rr == 1;
152 : : #else
153 : : return 0;
154 : : #endif
155 : : }
156 : :
157 : :
158 : : // sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
159 : 857130 : static int sleep_check_after_threshold(uint64_t *start_cycles)
160 : : {
161 : : JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder
162 : : /**
163 : : * This wait loop is a bit of a worst case for rr - it needs timer access,
164 : : * which are slow and it busy loops in user space, which prevents the
165 : : * scheduling logic from switching to other threads. Just don't bother
166 : : * trying to wait here
167 : : */
168 [ - + ]: 857130 : if (jl_running_under_rr(0))
169 : 0 : return 1;
170 [ + + ]: 857129 : if (!(*start_cycles)) {
171 : 336678 : *start_cycles = jl_hrtime();
172 : 336677 : return 0;
173 : : }
174 : 520451 : uint64_t elapsed_cycles = jl_hrtime() - (*start_cycles);
175 [ + + ]: 521484 : if (elapsed_cycles >= sleep_threshold) {
176 : 11885 : *start_cycles = 0;
177 : 11885 : return 1;
178 : : }
179 : 509599 : return 0;
180 : : }
181 : :
182 : :
183 : 717336 : static int wake_thread(int16_t tid)
184 : : {
185 : 717336 : jl_ptls_t other = jl_all_tls_states[tid];
186 : 717336 : int8_t state = sleeping;
187 : :
188 [ + + ]: 717336 : if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
189 [ + + ]: 11790 : if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
190 : : JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
191 : 11777 : uv_mutex_lock(&other->sleep_lock);
192 : 11777 : uv_cond_signal(&other->wake_signal);
193 : 11777 : uv_mutex_unlock(&other->sleep_lock);
194 : 11762 : return 1;
195 : : }
196 : : }
197 : 705559 : return 0;
198 : : }
199 : :
200 : :
201 : 509 : static void wake_libuv(void)
202 : : {
203 : : JULIA_DEBUG_SLEEPWAKE( io_wakeup_enter = cycleclock() );
204 : 509 : jl_wake_libuv();
205 : : JULIA_DEBUG_SLEEPWAKE( io_wakeup_leave = cycleclock() );
206 : 509 : }
207 : :
208 : : /* ensure thread tid is awake if necessary */
209 : 2971960 : JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
210 : : {
211 : 2971960 : jl_task_t *ct = jl_current_task;
212 : 2971960 : int16_t self = jl_atomic_load_relaxed(&ct->tid);
213 [ + + ]: 2971960 : if (tid != self)
214 : 717084 : jl_fence(); // [^store_buffering_1]
215 : 2971960 : jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner);
216 : : JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
217 [ + + + + ]: 5926920 : if (tid == self || tid == -1) {
218 : : // we're already awake, but make sure we'll exit uv_run
219 : 2955100 : jl_ptls_t ptls = ct->ptls;
220 [ + + ]: 2955100 : if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) {
221 : 220391 : jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
222 : : JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
223 : : }
224 [ + + ]: 2955100 : if (uvlock == ct)
225 : 449625 : uv_stop(jl_global_event_loop());
226 : : }
227 : : else {
228 : : // something added to the sticky-queue: notify that thread
229 [ + + ]: 16859 : if (wake_thread(tid)) {
230 : : // check if we need to notify uv_run too
231 : 135 : jl_fence();
232 : 135 : jl_ptls_t other = jl_all_tls_states[tid];
233 : 135 : jl_task_t *tid_task = jl_atomic_load_relaxed(&other->current_task);
234 : : // now that we have changed the thread to not-sleeping, ensure that
235 : : // either it has not yet acquired the libuv lock, or that it will
236 : : // observe the change of state to not_sleeping
237 [ + - + + ]: 135 : if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task)
238 : 126 : wake_libuv();
239 : : }
240 : : }
241 : : // check if the other threads might be sleeping
242 [ + + ]: 2971820 : if (tid == -1) {
243 : : // something added to the multi-queue: notify all threads
244 : : // in the future, we might want to instead wake some fraction of threads,
245 : : // and let each of those wake additional threads if they find work
246 : 700229 : int anysleep = 0;
247 [ + + ]: 2099330 : for (tid = 0; tid < jl_n_threads; tid++) {
248 [ + + ]: 1399210 : if (tid != self)
249 : 700545 : anysleep |= wake_thread(tid);
250 : : }
251 : : // check if we need to notify uv_run too
252 [ + + + + ]: 700122 : if (uvlock != ct && anysleep) {
253 : 11305 : jl_fence();
254 [ + + ]: 11305 : if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
255 : 383 : wake_libuv();
256 : : }
257 : : }
258 : : JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() );
259 : 2971710 : }
260 : :
261 : :
262 : : // get the next runnable task
263 : 1443330 : static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
264 : : {
265 : 1443330 : jl_gc_safepoint();
266 : 1442540 : jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1);
267 [ + + ]: 1441940 : if (jl_typeis(task, jl_task_type)) {
268 : 338473 : int self = jl_atomic_load_relaxed(&jl_current_task->tid);
269 : 338473 : jl_set_task_tid(task, self);
270 : 338473 : return task;
271 : : }
272 : 1103470 : return NULL;
273 : : }
274 : :
275 : 1105260 : static int check_empty(jl_value_t *checkempty)
276 : : {
277 : 1105260 : return jl_apply_generic(checkempty, NULL, 0) == jl_true;
278 : : }
279 : :
280 : 685998 : static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
281 : : {
282 : : // sleep_check_state is only transitioned from not_sleeping to sleeping
283 : : // by the thread itself. As a result, if this returns false, it will
284 : : // continue returning false. If it returns true, we know the total
285 : : // modification order of the fences.
286 : 685998 : jl_fence(); // [^store_buffering_1] [^store_buffering_2]
287 : 685998 : return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
288 : : }
289 : :
290 : : extern _Atomic(unsigned) _threadedregion;
291 : :
292 : 338577 : JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
293 : : {
294 : 338577 : jl_task_t *ct = jl_current_task;
295 : 338577 : uint64_t start_cycles = 0;
296 : :
297 : 874616 : while (1) {
298 : 1213190 : jl_task_t *task = get_next_task(trypoptask, q);
299 [ + + ]: 1211340 : if (task)
300 : 338329 : return task;
301 : :
302 : : // quick, race-y check to see if there seems to be any stuff in there
303 : : jl_cpu_pause();
304 [ + + ]: 873224 : if (!check_empty(checkempty)) {
305 : 17053 : start_cycles = 0;
306 : 17053 : continue;
307 : : }
308 : :
309 : : jl_cpu_pause();
310 : 857146 : jl_ptls_t ptls = ct->ptls;
311 [ + + + + : 857146 : if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
+ + ]
312 : : // acquire sleep-check lock
313 : 232320 : jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
314 : 232320 : jl_fence(); // [^store_buffering_1]
315 : : JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls);
316 [ + + ]: 232320 : if (!check_empty(checkempty)) { // uses relaxed loads
317 [ + + ]: 144 : if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
318 : 105 : jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
319 : : JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls);
320 : : }
321 : 144 : continue;
322 : : }
323 : 232266 : task = get_next_task(trypoptask, q); // note: this should not yield
324 [ - + ]: 232217 : if (ptls != ct->ptls) {
325 : : // sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
326 : 0 : ptls = ct->ptls;
327 [ # # ]: 0 : if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
328 : 0 : jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
329 : : JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
330 : : }
331 [ # # ]: 0 : if (task)
332 : 0 : return task;
333 : 0 : continue;
334 : : }
335 [ + + ]: 232217 : if (task) {
336 [ + + ]: 144 : if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
337 : 49 : jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
338 : : JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
339 : : }
340 : 144 : return task;
341 : : }
342 : :
343 : :
344 : : // IO is always permitted, but outside a threaded region, only
345 : : // thread 0 will process messages.
346 : : // Inside a threaded region, any thread can listen for IO messages,
347 : : // and one thread should win this race and watch the event loop,
348 : : // but we bias away from idle threads getting parked here.
349 : : //
350 : : // The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
351 : : // - After decrementing _threadedregion, the thread is required to
352 : : // call jl_wakeup_thread(0), that will kick out any thread who is
353 : : // already there, and then eventually thread 0 will get here.
354 : : // - Inside a _threadedregion, there must exist at least one
355 : : // thread that has a happens-before relationship on the libuv lock
356 : : // before reaching this decision point in the code who will see
357 : : // the lock as unlocked and thus must win this race here.
358 : 232073 : int uvlock = 0;
359 [ + + ]: 232073 : if (jl_atomic_load_relaxed(&_threadedregion)) {
360 : 622 : uvlock = jl_mutex_trylock(&jl_uv_mutex);
361 : : }
362 [ + + ]: 231451 : else if (ptls->tid == 0) {
363 : 220517 : uvlock = 1;
364 : 220517 : JL_UV_LOCK(); // jl_mutex_lock(&jl_uv_mutex);
365 : : }
366 [ + + ]: 232073 : if (uvlock) {
367 : 221019 : int active = 1;
368 : : // otherwise, we block until someone asks us for the lock
369 : 221019 : uv_loop_t *loop = jl_global_event_loop();
370 [ + - + + ]: 442726 : while (active && may_sleep(ptls)) {
371 [ + + ]: 221808 : if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
372 : : // but if we won the race against someone who actually needs
373 : : // the lock to do real work, we need to let them have it instead
374 : 100 : break;
375 : 221708 : loop->stop_flag = 0;
376 : : JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
377 : 221708 : active = uv_run(loop, UV_RUN_ONCE);
378 : : JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
379 : 221707 : jl_gc_safepoint();
380 : : }
381 : 221018 : JL_UV_UNLOCK();
382 : : // optimization: check again first if we may have work to do.
383 : : // Otherwise we got a spurious wakeup since some other thread
384 : : // that just wanted to steal libuv from us. We will just go
385 : : // right back to sleep on the individual wake signal to let
386 : : // them take it from us without conflict.
387 [ + + ]: 221018 : if (!may_sleep(ptls)) {
388 : 220918 : start_cycles = 0;
389 : 220918 : continue;
390 : : }
391 [ + + + - : 100 : if (!jl_atomic_load_relaxed(&_threadedregion) && active && ptls->tid == 0) {
+ + ]
392 : : // thread 0 is the only thread permitted to run the event loop
393 : : // so it needs to stay alive, just spin-looping if necessary
394 [ + - ]: 25 : if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
395 : 25 : jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
396 : : JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
397 : : }
398 : 25 : start_cycles = 0;
399 : 25 : continue;
400 : : }
401 : : }
402 : :
403 : : // the other threads will just wait for an individual wake signal to resume
404 : : JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
405 : 11129 : int8_t gc_state = jl_gc_safe_enter(ptls);
406 : 11195 : uv_mutex_lock(&ptls->sleep_lock);
407 [ + + ]: 22301 : while (may_sleep(ptls)) {
408 : 11203 : uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
409 : : // TODO: help with gc work here, if applicable
410 : : }
411 [ - + ]: 11098 : assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
412 : 11098 : uv_mutex_unlock(&ptls->sleep_lock);
413 : : JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
414 : 11098 : jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
415 : 11098 : start_cycles = 0;
416 : : }
417 : : else {
418 : : // maybe check the kernel for new messages too
419 : 624885 : jl_process_events();
420 : : }
421 : : }
422 : : }
423 : :
424 : : #ifdef __cplusplus
425 : : }
426 : : #endif
|