class Ractor::Selector

Public Instance Methods

_wait (p1, p2, p3, p4)
static VALUE
ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
{
    rb_execution_context_t *ec = GET_EC();
    struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
    struct rb_ractor_basket *tb = &s->take_basket;
    struct rb_ractor_basket taken_basket;
    rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
    bool do_receive = !!RTEST(do_receivev);
    bool do_yield = !!RTEST(do_yieldv);
    VALUE ret_v, ret_r;
    enum rb_ractor_wait_status wait_status;
    struct rb_ractor_queue *rq = &cr->sync.recv_queue;
    struct rb_ractor_queue *ts = &cr->sync.takers_queue;

    RUBY_DEBUG_LOG("start");

  retry:
    RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);

    // setup wait_status
    wait_status = wait_none;
    if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
    if (do_receive)                       wait_status |= wait_receiving;
    if (do_yield)                         wait_status |= wait_yielding;

    RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));

    if (wait_status == wait_none) {
        rb_raise(rb_eRactorError, "no taking ractors");
    }

    // check recv_queue
    if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
        ret_r = ID2SYM(rb_intern("receive"));
        goto success;
    }

    // check takers
    if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
        ret_v = Qnil;
        ret_r = ID2SYM(rb_intern("yield"));
        goto success;
    }

    // check take_basket
    VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
    s->take_basket.type.e = basket_type_none;
    // kick all take target ractors
    st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);

    RACTOR_LOCK_SELF(cr);
    {
      retry_waiting:
        while (1) {
            if (!basket_none_p(tb)) {
                RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
                               tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
                break;
            }
            if (do_receive && !ractor_queue_empty_p(cr, rq)) {
                RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
                break;
            }
            if (do_yield && ractor_check_take_basket(cr, ts)) {
                RUBY_DEBUG_LOG("can yield");
                break;
            }

            ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
        }

        taken_basket = *tb;

        // ensure
        //   tb->type.e = basket_type_reserved # do it atomic in the following code
        if (taken_basket.type.e == basket_type_yielding ||
            RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {

            if (basket_type_p(tb, basket_type_yielding)) {
                RACTOR_UNLOCK_SELF(cr);
                {
                    rb_thread_sleep(0);
                }
                RACTOR_LOCK_SELF(cr);
            }
            goto retry_waiting;
        }
    }
    RACTOR_UNLOCK_SELF(cr);

    // check the taken resutl
    switch (taken_basket.type.e) {
      case basket_type_none:
        VM_ASSERT(do_receive || do_yield);
        goto retry;
      case basket_type_yielding:
        rb_bug("unreachable");
      case basket_type_deleted: {
          ractor_selector_remove(selv, taken_basket.sender);

          rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
          if (ractor_take_will_lock(r, &taken_basket)) {
              RUBY_DEBUG_LOG("has_will");
          }
          else {
              RUBY_DEBUG_LOG("no will");
              // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
              // remove and retry wait
              goto retry;
          }
          break;
      }
      case basket_type_will:
        // no more messages
        ractor_selector_remove(selv, taken_basket.sender);
        break;
      default:
        break;
    }

    RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));

    ret_v = ractor_basket_accept(&taken_basket);
    ret_r = taken_basket.sender;
  success:
    return rb_ary_new_from_args(2, ret_r, ret_v);
}
add (p1)
static VALUE
ractor_selector_add(VALUE selv, VALUE rv)
{
    if (!rb_ractor_p(rv)) {
        rb_raise(rb_eArgError, "Not a ractor object");
    }

    rb_ractor_t *r = RACTOR_PTR(rv);
    struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);

    if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
        rb_raise(rb_eArgError, "already added");
    }

    struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
    VM_ASSERT(config != NULL);
    config->closed = false;
    config->oneshot = false;

    if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
        st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
    }

    return rv;
}
clear ()
static VALUE
ractor_selector_clear(VALUE selv)
{
    struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);

    st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
    st_clear(s->take_ractors);
    return selv;
}
empty? ()
static VALUE
ractor_selector_empty_p(VALUE selv)
{
    struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
    return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
}
remove (p1)
static VALUE
ractor_selector_remove(VALUE selv, VALUE rv)
{
    if (!rb_ractor_p(rv)) {
        rb_raise(rb_eArgError, "Not a ractor object");
    }

    rb_ractor_t *r = RACTOR_PTR(rv);
    struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);

    RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));

    if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
        rb_raise(rb_eArgError, "not added yet");
    }

    ractor_deregister_take(r, &s->take_basket);
    struct rb_ractor_selector_take_config *config;
    st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
    free(config);

    return rv;
}
wait (p1 = {})
static VALUE
ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
{
    VALUE options;
    ID keywords[3];
    VALUE values[3];

    keywords[0] = rb_intern("receive");
    keywords[1] = rb_intern("yield_value");
    keywords[2] = rb_intern("move");

    rb_scan_args(argc, argv, "0:", &options);
    rb_get_kwargs(options, keywords, 0, numberof(values), values);
    return ractor_selector__wait(selector,
                                 values[0] == Qundef ? Qfalse : RTEST(values[0]),
                                 values[1] != Qundef, values[1], values[2]);
}