Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions ext/-test-/postponed_job/postponed_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,54 @@ pjob_preregister_calls_with_last_argument(VALUE self)
return ary;
}

/* internal (vm_trace.c); exported for this test */
void rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE ractor);

static rb_postponed_job_handle_t pjob_for_ractor_handle = POSTPONED_JOB_HANDLE_INVALID;

static void
pjob_for_ractor_callback(void *data)
{
VALUE ary = (VALUE)data;
Check_Type(ary, T_ARRAY);

/* record which Ractor executed the job */
rb_ary_push(ary, rb_funcall(rb_path2class("Ractor"), rb_intern("current"), 0));
}

static VALUE
pjob_preregister_for_ractor(VALUE self, VALUE ary)
{
pjob_for_ractor_handle = rb_postponed_job_preregister(0, pjob_for_ractor_callback, (void *)ary);
if (pjob_for_ractor_handle == POSTPONED_JOB_HANDLE_INVALID) {
rb_raise(rb_eRuntimeError, "preregister failed");
}
return self;
}

static VALUE
pjob_trigger_for_ractor(VALUE self, VALUE ractor)
{
if (pjob_for_ractor_handle == POSTPONED_JOB_HANDLE_INVALID) {
rb_raise(rb_eRuntimeError, "not preregistered");
}
rb_postponed_job_trigger_for_ractor(pjob_for_ractor_handle, ractor);
return self;
}

void
Init_postponed_job(VALUE self)
{
#ifdef HAVE_RB_EXT_RACTOR_SAFE
rb_ext_ractor_safe(true);
#endif
VALUE mBug = rb_define_module("Bug");
rb_define_module_function(mBug, "postponed_job_call_direct", pjob_call_direct, 1);
rb_define_module_function(mBug, "postponed_job_preregister_and_call_with_sleep", pjob_preregister_and_call_with_sleep, 1);
rb_define_module_function(mBug, "postponed_job_preregister_and_call_without_sleep", pjob_preregister_and_call_without_sleep, 1);
rb_define_module_function(mBug, "postponed_job_preregister_multiple_times", pjob_preregister_multiple_times, 0);
rb_define_module_function(mBug, "postponed_job_preregister_calls_with_last_argument", pjob_preregister_calls_with_last_argument, 0);
rb_define_module_function(mBug, "postponed_job_preregister_for_ractor", pjob_preregister_for_ractor, 1);
rb_define_module_function(mBug, "postponed_job_trigger_for_ractor", pjob_trigger_for_ractor, 1);
}

6 changes: 6 additions & 0 deletions ractor_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ struct rb_ractor_struct {
rb_thread_t *main;
} threads;

/* Postponed jobs targeted at this Ractor
* (rb_postponed_job_trigger_for_ractor): bits index the VM-wide
* preregistration table; any of this Ractor's threads drains them
* in rb_postponed_job_flush. */
rb_atomic_t postponed_job_triggered_bits;

VALUE thgroup_default;

VALUE name;
Expand Down
25 changes: 25 additions & 0 deletions test/-ext-/postponed_job/test_postponed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,31 @@
require '-test-/postponed_job'

class TestPostponed_job < Test::Unit::TestCase
def test_trigger_for_ractor
omit 'Ractor not defined' unless defined?(Ractor)
assert_separately([], __FILE__, __LINE__, <<-'RUBY')
require '-test-/postponed_job'
Warning[:experimental] = false

executed_in = []
Bug.postponed_job_preregister_for_ractor(executed_in)

# trigger from a sub-Ractor, targeting the main Ractor
r = Ractor.new(Ractor.current) do |main|
Bug.postponed_job_trigger_for_ractor(main)
:done
end
assert_equal :done, r.value

# main picks the job up at one of its next interrupt checks
50.times do
break unless executed_in.empty?
sleep 0.02
end
assert_equal [Ractor.current], executed_in
RUBY
end

def test_preregister_and_trigger
assert_separately([], __FILE__, __LINE__, <<-'RUBY')
require '-test-/postponed_job'
Expand Down
1 change: 1 addition & 0 deletions vm_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,7 @@ extern void rb_resume_coverages(void);
extern void rb_suspend_coverages(void);

void rb_postponed_job_flush(rb_vm_t *vm);
void rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE running_ractor);

// ractor.c
RUBY_EXTERN VALUE rb_eRactorUnsafeError;
Expand Down
43 changes: 40 additions & 3 deletions vm_trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -1863,9 +1863,12 @@ rb_vm_postponed_job_atfork(void)
{
rb_postponed_job_queues_t *pjq = &postponed_job_queue;
/* make sure we set the interrupt flag on _this_ thread if we carried any pjobs over
* from the other side of the fork */
if (pjq->triggered_bitset) {
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(GET_VM()));
* from the other side of the fork (including jobs that targeted the
* forking Ractor, which is the child's main Ractor now; jobs targeted
* at any other Ractor die with it) */
rb_execution_context_t *ec = get_valid_ec(GET_VM());
if (pjq->triggered_bitset || rb_ec_ractor_ptr(ec)->postponed_job_triggered_bits) {
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
}

}
Expand Down Expand Up @@ -1922,6 +1925,37 @@ rb_postponed_job_trigger(rb_postponed_job_handle_t h)
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(GET_VM()));
}

/* Like rb_postponed_job_trigger(), but run the job on running_ractor
* instead of the caller's or the main Ractor: set the handle's bit in that
* Ractor's mask and post a POSTPONED_JOB interrupt to its running EC (or
* its main thread's EC before it starts). Delivery is lazy: the target is
* not unblocked, and a job for a Ractor that exits first is discarded.
*
* rb_postponed_job_trigger() is safe from a signal handler or any thread
* because it only touches the VM-global queue and the caller's or main EC.
* This one dereferences running_ractor and its EC, so the caller must keep
* running_ractor alive and running for the whole call: holding its VALUE
* is not enough, and a terminated Ractor is unsafe. The main Ractor is
* always safe. */
void
rb_postponed_job_trigger_for_ractor(unsigned int h, VALUE running_ractor)
{
VM_ASSERT(rb_ractor_p(running_ractor));
rb_ractor_t *r = (rb_ractor_t *)DATA_PTR(running_ractor);

RUBY_ATOMIC_OR(r->postponed_job_triggered_bits, (((rb_atomic_t)1UL) << h));

/* The racy running_ec read is benign: whichever of the target's threads
* checks interrupts first drains the whole per-Ractor mask. */
rb_execution_context_t *target_ec = r->threads.running_ec;
if (target_ec == NULL && r->threads.main) {
target_ec = r->threads.main->ec;
}
if (target_ec) {
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(target_ec);
}
}

void
rb_postponed_job_flush(rb_vm_t *vm)
{
Expand All @@ -1940,6 +1974,9 @@ rb_postponed_job_flush(rb_vm_t *vm)

rb_atomic_t triggered_bits = RUBY_ATOMIC_EXCHANGE(pjq->triggered_bitset, 0);

/* jobs targeted at this Ractor (rb_postponed_job_trigger_for_ractor) */
triggered_bits |= RUBY_ATOMIC_EXCHANGE(rb_ec_ractor_ptr(ec)->postponed_job_triggered_bits, 0);

ec->errinfo = Qnil;
/* mask POSTPONED_JOB dispatch */
ec->interrupt_mask |= block_mask;
Expand Down