Module: Concurrent
Overview
=== Concurrent Ruby
Modern concurrency tools for Ruby. Inspired by href="http://www.erlang.org/doc/reference_manual/processes.html">http://www.erlang.org/doc/reference_manual/processes.html Erlang, Clojure, Scala, Haskell, F#, C#, href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html">http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html Java, and classic concurrency patterns.
The design goals of this gem are:
- Be an 'unopinionated' toolbox that provides useful utilities without debating which is better or why
- Remain free of external gem dependencies
- Stay true to the spirit of the languages providing inspiration
- But implement in a way that makes sense for Ruby
- Keep the semantics as idiomatic Ruby as possible
- Support features that make sense in Ruby
- Exclude features that don't make sense in Ruby
- Be small, lean, and loosely coupled
- Thread-safety
- Backward compatibility
==== Contributing
This gem depends on
contributions and we
appreciate your help. Would you like to contribute? Great! Have a look at
issues with looking-for-contributor
label. And if you pick something up let us know on the issue.
You can also get started by triaging issues which may include reproducing bug reports or asking for vital information, such as version numbers or reproduction instructions. If you would like to start triaging issues, one easy way to get started is to subscribe to concurrent-ruby on CodeTriage.
==== Thread Safety
Concurrent Ruby makes one of the strongest thread safety guarantees of any Ruby concurrency
library, providing consistent behavior and guarantees on all three main Ruby interpreters
(MRI/CRuby, JRuby, ::TruffleRuby
).
Every abstraction in this library is thread safe. Specific thread safety guarantees are documented with each abstraction.
It is critical to remember, however, that Ruby is a language of mutable references. No
concurrency library for Ruby can ever prevent the user from making thread safety mistakes (such as
sharing a mutable object between threads and modifying it on both threads) or from creating
deadlocks through incorrect use of locks. All the library can do is provide safe abstractions which
encourage safe practices. Concurrent
Ruby provides more safe concurrency abstractions than any
other Ruby library, many of which support the mantra of
"Do not communicate by sharing memory; instead, share memory by communicating".
Concurrent
Ruby is also the only Ruby library which provides a full suite of thread safe and
immutable variable types and data structures.
We've also initiated discussion to document the memory model of Ruby which
would provide consistent behaviour and guarantees on all three main Ruby interpreters
(MRI/CRuby, JRuby, ::TruffleRuby
).
==== Features & Documentation
The primary site for documentation is the automatically generated API documentation which is up to date with latest release. This readme matches the master so may contain new stuff not yet released.
We also have a IRC (gitter).
===== Versioning
-
concurrent-ruby
uses Semantic Versioning -
concurrent-ruby-ext
has always same version asconcurrent-ruby
-
concurrent-ruby-edge
will always be 0.y.z therefore following point 4 applies "Major version zero (0.y.z) is for initial development. Anything may change at any time. The public API should not be considered stable." However we additionally use following rules:- Minor version increment means incompatible changes were made
- Patch version increment means only compatible changes were made
====== General-purpose Concurrency Abstractions
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Async.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Async.html Async: A mixin module that provides simple asynchronous behavior to a class. Loosely based on Erlang's href="http://www.erlang.org/doc/man/gen_server.html">http://www.erlang.org/doc/man/gen_server.html gen_server.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ScheduledTask.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ScheduledTask.html ScheduledTask: Like a Future scheduled for a specific future time.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TimerTask.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TimerTask.html TimerTask: A Thread that periodically wakes up to perform work at regular intervals.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises:
Unified implementation of futures and promises which combines features of previous
Future
,Promise
,IVar
,Event
, .dataflow,Delay
, and (partially)TimerTask
into a single framework. It extensively uses the new synchronization layer to make all the features non-blocking and lock-free, with the exception of obviously blocking operations like#wait
,#value
. It also offers better performance.
====== Thread-safe Value Objects, Structures, and Collections
Collection
classes that were originally part of the (deprecated) thread_safe
gem:
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Array.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Array.html Array A thread-safe subclass of Ruby's standard href="http://ruby-doc.org/core/Array.html">http://ruby-doc.org/core/Array.html Array.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Hash.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Hash.html Hash A thread-safe subclass of Ruby's standard href="http://ruby-doc.org/core/Hash.html">http://ruby-doc.org/core/Hash.html Hash.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Set.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Set.html Set A thread-safe subclass of Ruby's standard href="http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html">http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Set.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Map.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Map.html Map A hash-like object
that should have much better performance characteristics, especially under high concurrency,
than
Hash
. - href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Tuple.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Tuple.html Tuple A fixed size array with volatile (synchronized, thread safe) getters/setters.
Value objects inspired by other languages:
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Maybe.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Maybe.html Maybe A thread-safe, immutable object representing an optional value, based on Haskell Data.Maybe.
Structure classes derived from Ruby's href="http://ruby-doc.org/core/Struct.html">http://ruby-doc.org/core/Struct.html Struct:
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ImmutableStruct.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ImmutableStruct.html ImmutableStruct Immutable struct where values are set at construction and cannot be changed later.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/MutableStruct.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/MutableStruct.html MutableStruct Synchronized, mutable struct where values can be safely changed at any time.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/SettableStruct.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/SettableStruct.html SettableStruct Synchronized, write-once struct where values can be set at most once, either at construction or any time thereafter.
Thread-safe variables:
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Agent.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Agent.html Agent: A way to manage shared, mutable, asynchronous, independent state. Based on Clojure's Agent.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Atom.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Atom.html Atom: A way to manage shared, mutable, synchronous, independent state. Based on Clojure's Atom.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicBoolean.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicBoolean.html AtomicBoolean A boolean value that can be updated atomically.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicFixnum.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicFixnum.html AtomicFixnum A numeric value that can be updated atomically.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicReference.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicReference.html AtomicReference An object reference that may be updated atomically.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Exchanger.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Exchanger.html Exchanger A synchronization point at which threads can pair and swap elements within pairs. Based on Java's href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html">http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html Exchanger.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/MVar.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/MVar.html MVar A synchronized single element container. Based on Haskell's href="https://hackage.haskell.org/package/base-4.8.1.0/docs/Control-Concurrent-MVar.html">https://hackage.haskell.org/package/base-4.8.1.0/docs/Control-Concurrent-MVar.html MVar and Scala's MVar.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadLocalVar.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadLocalVar.html ThreadLocalVar A variable where the value is different for each thread.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TVar.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TVar.html TVar A transactional variable implementing software transactional memory (STM). Based on Clojure's Ref.
====== Java-inspired ThreadPools and Other Executors
- See the thread pool overview, which also contains a list of other Executors available.
====== Thread Synchronization Classes and Algorithms
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/CountDownLatch.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/CountDownLatch.html CountDownLatch A synchronization object that allows one thread to wait on multiple other threads.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/CyclicBarrier.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/CyclicBarrier.html CyclicBarrier A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Event.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Event.html Event Old school kernel-style event.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ReadWriteLock.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ReadWriteLock.html ReadWriteLock A lock that supports multiple readers but only one writer.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ReentrantReadWriteLock.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ReentrantReadWriteLock.html ReentrantReadWriteLock A read/write lock with reentrant and upgrade features.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Semaphore.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Semaphore.html Semaphore A counting-based locking mechanism that uses permits.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicMarkableReference.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/AtomicMarkableReference.html AtomicMarkableReference
====== Deprecated
Deprecated features are still available and bugs are being fixed, but new features will not be added.
- ~~href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future.html Future:
An asynchronous operation that produces a value.~~ Replaced by
href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises.
- ~~.dataflow: Built on Futures, Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.~~ Replaced by href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises.
- ~~href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promise.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promise.html Promise: Similar to Futures, with more features.~~ Replaced by href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises.
- ~~href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Delay.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Delay.html Delay Lazy evaluation of a block yielding an immutable result. Based on Clojure's delay.~~ Replaced by href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises.
- ~~href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/IVar.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/IVar.html IVar Similar to a "future" but can be manually assigned once, after which it becomes immutable.~~ Replaced by href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html Promises.
===== Edge Features
These are available in the concurrent-ruby-edge
companion gem.
These features are under active development and may change frequently. They are expected not to
keep backward compatibility (there may also lack tests and documentation). Semantic versions will
be obeyed though. Features developed in concurrent-ruby-edge
are expected to move to
concurrent-ruby
when final.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Actor.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Actor.html Actor: Implements the Actor Model, where concurrent actors exchange messages. Status: Partial documentation and tests; depends on new future/promise framework; stability is good.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Channel.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Channel.html Channel: Communicating Sequential Processes (CSP). Functionally equivalent to Go channels with additional inspiration from Clojure core.async. Status: Partial documentation and tests.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/LazyRegister.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/LazyRegister.html LazyRegister
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Edge/LockFreeLinkedSet.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Edge/LockFreeLinkedSet.html LockFreeLinkedSet Status: will be moved to core soon.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/LockFreeStack.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/LockFreeStack.html LockFreeStack Status: missing documentation and tests.
- href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises/Channel.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises/Channel.html Promises::Channel
A first in first out channel that accepts messages with push family of methods and returns
messages with pop family of methods.
Pop and push operations can be represented as futures, see
#pop_op
and#push_op
. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize.#pop
method blocks ans#pop_op
returns pending future if there is no message in the channel. If the capacity is limited the#push
method blocks and#push_op
returns pending future. href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html Cancellation The Cancellation abstraction provides cooperative cancellation.
The standard methods
Thread#raise
ofThread#kill
available in Ruby are very dangerous (see linked the blog posts bellow). Therefore concurrent-ruby provides an alternative.- https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/
- http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
- http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html
It provides an object which represents a task which can be executed, the task has to get the reference to the object and periodically cooperatively check that it is not cancelled. Good practices to make tasks cancellable:
- check cancellation every cycle of a loop which does significant work,
- do all blocking actions in a loop with a timeout then on timeout check cancellation and if ok block again with the timeout
href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Throttle.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Throttle.html Throttle A tool managing concurrency level of tasks.
href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ErlangActor.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ErlangActor.html ErlangActor Actor implementation which precisely matches Erlang actor behaviour. Requires at least Ruby 2.1 otherwise it's not loaded.
href="http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/WrappingExecutor.html">http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/WrappingExecutor.html WrappingExecutor A delegating executor which modifies each task before the task is given to the target executor it delegates to.
==== Supported Ruby versions
- MRI 2.3 and above
- Latest JRuby 9000
- Latest TruffleRuby
==== Usage
Everything within this gem can be loaded simply by requiring it:
require 'concurrent'
You can also require a specific abstraction part of the public documentation since concurrent-ruby 1.2.0, for example:
require 'concurrent/map'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/executor/fixed_thread_pool'
To use the tools in the Edge gem it must be required separately:
require 'concurrent-edge'
If the library does not behave as expected, Concurrent.use_simple_logger(:DEBUG)
could
help to reveal the problem.
==== Installation
gem install concurrent-ruby
or add the following line to Gemfile:
gem 'concurrent-ruby', require: 'concurrent'
and run bundle install
from your shell.
===== Edge Gem Installation
The Edge gem must be installed separately from the core gem:
gem install concurrent-ruby-edge
or add the following line to Gemfile:
gem 'concurrent-ruby-edge', require: 'concurrent-edge'
and run bundle install
from your shell.
===== C Extensions for MRI
Potential performance improvements may be achieved under MRI by installing optional C extensions.
To minimise installation errors the C extensions are available in the concurrent-ruby-ext
extension gem. concurrent-ruby
and concurrent-ruby-ext
are always released together with same
version. Simply install the extension gem too:
gem install concurrent-ruby-ext
or add the following line to Gemfile:
gem 'concurrent-ruby-ext'
and run bundle install
from your shell.
In code it is only necessary to
require 'concurrent'
The concurrent-ruby
gem will automatically detect the presence of the concurrent-ruby-ext
gem
and load the appropriate C extensions.
====== Note For gem developers
No gems should depend on concurrent-ruby-ext
. Doing so will force C extensions on your users. The
best practice is to depend on concurrent-ruby
and let users to decide if they want C extensions.
==== Building the gem
===== Requirements
- Recent CRuby
- JRuby,
rbenv install jruby-9.2.17.0
- Set env variable
CONCURRENT_JRUBY_HOME
to point to it, e.g./usr/local/opt/rbenv/versions/jruby-9.2.17.0
- Install Docker, required for Windows builds
===== Publishing the Gem
- Update
version.rb
- Update the CHANGELOG
- Add the new version to
docs-source/signpost.md
. Needs to be done only if there are visible changes in the documentation. - Commit (and push) the changes.
- Use
bundle exec rake release
to release the gem. It consists of['release:checks', 'release:build', 'release:test', 'release:publish']
steps. It will ask at the end before publishing anything. Steps can also be executed individually.
==== Maintainers
===== Special Thanks to
- Jerry D'Antonio for creating the gem
- Brian Durand for the
ref
gem - Charles Oliver Nutter for the
atomic
andthread_safe
gems - thedarkone for the
thread_safe
gem
to the past maintainers
and to Ruby Association for sponsoring a project "Enhancing Ruby’s concurrency tooling" in 2018.
==== License and Copyright
Concurrent Ruby is free software released under the MIT License.
The Concurrent Ruby logo was designed by David Jones. It is Copyright © 2014 Jerry D'Antonio. All Rights Reserved.
Constant Summary
-
ArrayImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? # Array is not fully thread-safe on CRuby, see # https://github.com/ruby-concurrency/concurrent-ruby/issues/929 # So we will need to add synchronization here ::Array when Concurrent.on_jruby? require 'jruby/synchronized' class JRubyArray < ::Array include JRuby::Synchronized end JRubyArray when Concurrent.on_truffleruby? require 'concurrent/thread_safe/util/data_structures' class TruffleRubyArray < ::Array end ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubyArray TruffleRubyArray else warn 'Possibly unsupported Ruby implementation' ::Array end
-
AtomicBooleanImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? && Concurrent.c_extensions_loaded? CAtomicBoolean when Concurrent.on_jruby? JavaAtomicBoolean else MutexAtomicBoolean end
-
AtomicFixnumImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? && Concurrent.c_extensions_loaded? CAtomicFixnum when Concurrent.on_jruby? JavaAtomicFixnum else MutexAtomicFixnum end
-
AtomicReferenceImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? && Concurrent.c_extensions_loaded? # @!visibility private # @!macro internal_implementation_note class CAtomicReference include AtomicDirectUpdate include AtomicNumericCompareAndSetWrapper alias_method :compare_and_swap, :compare_and_set end CAtomicReference when Concurrent.on_jruby? # @!visibility private # @!macro internal_implementation_note class JavaAtomicReference include AtomicDirectUpdate end JavaAtomicReference when Concurrent.on_truffleruby? class TruffleRubyAtomicReference < TruffleRuby::AtomicReference include AtomicDirectUpdate alias_method :value, :get alias_method :value=, :set alias_method :compare_and_swap, :compare_and_set alias_method :swap, :get_and_set end TruffleRubyAtomicReference else MutexAtomicReference end
-
CancelledOperationError =
Raised when an asynchronous operation is cancelled before execution.
Class.new(Error)
-
ConfigurationError =
Raised when errors occur during configuration.
Class.new(Error)
-
CountDownLatchImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_jruby? JavaCountDownLatch else MutexCountDownLatch end
-
EDGE_VERSION =
# File 'lib/concurrent-ruby-edge/concurrent/edge/version.rb', line 2'0.7.1'
-
Error =
# File 'lib/concurrent-ruby/concurrent/errors.rb', line 3Class.new(StandardError)
-
ExchangerImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_jruby? JavaExchanger else RubyExchanger end
-
GLOBAL_FAST_EXECUTOR =
private
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 18Delay.new { Concurrent.new_fast_executor }
-
GLOBAL_IMMEDIATE_EXECUTOR =
private
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 30ImmediateExecutor.new
-
GLOBAL_IO_EXECUTOR =
private
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 22Delay.new { Concurrent.new_io_executor }
-
GLOBAL_LOGGER =
private
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 111AtomicReference.new(create_simple_logger(:WARN))
-
GLOBAL_TIMER_SET =
private
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 26Delay.new { TimerSet.new }
-
HashImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? # Hash is not fully thread-safe on CRuby, see # https://bugs.ruby-lang.org/issues/19237 # https://github.com/ruby/ruby/commit/ffd52412ab # https://github.com/ruby-concurrency/concurrent-ruby/issues/929 # So we will need to add synchronization here (similar to Concurrent::Map). ::Hash when Concurrent.on_jruby? require 'jruby/synchronized' class JRubyHash < ::Hash include JRuby::Synchronized end JRubyHash when Concurrent.on_truffleruby? require 'concurrent/thread_safe/util/data_structures' class TruffleRubyHash < ::Hash end ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubyHash TruffleRubyHash else warn 'Possibly unsupported Ruby implementation' ::Hash end
-
IllegalOperationError =
Raised when an operation is attempted which is not legal given the receiver’s current state
Class.new(Error)
-
ImmutabilityError =
Raised when an attempt is made to violate an immutability guarantee.
Class.new(Error)
-
InitializationError =
Raised when an object’s methods are called when it has not been properly initialized.
Class.new(Error)
-
LifecycleError =
Raised when a lifecycle method (such as
stop
) is called in an improper sequence or when the object is in an inappropriate state.Class.new(Error)
-
MaxRestartFrequencyError =
Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.
Class.new(Error)
-
NULL =
private
Various classes within allows for
nil
values to be stored, so a specialNULL
token is required to indicate the “nil-ness”.::Object.new
-
NULL_LOGGER =
Suppresses all output when used for logging.
lambda { |level, progname, = nil, &block| }
-
PromiseExecutionError =
# File 'lib/concurrent-ruby/concurrent/promise.rb', line 11Class.new(StandardError)
-
RejectedExecutionError =
Raised by an
Executor
when it is unable to process a given task, possibly because of a reject policy or other internal error.Class.new(Error)
-
ResourceLimitError =
Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.
Class.new(Error)
-
SemaphoreImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
if Concurrent.on_jruby? require 'concurrent/utility/native_extension_loader' JavaSemaphore else MutexSemaphore end
-
SetImplementation =
private
Note:
**Private Implementation:** This abstraction is a private, internal implementation detail. It should never be used directly.
case when Concurrent.on_cruby? # The CRuby implementation of Set is written in Ruby itself and is # not thread safe for certain methods. require 'monitor' require 'concurrent/thread_safe/util/data_structures' class CRubySet < ::Set end ThreadSafe::Util.make_synchronized_on_cruby CRubySet CRubySet when Concurrent.on_jruby? require 'jruby/synchronized' class JRubySet < ::Set include JRuby::Synchronized end JRubySet when Concurrent.on_truffleruby? require 'concurrent/thread_safe/util/data_structures' class TruffleRubySet < ::Set end ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet TruffleRubySet else warn 'Possibly unsupported Ruby implementation' ::Set end
-
SingleThreadExecutorImplementation =
private
# File 'lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb', line 10case when Concurrent.on_jruby? JavaSingleThreadExecutor else RubySingleThreadExecutor end
-
ThreadPoolExecutorImplementation =
private
# File 'lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb', line 10case when Concurrent.on_jruby? JavaThreadPoolExecutor else RubyThreadPoolExecutor end
-
TimeoutError =
Raised when an operation times out.
Class.new(Error)
-
VERSION =
# File 'lib/concurrent-ruby/concurrent/version.rb', line 2'1.3.4'
Class Attribute Summary
- .global_logger rw
- .global_logger=(value) rw
- .mutex_owned_per_thread? ⇒ Boolean readonly private
Utility::NativeExtensionLoader
- Extended
Utility::EngineDetector
- Extended
Class Method Summary
-
.available_processor_count ⇒ Float
Number of processors cores available for process scheduling.
-
.cpu_quota ⇒ nil, Float
The maximum number of processors cores available for process scheduling.
-
.cpu_shares ⇒ Float?
The CPU shares requested by the process.
-
.create_simple_logger(level = :FATAL, output = $stderr)
Create a simple logger with provided level and output.
- .create_stdlib_logger(level = :FATAL, output = $stderr) deprecated Deprecated.
-
.disable_at_exit_handlers!
deprecated
Deprecated.
Has no effect since it is no longer needed, see github.com/ruby-concurrency/concurrent-ruby/pull/841.
-
.executor(executor_identifier) ⇒ Executor
General access point to global executors.
-
.global_fast_executor ⇒ ThreadPoolExecutor
Global thread pool optimized for short, fast operations.
- .global_immediate_executor
-
.global_io_executor ⇒ ThreadPoolExecutor
Global thread pool optimized for long, blocking (IO) tasks.
-
.global_timer_set ⇒ Concurrent::TimerSet
Global thread pool user for global timers.
- .new_fast_executor(opts = {})
- .new_io_executor(opts = {})
-
.physical_processor_count ⇒ Integer
Number of physical processor cores on the current system.
-
.processor_count ⇒ Integer
Number of processors seen by the OS and used for process scheduling.
-
.use_simple_logger(level = :FATAL, output = $stderr)
Use logger created by
#create_simple_logger
to log concurrent-ruby messages. - .use_stdlib_logger(level = :FATAL, output = $stderr) deprecated Deprecated.
-
.abort_transaction
mod_func
Abort a currently running transaction - see
Concurrent::atomically
. -
.atomically
mod_func
Run a block that reads and writes
TVar
s as a single atomic transaction. - .call_dataflow(method, executor, *inputs, &block) mod_func
-
.dataflow(*inputs) {|inputs| ... } ⇒ Object
mod_func
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
- .dataflow!(*inputs, &block) mod_func
- .dataflow_with(executor, *inputs, &block) mod_func
- .dataflow_with!(executor, *inputs, &block) mod_func
-
.leave_transaction
mod_func
Leave a transaction without committing or aborting - see
Concurrent::atomically
. -
.monotonic_time(unit = :float_second) ⇒ Float
mod_func
Returns the current time as tracked by the application monotonic clock.
Utility::NativeExtensionLoader
- Extended
load_native_extensions, load_error_path, set_c_extensions_loaded, set_java_extensions_loaded, try_load_c_extension |
Utility::EngineDetector
- Extended
Concern::Deprecation
- Extended
Concern::Logging
- Included
log | Logs through .global_logger, it can be overridden by setting @logger. |
Instance Method Summary
-
#exchange(value, timeout = nil) ⇒ Object
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
-
#exchange!(value, timeout = nil) ⇒ Object
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
-
#initialize(opts = {})
Create a new thread pool.
-
#try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
Class Attribute Details
.global_logger (rw)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 114
def self.global_logger GLOBAL_LOGGER.value end
.global_logger=(value) (rw)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 118
def self.global_logger=(value) GLOBAL_LOGGER.value = value end
.mutex_owned_per_thread? ⇒ Boolean
(readonly, private)
[ GitHub ]
# File 'lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb', line 7
def self.mutex_owned_per_thread? return false if Concurrent.on_jruby? || Concurrent.on_truffleruby? mutex = Mutex.new # Lock the mutex: mutex.synchronize do # Check if the mutex is still owned in a child fiber: Fiber.new { mutex.owned? }.resume end end
Class Method Details
.abort_transaction (mod_func)
Abort a currently running transaction - see Concurrent::atomically
.
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139
def abort_transaction raise Transaction::AbortError.new end
.atomically (mod_func)
Run a block that reads and writes TVar
s as a single atomic transaction. With respect to the value of TVar
objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the TVar
objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
-
Most importantly, the block that you pass to atomically may be executed
more than once. In most cases your code should be free of side-effects, except for via TVar.
-
If an exception escapes an atomically block it will abort the transaction.
-
It is undefined behaviour to use callcc or Fiber with atomically.
-
If you create a new thread within an atomically, it will not be part of
the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82
def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end
.available_processor_count ⇒ Float
Number of processors cores available for process scheduling. This method takes in account the CPU quota if the process is inside a cgroup with a dedicated CPU quota (typically Docker). Otherwise it returns the same value as #processor_count
but as a Float.
For performance reasons the calculated value will be memoized on the first call.
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 194
def self.available_processor_count processor_counter.available_processor_count end
.call_dataflow(method, executor, *inputs, &block) (mod_func)
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56
def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end
.cpu_quota ⇒ nil
, Float
The maximum number of processors cores available for process scheduling. Returns nil
if there is no enforced limit, or a Float
if the process is inside a cgroup with a dedicated CPU quota (typically Docker).
Note that nothing prevents setting a CPU quota higher than the actual number of cores on the system.
For performance reasons the calculated value will be memoized on the first call.
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 209
def self.cpu_quota processor_counter.cpu_quota end
.create_simple_logger(level = :FATAL, output = $stderr)
Create a simple logger with provided level and output.
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 38
def self.create_simple_logger(level = :FATAL, output = $stderr) level = Concern::Logging.const_get(level) unless level.is_a?(Integer) # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking lambda do |severity, progname, = nil, &block| return false if severity < level = block ? block.call : = case when String when Exception format "%s (%s)\n%s", ., .class, ( .backtrace || []).join("\n") else .inspect end output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), Concern::Logging::SEV_LABEL[severity], progname, true end end
.create_stdlib_logger(level = :FATAL, output = $stderr)
Create a stdlib logger with provided level and output. If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+.
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 73
def self.create_stdlib_logger(level = :FATAL, output = $stderr) require 'logger' logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| = case msg when String msg when Exception format "%s (%s)\n%s", msg., msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, end lambda do |loglevel, progname, = nil, &block| logger.add loglevel, , progname, &block end end
.dataflow(*inputs) {|inputs| ... } ⇒ Object (mod_func)
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future
values. The dataflow task itself is also a Future
value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.
Our syntax is somewhat related to that of Akka's flow
and Habanero Java's DataDrivenFuture
. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.
The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
==== Example
A dataflow task is created with the dataflow
method, passing in a block.
task = <code>dataflow</code> { 14 }
This produces a simple Future
value. The task will run immediately, as it has no dependencies. We can also specify Future
values that must be available before a task will run. When we do this we get the value of those futures passed to our block.
a = <code>dataflow</code> { 1 }
b = <code>dataflow</code> { 2 }
c = {Concurrent.dataflow}(a, b) { |av, bv| av + bv }
Using the dataflow
method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.
==== Derivation
This section describes how we could derive dataflow from other primitives in this library.
Consider a naive fibonacci calculator.
def fib(n)
if n < 2
n
else
fib(n - 1) + fib(n - 2)
end
end
puts fib(14) #=> 377
We could modify this to use futures.
def fib(n)
if n < 2
Concurrent::Future.new { n }
else
n1 = fib(n - 1).execute
n2 = fib(n - 2).execute
Concurrent::Future.new { n1.value + n2.value }
end
end
f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ...
f.execute #=> #<Concurrent::Future:0x000001019ef5a0 ...
sleep(0.5)
puts f.value #=> 377
One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.
To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.
class CountingObserver
def initialize(count, &block)
@count = count
@block = block
end
def update(time, value, reason)
@count -= 1
if @count <= 0
@block.call()
end
end
end
def fib(n)
if n < 2
Concurrent::Future.new { n }.execute
else
n1 = fib(n - 1)
n2 = fib(n - 2)
result = Concurrent::Future.new { n1.value + n2.value }
= CountingObserver.new(2) { result.execute }
n1.add_observer
n2.add_observer
n1.execute
n2.execute
result
end
end
We can wrap this up in a dataflow utility.
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)
puts f.value #=> 377
def dataflow(*inputs, &block)
result = Concurrent::Future.new(&block)
if inputs.empty?
result.execute
else
= CountingObserver.new(inputs.size) { result.execute }
inputs.each do |input|
input.add_observer
end
end
result
end
def fib(n)
if n < 2
dataflow { n }
else
n1 = fib(n - 1)
n2 = fib(n - 2)
dataflow(n1, n2) { n1.value + n2.value }
end
end
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)
puts f.value #=> 377
Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.
def dataflow(*inputs, &block)
result = Concurrent::Future.new do
values = inputs.map { |input| input.value }
block.call(*values)
end
if inputs.empty?
result.execute
else
= CountingObserver.new(inputs.size) { result.execute }
inputs.each do |input|
input.add_observer
end
end
result
end
def fib(n)
if n < 2
Concurrent::dataflow { n }
else
n1 = fib(n - 1)
n2 = fib(n - 2)
Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
end
end
f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
sleep(0.5)
puts f.value #=> 377
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34
def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end
.dataflow!(*inputs, &block) (mod_func)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44
def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end
.dataflow_with(executor, *inputs, &block) (mod_func)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39
def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end
.dataflow_with!(executor, *inputs, &block) (mod_func)
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49
def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end
.disable_at_exit_handlers!
Has no effect since it is no longer needed, see github.com/ruby-concurrency/concurrent-ruby/pull/841.
this option should be needed only because of at_exit
ordering issues which may arise when running some of the testing frameworks. E.g. Minitest’s test-suite runs itself in at_exit
callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.
This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.
Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer’s responsibility to ensure that the handlers are shutdown properly prior to application exit by calling AtExit.run
method.
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 48
def self.disable_at_exit_handlers! deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." end
.executor(executor_identifier) ⇒ Executor
General access point to global executors.
.global_fast_executor ⇒ ThreadPoolExecutor
Global thread pool optimized for short, fast operations.
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 55
def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value! end
.global_immediate_executor
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 66
def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end
.global_io_executor ⇒ ThreadPoolExecutor
Global thread pool optimized for long, blocking (IO) tasks.
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 62
def self.global_io_executor GLOBAL_IO_EXECUTOR.value! end
.global_timer_set ⇒ Concurrent::TimerSet
Global thread pool user for global timers.
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 73
def self.global_timer_set GLOBAL_TIMER_SET.value! end
.leave_transaction (mod_func)
Leave a transaction without committing or aborting - see Concurrent::atomically
.
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144
def leave_transaction raise Transaction::LeaveError.new end
.monotonic_time(unit = :float_second) ⇒ Float
(mod_func)
Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).
Returns the current time as tracked by the application monotonic clock.
# File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 15
def monotonic_time(unit = :float_second) Process.clock_gettime(Process::CLOCK_MONOTONIC, unit) end
.new_fast_executor(opts = {})
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 87
def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "fast" ) end
.new_io_executor(opts = {})
[ GitHub ]# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 98
def self.new_io_executor(opts = {}) CachedThreadPool.new( auto_terminate: opts.fetch(:auto_terminate, true), fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "io" ) end
.physical_processor_count ⇒ Integer
Number of physical processor cores on the current system. For performance reasons the calculated value will be memoized on the first call.
On Windows the Win32 API will be queried for the ‘NumberOfCores from Win32_Processor`. This will return the total number “of cores for the current instance of the processor.” On Unix-like operating systems either the hwprefs
or sysctl
utility will be called in a subshell and the returned value will be used. In the rare case where none of these methods work or an exception is raised the function will simply return 1.
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 181
def self.physical_processor_count processor_counter.physical_processor_count end
.processor_count ⇒ Integer
Number of processors seen by the OS and used for process scheduling. For performance reasons the calculated value will be memoized on the first call.
When running under JRuby the Java runtime call java.lang.Runtime.getRuntime.availableProcessors
will be used. According to the Java documentation this “value may change during a particular invocation of the virtual machine… [applications] should therefore occasionally poll this property.” We still memoize this value once under JRuby.
Otherwise Ruby’s Etc.nprocessors
will be used.
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 160
def self.processor_count processor_counter.processor_count end
.use_simple_logger(level = :FATAL, output = $stderr)
Use logger created by #create_simple_logger
to log concurrent-ruby messages.
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 66
def self.use_simple_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end
.use_stdlib_logger(level = :FATAL, output = $stderr)
Use logger created by #create_stdlib_logger
to log concurrent-ruby messages.
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 101
def self.use_stdlib_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end
Instance Method Details
#exchange(value, timeout = nil) ⇒ Object
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil
the method will block indefinitely.
In some edge cases when a timeout
is given a return value of nil
may be ambiguous. Specifically, if nil
is a valid value in the exchange it will be impossible to tell whether nil
is the actual return value or if it signifies timeout. When nil
is a valid value in the exchange consider using #exchange! or #try_exchange instead.
#exchange!(value, timeout = nil) ⇒ Object
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil
the method will block indefinitely.
On timeout a TimeoutError exception will be raised.
#initialize(opts = {})
Create a new thread pool.
#try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil
the method will block indefinitely.
The return value will be a Maybe
set to Just
on success or Nothing
on timeout.