Module: EventMachine
Overview
Top-level EventMachine
namespace. If you are looking for EventMachine
examples, see EventMachine
tutorial.
Key methods
Starting and stopping the event loop
Implementing clients
Implementing servers
Working with timers
Working with blocking tasks
Efficient proxying
Constant Summary
-
ConnectionAccepted =
# File 'ext/rubymain.cpp', line 1605INT2NUM(EM_CONNECTION_ACCEPTED )
-
ConnectionCompleted =
# File 'ext/rubymain.cpp', line 1606INT2NUM(EM_CONNECTION_COMPLETED )
-
ConnectionData =
# File 'ext/rubymain.cpp', line 1603INT2NUM(EM_CONNECTION_READ )
-
ConnectionNotifyReadable =
# File 'ext/rubymain.cpp', line 1608INT2NUM(EM_CONNECTION_NOTIFY_READABLE)
-
ConnectionNotifyWritable =
# File 'ext/rubymain.cpp', line 1609INT2NUM(EM_CONNECTION_NOTIFY_WRITABLE)
-
ConnectionUnbound =
# File 'ext/rubymain.cpp', line 1604INT2NUM(EM_CONNECTION_UNBOUND )
-
EM_PROTO_SSLv2 =
SSL Protocols
INT2NUM(EM_PROTO_SSLv2 )
-
EM_PROTO_SSLv3 =
# File 'ext/rubymain.cpp', line 1617INT2NUM(EM_PROTO_SSLv3 )
-
EM_PROTO_TLSv1 =
# File 'ext/rubymain.cpp', line 1618INT2NUM(EM_PROTO_TLSv1 )
-
EM_PROTO_TLSv1_1 =
# File 'ext/rubymain.cpp', line 1619INT2NUM(EM_PROTO_TLSv1_1)
-
EM_PROTO_TLSv1_2 =
# File 'ext/rubymain.cpp', line 1620INT2NUM(EM_PROTO_TLSv1_2)
-
EM_PROTO_TLSv1_3 =
# File 'ext/rubymain.cpp', line 1622INT2NUM(EM_PROTO_TLSv1_3)
-
ERRNOS =
Internal use only
System errnos
Errno::constants.grep(/^E/).inject(Hash.new(:unknown)) { |hash, name| errno = Errno.__send__(:const_get, name) hash[errno::Errno] = errno hash }
-
LoopbreakSignalled =
# File 'ext/rubymain.cpp', line 1607INT2NUM(EM_LOOPBREAK_SIGNAL )
-
OPENSSL_LIBRARY_VERSION =
Version of OpenSSL that
EventMachine
loaded withrb_str_new2(OpenSSL_version(OPENSSL_VERSION))
-
OPENSSL_NO_SSL2 =
# File 'ext/rubymain.cpp', line 1628Qtrue
-
OPENSSL_NO_SSL3 =
True if SSL3 is not available
Qtrue
-
OPENSSL_VERSION =
Version of OpenSSL that
EventMachine
was compiled withrb_str_new2(OPENSSL_VERSION_TEXT)
-
P =
Alias for
Protocols
EventMachine::Protocols
-
SslHandshakeCompleted =
# File 'ext/rubymain.cpp', line 1610INT2NUM(EM_SSL_HANDSHAKE_COMPLETED )
-
SslVerify =
# File 'ext/rubymain.cpp', line 1611INT2NUM(EM_SSL_VERIFY )
-
TimerFired =
Connection
statesINT2NUM(EM_TIMER_FIRED )
-
VERSION =
# File 'lib/em/version.rb', line 2"1.3.0.dev.1"
Class Attribute Summary
-
.defers_finished? ⇒ Boolean
readonly
Returns
true
if all deferred actions are done executing and their callbacks have been fired. -
.heartbeat_interval ⇒ Integer
rw
Retrieve the heartbeat interval.
-
.heartbeat_interval=(time)
rw
Set the heartbeat interval.
-
.reactor_running? ⇒ Boolean
readonly
Tells you whether the
EventMachine
reactor loop is currently running. -
.reactor_thread ⇒ Thread
readonly
Exposed to allow joining on the thread, when run in a multithreaded environment.
- .reactor_thread? ⇒ Boolean readonly
- .threadpool readonly Internal use only Internal use only
-
.threadpool_size ⇒ Number
rw
Size of the
EventMachine
.defer threadpool (defaults to 20). -
.epoll
readonly
mod_func
t__epoll.
-
.epoll? ⇒ Boolean
readonly
mod_func
t__epoll_p.
-
.kqueue
readonly
mod_func
t__kqueue.
-
.kqueue? ⇒ Boolean
readonly
mod_func
t__kqueue_p.
-
.ssl? ⇒ Boolean
readonly
mod_func
t__ssl_p.
-
.stopping? ⇒ Boolean
readonly
mod_func
t_stopping.
Class Method Summary
- ._open_file_for_writing(filename, handler = nil) Internal use only Internal use only
-
.add_periodic_timer(*args, &block)
Adds a periodic timer to the event loop.
-
.add_shutdown_hook(&block)
Adds a block to call as the reactor is shutting down.
-
.add_timer(*args, &block)
Adds a one-shot timer to the event loop.
-
.attach(io, handler = nil, *args, &blk)
Attaches an IO object or file descriptor to the eventloop as a regular connection.
- .attach_io(io, watch_mode, handler = nil, *args) Internal use only Internal use only
-
.attach_server(sock, handler = nil, *args, &block)
Attach to an existing socket's file descriptor.
-
.bind_connect(bind_addr, bind_port, server, port = nil, handler = nil, *args)
This method is like .connect, but allows for a local address/port to bind the connection to.
-
Callback(object, method) ⇒ <#call>
Utility method for coercing arguments to an object that responds to
:call
. -
.cancel_timer(timer_or_sig)
Cancel a timer (can be a callback or an
Timer
instance). -
.cleanup_machine
Clean up Ruby space following a release_machine.
-
.connect(server, port = nil, handler = nil, *args, &blk)
Initiates a TCP connection to a remote server and sets up event handling for the connection.
-
.connect_unix_domain(socketname, *args, &blk)
Make a connection to a Unix-domain socket.
-
.connection_count ⇒ Integer
Returns the total number of connections (file descriptors) currently held by the reactor.
-
.defer(op = nil, callback = nil, errback = nil, &blk)
EventMachine
.defer is used for integrating blocking operations into EventMachine's control flow. -
.disable_proxy(from)
Takes just one argument, a
Connection
that has proxying enabled via .enable_proxy. -
.enable_proxy(from, to, bufsize = 0, length = 0)
This method allows for direct writing of incoming data back out to another descriptor, at the C++ level in the reactor.
-
.error_handler(cb = nil, &blk)
Catch-all for errors raised during event loop callbacks.
- .event_callback(conn_binding, opcode, data) Internal use only Internal use only
-
.fork_reactor(&block)
Forks a new process, properly stops the reactor and then calls .run inside of it again, passing your block.
-
.get_max_timers ⇒ Integer
Gets the current maximum number of allowed timers.
- .klass_from_handler(klass = Connection, handler = nil, *args) Internal use only Internal use only
-
.next_tick(pr = nil, &block)
Schedules a proc for execution immediately after the next "turn" through the reactor core.
-
.open_datagram_socket(address, port, handler = nil, *args)
Used for UDP-based protocols.
-
.open_keyboard(handler = nil, *args)
Internal use only
Internal use only
(Experimental).
-
.popen(cmd, handler = nil, *args) {|c| ... }
Runs an external process.
-
.reconnect(server, port, handler)
Connect to a given host/port and re-use the provided
Connection
instance. -
.run(blk = nil, tail = nil, &block)
Initializes and runs an event loop.
-
.run_block(&block)
Sugars a common use case.
-
.run_deferred_callbacks
Internal use only
Internal use only
The is the responder for the loopback-signalled event.
-
.schedule(*a, &b)
Runs the given callback on the reactor thread, or immediately if called from the reactor thread.
-
.set_descriptor_table_size(n_descriptors = nil) ⇒ Integer
Sets the maximum number of file or socket descriptors that your process may open.
-
.set_effective_user(username)
A wrapper over the setuid system call.
-
.set_max_timers(ct)
Sets the maximum number of timers and periodic timers that may be outstanding at any given time.
-
.set_quantum(mills)
For advanced users.
-
.spawn(&block)
Spawn an erlang-style process.
- .spawn_threadpool Internal use only Internal use only
-
.start_server(server, port = nil, handler = nil, *args, &block)
Initiates a TCP server (socket acceptor) on the specified IP address and port.
-
.start_unix_domain_server(filename, *args, &block)
Start a Unix-domain server.
-
.stop_event_loop
Causes the processing loop to stop executing, which will cause all open connections and accepting servers to be run down and closed.
-
.stop_server(signature)
Stop a TCP server socket that was started with .start_server.
-
.system(cmd, *args, &cb)
EM::system is a simple wrapper for EM::popen.
-
.tick_loop(*a, &b)
Creates and immediately starts an
TickLoop
. -
.watch(io, handler = nil, *args, &blk)
.watch registers a given file descriptor or IO object with the eventloop.
-
.watch_file(filename, handler = nil, *args)
EventMachine's file monitoring API.
-
.watch_process(pid, handler = nil, *args)
EventMachine's process monitoring API.
- .yield(&block) Internal use only Internal use only
- .yield_and_notify(&block) Internal use only Internal use only
-
.add_oneshot_timer
mod_func
t_add_oneshot_timer.
-
.attach_sd(sd)
mod_func
t_attach_sd.
-
.bind_connect_server
mod_func
t_bind_connect_server.
-
.close_connection
mod_func
t_close_connection.
-
.connect_server
mod_func
t_connect_server.
-
.connect_unix_server
mod_func
t_connect_unix_server.
-
.current_time
mod_func
t_get_loop_time.
-
.epoll=
readonly
mod_func
t__epoll_set.
- .get_cipher_bits mod_func
- .get_cipher_name mod_func
- .get_cipher_protocol mod_func
-
.get_comm_inactivity_timeout
mod_func
t_get_comm_inactivity_timeout.
-
.get_connection_count
mod_func
t_get_connection_count.
-
.get_heartbeat_interval
mod_func
t_get_heartbeat_interval.
-
.get_idle_time
mod_func
t_get_idle_time.
-
.get_max_timer_count
mod_func
t_get_max_timer_count.
- .get_peer_cert mod_func
-
.get_peername
mod_func
t_get_peername.
-
.get_pending_connect_timeout
mod_func
t_get_pending_connect_timeout.
-
.get_simultaneous_accept_count
mod_func
t_get/set_simultaneous_accept_count.
- .get_sni_hostname mod_func
-
.get_sockname
mod_func
t_get_sockname.
-
.get_subprocess_pid
mod_func
t_get_subprocess_pid.
-
.get_subprocess_status
mod_func
t_get_subprocess_status.
-
.get_timer_count
mod_func
t_get_timer_count.
-
.initialize_event_machine
mod_func
t_initialize_event_machine.
-
.invoke_popen
mod_func
t_invoke_popen.
-
.kqueue=
readonly
mod_func
t__kqueue_set.
-
.library_type
mod_func
t_library_type.
-
.open_udp_socket
mod_func
t_open_udp_socket.
-
.read_keyboard
mod_func
t_read_keyboard.
-
.release_machine
mod_func
t_release_machine.
-
.report_connection_error_status
mod_func
t_report_connection_error_status.
-
.run_machine
(also: .run_machine_without_threads)
mod_func
t_run_machine.
-
.run_machine_once
mod_func
t_run_machine_once.
-
.run_machine_without_threads
mod_func
Alias for .run_machine.
-
.send_data
mod_func
t_send_data.
-
.send_datagram
mod_func
t_send_datagram.
-
.send_file_data
mod_func
t_send_file_data.
-
.set_comm_inactivity_timeout
mod_func
t_set_comm_inactivity_timeout.
-
.set_heartbeat_interval
mod_func
t_set_heartbeat_interval.
-
.set_max_timer_count
mod_func
t_set_max_timer_count.
-
.set_pending_connect_timeout
mod_func
t_set_pending_connect_timeout.
-
.set_rlimit_nofile
mod_func
t_set_rlimit_nofile.
- .set_simultaneous_accept_count mod_func
-
.set_timer_quantum
mod_func
t_set_timer_quantum.
-
.set_tls_parms
mod_func
t_set_tls_parms.
-
.setuid_string
mod_func
t_setuid_string.
-
.signal_loopbreak
mod_func
t_signal_loopbreak.
-
.start_tcp_server
mod_func
t_start_server.
-
.start_tls
mod_func
t_start_tls.
-
.start_unix_server
mod_func
t_start_unix_server.
-
.stop
mod_func
t_stop.
-
.stop_tcp_server
mod_func
t_stop_server.
Class Attribute Details
.defers_finished? ⇒ Boolean
(readonly)
Returns true
if all deferred actions are done executing and their
callbacks have been fired.
# File 'lib/eventmachine.rb', line 1096
def self.defers_finished? return false if @threadpool and !@all_threads_spawned return false if @threadqueue and not @threadqueue.empty? return false if @resultqueue and not @resultqueue.empty? return false if @threadpool and @threadqueue.num_waiting != @threadpool.size return true end
.epoll (readonly, mod_func)
t__epoll
# File 'ext/rubymain.cpp', line 1159
static VALUE t__epoll (VALUE self UNUSED) { if (t__epoll_p(self) == Qfalse) return Qfalse; evma_set_epoll (1); return Qtrue; }
.epoll? ⇒ Boolean
(readonly, mod_func)
t__epoll_p
# File 'ext/rubymain.cpp', line 1146
static VALUE t__epoll_p (VALUE self UNUSED) { #ifdef HAVE_EPOLL return Qtrue; #else return Qfalse; #endif }
.heartbeat_interval ⇒ Integer
(rw)
Retrieve the heartbeat interval. This is how often EventMachine
will check for dead connections
that have had an inactivity timeout set via Connection#set_comm_inactivity_timeout.
Default is 2 seconds.
# File 'lib/eventmachine.rb', line 1450
def self.heartbeat_interval EM::get_heartbeat_interval end
.heartbeat_interval=(time) (rw)
Set the heartbeat interval. This is how often EventMachine
will check for dead connections
that have had an inactivity timeout set via Connection#set_comm_inactivity_timeout.
Takes a Numeric number of seconds. Default is 2.
# File 'lib/eventmachine.rb', line 1459
def self.heartbeat_interval=(time) EM::set_heartbeat_interval time.to_f end
.kqueue (readonly, mod_func)
t__kqueue
# File 'ext/rubymain.cpp', line 1199
static VALUE t__kqueue (VALUE self UNUSED) { if (t__kqueue_p(self) == Qfalse) return Qfalse; evma_set_kqueue (1); return Qtrue; }
.kqueue? ⇒ Boolean
(readonly, mod_func)
t__kqueue_p
# File 'ext/rubymain.cpp', line 1186
static VALUE t__kqueue_p (VALUE self UNUSED) { #ifdef HAVE_KQUEUE return Qtrue; #else return Qfalse; #endif }
.reactor_running? ⇒ Boolean
(readonly)
Tells you whether the EventMachine
reactor loop is currently running.
Useful when writing libraries that want to run event-driven code, but may
be running in programs that are already event-driven. In such cases, if reactor_running?
returns false, your code can invoke .run and run your application code inside
the block passed to that method. If this method returns true, just
execute your event-aware code.
# File 'lib/eventmachine.rb', line 1228
def self.reactor_running? @reactor_running && Process.pid == @reactor_pid end
.reactor_thread ⇒ Thread
(readonly)
Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).
# File 'lib/eventmachine.rb', line 80
attr_reader :reactor_thread
.reactor_thread? ⇒ Boolean
(readonly)
# File 'lib/eventmachine.rb', line 228
def self.reactor_thread? Thread.current == @reactor_thread end
.ssl? ⇒ Boolean
(readonly, mod_func)
t__ssl_p
# File 'ext/rubymain.cpp', line 1226
static VALUE t__ssl_p (VALUE self UNUSED) { #ifdef WITH_SSL return Qtrue; #else return Qfalse; #endif }
.stopping? ⇒ Boolean
(readonly, mod_func)
t_stopping
# File 'ext/rubymain.cpp', line 1239
static VALUE t_stopping () { if (evma_stopping()) { return Qtrue; } else { return Qfalse; } }
.threadpool (readonly)
# File 'lib/eventmachine.rb', line 1106
attr_reader :threadpool
.threadpool_size ⇒ Number
(rw)
Size of the EventMachine
.defer threadpool (defaults to 20)
# File 'lib/eventmachine.rb', line 1110
attr_accessor :threadpool_size
Class Method Details
._open_file_for_writing(filename, handler = nil)
# File 'lib/eventmachine.rb', line 1565
def self._open_file_for_writing filename, handler=nil klass = klass_from_handler(Connection, handler) s = _write_file filename c = klass.new s @conns[s] = c block_given? and yield c c end
.add_oneshot_timer (mod_func)
t_add_oneshot_timer
# File 'ext/rubymain.cpp', line 291
static VALUE t_add_oneshot_timer (VALUE self UNUSED, VALUE interval) { const uintptr_t f = evma_install_oneshot_timer (FIX2LONG (interval)); if (!f) rb_raise (rb_eRuntimeError, "%s", "ran out of timers; use #set_max_timers to increase limit"); return BSIG2NUM (f); }
.add_periodic_timer(*args, &block)
Adds a periodic timer to the event loop. It takes the same parameters as the one-shot timer method, .add_timer. This method schedules execution of the given block repeatedly, at intervals of time at least as great as the number of seconds given in the first parameter to the call.
# File 'lib/eventmachine.rb', line 352
def self.add_periodic_timer *args, &block interval = args.shift code = args.shift || block EventMachine::PeriodicTimer.new(interval, code) end
.add_shutdown_hook(&block)
Adds a block to call as the reactor is shutting down.
These callbacks are called in the reverse order to which they are added.
# File 'lib/eventmachine.rb', line 292
def self.add_shutdown_hook &block @tails << block end
.add_timer(*args, &block)
Adds a one-shot timer to the event loop.
Call it with one or two parameters. The first parameters is a delay-time
expressed in seconds (not milliseconds). The second parameter, if
present, must be an object that responds to :call
. If 2nd parameter is not given, then you
can also simply pass a block to the method call.
This method may be called from the block passed to .run or from any callback method. It schedules execution of the proc or block passed to it, after the passage of an interval of time equal to at least the number of seconds specified in the first parameter to the call.
add_timer
is a non-blocking method. Callbacks can and will
be called during the interval of time that the timer is in effect.
There is no built-in limit to the number of timers that can be outstanding at
any given time.
# File 'lib/eventmachine.rb', line 324
def self.add_timer *args, &block interval = args.shift code = args.shift || block if code # check too many timers! s = add_oneshot_timer((interval.to_f * 1000).to_i) @timers[s] = code s end end
.attach(io, handler = nil, *args, &blk)
Attaches an IO object or file descriptor to the eventloop as a regular connection.
The file descriptor will be set as non-blocking, and EventMachine
will process
receive_data and send_data events on it as it would for any other connection.
To watch a fd instead, use .watch, which will not alter the state of the socket and fire notify_readable and notify_writable events instead.
# File 'lib/eventmachine.rb', line 742
def EventMachine::attach io, handler=nil, *args, &blk attach_io io, false, handler, *args, &blk end
.attach_io(io, watch_mode, handler = nil, *args)
# File 'lib/eventmachine.rb', line 747
def EventMachine::attach_io io, watch_mode, handler=nil, *args klass = klass_from_handler(Connection, handler, *args) if !watch_mode and klass.public_instance_methods.any?{|m| [:notify_readable, :notify_writable].include? m.to_sym } raise ArgumentError, "notify_readable/writable with EM.attach is not supported. Use EM.watch(io){ |c| c.notify_readable = true }" end if io.respond_to?(:fileno) # getDescriptorByFileno deprecated in JRuby 1.7.x, removed in JRuby 9000 if defined?(JRuby) && JRuby.runtime.respond_to?(:getDescriptorByFileno) fd = JRuby.runtime.getDescriptorByFileno(io.fileno).getChannel else fd = io.fileno end else fd = io end s = attach_fd fd, watch_mode c = klass.new s, *args c.instance_variable_set(:@io, io) c.instance_variable_set(:@watch_mode, watch_mode) c.instance_variable_set(:@fd, fd) @conns[s] = c block_given? and yield c c end
.attach_sd(sd) (mod_func)
t_attach_sd
# File 'ext/rubymain.cpp', line 339
static VALUE t_attach_sd(VALUE self UNUSED, VALUE sd) { const uintptr_t f = evma_attach_sd(FIX2INT(sd)); if (!f) rb_raise (rb_eRuntimeError, "%s", "no socket descriptor acceptor"); return BSIG2NUM (f); }
.attach_server(sock, handler = nil, *args, &block)
Attach to an existing socket's file descriptor. The socket may have been started with .start_server.
# File 'lib/eventmachine.rb', line 542
def self.attach_server sock, handler=nil, *args, &block klass = klass_from_handler(Connection, handler, *args) sd = sock.respond_to?(:fileno) ? sock.fileno : sock s = attach_sd(sd) @acceptors[s] = [klass,args,block,sock] s end
.bind_connect(bind_addr, bind_port, server, port = nil, handler = nil, *args)
This method is like .connect, but allows for a local address/port to bind the connection to.
# File 'lib/eventmachine.rb', line 662
def self.bind_connect bind_addr, bind_port, server, port=nil, handler=nil, *args begin port = Integer(port) rescue ArgumentError, TypeError # there was no port, so server must be a unix domain socket # the port argument is actually the handler, and the handler is one of the args args.unshift handler if handler handler = port port = nil end if port klass = klass_from_handler(Connection, handler, *args) s = if port if bind_addr bind_connect_server bind_addr, bind_port.to_i, server, port else connect_server server, port end else connect_unix_server server end c = klass.new s, *args @conns[s] = c block_given? and yield c c end
.bind_connect_server (mod_func)
t_bind_connect_server
# File 'ext/rubymain.cpp', line 725
static VALUE t_bind_connect_server (VALUE self UNUSED, VALUE bind_addr, VALUE bind_port, VALUE server, VALUE port) { // Avoid FIX2INT in this case, because it doesn't deal with type errors properly. // Specifically, if the value of port comes in as a string rather than an integer, // NUM2INT will throw a type error, but FIX2INT will generate garbage. try { const uintptr_t f = evma_connect_to_server (StringValueCStr(bind_addr), NUM2INT(bind_port), StringValueCStr(server), NUM2INT(port)); if (!f) rb_raise (EM_eConnectionError, "%s", "no connection"); return BSIG2NUM (f); } catch (std::runtime_error e) { rb_raise (EM_eConnectionError, "%s", e.what()); } return Qnil; }
Callback(object, method) ⇒ <#call
>
Callback(object) ⇒ <#call
>
Callback(&block) ⇒ <#call
>
call
>
Callback(object) ⇒ <#call
>
Callback(&block) ⇒ <#call
>
Utility method for coercing arguments to an object that responds to :call
.
Accepts an object and a method name to send to, or a block, or an object
that responds to :call
.
# File 'lib/em/callback.rb', line 47
def self.Callback(object = nil, method = nil, &blk) if object && method lambda { |*args| object.__send__ method, *args } else if object.respond_to? :call object else blk || raise(ArgumentError) end # if end # if end
.cancel_timer(timer_or_sig)
Cancel a timer (can be a callback or an Timer
instance).
# File 'lib/eventmachine.rb', line 364
def self.cancel_timer timer_or_sig if timer_or_sig.respond_to? :cancel timer_or_sig.cancel else @timers[timer_or_sig] = false if @timers.has_key?(timer_or_sig) end end
.cleanup_machine
Clean up Ruby space following a release_machine
# File 'lib/eventmachine.rb', line 262
def self.cleanup_machine if @threadpool && !@threadpool.empty? # Tell the threads to stop @threadpool.each { |t| t.exit } # Join the threads or bump the stragglers one more time @threadpool.each { |t| t.join 0.01 || t.exit } end @threadpool = nil @threadqueue = nil @resultqueue = nil @all_threads_spawned = false @next_tick_queue = [] end
.close_connection (mod_func)
t_close_connection
# File 'ext/rubymain.cpp', line 682
static VALUE t_close_connection (VALUE self UNUSED, VALUE signature, VALUE after_writing) { evma_close_connection (NUM2BSIG (signature), ((after_writing == Qtrue) ? 1 : 0)); return Qnil; }
.connect(server, port = nil, handler = nil, *args, &blk)
Initiates a TCP connection to a remote server and sets up event handling for the connection.
connect
requires event loop to be running (see .run).
connect
takes the IP address (or hostname) and
port of the remote server you want to connect to.
It also takes an optional handler (a module or a subclass of Connection
) which you must define, that
contains the callbacks that will be invoked by the event loop on behalf of the connection.
Learn more about connection lifecycle callbacks in the EventMachine
tutorial and
{EventMachine::Connection lifecycle guide}.
# File 'lib/eventmachine.rb', line 632
def self.connect server, port=nil, handler=nil, *args, &blk # EventMachine::connect initiates a TCP connection to a remote # server and sets up event-handling for the connection. # It internally creates an object that should not be handled # by the caller. HOWEVER, it's often convenient to get the # object to set up interfacing to other objects in the system. # We return the newly-created anonymous-class object to the caller. # It's expected that a considerable amount of code will depend # on this behavior, so don't change it. # # Ok, added support for a user-defined block, 13Apr06. # This leads us to an interesting choice because of the # presence of the post_init call, which happens in the # initialize method of the new object. We call the user's # block and pass the new object to it. This is a great # way to do protocol-specific initiation. It happens # AFTER post_init has been called on the object, which I # certainly hope is the right choice. # Don't change this lightly, because accepted connections # are different from connected ones and we don't want # to have them behave differently with respect to post_init # if at all possible. bind_connect nil, nil, server, port, handler, *args, &blk end
.connect_server (mod_func)
t_connect_server
# File 'ext/rubymain.cpp', line 704
static VALUE t_connect_server (VALUE self UNUSED, VALUE server, VALUE port) { // Avoid FIX2INT in this case, because it doesn't deal with type errors properly. // Specifically, if the value of port comes in as a string rather than an integer, // NUM2INT will throw a type error, but FIX2INT will generate garbage. try { const uintptr_t f = evma_connect_to_server (NULL, 0, StringValueCStr(server), NUM2INT(port)); if (!f) rb_raise (EM_eConnectionError, "%s", "no connection"); return BSIG2NUM (f); } catch (std::runtime_error e) { rb_raise (EM_eConnectionError, "%s", e.what()); } return Qnil; }
.connect_unix_domain(socketname, *args, &blk)
UNIX sockets, as the name suggests, are not available on Microsoft Windows.
Make a connection to a Unix-domain socket. This method is simply an alias for .connect, which can connect to both TCP and Unix-domain sockets. Make sure that your process has sufficient permissions to open the socket it is given.
# File 'lib/eventmachine.rb', line 814
def self.connect_unix_domain socketname, *args, &blk connect socketname, *args, &blk end
.connect_unix_server (mod_func)
t_connect_unix_server
# File 'ext/rubymain.cpp', line 746
static VALUE t_connect_unix_server (VALUE self UNUSED, VALUE serversocket) { const uintptr_t f = evma_connect_to_unix_server (StringValueCStr(serversocket)); if (!f) rb_raise (rb_eRuntimeError, "%s", "no connection"); return BSIG2NUM (f); }
.connection_count ⇒ Integer
Returns the total number of connections (file descriptors) currently held by the reactor. Note that a tick must pass after the 'initiation' of a connection for this number to increment. It's usually accurate, but don't rely on the exact precision of this number unless you really know EM internals.
# File 'lib/eventmachine.rb', line 956
def self.connection_count self.get_connection_count end
.current_time (mod_func)
t_get_loop_time
# File 'ext/rubymain.cpp', line 1352
static VALUE t_get_loop_time (VALUE self UNUSED) { uint64_t current_time = evma_get_current_loop_time(); if (current_time == 0) { return Qnil; } // Generally the industry has moved to 64-bit time_t, this is just in case we're 32-bit time_t. if (sizeof(time_t) < 8 && current_time > INT_MAX) { return rb_funcall(rb_cTime, Intern_at, 2, INT2NUM(current_time / 1000000), INT2NUM(current_time % 1000000)); } else { return rb_time_new(current_time / 1000000, current_time % 1000000); } }
.defer(op = nil, callback = nil, errback = nil, &blk)
EventMachine
.defer is used for integrating blocking operations into EventMachine's control flow.
The action of .defer
is to take the block specified in the first parameter (the "operation")
and schedule it for asynchronous execution on an internal thread pool maintained by EventMachine
.
When the operation completes, it will pass the result computed by the block (if any) back to the
EventMachine
reactor. Then, EventMachine
calls the block specified in the second parameter to
.defer
(the "callback"), as part of its normal event handling loop. The result computed by the
operation block is passed as a parameter to the callback. You may omit the callback parameter if
you don't need to execute any code after the operation completes. If the operation raises an
unhandled exception, the exception will be passed to the third parameter to .defer
(the
"errback"), as part of its normal event handling loop. If no errback is provided, the exception
will be allowed to blow through to the main thread immediately.
Caveats
Note carefully that the code in your deferred operation will be executed on a separate
thread from the main EventMachine
processing and all other Ruby threads that may exist in
your program. Also, multiple deferred operations may be running at once! Therefore, you
are responsible for ensuring that your operation code is threadsafe.
Don't write a deferred operation that will block forever. If so, the current implementation will
not detect the problem, and the thread will never be returned to the pool. EventMachine
limits
the number of threads in its pool, so if you do this enough times, your subsequent deferred
operations won't get a chance to run.
The threads within the EventMachine's thread pool have abort_on_exception set to true. As a result, if an unhandled exception is raised by the deferred operation and an errback is not provided, it will blow through to the main thread immediately. If the main thread is within an indiscriminate rescue block at that time, the exception could be handled improperly by the main thread.
# File 'lib/eventmachine.rb', line 1044
def self.defer op = nil, callback = nil, errback = nil, &blk # OBSERVE that #next_tick hacks into this mechanism, so don't make any changes here # without syncing there. # # Running with $VERBOSE set to true gives a warning unless all ivars are defined when # they appear in rvalues. But we DON'T ever want to initialize @threadqueue unless we # need it, because the Ruby threads are so heavyweight. We end up with this bizarre # way of initializing @threadqueue because EventMachine is a Module, not a Class, and # has no constructor. unless @threadpool @threadpool = [] @threadqueue = ::Queue.new @resultqueue = ::Queue.new spawn_threadpool end @threadqueue << [op||blk,callback,errback] end
.disable_proxy(from)
Takes just one argument, a Connection
that has proxying enabled via .enable_proxy.
Calling this method will remove that functionality and your connection will begin receiving
data via Connection#receive_data again.
# File 'lib/eventmachine.rb', line 1441
def self.disable_proxy(from) EM::stop_proxy(from.signature) end
.enable_proxy(from, to, bufsize = 0, length = 0)
This method allows for direct writing of incoming data back out to another descriptor, at the C++ level in the reactor. This is very efficient and especially useful for proxies where high performance is required. Propogating data from a server response all the way up to Ruby, and then back down to the reactor to be sent back to the client, is often unnecessary and incurs a significant performance decrease.
The two arguments are instance of Connection
subclasses, 'from' and 'to'. 'from' is the connection whose inbound data you want
relayed back out. 'to' is the connection to write it to.
Once you call this method, the 'from' connection will no longer get receive_data callbacks from the reactor, except in the case that 'to' connection has already closed when attempting to write to it. You can see in the example, that proxy_target_unbound will be called when this occurs. After that, further incoming data will be passed into receive_data as normal.
Note also that this feature supports different types of descriptors: TCP, UDP, and pipes. You can relay data from one kind to another, for example, feed a pipe from a UDP stream.
# File 'lib/eventmachine.rb', line 1431
def self.enable_proxy(from, to, bufsize=0, length=0) EM::start_proxy(from.signature, to.signature, bufsize, length) end
.epoll= (readonly, mod_func)
t__epoll_set
# File 'ext/rubymain.cpp', line 1172
static VALUE t__epoll_set (VALUE self, VALUE val) { if (t__epoll_p(self) == Qfalse && val == Qtrue) rb_raise (EM_eUnsupported, "%s", "epoll is not supported on this platform"); evma_set_epoll (val == Qtrue ? 1 : 0); return val; }
.error_handler(cb = nil, &blk)
Catch-all for errors raised during event loop callbacks.
# File 'lib/eventmachine.rb', line 1364
def self.error_handler cb = nil, &blk if cb or blk @error_handler = cb || blk elsif instance_variable_defined? :@error_handler remove_instance_variable :@error_handler end end
.event_callback(conn_binding, opcode, data)
# File 'lib/eventmachine.rb', line 1464
def self.event_callback conn_binding, opcode, data # # Changed 27Dec07: Eliminated the hookable error handling. # No one was using it, and it degraded performance significantly. # It's in original_event_callback, which is dead code. # # Changed 25Jul08: Added a partial solution to the problem of exceptions # raised in user-written event-handlers. If such exceptions are not caught, # we must cause the reactor to stop, and then re-raise the exception. # Otherwise, the reactor doesn't stop and it's left on the call stack. # This is partial because we only added it to #unbind, where it's critical # (to keep unbind handlers from being re-entered when a stopping reactor # runs down open connections). It should go on the other calls to user # code, but the performance impact may be too large. # if opcode == ConnectionUnbound if c = @conns.delete( conn_binding ) begin if c.original_method(:unbind).arity != 0 c.unbind(data == 0 ? nil : EventMachine::ERRNOS[data]) else c.unbind end # If this is an attached (but not watched) connection, close the underlying io object. if c.instance_variable_defined?(:@io) and !c.instance_variable_get(:@watch_mode) io = c.instance_variable_get(:@io) begin io.close rescue Errno::EBADF, IOError end end # As noted above, unbind absolutely must not raise an exception or the reactor will crash. # If there is no EM.error_handler, or if the error_handler retrows, then stop the reactor, # stash the exception in $wrapped_exception, and the exception will be raised after the # reactor is cleaned up (see the last line of self.run). rescue Exception => error if instance_variable_defined? :@error_handler begin @error_handler.call error # No need to stop unless error_handler rethrows rescue Exception => error @wrapped_exception = error stop end else @wrapped_exception = error stop end end elsif c = @acceptors.delete( conn_binding ) # no-op else if $! # Bubble user generated errors. @wrapped_exception = $! stop else raise ConnectionNotBound, "received ConnectionUnbound for an unknown signature: #{conn_binding}" end end elsif opcode == ConnectionAccepted accep,args,blk = @acceptors[conn_binding] raise NoHandlerForAcceptedConnection unless accep c = accep.new data, *args @conns[data] = c blk and blk.call(c) c # (needed?) ## # The remaining code is a fallback for the pure ruby and java reactors. # In the C++ reactor, these events are handled in the C event_callback() in rubymain.cpp elsif opcode == ConnectionCompleted c = @conns[conn_binding] or raise ConnectionNotBound, "received ConnectionCompleted for unknown signature: #{conn_binding}" c.connection_completed elsif opcode == SslHandshakeCompleted c = @conns[conn_binding] or raise ConnectionNotBound, "received SslHandshakeCompleted for unknown signature: #{conn_binding}" c.ssl_handshake_completed elsif opcode == SslVerify c = @conns[conn_binding] or raise ConnectionNotBound, "received SslVerify for unknown signature: #{conn_binding}" result = c.ssl_verify_peer(data) or c.close_connection result elsif opcode == TimerFired t = @timers.delete( data ) return if t == false # timer cancelled t or raise UnknownTimerFired, "timer data: #{data}" t.call elsif opcode == ConnectionData c = @conns[conn_binding] or raise ConnectionNotBound, "received data #{data} for unknown signature: #{conn_binding}" c.receive_data data elsif opcode == LoopbreakSignalled run_deferred_callbacks elsif opcode == ConnectionNotifyReadable c = @conns[conn_binding] or raise ConnectionNotBound c.notify_readable elsif opcode == ConnectionNotifyWritable c = @conns[conn_binding] or raise ConnectionNotBound c.notify_writable end end
.fork_reactor(&block)
Forks a new process, properly stops the reactor and then calls .run inside of it again, passing your block.
# File 'lib/eventmachine.rb', line 244
def self.fork_reactor &block # This implementation is subject to change, especially if we clean up the relationship # of EM#run to @reactor_running. # Original patch by Aman Gupta. # Kernel.fork do if reactor_running? stop_event_loop release_machine cleanup_machine @reactor_running = false @reactor_thread = nil end run block end end
.get_cipher_bits (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 434
static VALUE t_get_cipher_bits (VALUE self UNUSED, VALUE signature UNUSED) { return Qnil; }
.get_cipher_name (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 454
static VALUE t_get_cipher_name (VALUE self UNUSED, VALUE signature UNUSED) { return Qnil; }
.get_cipher_protocol (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 474
static VALUE t_get_cipher_protocol (VALUE self UNUSED, VALUE signature UNUSED) { return Qnil; }
.get_comm_inactivity_timeout (mod_func)
t_get_comm_inactivity_timeout
# File 'ext/rubymain.cpp', line 625
static VALUE t_get_comm_inactivity_timeout (VALUE self UNUSED, VALUE signature) { return rb_float_new(evma_get_comm_inactivity_timeout(NUM2BSIG (signature))); }
.get_connection_count (mod_func)
t_get_connection_count
# File 'ext/rubymain.cpp', line 616
static VALUE t_get_connection_count (VALUE self UNUSED) { return INT2NUM(evma_get_connection_count()); }
.get_heartbeat_interval (mod_func)
t_get_heartbeat_interval
# File 'ext/rubymain.cpp', line 1440
static VALUE t_get_heartbeat_interval (VALUE self UNUSED) { return rb_float_new(evma_get_heartbeat_interval()); }
.get_idle_time (mod_func)
t_get_idle_time
# File 'ext/rubymain.cpp', line 1415
static VALUE t_get_idle_time (VALUE self UNUSED, VALUE from) { try{ uint64_t current_time = evma_get_current_loop_time(); uint64_t time = evma_get_last_activity_time(NUM2BSIG (from)); if (current_time != 0 && time != 0) { if (time >= current_time) return BSIG2NUM(0); else { uint64_t diff = current_time - time; float seconds = diff / (1000.0*1000.0); return rb_float_new(seconds); } return Qnil; } } catch (std::runtime_error e) { rb_raise (EM_eConnectionError, "%s", e.what()); } return Qnil; }
.get_max_timer_count (mod_func)
t_get_max_timer_count
# File 'ext/rubymain.cpp', line 982
static VALUE t_get_max_timer_count (VALUE self UNUSED) { return INT2FIX (evma_get_max_timer_count()); }
.get_max_timers ⇒ Integer
Gets the current maximum number of allowed timers
# File 'lib/eventmachine.rb', line 925
def self.get_max_timers get_max_timer_count end
.get_peer_cert (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 415
static VALUE t_get_peer_cert (VALUE self UNUSED, VALUE signature UNUSED) { return Qnil; }
.get_peername (mod_func)
t_get_peername
# File 'ext/rubymain.cpp', line 504
static VALUE t_get_peername (VALUE self UNUSED, VALUE signature) { char buf[1024]; socklen_t len = sizeof buf; try { if (evma_get_peername (NUM2BSIG (signature), (struct sockaddr*)buf, &len)) { return rb_str_new (buf, len); } } catch (std::runtime_error e) { rb_raise (rb_eRuntimeError, "%s", e.what()); } return Qnil; }
.get_pending_connect_timeout (mod_func)
t_get_pending_connect_timeout
# File 'ext/rubymain.cpp', line 647
static VALUE t_get_pending_connect_timeout (VALUE self UNUSED, VALUE signature) { return rb_float_new(evma_get_pending_connect_timeout(NUM2BSIG (signature))); }
.get_simultaneous_accept_count (mod_func)
t_get/set_simultaneous_accept_count
# File 'ext/rubymain.cpp', line 1001
static VALUE t_get_simultaneous_accept_count (VALUE self UNUSED) { return INT2FIX (evma_get_simultaneous_accept_count()); }
.get_sni_hostname (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 494
static VALUE t_get_sni_hostname (VALUE self UNUSED, VALUE signature UNUSED) { return Qnil; }
.get_sockname (mod_func)
t_get_sockname
# File 'ext/rubymain.cpp', line 523
static VALUE t_get_sockname (VALUE self UNUSED, VALUE signature) { char buf[1024]; socklen_t len = sizeof buf; try { if (evma_get_sockname (NUM2BSIG (signature), (struct sockaddr*)buf, &len)) { return rb_str_new (buf, len); } } catch (std::runtime_error e) { rb_raise (rb_eRuntimeError, "%s", e.what()); } return Qnil; }
.get_subprocess_pid (mod_func)
t_get_subprocess_pid
# File 'ext/rubymain.cpp', line 542
static VALUE t_get_subprocess_pid (VALUE self UNUSED, VALUE signature) { pid_t pid; if (evma_get_subprocess_pid (NUM2BSIG (signature), &pid)) { return INT2NUM (pid); } return Qnil; }
.get_subprocess_status (mod_func)
t_get_subprocess_status
# File 'ext/rubymain.cpp', line 556
static VALUE t_get_subprocess_status (VALUE self UNUSED, VALUE signature) { VALUE proc_status = Qnil; int status; pid_t pid; if (evma_get_subprocess_status (NUM2BSIG (signature), &status)) { if (evma_get_subprocess_pid (NUM2BSIG (signature), &pid)) { #if defined(IS_RUBY_3_3_OR_LATER) proc_status = rb_obj_alloc(rb_cProcessStatus); struct rb_process_status *data = NULL; data = (rb_process_status*)RTYPEDDATA_GET_DATA(proc_status); data->pid = pid; data->status = status; #elif defined(IS_RUBY_3_OR_LATER) struct rb_process_status *data = NULL; /* Defined to match static definition from MRI Ruby 3.0 process.c * * Older C++ compilers before GCC 8 don't allow static initialization of a * struct without every field specified, so the definition here is at runtime */ static rb_data_type_t rb_process_status_type; rb_process_status_type.wrap_struct_name = "Process::Status"; rb_process_status_type.function.dfree = RUBY_DEFAULT_FREE; rb_process_status_type.flags = RUBY_TYPED_FREE_IMMEDIATELY; proc_status = TypedData_Make_Struct(rb_cProcessStatus, struct rb_process_status, &rb_process_status_type, data); data->pid = pid; data->status = status; #else proc_status = rb_obj_alloc(rb_cProcessStatus); /* MRI Ruby uses hidden instance vars */ rb_ivar_set(proc_status, rb_intern_const("status"), INT2FIX(status)); rb_ivar_set(proc_status, rb_intern_const("pid"), INT2FIX(pid)); #endif #ifdef RUBINIUS /* Rubinius uses standard instance vars */ rb_iv_set(proc_status, "@pid", INT2FIX(pid)); if (WIFEXITED(status)) { rb_iv_set(proc_status, "@status", INT2FIX(WEXITSTATUS(status))); } else if (WIFSIGNALED(status)) { rb_iv_set(proc_status, "@termsig", INT2FIX(WTERMSIG(status))); } else if (WIFSTOPPED(status)) { rb_iv_set(proc_status, "@stopsig", INT2FIX(WSTOPSIG(status))); } #endif } } rb_obj_freeze(proc_status); return proc_status; }
.get_timer_count (mod_func)
t_get_timer_count
# File 'ext/rubymain.cpp', line 282
static VALUE t_get_timer_count () { return SIZET2NUM (evma_get_timer_count ()); }
.initialize_event_machine (mod_func)
t_initialize_event_machine
# File 'ext/rubymain.cpp', line 247
static VALUE t_initialize_event_machine (VALUE self UNUSED) { EmConnsHash = rb_ivar_get (EmModule, Intern_at_conns); EmTimersHash = rb_ivar_get (EmModule, Intern_at_timers); assert(EmConnsHash != Qnil); assert(EmTimersHash != Qnil); evma_initialize_library ((EMCallback)event_callback_wrapper); return Qnil; }
.invoke_popen (mod_func)
t_invoke_popen
# File 'ext/rubymain.cpp', line 1028
static VALUE t_invoke_popen (VALUE self UNUSED, VALUE cmd) { #ifdef OS_WIN32 rb_raise (EM_eUnsupported, "popen is not available on this platform"); #endif int len = RARRAY_LEN(cmd); if (len >= 2048) rb_raise (rb_eRuntimeError, "%s", "too many arguments to popen"); char *strings [2048]; for (int i=0; i < len; i++) { VALUE ix = INT2FIX (i); VALUE s = rb_ary_aref (1, &ix, cmd); strings[i] = StringValueCStr (s); } strings[len] = NULL; uintptr_t f = 0; try { f = evma_popen (strings); } catch (std::runtime_error e) { rb_raise (rb_eRuntimeError, "%s", e.what()); } if (!f) { char *err = strerror (errno); char buf[100]; memset (buf, 0, sizeof(buf)); snprintf (buf, sizeof(buf)-1, "no popen: %s", (err?err:"???")); rb_raise (rb_eRuntimeError, "%s", buf); } return BSIG2NUM (f); }
.klass_from_handler(klass = Connection, handler = nil, *args)
# File 'lib/eventmachine.rb', line 1576
def self.klass_from_handler(klass = Connection, handler = nil, *args) klass = if handler and handler.is_a?(Class) raise ArgumentError, "must provide module or subclass of #{klass.name}" unless klass >= handler handler elsif handler if defined?(handler::EM_CONNECTION_CLASS) handler::EM_CONNECTION_CLASS else handler::const_set(:EM_CONNECTION_CLASS, Class.new(klass) {include handler}) end else klass end arity = klass.instance_method(:initialize).arity expected = arity >= 0 ? arity : -(arity + 1) if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected) raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})" end klass end
.kqueue= (readonly, mod_func)
t__kqueue_set
# File 'ext/rubymain.cpp', line 1212
static VALUE t__kqueue_set (VALUE self, VALUE val) { if (t__kqueue_p(self) == Qfalse && val == Qtrue) rb_raise (EM_eUnsupported, "%s", "kqueue is not supported on this platform"); evma_set_kqueue (val == Qtrue ? 1 : 0); return val; }
.library_type (mod_func)
t_library_type
# File 'ext/rubymain.cpp', line 961
static VALUE t_library_type (VALUE self UNUSED) { return rb_eval_string (":extension"); }
.next_tick(pr = nil, &block)
Schedules a proc for execution immediately after the next "turn" through the reactor core. An advanced technique, this can be useful for improving memory management and/or application responsiveness, especially when scheduling large amounts of data for writing to a network connection.
This method takes either a single argument (which must be a callable object) or a block.
# File 'lib/eventmachine.rb', line 1122
def self.next_tick pr=nil, &block # This works by adding to the @resultqueue that's used for #defer. # The general idea is that next_tick is used when we want to give the reactor a chance # to let other operations run, either to balance the load out more evenly, or to let # outbound network buffers drain, or both. So we probably do NOT want to block, and # we probably do NOT want to be spinning any threads. A program that uses next_tick # but not #defer shouldn't suffer the penalty of having Ruby threads running. They're # extremely expensive even if they're just sleeping. raise ArgumentError, "no proc or block given" unless ((pr && pr.respond_to?(:call)) or block) @next_tick_mutex.synchronize do @next_tick_queue << ( pr || block ) end signal_loopbreak if reactor_running? end
.open_datagram_socket(address, port, handler = nil, *args)
Used for UDP-based protocols. Its usage is similar to that of .start_server.
This method will create a new UDP (datagram) socket and bind it to the address and port that you specify. The normal callbacks (see .start_server) will be called as events of interest occur on the newly-created socket, but there are some differences in how they behave.
Connection#receive_data will be called when a datagram packet is received on the socket, but unlike TCP sockets, the message boundaries of the received data will be respected. In other words, if the remote peer sent you a datagram of a particular size, you may rely on Connection#receive_data to give you the exact data in the packet, with the original data length. Also observe that Connection#receive_data may be called with a zero-length data payload, since empty datagrams are permitted in UDP.
Connection#send_data is available with UDP packets as with TCP,
but there is an important difference. Because UDP communications
are connectionless, there is no implicit recipient for the packets you
send. Ordinarily you must specify the recipient for each packet you send.
However, EventMachine
provides for the typical pattern of receiving a UDP datagram
from a remote peer, performing some operation, and then sending
one or more packets in response to the same remote peer.
To support this model easily, just use Connection#send_data
in the code that you supply for Connection#receive_data.
EventMachine
will provide an implicit return address for any messages sent to
Connection#send_data within the context of a Connection#receive_data callback,
and your response will automatically go to the correct remote peer.
Observe that the port number that you supply to open_datagram_socket
may be zero. In this case, EventMachine
will create a UDP socket
that is bound to an ephemeral port.
This is not appropriate for servers that must publish a well-known
port to which remote peers may send datagrams. But it can be useful
for clients that send datagrams to other servers.
If you do this, you will receive any responses from the remote
servers through the normal Connection#receive_data callback.
Observe that you will probably have issues with firewalls blocking
the ephemeral port numbers, so this technique is most appropriate for LANs.
If you wish to send datagrams to arbitrary remote peers (not necessarily ones that have sent data to which you are responding), then see Connection#send_datagram.
DO NOT call send_data from a datagram socket outside of a Connection#receive_data method. Use Connection#send_datagram. If you do use Connection#send_data outside of a Connection#receive_data method, you'll get a confusing error because there is no "peer," as #send_data requires (inside of Connection#receive_data, Connection#send_data "fakes" the peer as described above).
# File 'lib/eventmachine.rb', line 873
def self.open_datagram_socket address, port, handler=nil, *args # Replaced the implementation on 01Oct06. Thanks to Tobias Gustafsson for pointing # out that this originally did not take a class but only a module. klass = klass_from_handler(Connection, handler, *args) s = open_udp_socket address, port.to_i c = klass.new s, *args @conns[s] = c block_given? and yield c c end
.open_keyboard(handler = nil, *args)
(Experimental)
# File 'lib/eventmachine.rb', line 1236
def self.open_keyboard handler=nil, *args klass = klass_from_handler(Connection, handler, *args) s = read_keyboard c = klass.new s, *args @conns[s] = c block_given? and yield c c end
.open_udp_socket (mod_func)
t_open_udp_socket
# File 'ext/rubymain.cpp', line 916
static VALUE t_open_udp_socket (VALUE self UNUSED, VALUE server, VALUE port) { const uintptr_t f = evma_open_datagram_socket (StringValueCStr(server), FIX2INT(port)); if (!f) rb_raise (rb_eRuntimeError, "%s", "no datagram socket"); return BSIG2NUM(f); }
.popen(cmd, handler = nil, *args) {|c| ... }
This method is not supported on Microsoft Windows
Runs an external process.
# File 'lib/eventmachine.rb', line 1199
def self.popen cmd, handler=nil, *args # At this moment, it's only available on Unix. # Perhaps misnamed since the underlying function uses socketpair and is full-duplex. klass = klass_from_handler(Connection, handler, *args) w = case cmd when Array cmd when String Shellwords::shellwords( cmd ) end w.unshift( w.first ) if w.first s = invoke_popen( w ) c = klass.new s, *args @conns[s] = c yield(c) if block_given? c end
.read_keyboard (mod_func)
t_read_keyboard
# File 'ext/rubymain.cpp', line 1066
static VALUE t_read_keyboard (VALUE self UNUSED) { const uintptr_t f = evma_open_keyboard(); if (!f) rb_raise (rb_eRuntimeError, "%s", "no keyboard reader"); return BSIG2NUM (f); }
.reconnect(server, port, handler)
Connect to a given host/port and re-use the provided Connection
instance.
Consider also Connection#reconnect.
# File 'lib/eventmachine.rb', line 782
def self.reconnect server, port, handler # Observe, the test for already-connected FAILS if we call a reconnect inside post_init, # because we haven't set up the connection in @conns by that point. # RESIST THE TEMPTATION to "fix" this problem by redefining the behavior of post_init. # # Changed 22Nov06: if called on an already-connected handler, just return the # handler and do nothing more. Originally this condition raised an exception. # We may want to change it yet again and call the block, if any. raise "invalid handler" unless handler.respond_to?(:connection_completed) #raise "still connected" if @conns.has_key?(handler.signature) return handler if @conns.has_key?(handler.signature) s = if port connect_server server, port else connect_unix_server server end handler.signature = s @conns[s] = handler block_given? and yield handler handler end
.release_machine (mod_func)
t_release_machine
# File 'ext/rubymain.cpp', line 930
static VALUE t_release_machine (VALUE self UNUSED) { evma_release_library(); return Qnil; }
.report_connection_error_status (mod_func)
t_report_connection_error_status
# File 'ext/rubymain.cpp', line 692
static VALUE t_report_connection_error_status (VALUE self UNUSED, VALUE signature) { int b = evma_report_connection_error_status (NUM2BSIG (signature)); return INT2NUM (b); }
.run(blk = nil, tail = nil, &block)
This method blocks calling thread. If you need to start EventMachine
event loop from a Web app
running on a non event-driven server (Unicorn, Apache Passenger, Mongrel), do it in a separate thread like demonstrated
in one of the examples.
Initializes and runs an event loop. This method only returns if code inside the block passed to this method
calls .stop_event_loop. The block is executed after initializing its internal event loop but before running the loop,
therefore this block is the right place to call any code that needs event loop to run, for example, .start_server,
.connect or similar methods of libraries that use EventMachine
under the hood
(like EventMachine::HttpRequest.new
or AMQP.start
).
Programs that are run for long periods of time (e.g. servers) usually start event loop by calling run
, and let it
run "forever". It's also possible to use run
to make a single client-connection to a remote server,
process the data flow from that single connection, and then call .stop_event_loop to stop, in other words,
to run event loop for a short period of time (necessary to complete some operation) and then shut it down.
Once event loop is running, it is perfectly possible to start multiple servers and clients simultaneously: content-aware proxies like Proxymachine do just that.
Using EventMachine with Ruby on Rails and other Web application frameworks
Standalone applications often run event loop on the main thread, thus blocking for their entire lifespan. In case of Web applications,
if you are running an EventMachine-based app server such as Thin or Goliath,
they start event loop for you. Servers like Unicorn, Apache Passenger or Mongrel occupy main Ruby thread to serve HTTP(S) requests. This means
that calling run
on the same thread is not an option (it will result in Web server never binding to the socket).
In that case, start event loop in a separate thread as demonstrated below.
# File 'lib/eventmachine.rb', line 150
def self.run blk=nil, tail=nil, &block # Obsoleted the use_threads mechanism. # 25Nov06: Added the begin/ensure block. We need to be sure that release_machine # gets called even if an exception gets thrown within any of the user code # that the event loop runs. The best way to see this is to run a unit # test with two functions, each of which calls {EventMachine.run} and each of # which throws something inside of #run. Without the ensure, the second test # will start without release_machine being called and will immediately throw # if @reactor_running and @reactor_pid != Process.pid # Reactor was started in a different parent, meaning we have forked. # Clean up reactor state so a new reactor boots up in this child. stop_event_loop release_machine cleanup_machine @reactor_running = false end tail and @tails.unshift(tail) if reactor_running? (b = blk || block) and b.call # next_tick(b) else @conns = {} @acceptors = {} @timers = {} @wrapped_exception = nil @next_tick_queue ||= [] @tails ||= [] begin initialize_event_machine @reactor_pid = Process.pid @reactor_thread = Thread.current @reactor_running = true (b = blk || block) and add_timer(0, b) if @next_tick_queue && !@next_tick_queue.empty? add_timer(0) { signal_loopbreak } end # Rubinius needs to come back into "Ruby space" for GC to work, # so we'll crank the machine here. if defined?(RUBY_ENGINE) && RUBY_ENGINE == "rbx" while run_machine_once; end else run_machine end ensure until @tails.empty? @tails.pop.call end release_machine cleanup_machine @reactor_running = false @reactor_thread = nil end raise @wrapped_exception if @wrapped_exception end end
.run_block(&block)
.run_deferred_callbacks
The is the responder for the loopback-signalled event. It can be fired either by code running on a separate thread (.defer) or on the main thread (.next_tick). It will often happen that a next_tick handler will reschedule itself. We consume a copy of the tick queue so that tick events scheduled by tick events have to wait for the next pass through the reactor core.
# File 'lib/eventmachine.rb', line 968
def self.run_deferred_callbacks until (@resultqueue ||= []).empty? result,cback = @resultqueue.pop cback.call result if cback end # Capture the size at the start of this tick... size = @next_tick_mutex.synchronize { @next_tick_queue.size } size.times do |i| callback = @next_tick_mutex.synchronize { @next_tick_queue.shift } begin callback.call rescue exception_raised = true raise ensure # This is a little nasty. The problem is, if an exception occurs during # the callback, then we need to send a signal to the reactor to actually # do some work during the next_tick. The only mechanism we have from the # ruby side is next_tick itself, although ideally, we'd just drop a byte # on the loopback descriptor. next_tick {} if exception_raised end end end
.run_machine (mod_func) Also known as: .run_machine_without_threads
t_run_machine
# File 'ext/rubymain.cpp', line 272
static VALUE t_run_machine (VALUE self UNUSED) { evma_run_machine(); return Qnil; }
.run_machine_once (mod_func)
t_run_machine_once
# File 'ext/rubymain.cpp', line 262
static VALUE t_run_machine_once (VALUE self UNUSED) { return evma_run_machine_once () ? Qtrue : Qfalse; }
.run_machine_without_threads (mod_func)
Alias for .run_machine.
.schedule(*a, &b)
Runs the given callback on the reactor thread, or immediately if called
from the reactor thread. Accepts the same arguments as EventMachine::Callback
# File 'lib/eventmachine.rb', line 234
def self.schedule(*a, &b) cb = Callback(*a, &b) if reactor_running? && reactor_thread? cb.call else next_tick { cb.call } end end
.send_data (mod_func)
t_send_data
# File 'ext/rubymain.cpp', line 352
static VALUE t_send_data (VALUE self UNUSED, VALUE signature, VALUE data, VALUE data_length) { int b = evma_send_data_to_connection (NUM2BSIG (signature), StringValuePtr (data), FIX2INT (data_length)); return INT2NUM (b); }
.send_datagram (mod_func)
t_send_datagram
# File 'ext/rubymain.cpp', line 669
static VALUE t_send_datagram (VALUE self UNUSED, VALUE signature, VALUE data, VALUE data_length, VALUE address, VALUE port) { int b = evma_send_datagram (NUM2BSIG (signature), StringValuePtr (data), FIX2INT (data_length), StringValueCStr(address), FIX2INT(port)); if (b < 0) rb_raise (EM_eConnectionError, "%s", "error in sending datagram"); // FIXME: this could be more specific. return INT2NUM (b); }
.send_file_data (mod_func)
t_send_file_data
# File 'ext/rubymain.cpp', line 1253
static VALUE t_send_file_data (VALUE self UNUSED, VALUE signature, VALUE filename) { /* The current implementation of evma_send_file_data_to_connection enforces a strict * upper limit on the file size it will transmit (currently 32K). The function returns * zero on success, -1 if the requested file exceeds its size limit, and a positive * number for other errors. * TODO: Positive return values are actually errno's, which is probably the wrong way to * do this. For one thing it's ugly. For another, we can't be sure zero is never a real errno. */ int b = evma_send_file_data_to_connection (NUM2BSIG (signature), StringValueCStr(filename)); if (b == -1) rb_raise(rb_eRuntimeError, "%s", "File too large. send_file_data() supports files under 32k."); if (b > 0) { char *err = strerror (b); char buf[1024]; memset (buf, 0, sizeof(buf)); snprintf (buf, sizeof(buf)-1, ": %s %s", StringValueCStr(filename),(err?err:"???")); rb_raise (rb_eIOError, "%s", buf); } return INT2NUM (0); }
.set_comm_inactivity_timeout (mod_func)
t_set_comm_inactivity_timeout
# File 'ext/rubymain.cpp', line 634
static VALUE t_set_comm_inactivity_timeout (VALUE self UNUSED, VALUE signature, VALUE timeout) { float ti = RFLOAT_VALUE(timeout); if (evma_set_comm_inactivity_timeout(NUM2BSIG(signature), ti)) { return Qtrue; } return Qfalse; }
.set_descriptor_table_size(n_descriptors = nil) ⇒ Integer
Sets the maximum number of file or socket descriptors that your process may open. If you call this method with no arguments, it will simply return the current size of the descriptor table without attempting to change it.
The new limit on open descriptors only applies to sockets and other descriptors
that belong to EventMachine
. It has no effect on the number of descriptors
you can create in ordinary Ruby code.
Not available on all platforms. Increasing the number of descriptors beyond its default limit usually requires superuser privileges. (See .set_effective_user for a way to drop superuser privileges while your program is running.)
# File 'lib/eventmachine.rb', line 1169
def self.set_descriptor_table_size n_descriptors=nil EventMachine::set_rlimit_nofile n_descriptors end
.set_effective_user(username)
This method has no effective implementation on Windows or in the pure-Ruby
implementation of EventMachine
A wrapper over the setuid system call. Particularly useful when opening a network server on a privileged port because you can use this call to drop privileges after opening the port. Also very useful after a call to .set_descriptor_table_size, which generally requires that you start your process with root privileges.
This method is intended for use in enforcing security requirements, consequently it will throw a fatal error and end your program if it fails.
# File 'lib/eventmachine.rb', line 1150
def self.set_effective_user username EventMachine::setuid_string username end
.set_heartbeat_interval (mod_func)
t_set_heartbeat_interval
# File 'ext/rubymain.cpp', line 1450
static VALUE t_set_heartbeat_interval (VALUE self UNUSED, VALUE interval) { float iv = RFLOAT_VALUE(interval); if (evma_set_heartbeat_interval(iv)) return Qtrue; return Qfalse; }
.set_max_timer_count (mod_func)
t_set_max_timer_count
# File 'ext/rubymain.cpp', line 991
static VALUE t_set_max_timer_count (VALUE self UNUSED, VALUE ct) { evma_set_max_timer_count (FIX2INT (ct)); return Qnil; }
.set_max_timers(ct)
This method has to be used before event loop is started.
Sets the maximum number of timers and periodic timers that may be outstanding at any
given time. You only need to call .set_max_timers
if you need more than the default
number of timers, which on most platforms is 1000.
# File 'lib/eventmachine.rb', line 918
def self.set_max_timers ct set_max_timer_count ct end
.set_pending_connect_timeout (mod_func)
t_set_pending_connect_timeout
# File 'ext/rubymain.cpp', line 656
static VALUE t_set_pending_connect_timeout (VALUE self UNUSED, VALUE signature, VALUE timeout) { float ti = RFLOAT_VALUE(timeout); if (evma_set_pending_connect_timeout(NUM2BSIG(signature), ti)) { return Qtrue; } return Qfalse; }
.set_quantum(mills)
For advanced users. This function sets the default timer granularity, which by default is slightly smaller than 100 milliseconds. Call this function to set a higher or lower granularity. The function affects the behavior of .add_timer and .add_periodic_timer. Most applications will not need to call this function.
Avoid setting the quantum to very low values because that may reduce performance under some extreme conditions. We recommend that you not use values lower than 10.
This method only can be used if event loop is running.
# File 'lib/eventmachine.rb', line 903
def self.set_quantum mills set_timer_quantum mills.to_i end
.set_rlimit_nofile (mod_func)
t_set_rlimit_nofile
# File 'ext/rubymain.cpp', line 1284
static VALUE t_set_rlimit_nofile (VALUE self UNUSED, VALUE arg) { int arg_int = (NIL_P(arg)) ? -1 : NUM2INT (arg); return INT2NUM (evma_set_rlimit_nofile (arg_int)); }
.set_simultaneous_accept_count (mod_func)
[ GitHub ]# File 'ext/rubymain.cpp', line 1006
static VALUE t_set_simultaneous_accept_count (VALUE self UNUSED, VALUE ct) { evma_set_simultaneous_accept_count (FIX2INT (ct)); return Qnil; }
.set_timer_quantum (mod_func)
t_set_timer_quantum
# File 'ext/rubymain.cpp', line 972
static VALUE t_set_timer_quantum (VALUE self UNUSED, VALUE interval) { evma_set_timer_quantum (FIX2INT (interval)); return Qnil; }
.set_tls_parms (mod_func)
t_set_tls_parms
# File 'ext/rubymain.cpp', line 377
static VALUE t_set_tls_parms (VALUE self UNUSED, VALUE signature, VALUE privkeyfile, VALUE privkey, VALUE privkeypass, VALUE certchainfile, VALUE cert, VALUE verify_peer, VALUE fail_if_no_peer_cert, VALUE snihostname, VALUE cipherlist, VALUE ecdh_curve, VALUE dhparam, VALUE ssl_version) { /* set_tls_parms takes a series of positional arguments for specifying such things * as private keys and certificate chains. * It's expected that the parameter list will grow as we add more supported features. * ALL of these parameters are optional, and can be specified as empty or NULL strings. */ evma_set_tls_parms (NUM2BSIG (signature), StringValueCStr (privkeyfile), StringValueCStr (privkey), StringValueCStr (privkeypass), StringValueCStr (certchainfile), StringValueCStr (cert), (verify_peer == Qtrue ? 1 : 0), (fail_if_no_peer_cert == Qtrue ? 1 : 0), StringValueCStr (snihostname), StringValueCStr (cipherlist), StringValueCStr (ecdh_curve), StringValueCStr (dhparam), NUM2INT (ssl_version)); return Qnil; }
.setuid_string (mod_func)
t_setuid_string
# File 'ext/rubymain.cpp', line 1016
static VALUE t_setuid_string (VALUE self UNUSED, VALUE username) { evma_setuid_string (StringValueCStr (username)); return Qnil; }
.signal_loopbreak (mod_func)
t_signal_loopbreak
# File 'ext/rubymain.cpp', line 951
static VALUE t_signal_loopbreak (VALUE self UNUSED) { evma_signal_loopbreak(); return Qnil; }
.spawn(&block)
Spawn an erlang-style process
# File 'lib/em/spawnable.rb', line 69
def self.spawn &block s = SpawnedProcess.new s.set_receiver block s end
.spawn_threadpool
# File 'lib/eventmachine.rb', line 1066
def self.spawn_threadpool until @threadpool.size == @threadpool_size.to_i thread = Thread.new do Thread.current.abort_on_exception = true while true begin op, cback, eback = *@threadqueue.pop rescue ThreadError $stderr.puts $!. break # Ruby 2.0 may fail at Queue.pop end begin result = op.call @resultqueue << [result, cback] rescue Exception => error raise error unless eback @resultqueue << [error, eback] end signal_loopbreak end end @threadpool << thread end @all_threads_spawned = true end
.start_server(server, port = nil, handler = nil, *args, &block)
Don't forget that in order to bind to ports < 1024 on Linux, *BSD and Mac OS X your process must have superuser privileges.
Initiates a TCP server (socket acceptor) on the specified IP address and port.
The IP address must be valid on the machine where the program
runs, and the process must be privileged enough to listen
on the specified port (on Unix-like systems, superuser privileges
are usually required to listen on any port lower than 1024).
Only one listener may be running on any given address/port
combination. start_server will fail if the given address and port
are already listening on the machine, either because of a prior call
to .start_server
or some unrelated process running on the machine.
If .start_server
succeeds, the new network listener becomes active
immediately and starts accepting connections from remote peers,
and these connections generate callback events that are processed
by the code specified in the handler parameter to .start_server
.
The optional handler which is passed to this method is the key
to EventMachine's ability to handle particular network protocols.
The handler parameter passed to start_server must be a Ruby Module
that you must define. When the network server that is started by
start_server accepts a new connection, it instantiates a new
object of an anonymous class that is inherited from Connection
,
into which your handler module have been included. Arguments passed into start_server
after the class name are passed into the constructor during the instantiation.
Your handler module may override any of the methods in Connection
,
such as Connection#receive_data, in order to implement the specific behavior
of the network protocol.
Callbacks invoked in response to network events always take place
within the execution context of the object derived from Connection
extended by your handler module. There is one object per connection, and
all of the callbacks invoked for a particular connection take the form
of instance methods called against the corresponding Connection
object. Therefore, you are free to define whatever instance variables you
wish, in order to contain the per-connection state required by the network protocol you are
implementing.
start_server
is usually called inside the block passed to .run,
but it can be called from any EventMachine
callback. start_server
will fail
unless the EventMachine
event loop is currently running (which is why
it's often called in the block suppled to .run).
You may call start_server any number of times to start up network listeners on different address/port combinations. The servers will all run simultaneously. More interestingly, each individual call to start_server can specify a different handler module and thus implement a different network protocol from all the others.
# File 'lib/eventmachine.rb', line 518
def self.start_server server, port=nil, handler=nil, *args, &block begin port = Integer(port) rescue ArgumentError, TypeError # there was no port, so server must be a unix domain socket # the port argument is actually the handler, and the handler is one of the args args.unshift handler if handler handler = port port = nil end if port klass = klass_from_handler(Connection, handler, *args) s = if port start_tcp_server server, port else start_unix_server server end @acceptors[s] = [klass,args,block] s end
.start_tcp_server (mod_func)
t_start_server
# File 'ext/rubymain.cpp', line 304
static VALUE t_start_server (VALUE self UNUSED, VALUE server, VALUE port) { const uintptr_t f = evma_create_tcp_server (StringValueCStr(server), FIX2INT(port)); if (!f) rb_raise (rb_eRuntimeError, "%s", "no acceptor (port is in use or requires root privileges)"); return BSIG2NUM (f); }
.start_tls (mod_func)
t_start_tls
# File 'ext/rubymain.cpp', line 363
static VALUE t_start_tls (VALUE self UNUSED, VALUE signature) { try { evma_start_tls (NUM2BSIG (signature)); } catch (const std::runtime_error& e) { rb_raise (EM_eInvalidPrivateKey, e.what(), signature); } return Qnil; }
.start_unix_domain_server(filename, *args, &block)
Start a Unix-domain server.
Note that this is an alias for .start_server, which can be used to start both TCP and Unix-domain servers.
# File 'lib/eventmachine.rb', line 562
def self.start_unix_domain_server filename, *args, &block start_server filename, *args, &block end
.start_unix_server (mod_func)
t_start_unix_server
# File 'ext/rubymain.cpp', line 327
static VALUE t_start_unix_server (VALUE self UNUSED, VALUE filename) { const uintptr_t f = evma_create_unix_domain_server (StringValueCStr(filename)); if (!f) rb_raise (rb_eRuntimeError, "%s", "no unix-domain acceptor"); return BSIG2NUM (f); }
.stop (mod_func)
t_stop
# File 'ext/rubymain.cpp', line 941
static VALUE t_stop (VALUE self UNUSED) { evma_stop_machine(); return Qnil; }
.stop_event_loop
Causes the processing loop to stop executing, which will cause all open connections and accepting servers
to be run down and closed. Connection
termination callbacks added using .add_shutdown_hook
will be called as part of running this method.
When all of this processing is complete, the call to .run which started the processing loop will return and program flow will resume from the statement following .run call.
# File 'lib/eventmachine.rb', line 418
def self.stop_event_loop EventMachine::stop end
.stop_server(signature)
Stop a TCP server socket that was started with .start_server.
# File 'lib/eventmachine.rb', line 552
def self.stop_server signature EventMachine::stop_tcp_server signature end
.stop_tcp_server (mod_func)
t_stop_server
# File 'ext/rubymain.cpp', line 316
static VALUE t_stop_server (VALUE self UNUSED, VALUE signature) { evma_stop_tcp_server (NUM2BSIG (signature)); return Qnil; }
.system(cmd, *args, &cb)
EM::system is a simple wrapper for EM::popen. It is similar to Kernel::system, but requires a single string argument for the command and performs no shell expansion.
The block or proc passed to EM::system is called with two arguments: the output generated by the command, and a Process::Status that contains information about the command's execution.
EM.run{ EM.system('ls'){ |output,status| puts output if status.exitstatus == 0 } }
You can also supply an additional proc to send some data to the process:
EM.run{ EM.system('sh', proc{ |process| process.send_data("echo hello\n") process.send_data("exit\n") }, proc{ |out,status| puts(out) }) }
Like EventMachine.popen, EventMachine
.system currently does not work on windows.
It returns the pid of the spawned process.
# File 'lib/em/processes.rb', line 112
def EventMachine::system cmd, *args, &cb cb ||= args.pop if args.last.is_a? Proc init = args.pop if args.last.is_a? Proc # merge remaining arguments into the command cmd = [cmd, *args] if args.any? EM.get_subprocess_pid(EM.popen(cmd, SystemCmd, cb) do |c| init[c] if init end.signature) end
.tick_loop(*a, &b)
Creates and immediately starts an TickLoop
.watch(io, handler = nil, *args, &blk)
watch
registers a given file descriptor or IO object with the eventloop. The
file descriptor will not be modified (it will remain blocking or non-blocking).
The eventloop can be used to process readable and writable events on the file descriptor, using Connection#notify_readable= and Connection#notify_writable=
Connection#notify_readable? and Connection#notify_writable? can be used to check what events are enabled on the connection.
To detach the file descriptor, use Connection#detach
# File 'lib/eventmachine.rb', line 732
def EventMachine::watch io, handler=nil, *args, &blk attach_io io, true, handler, *args, &blk end
.watch_file(filename, handler = nil, *args)
The ability to pick up on the new filename after a rename is not yet supported. Calling #path will always return the filename you originally used.
EventMachine's file monitoring API. Currently supported are the following events on individual files, using inotify on Linux systems, and kqueue for *BSD and Mac OS X:
- File modified (written to)
- File moved/renamed
- File deleted
EventMachine::watch_file takes a filename and a handler Module containing your custom callback methods.
This will setup the low level monitoring on the specified file, and create a new FileWatch
object with your Module mixed in. FileWatch
is a subclass of Connection
, so callbacks on this object
work in the familiar way. The callbacks that will be fired by EventMachine
are:
- file_modified
- file_moved
- file_deleted
You can access the filename being monitored from within this object using FileWatch#path.
When a file is deleted, FileWatch#stop_watching will be called after your file_deleted callback,
to clean up the underlying monitoring and remove EventMachine's reference to the now-useless FileWatch
instance.
This will in turn call unbind, if you wish to use it.
The corresponding system-level Errno will be raised when attempting to monitor non-existent files, files with wrong permissions, or if an error occurs dealing with inotify/kqueue.
# File 'lib/eventmachine.rb', line 1310
def self.watch_file(filename, handler=nil, *args) klass = klass_from_handler(FileWatch, handler, *args) s = EM::watch_filename(filename) c = klass.new s, *args # we have to set the path like this because of how Connection.new works c.instance_variable_set("@path", filename) @conns[s] = c block_given? and yield c c end
.watch_process(pid, handler = nil, *args)
EventMachine's process monitoring API. On Mac OS X and *BSD this method is implemented using kqueue.
# File 'lib/eventmachine.rb', line 1341
def self.watch_process(pid, handler=nil, *args) pid = pid.to_i klass = klass_from_handler(ProcessWatch, handler, *args) s = EM::watch_pid(pid) c = klass.new s, *args # we have to set the path like this because of how Connection.new works c.instance_variable_set("@pid", pid) @conns[s] = c block_given? and yield c c end
.yield(&block)
# File 'lib/em/spawnable.rb', line 76
def self.yield &block return YieldBlockFromSpawnedProcess.new( block, false ) end
.yield_and_notify(&block)
# File 'lib/em/spawnable.rb', line 81
def self.yield_and_notify &block return YieldBlockFromSpawnedProcess.new( block, true ) end