123456789_123456789_123456789_123456789_123456789_

Class: Thread::SizedQueue

Relationships & Source Files
Inherits: Object
Defined in: thread_sync.c,
thread_sync.c

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Class Method Summary

  • .new(max) constructor

    Creates a fixed-length queue with a maximum size of #max.

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(max)

Creates a fixed-length queue with a maximum size of #max.

[ GitHub ]

  
# File 'thread_sync.c', line 1147

static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
    long max;
    struct rb_szqueue *sq = szqueue_ptr(self);

    max = NUM2LONG(vmax);
    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }

    RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
    list_head_init(szqueue_waitq(sq));
    list_head_init(szqueue_pushq(sq));
    sq->max = max;

    return self;
}

Instance Attribute Details

#empty?Boolean (readonly)

Returns true if the queue is empty.

[ GitHub ]

  
# File 'thread_sync.c', line 1377

static VALUE
rb_szqueue_empty_p(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return RBOOL(queue_length(self, &sq->q) == 0);
}

#max (rw)

Returns the maximum size of the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1197

static VALUE
rb_szqueue_max_get(VALUE self)
{
    return LONG2NUM(szqueue_ptr(self)->max);
}

#max=(number) (rw)

Sets the maximum size of the queue to the given number.

[ GitHub ]

  
# File 'thread_sync.c', line 1210

static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
    long max = NUM2LONG(vmax);
    long diff = 0;
    struct rb_szqueue *sq = szqueue_ptr(self);

    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }
    if (max > sq->max) {
	diff = max - sq->max;
    }
    sq->max = max;
    sync_wakeup(szqueue_pushq(sq), diff);
    return vmax;
}

Instance Method Details

#push(object, non_block = false) #enq(object, non_block = false) #<<(object)
Also known as: #push, #enq

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.

[ GitHub ]

  
# File 'thread_sync.c', line 1253

static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);
    int should_block = szqueue_push_should_block(argc, argv);

    while (queue_length(self, &sq->q) >= sq->max) {
        if (!should_block) {
            rb_raise(rb_eThreadError, "queue full");
        }
        else if (queue_closed_p(self)) {
            break;
        }
        else {
            rb_execution_context_t *ec = GET_EC();
            struct queue_waiter queue_waiter = {
                .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
                .as = {.sq = sq}
            };

            struct list_head *pushq = szqueue_pushq(sq);

            list_add_tail(pushq, &queue_waiter.w.node);
            sq->num_waiting_push++;

            rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter);
        }
    }

    if (queue_closed_p(self)) {
        raise_closed_queue_error(self);
    }

    return queue_do_push(self, &sq->q, argv[0]);
}

#clear

Removes all objects from the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1329

static VALUE
rb_szqueue_clear(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    rb_ary_clear(check_array(self, sq->q.que));
    wakeup_all(szqueue_pushq(sq));
    return self;
}

#close

Similar to Queue#close.

The difference is behavior with waiting enqueuing threads.

If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).

[ GitHub ]

  
# File 'thread_sync.c', line 1178

static VALUE
rb_szqueue_close(VALUE self)
{
    if (!queue_closed_p(self)) {
	struct rb_szqueue *sq = szqueue_ptr(self);

	FL_SET(self, QUEUE_CLOSED);
	wakeup_all(szqueue_waitq(sq));
	wakeup_all(szqueue_pushq(sq));
    }
    return self;
}

#pop(non_block = false) #deq(non_block = false) #shift(non_block = false)
Also known as: #pop, #shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ::ThreadError is raised.

[ GitHub ]

  
# File 'thread_sync.c', line 1316

static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return szqueue_do_pop(self, should_block);
}

#push(object, non_block = false) #enq(object, non_block = false) #<<(object)

Alias for #<<.

#length #size
Also known as: #size

Returns the length of the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1348

static VALUE
rb_szqueue_length(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return LONG2NUM(queue_length(self, &sq->q));
}

#num_waiting

Returns the number of threads waiting on the queue.

[ GitHub ]

  
# File 'thread_sync.c', line 1362

static VALUE
rb_szqueue_num_waiting(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}

#pop(non_block = false) #deq(non_block = false) #shift(non_block = false)

Alias for #deq.

#push(object, non_block = false) #enq(object, non_block = false) #<<(object)

Alias for #<<.

#pop(non_block = false) #deq(non_block = false) #shift(non_block = false)

Alias for #deq.

#length #size

Alias for #length.