Class: Fiber::Scheduler
Relationships & Source Files | |
Inherits: | Object |
Defined in: | scheduler.c, scheduler.c |
Overview
This is not an existing class, but documentation of the interface that Scheduler
object should comply to in order to be used as argument to scheduler and handle non-blocking fibers. See also the “Non-blocking fibers” section in ::Fiber
class docs for explanations of some concepts.
Scheduler’s behavior and usage are expected to be as follows:
-
When the execution in the non-blocking
::Fiber
reaches some blocking operation (like sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler’s hook methods, listed below. -
Scheduler
somehow registers what the current fiber is waiting on, and yields control to other fibers with Fiber.yield (so the fiber would be suspended while expecting its wait to end, and other fibers in the same thread can perform) -
At the end of the current thread execution, the scheduler’s method
#scheduler_close
is called -
The scheduler runs into a wait loop, checking all the blocked fibers (which it has registered on hook calls) and resuming them when the awaited resource is ready (e.g. I/O ready or sleep time elapsed).
This way concurrent execution will be achieved transparently for every individual Fiber’s code.
Scheduler
implementations are provided by gems, like Async.
Hook methods are:
-
#io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select,
#io_close
-
(the list is expanded as
::Ruby
developers make more methods having non-blocking calls)
When not specified otherwise, the hook implementations are mandatory: if they are not implemented, the methods trying to call hook will fail. To provide backward compatibility, in the future hooks will be optional (if they are not implemented, due to the scheduler being created for the older ::Ruby
version, the code which needs this hook will not fail, and will just behave in a blocking fashion).
It is also strongly recommended that the scheduler implements the #fiber method, which is delegated to by schedule.
Sample toy implementation of the scheduler can be found in Ruby’s code, in test/fiber/scheduler.rb
Instance Method Summary
-
#address_resolve(hostname) ⇒ array_of_strings?
Invoked by any method that performs a non-reverse DNS lookup.
- #block(blocker, timeout = nil)
-
#blocking_operation_wait(work)
Invoked by Ruby’s core methods to run a blocking operation in a non-blocking way.
-
#close
Called when the current thread exits.
- #fiber
-
#io_pread(io, buffer, from, length, offset) ⇒ read length, -errno
Invoked by IO#pread or
IO::Buffer#pread
to readlength
bytes fromio
at offsetfrom
into a specifiedbuffer
(see::IO::Buffer
) at the givenoffset
. -
#io_pwrite(io, buffer, from, length, offset) ⇒ written length, -errno
Invoked by IO#pwrite or
IO::Buffer#pwrite
to writelength
bytes toio
at offsetfrom
into a specifiedbuffer
(see::IO::Buffer
) at the givenoffset
. -
#io_read(io, buffer, length, offset) ⇒ read length, -errno
Invoked by IO#read or
IO#Buffer
.read to readlength
bytes fromio
into a specifiedbuffer
(see::IO::Buffer
) at the givenoffset
. -
#io_select(readables, writables, exceptables, timeout)
Invoked by IO.select to ask whether the specified descriptors are ready for specified events within the specified
timeout
. -
#io_wait(io, events, timeout)
Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the specified descriptor is ready for specified events within the specified
timeout
. -
#io_write(io, buffer, length, offset) ⇒ written length, -errno
Invoked by IO#write or IO::Buffer#write to write
length
bytes toio
from from a specifiedbuffer
(see::IO::Buffer
) at the givenoffset
. -
#kernel_sleep(duration = nil)
Invoked by Kernel.sleep and
Mutex#sleep
and is expected to provide an implementation of sleeping in a non-blocking way. -
#process_wait(pid, flags)
Invoked by
Process::Status.wait
in order to wait for a specified process. -
#timeout_after(duration, exception_class, *exception_arguments, &block) ⇒ result of block
Invoked by
Timeout.timeout
to execute the given #block within the givenduration
. - #unblock(blocker, fiber)
Instance Method Details
#address_resolve(hostname) ⇒ array_of_strings
?
Invoked by any method that performs a non-reverse DNS lookup. The most notable method is Addrinfo.getaddrinfo
, but there are many other.
The method is expected to return an array of strings corresponding to ip addresses the hostname
is resolved to, or nil
if it can not be resolved.
Fairly exhaustive list of all possible call-sites:
-
Addrinfo.getaddrinfo
-
Addrinfo.tcp
-
Addrinfo.udp
-
Addrinfo.ip
-
Addrinfo.new
-
Addrinfo.marshal_load
-
SOCKSSocket.new
-
TCPServer.new
-
TCPSocket.new
-
IPSocket.getaddress
-
TCPSocket.gethostbyname
-
UDPSocket#connect
-
UDPSocket#bind
-
UDPSocket#send
-
Socket.getaddrinfo
-
Socket.gethostbyname
-
Socket.pack_sockaddr_in
-
Socket.sockaddr_in
-
Socket.unpack_sockaddr_in
# File 'scheduler.c', line 703
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) { VALUE arguments[] = { hostname }; return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); }
#block(blocker, timeout = nil)
Invoked by methods like Thread.join
, and by Mutex, to signify that current ::Fiber
is blocked until further notice (e.g. #unblock) or until timeout
has elapsed.
blocker
is what we are waiting on, informational only (for debugging and logging). There are no guarantee about its value.
Expected to return boolean, specifying whether the blocking operation was successful or not.
# File 'scheduler.c', line 391
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) { return rb_funcall(scheduler, id_block, 2, blocker, timeout); }
#blocking_operation_wait(work)
# File 'scheduler.c', line 753
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state) { struct rb_blocking_operation_wait_arguments arguments = { .function = function, .data = data, .unblock_function = unblock_function, .data2 = data2, .flags = flags, .state = state }; VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments); return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc); }
#close
Called when the current thread exits. The scheduler is expected to implement this method in order to allow all waiting fibers to finalize their execution.
The suggested pattern is to implement the main event loop in the #close
method.
# File 'scheduler.c', line 248
VALUE rb_fiber_scheduler_close(VALUE scheduler) { RUBY_ASSERT(ruby_thread_has_gvl_p()); VALUE result; // The reason for calling `scheduler_close` before calling `close` is for // legacy schedulers which implement `close` and expect the user to call // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` // which should call `scheduler_close`. If it were to call `close`, it // would create an infinite loop. result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); if (!UNDEF_P(result)) return result; result = rb_check_funcall(scheduler, id_close, 0, NULL); if (!UNDEF_P(result)) return result; return Qnil; }
#fiber
[ GitHub ]
#io_pread(io, buffer, from, length, offset) ⇒ read
length
, -errno
Invoked by IO#pread or IO::Buffer#pread
to read length
bytes from io
at offset from
into a specified buffer
(see ::IO::Buffer
) at the given offset
.
This method is semantically the same as #io_read, but it allows to specify the offset to read from and is often better for asynchronous ::IO
on the same file.
The method should be considered experimental.
# File 'scheduler.c', line 542
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pread, 5, arguments); }
#io_pwrite(io, buffer, from, length, offset) ⇒ written
length
, -errno
Invoked by IO#pwrite or IO::Buffer#pwrite
to write length
bytes to io
at offset from
into a specified buffer
(see ::IO::Buffer
) at the given offset
.
This method is semantically the same as #io_write, but it allows to specify the offset to write to and is often better for asynchronous ::IO
on the same file.
The method should be considered experimental.
# File 'scheduler.c', line 605
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments); }
#io_read(io, buffer, length, offset) ⇒ read
length
, -errno
Invoked by IO#read or IO#Buffer
.read to read length
bytes from io
into a specified buffer
(see ::IO::Buffer
) at the given offset
.
The length
argument is the “minimum length to be read”. If the ::IO
buffer size is 8KiB, but the length
is 1024
(1KiB), up to 8KiB might be read, but at least 1KiB will be. Generally, the only case where less data than length
will be read is if there is an error reading the data.
Specifying a length
of 0 is valid and means try reading at least once and return any available data.
Suggested implementation should try to read from io
in a non-blocking manner and call #io_wait if the io
is not ready (which will yield control to other fibers).
See ::IO::Buffer
for an interface available to return data.
Expected to return number of bytes read, or, in case of an error, -errno
(negated number corresponding to system’s error code).
The method should be considered experimental.
# File 'scheduler.c', line 518
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_read, 4, arguments); }
#io_select(readables, writables, exceptables, timeout)
# File 'scheduler.c', line 473
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout) { VALUE arguments[] = { readables, writables, exceptables, timeout }; return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments); }
#io_wait(io, events, timeout)
Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the specified descriptor is ready for specified events within the specified timeout
.
events
is a bit mask of IO::READABLE, IO::WRITABLE, and IO::PRIORITY.
Suggested implementation should register which ::Fiber
is waiting for which resources and immediately calling Fiber.yield to pass control to other fibers. Then, in the #close method, the scheduler might dispatch all the I/O resources to fibers waiting for it.
Expected to return the subset of events that are ready immediately.
# File 'scheduler.c', line 445
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); }
#io_write(io, buffer, length, offset) ⇒ written
length
, -errno
Invoked by IO#write or IO::Buffer#write to write length
bytes to io
from from a specified buffer
(see ::IO::Buffer
) at the given offset
.
The length
argument is the “minimum length to be written”. If the ::IO
buffer size is 8KiB, but the length
specified is 1024 (1KiB), at most 8KiB will be written, but at least 1KiB will be. Generally, the only case where less data than length
will be written is if there is an error writing the data.
Specifying a length
of 0 is valid and means try writing at least once, as much data as possible.
Suggested implementation should try to write to io
in a non-blocking manner and call #io_wait if the io
is not ready (which will yield control to other fibers).
See ::IO::Buffer
for an interface available to get data from buffer efficiently.
Expected to return number of bytes written, or, in case of an error, -errno
(negated number corresponding to system’s error code).
The method should be considered experimental.
# File 'scheduler.c', line 580
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_write, 4, arguments); }
#kernel_sleep(duration = nil)
Invoked by Kernel.sleep and Mutex#sleep
and is expected to provide an implementation of sleeping in a non-blocking way. Implementation might register the current fiber in some list of “which fiber wait until what moment”, call Fiber.yield to pass control, and then in #close resume the fibers whose wait period has elapsed.
# File 'scheduler.c', line 291
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout) { return rb_funcall(scheduler, id_kernel_sleep, 1, timeout); }
#process_wait(pid, flags)
Invoked by Process::Status.wait
in order to wait for a specified process. See that method description for arguments description.
Suggested minimal implementation:
Thread.new do
Process::Status.wait(pid, flags)
end.value
This hook is optional: if it is not present in the current scheduler, Process::Status.wait
will behave as a blocking method.
Expected to return a ::Process::Status
instance.
# File 'scheduler.c', line 367
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) { VALUE arguments[] = { PIDT2NUM(pid), RB_INT2NUM(flags) }; return rb_check_funcall(scheduler, id_process_wait, 2, arguments); }
#timeout_after(duration, exception_class, *exception_arguments, &block) ⇒ result
of
block
Invoked by Timeout.timeout
to execute the given #block within the given duration
. It can also be invoked directly by the scheduler or user code.
Attempt to limit the execution time of a given #block to the given duration
if possible. When a non-blocking operation causes the #block‘s execution time to exceed the specified duration
, that non-blocking operation should be interrupted by raising the specified exception_class
constructed with the given exception_arguments
.
General execution timeouts are often considered risky. This implementation will only interrupt non-blocking operations. This is by design because it’s expected that non-blocking operations can fail for a variety of unpredictable reasons, so applications should already be robust in handling these conditions and by implication timeouts.
However, as a result of this design, if the #block does not invoke any non-blocking operations, it will be impossible to interrupt it. If you desire to provide predictable points for timeouts, consider adding sleep(0).
If the block is executed successfully, its result will be returned.
The exception will typically be raised using Fiber#raise.
# File 'scheduler.c', line 332
VALUE rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message) { VALUE arguments[] = { timeout, exception, message }; return rb_check_funcall(scheduler, id_timeout_after, 3, arguments); }
#unblock(blocker, fiber)
Invoked to wake up ::Fiber
previously blocked with #block (for example, Mutex#lock
calls #block and Mutex#unlock
calls #unblock
). The scheduler should use the #fiber parameter to understand which fiber is unblocked.
blocker
is what was awaited for, but it is informational only (for debugging and logging), and it is not guaranteed to be the same value as the blocker
for #block.
# File 'scheduler.c', line 410
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { RUBY_ASSERT(rb_obj_is_fiber(fiber)); // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`. // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it. int saved_errno = errno; VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); errno = saved_errno; return result; }