class Ractor::Selector
Public Instance Methods
_wait
(p1, p2, p3, p4)
static VALUE ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) { rb_execution_context_t *ec = GET_EC(); struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); struct rb_ractor_basket *tb = &s->take_basket; struct rb_ractor_basket taken_basket; rb_ractor_t *cr = rb_ec_ractor_ptr(ec); bool do_receive = !!RTEST(do_receivev); bool do_yield = !!RTEST(do_yieldv); VALUE ret_v, ret_r; enum rb_ractor_wait_status wait_status; struct rb_ractor_queue *rq = &cr->sync.recv_queue; struct rb_ractor_queue *ts = &cr->sync.takers_queue; RUBY_DEBUG_LOG("start"); retry: RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); // setup wait_status wait_status = wait_none; if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; if (do_receive) wait_status |= wait_receiving; if (do_yield) wait_status |= wait_yielding; RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); if (wait_status == wait_none) { rb_raise(rb_eRactorError, "no taking ractors"); } // check recv_queue if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) { ret_r = ID2SYM(rb_intern("receive")); goto success; } // check takers if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { ret_v = Qnil; ret_r = ID2SYM(rb_intern("yield")); goto success; } // check take_basket VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); s->take_basket.type.e = basket_type_none; // kick all take target ractors st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); RACTOR_LOCK_SELF(cr); { retry_waiting: while (1) { if (!basket_none_p(tb)) { RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e), tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0); break; } if (do_receive && !ractor_queue_empty_p(cr, rq)) { RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); break; } if (do_yield && ractor_check_take_basket(cr, ts)) { RUBY_DEBUG_LOG("can yield"); break; } ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb); } taken_basket = *tb; // ensure // tb->type.e = basket_type_reserved # do it atomic in the following code if (taken_basket.type.e == basket_type_yielding || RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { if (basket_type_p(tb, basket_type_yielding)) { RACTOR_UNLOCK_SELF(cr); { rb_thread_sleep(0); } RACTOR_LOCK_SELF(cr); } goto retry_waiting; } } RACTOR_UNLOCK_SELF(cr); // check the taken resutl switch (taken_basket.type.e) { case basket_type_none: VM_ASSERT(do_receive || do_yield); goto retry; case basket_type_yielding: rb_bug("unreachable"); case basket_type_deleted: { ractor_selector_remove(selv, taken_basket.sender); rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); if (ractor_take_will_lock(r, &taken_basket)) { RUBY_DEBUG_LOG("has_will"); } else { RUBY_DEBUG_LOG("no will"); // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); // remove and retry wait goto retry; } break; } case basket_type_will: // no more messages ractor_selector_remove(selv, taken_basket.sender); break; default: break; } RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e)); ret_v = ractor_basket_accept(&taken_basket); ret_r = taken_basket.sender; success: return rb_ary_new_from_args(2, ret_r, ret_v); }
add
(p1)
static VALUE ractor_selector_add(VALUE selv, VALUE rv) { if (!rb_ractor_p(rv)) { rb_raise(rb_eArgError, "Not a ractor object"); } rb_ractor_t *r = RACTOR_PTR(rv); struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { rb_raise(rb_eArgError, "already added"); } struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); VM_ASSERT(config != NULL); config->closed = false; config->oneshot = false; if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) { st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); } return rv; }
clear
()
static VALUE ractor_selector_clear(VALUE selv) { struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv); st_clear(s->take_ractors); return selv; }
empty?
()
static VALUE ractor_selector_empty_p(VALUE selv) { struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse; }
remove
(p1)
static VALUE ractor_selector_remove(VALUE selv, VALUE rv) { if (!rb_ractor_p(rv)) { rb_raise(rb_eArgError, "Not a ractor object"); } rb_ractor_t *r = RACTOR_PTR(rv); struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { rb_raise(rb_eArgError, "not added yet"); } ractor_deregister_take(r, &s->take_basket); struct rb_ractor_selector_take_config *config; st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); free(config); return rv; }
wait
(p1 = {})
static VALUE ractor_selector_wait(int argc, VALUE *argv, VALUE selector) { VALUE options; ID keywords[3]; VALUE values[3]; keywords[0] = rb_intern("receive"); keywords[1] = rb_intern("yield_value"); keywords[2] = rb_intern("move"); rb_scan_args(argc, argv, "0:", &options); rb_get_kwargs(options, keywords, 0, numberof(values), values); return ractor_selector__wait(selector, values[0] == Qundef ? Qfalse : RTEST(values[0]), values[1] != Qundef, values[1], values[2]); }