diff --git a/ext/-test-/postponed_job/postponed_job.c b/ext/-test-/postponed_job/postponed_job.c index 4426fc3104ccf5..41f0eea2c3b235 100644 --- a/ext/-test-/postponed_job/postponed_job.c +++ b/ext/-test-/postponed_job/postponed_job.c @@ -138,14 +138,54 @@ pjob_preregister_calls_with_last_argument(VALUE self) return ary; } +/* internal (vm_trace.c); exported for this test */ +void rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE ractor); + +static rb_postponed_job_handle_t pjob_for_ractor_handle = POSTPONED_JOB_HANDLE_INVALID; + +static void +pjob_for_ractor_callback(void *data) +{ + VALUE ary = (VALUE)data; + Check_Type(ary, T_ARRAY); + + /* record which Ractor executed the job */ + rb_ary_push(ary, rb_funcall(rb_path2class("Ractor"), rb_intern("current"), 0)); +} + +static VALUE +pjob_preregister_for_ractor(VALUE self, VALUE ary) +{ + pjob_for_ractor_handle = rb_postponed_job_preregister(0, pjob_for_ractor_callback, (void *)ary); + if (pjob_for_ractor_handle == POSTPONED_JOB_HANDLE_INVALID) { + rb_raise(rb_eRuntimeError, "preregister failed"); + } + return self; +} + +static VALUE +pjob_trigger_for_ractor(VALUE self, VALUE ractor) +{ + if (pjob_for_ractor_handle == POSTPONED_JOB_HANDLE_INVALID) { + rb_raise(rb_eRuntimeError, "not preregistered"); + } + rb_postponed_job_trigger_for_ractor(pjob_for_ractor_handle, ractor); + return self; +} + void Init_postponed_job(VALUE self) { +#ifdef HAVE_RB_EXT_RACTOR_SAFE + rb_ext_ractor_safe(true); +#endif VALUE mBug = rb_define_module("Bug"); rb_define_module_function(mBug, "postponed_job_call_direct", pjob_call_direct, 1); rb_define_module_function(mBug, "postponed_job_preregister_and_call_with_sleep", pjob_preregister_and_call_with_sleep, 1); rb_define_module_function(mBug, "postponed_job_preregister_and_call_without_sleep", pjob_preregister_and_call_without_sleep, 1); rb_define_module_function(mBug, "postponed_job_preregister_multiple_times", pjob_preregister_multiple_times, 0); rb_define_module_function(mBug, "postponed_job_preregister_calls_with_last_argument", pjob_preregister_calls_with_last_argument, 0); + rb_define_module_function(mBug, "postponed_job_preregister_for_ractor", pjob_preregister_for_ractor, 1); + rb_define_module_function(mBug, "postponed_job_trigger_for_ractor", pjob_trigger_for_ractor, 1); } diff --git a/ractor_core.h b/ractor_core.h index c692ebbbbfc638..032e578603a398 100644 --- a/ractor_core.h +++ b/ractor_core.h @@ -80,6 +80,12 @@ struct rb_ractor_struct { rb_thread_t *main; } threads; + /* Postponed jobs targeted at this Ractor + * (rb_postponed_job_trigger_for_ractor): bits index the VM-wide + * preregistration table; any of this Ractor's threads drains them + * in rb_postponed_job_flush. */ + rb_atomic_t postponed_job_triggered_bits; + VALUE thgroup_default; VALUE name; diff --git a/test/-ext-/postponed_job/test_postponed_job.rb b/test/-ext-/postponed_job/test_postponed_job.rb index 01d6015de15be7..06fdb310f81223 100644 --- a/test/-ext-/postponed_job/test_postponed_job.rb +++ b/test/-ext-/postponed_job/test_postponed_job.rb @@ -3,6 +3,31 @@ require '-test-/postponed_job' class TestPostponed_job < Test::Unit::TestCase + def test_trigger_for_ractor + omit 'Ractor not defined' unless defined?(Ractor) + assert_separately([], __FILE__, __LINE__, <<-'RUBY') + require '-test-/postponed_job' + Warning[:experimental] = false + + executed_in = [] + Bug.postponed_job_preregister_for_ractor(executed_in) + + # trigger from a sub-Ractor, targeting the main Ractor + r = Ractor.new(Ractor.current) do |main| + Bug.postponed_job_trigger_for_ractor(main) + :done + end + assert_equal :done, r.value + + # main picks the job up at one of its next interrupt checks + 50.times do + break unless executed_in.empty? + sleep 0.02 + end + assert_equal [Ractor.current], executed_in + RUBY + end + def test_preregister_and_trigger assert_separately([], __FILE__, __LINE__, <<-'RUBY') require '-test-/postponed_job' diff --git a/vm_core.h b/vm_core.h index 25d34db954c05b..04bf0f81fbf442 100644 --- a/vm_core.h +++ b/vm_core.h @@ -2419,6 +2419,7 @@ extern void rb_resume_coverages(void); extern void rb_suspend_coverages(void); void rb_postponed_job_flush(rb_vm_t *vm); +void rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE running_ractor); // ractor.c RUBY_EXTERN VALUE rb_eRactorUnsafeError; diff --git a/vm_trace.c b/vm_trace.c index 542b1869e08a84..746f43038ab7e0 100644 --- a/vm_trace.c +++ b/vm_trace.c @@ -1863,9 +1863,12 @@ rb_vm_postponed_job_atfork(void) { rb_postponed_job_queues_t *pjq = &postponed_job_queue; /* make sure we set the interrupt flag on _this_ thread if we carried any pjobs over - * from the other side of the fork */ - if (pjq->triggered_bitset) { - RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(GET_VM())); + * from the other side of the fork (including jobs that targeted the + * forking Ractor, which is the child's main Ractor now; jobs targeted + * at any other Ractor die with it) */ + rb_execution_context_t *ec = get_valid_ec(GET_VM()); + if (pjq->triggered_bitset || rb_ec_ractor_ptr(ec)->postponed_job_triggered_bits) { + RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec); } } @@ -1922,6 +1925,37 @@ rb_postponed_job_trigger(rb_postponed_job_handle_t h) RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(GET_VM())); } +/* Like rb_postponed_job_trigger(), but run the job on running_ractor + * instead of the caller's or the main Ractor: set the handle's bit in that + * Ractor's mask and post a POSTPONED_JOB interrupt to its running EC (or + * its main thread's EC before it starts). Delivery is lazy: the target is + * not unblocked, and a job for a Ractor that exits first is discarded. + * + * rb_postponed_job_trigger() is safe from a signal handler or any thread + * because it only touches the VM-global queue and the caller's or main EC. + * This one dereferences running_ractor and its EC, so the caller must keep + * running_ractor alive and running for the whole call: holding its VALUE + * is not enough, and a terminated Ractor is unsafe. The main Ractor is + * always safe. */ +void +rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE running_ractor) +{ + VM_ASSERT(rb_ractor_p(running_ractor)); + rb_ractor_t *r = (rb_ractor_t *)DATA_PTR(running_ractor); + + RUBY_ATOMIC_OR(r->postponed_job_triggered_bits, (((rb_atomic_t)1UL) << h)); + + /* The racy running_ec read is benign: whichever of the target's threads + * checks interrupts first drains the whole per-Ractor mask. */ + rb_execution_context_t *target_ec = r->threads.running_ec; + if (target_ec == NULL && r->threads.main) { + target_ec = r->threads.main->ec; + } + if (target_ec) { + RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(target_ec); + } +} + void rb_postponed_job_flush(rb_vm_t *vm) { @@ -1940,6 +1974,9 @@ rb_postponed_job_flush(rb_vm_t *vm) rb_atomic_t triggered_bits = RUBY_ATOMIC_EXCHANGE(pjq->triggered_bitset, 0); + /* jobs targeted at this Ractor (rb_postponed_job_trigger_for_ractor) */ + triggered_bits |= RUBY_ATOMIC_EXCHANGE(rb_ec_ractor_ptr(ec)->postponed_job_triggered_bits, 0); + ec->errinfo = Qnil; /* mask POSTPONED_JOB dispatch */ ec->interrupt_mask |= block_mask;