123456789_123456789_123456789_123456789_123456789_

Module: Concurrent

Relationships & Source Files
Namespace Children
Modules:
Classes:
Exceptions:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
Defined in: lib/concurrent-ruby/concurrent.rb,
ext/concurrent-ruby-ext/rb_concurrent.c,
lib/concurrent-ruby-edge/concurrent/actor.rb,
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge.rb,
lib/concurrent-ruby-edge/concurrent/lazy_register.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/context.rb,
lib/concurrent-ruby-edge/concurrent/actor/core.rb,
lib/concurrent-ruby-edge/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent-ruby-edge/concurrent/actor/envelope.rb,
lib/concurrent-ruby-edge/concurrent/actor/errors.rb,
lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb,
lib/concurrent-ruby-edge/concurrent/actor/public_delegations.rb,
lib/concurrent-ruby-edge/concurrent/actor/reference.rb,
lib/concurrent-ruby-edge/concurrent/actor/root.rb,
lib/concurrent-ruby-edge/concurrent/actor/type_check.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb,
lib/concurrent-ruby-edge/concurrent/edge/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_queue.rb,
lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb,
lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb,
lib/concurrent-ruby-edge/concurrent/edge/promises.rb,
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb,
lib/concurrent-ruby-edge/concurrent/edge/version.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb,
lib/concurrent-ruby/concurrent/agent.rb,
lib/concurrent-ruby/concurrent/array.rb,
lib/concurrent-ruby/concurrent/async.rb,
lib/concurrent-ruby/concurrent/atom.rb,
lib/concurrent-ruby/concurrent/configuration.rb,
lib/concurrent-ruby/concurrent/constants.rb,
lib/concurrent-ruby/concurrent/dataflow.rb,
lib/concurrent-ruby/concurrent/delay.rb,
lib/concurrent-ruby/concurrent/errors.rb,
lib/concurrent-ruby/concurrent/exchanger.rb,
lib/concurrent-ruby/concurrent/future.rb,
lib/concurrent-ruby/concurrent/hash.rb,
lib/concurrent-ruby/concurrent/immutable_struct.rb,
lib/concurrent-ruby/concurrent/ivar.rb,
lib/concurrent-ruby/concurrent/map.rb,
lib/concurrent-ruby/concurrent/maybe.rb,
lib/concurrent-ruby/concurrent/mutable_struct.rb,
lib/concurrent-ruby/concurrent/mvar.rb,
lib/concurrent-ruby/concurrent/options.rb,
lib/concurrent-ruby/concurrent/promise.rb,
lib/concurrent-ruby/concurrent/promises.rb,
lib/concurrent-ruby/concurrent/re_include.rb,
lib/concurrent-ruby/concurrent/scheduled_task.rb,
lib/concurrent-ruby/concurrent/set.rb,
lib/concurrent-ruby/concurrent/settable_struct.rb,
lib/concurrent-ruby/concurrent/synchronization.rb,
lib/concurrent-ruby/concurrent/timer_task.rb,
lib/concurrent-ruby/concurrent/tuple.rb,
lib/concurrent-ruby/concurrent/tvar.rb,
lib/concurrent-ruby/concurrent/version.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_markable_reference.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_reference.rb,
lib/concurrent-ruby/concurrent/atomic/count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent-ruby/concurrent/atomic/event.rb,
lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/locals.rb,
lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb,
lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent-ruby/concurrent/atomic/semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb,
lib/concurrent-ruby/concurrent/atomic_reference/atomic_direct_update.rb,
lib/concurrent-ruby/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent-ruby/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/lock_free_stack.rb,
lib/concurrent-ruby/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/truffleruby_map_backend.rb,
lib/concurrent-ruby/concurrent/concern/deprecation.rb,
lib/concurrent-ruby/concurrent/concern/dereferenceable.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/concern/obligation.rb,
lib/concurrent-ruby/concurrent/concern/observable.rb,
lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/executor_service.rb,
lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/safe_task_executor.rb,
lib/concurrent-ruby/concurrent/executor/serial_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/timer_set.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb,
lib/concurrent-ruby/concurrent/synchronization/condition.rb,
lib/concurrent-ruby/concurrent/synchronization/full_memory_barrier.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/lock.rb,
lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/object.rb,
lib/concurrent-ruby/concurrent/synchronization/safe_initialization.rb,
lib/concurrent-ruby/concurrent/synchronization/volatile.rb,
lib/concurrent-ruby/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent-ruby/concurrent/thread_safe/util.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/striped64.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/volatile.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent-ruby/concurrent/utility/engine.rb,
lib/concurrent-ruby/concurrent/utility/monotonic_time.rb,
lib/concurrent-ruby/concurrent/utility/native_extension_loader.rb,
lib/concurrent-ruby/concurrent/utility/native_integer.rb,
lib/concurrent-ruby/concurrent/utility/processor_counter.rb

Overview

=== Concurrent Ruby

Gem Version License</a> Gitter chat

Modern concurrency tools for Ruby. Inspired by href="http://www.erlang.org/doc/reference_manual/processes.html">http://www.erlang.org/doc/reference_manual/processes.html Erlang, Clojure, Scala, Haskell, F#, C#, href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html">http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html Java, and classic concurrency patterns.

The design goals of this gem are:

  • Be an 'unopinionated' toolbox that provides useful utilities without debating which is better or why
  • Remain free of external gem dependencies
  • Stay true to the spirit of the languages providing inspiration
  • But implement in a way that makes sense for Ruby
  • Keep the semantics as idiomatic Ruby as possible
  • Support features that make sense in Ruby
  • Exclude features that don't make sense in Ruby
  • Be small, lean, and loosely coupled
  • Thread-safety
  • Backward compatibility

==== Contributing

This gem depends on contributions and we appreciate your help. Would you like to contribute? Great! Have a look at issues with looking-for-contributor label. And if you pick something up let us know on the issue.

You can also get started by triaging issues which may include reproducing bug reports or asking for vital information, such as version numbers or reproduction instructions. If you would like to start triaging issues, one easy way to get started is to subscribe to concurrent-ruby on CodeTriage. Open Source Helpers

==== Thread Safety

Concurrent Ruby makes one of the strongest thread safety guarantees of any Ruby concurrency library, providing consistent behavior and guarantees on all three main Ruby interpreters (MRI/CRuby, JRuby, ::TruffleRuby).

Every abstraction in this library is thread safe. Specific thread safety guarantees are documented with each abstraction.

It is critical to remember, however, that Ruby is a language of mutable references. No concurrency library for Ruby can ever prevent the user from making thread safety mistakes (such as sharing a mutable object between threads and modifying it on both threads) or from creating deadlocks through incorrect use of locks. All the library can do is provide safe abstractions which encourage safe practices. Concurrent Ruby provides more safe concurrency abstractions than any other Ruby library, many of which support the mantra of "Do not communicate by sharing memory; instead, share memory by communicating". Concurrent Ruby is also the only Ruby library which provides a full suite of thread safe and immutable variable types and data structures.

We've also initiated discussion to document the memory model of Ruby which would provide consistent behaviour and guarantees on all three main Ruby interpreters (MRI/CRuby, JRuby, ::TruffleRuby).

==== Features & Documentation

The primary site for documentation is the automatically generated API documentation which is up to date with latest release. This readme matches the master so may contain new stuff not yet released.

We also have a IRC (gitter).

===== Versioning

  • concurrent-ruby uses Semantic Versioning
  • concurrent-ruby-ext has always same version as concurrent-ruby
  • concurrent-ruby-edge will always be 0.y.z therefore following point 4 applies "Major version zero (0.y.z) is for initial development. Anything may change at any time. The public API should not be considered stable." However we additionally use following rules:
    • Minor version increment means incompatible changes were made
    • Patch version increment means only compatible changes were made

====== General-purpose Concurrency Abstractions

====== Thread-safe Value Objects, Structures, and Collections

Collection classes that were originally part of the (deprecated) thread_safe gem:

Value objects inspired by other languages:

Structure classes derived from Ruby's href="http://ruby-doc.org/core/Struct.html">http://ruby-doc.org/core/Struct.html Struct:

Thread-safe variables:

====== Java-inspired ThreadPools and Other Executors

  • See the thread pool overview, which also contains a list of other Executors available.

====== Thread Synchronization Classes and Algorithms

====== Deprecated

Deprecated features are still available and bugs are being fixed, but new features will not be added.

===== Edge Features

These are available in the concurrent-ruby-edge companion gem.

These features are under active development and may change frequently. They are expected not to keep backward compatibility (there may also lack tests and documentation). Semantic versions will be obeyed though. Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when final.

==== Supported Ruby versions

  • MRI 2.3 and above
  • Latest JRuby 9000
  • Latest TruffleRuby

==== Usage

Everything within this gem can be loaded simply by requiring it:

require 'concurrent'

You can also require a specific abstraction part of the public documentation since concurrent-ruby 1.2.0, for example:

require 'concurrent/map'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/executor/fixed_thread_pool'

To use the tools in the Edge gem it must be required separately:

require 'concurrent-edge'

If the library does not behave as expected, Concurrent.use_simple_logger(:DEBUG) could help to reveal the problem.

==== Installation

gem install concurrent-ruby

or add the following line to Gemfile:

gem 'concurrent-ruby', require: 'concurrent'

and run bundle install from your shell.

===== Edge Gem Installation

The Edge gem must be installed separately from the core gem:

gem install concurrent-ruby-edge

or add the following line to Gemfile:

gem 'concurrent-ruby-edge', require: 'concurrent-edge'

and run bundle install from your shell.

===== C Extensions for MRI

Potential performance improvements may be achieved under MRI by installing optional C extensions. To minimise installation errors the C extensions are available in the concurrent-ruby-ext extension gem. concurrent-ruby and concurrent-ruby-ext are always released together with same version. Simply install the extension gem too:

gem install concurrent-ruby-ext

or add the following line to Gemfile:

gem 'concurrent-ruby-ext'

and run bundle install from your shell.

In code it is only necessary to

require 'concurrent'

The concurrent-ruby gem will automatically detect the presence of the concurrent-ruby-ext gem and load the appropriate C extensions.

====== Note For gem developers

No gems should depend on concurrent-ruby-ext. Doing so will force C extensions on your users. The best practice is to depend on concurrent-ruby and let users to decide if they want C extensions.

==== Building the gem

===== Requirements

  • Recent CRuby
  • JRuby, rbenv install jruby-9.2.17.0
  • Set env variable CONCURRENT_JRUBY_HOME to point to it, e.g. /usr/local/opt/rbenv/versions/jruby-9.2.17.0
  • Install Docker, required for Windows builds

===== Publishing the Gem

  • Update version.rb
  • Update the CHANGELOG
  • Add the new version to docs-source/signpost.md. Needs to be done only if there are visible changes in the documentation.
  • Commit (and push) the changes.
  • Use bundle exec rake release to release the gem. It consists of ['release:checks', 'release:build', 'release:test', 'release:publish'] steps. It will ask at the end before publishing anything. Steps can also be executed individually.

==== Maintainers

===== Special Thanks to

to the past maintainers

and to Ruby Association for sponsoring a project "Enhancing Ruby’s concurrency tooling" in 2018.

==== License and Copyright

Concurrent Ruby is free software released under the MIT License.

The Concurrent Ruby logo was designed by David Jones. It is Copyright © 2014 Jerry D'Antonio. All Rights Reserved.

Constant Summary

Class Attribute Summary

Class Method Summary

Utility::NativeExtensionLoader - Extended

Utility::EngineDetector - Extended

Concern::Deprecation - Extended

Concern::Logging - Included

log

Logs through .global_logger, it can be overridden by setting @logger.

Instance Method Summary

Class Attribute Details

.global_logger (rw)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 114

def self.global_logger
  GLOBAL_LOGGER.value
end

.global_logger=(value) (rw)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 118

def self.global_logger=(value)
  GLOBAL_LOGGER.value = value
end

.mutex_owned_per_thread?Boolean (readonly, private)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb', line 7

def self.mutex_owned_per_thread?
  return false if Concurrent.on_jruby? || Concurrent.on_truffleruby?

  mutex = Mutex.new
  # Lock the mutex:
  mutex.synchronize do
    # Check if the mutex is still owned in a child fiber:
    Fiber.new { mutex.owned? }.resume
  end
end

Class Method Details

.abort_transaction (mod_func)

Abort a currently running transaction - see Concurrent::atomically.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139

def abort_transaction
  raise Transaction::AbortError.new
end

.atomically (mod_func)

Run a block that reads and writes TVars as a single atomic transaction. With respect to the value of TVar objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the TVar objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed

    more than once. In most cases your code should be free of
    side-effects, except for via TVar.
  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of

    the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82

def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end

.available_processor_countFloat

Number of processors cores available for process scheduling. This method takes in account the CPU quota if the process is inside a cgroup with a dedicated CPU quota (typically Docker). Otherwise it returns the same value as #processor_count but as a Float.

For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (Float)

    number of available processors

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 194

def self.available_processor_count
  processor_counter.available_processor_count
end

.call_dataflow(method, executor, *inputs, &block) (mod_func)

Raises:

  • (ArgumentError)
[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

.cpu_quotanil, Float

The maximum number of processors cores available for process scheduling. Returns nil if there is no enforced limit, or a Float if the process is inside a cgroup with a dedicated CPU quota (typically Docker).

Note that nothing prevents setting a CPU quota higher than the actual number of cores on the system.

For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (nil, Float)

    Maximum number of available processors as set by a cgroup CPU quota, or nil if none set

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 209

def self.cpu_quota
  processor_counter.cpu_quota
end

.cpu_sharesFloat?

The CPU shares requested by the process. For performance reasons the calculated value will be memoized on the first call.

Returns:

  • (Float, nil)

    CPU shares requested by the process, or nil if not set

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 217

def self.cpu_shares
  processor_counter.cpu_shares
end

.create_simple_logger(level = :FATAL, output = $stderr)

Create a simple logger with provided level and output.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 38

def self.create_simple_logger(level = :FATAL, output = $stderr)
  level = Concern::Logging.const_get(level) unless level.is_a?(Integer)

  # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking
  lambda do |severity, progname, message = nil, &block|
    return false if severity < level

    message           = block ? block.call : message
    formatted_message = case message
                        when String
                          message
                        when Exception
                          format "%s (%s)\n%s",
                                 message.message, message.class, (message.backtrace || []).join("\n")
                        else
                          message.inspect
                        end

    output.print format "[%s] %5s -- %s: %s\n",
                        Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),
                        Concern::Logging::SEV_LABEL[severity],
                        progname,
                        formatted_message
    true
  end
end

.create_stdlib_logger(level = :FATAL, output = $stderr)

Deprecated.

Create a stdlib logger with provided level and output. If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 73

def self.create_stdlib_logger(level = :FATAL, output = $stderr)
  require 'logger'
  logger           = Logger.new(output)
  logger.level     = level
  logger.formatter = lambda do |severity, datetime, progname, msg|
    formatted_message = case msg
                        when String
                          msg
                        when Exception
                          format "%s (%s)\n%s",
                                 msg.message, msg.class, (msg.backtrace || []).join("\n")
                        else
                          msg.inspect
                        end
    format "[%s] %5s -- %s: %s\n",
           datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
           severity,
           progname,
           formatted_message
  end

  lambda do |loglevel, progname, message = nil, &block|
    logger.add loglevel, message, progname, &block
  end
end

.dataflow(*inputs) {|inputs| ... } ⇒ Object (mod_func)

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka's flow and Habanero Java's DataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

==== Example

A dataflow task is created with the dataflow method, passing in a block.

task = <code>dataflow</code> { 14 }

This produces a simple Future value. The task will run immediately, as it has no dependencies. We can also specify Future values that must be available before a task will run. When we do this we get the value of those futures passed to our block.

a = <code>dataflow</code> { 1 }
b = <code>dataflow</code> { 2 }
c = {Concurrent.dataflow}(a, b) { |av, bv| av + bv }

Using the dataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.

==== Derivation

This section describes how we could derive dataflow from other primitives in this library.

Consider a naive fibonacci calculator.

def fib(n)
  if n < 2
    n
  else
    fib(n - 1) + fib(n - 2)
  end
end

puts fib(14) #=> 377

We could modify this to use futures.

def fib(n)
  if n < 2
    Concurrent::Future.new { n }
  else
    n1 = fib(n - 1).execute
    n2 = fib(n - 2).execute
    Concurrent::Future.new { n1.value + n2.value }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ...
f.execute   #=> #<Concurrent::Future:0x000001019ef5a0 ...

sleep(0.5)

puts f.value #=> 377

One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.

To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.

class CountingObserver

  def initialize(count, &block)
    @count = count
    @block = block
  end

  def update(time, value, reason)
    @count -= 1

    if @count <= 0
      @block.call()
    end
  end

end

def fib(n)
  if n < 2
    Concurrent::Future.new { n }.execute
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)

    result = Concurrent::Future.new { n1.value + n2.value }

    barrier = CountingObserver.new(2) { result.execute }
    n1.add_observer barrier
    n2.add_observer barrier

    n1.execute
    n2.execute

    result
  end
end

We can wrap this up in a dataflow utility.

f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)

puts f.value #=> 377

def dataflow(*inputs, &block)
  result = Concurrent::Future.new(&block)

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    dataflow(n1, n2) { n1.value + n2.value }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)

puts f.value #=> 377

Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.

def dataflow(*inputs, &block)
  result = Concurrent::Future.new do
    values = inputs.map { |input| input.value }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    Concurrent::dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
sleep(0.5)

puts f.value #=> 377

Parameters:

  • inputs (Future)

    zero or more Future operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the Future inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not IVars

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34

def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end

.dataflow!(*inputs, &block) (mod_func)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44

def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end

.dataflow_with(executor, *inputs, &block) (mod_func)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39

def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end

.dataflow_with!(executor, *inputs, &block) (mod_func)

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49

def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end

.disable_at_exit_handlers!

Deprecated.

Has no effect since it is no longer needed, see github.com/ruby-concurrency/concurrent-ruby/pull/841.

Note:

this option should be needed only because of at_exit ordering issues which may arise when running some of the testing frameworks. E.g. Minitest’s test-suite runs itself in at_exit callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.

Note:

This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.

Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer’s responsibility to ensure that the handlers are shutdown properly prior to application exit by calling AtExit.run method.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 48

def self.disable_at_exit_handlers!
  deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end

.executor(executor_identifier) ⇒ Executor

General access point to global executors.

Parameters:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 83

def self.executor(executor_identifier)
  Options.executor(executor_identifier)
end

.global_fast_executorThreadPoolExecutor

Global thread pool optimized for short, fast operations.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 55

def self.global_fast_executor
  GLOBAL_FAST_EXECUTOR.value!
end

.global_immediate_executor

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 66

def self.global_immediate_executor
  GLOBAL_IMMEDIATE_EXECUTOR
end

.global_io_executorThreadPoolExecutor

Global thread pool optimized for long, blocking (IO) tasks.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 62

def self.global_io_executor
  GLOBAL_IO_EXECUTOR.value!
end

.global_timer_setConcurrent::TimerSet

Global thread pool user for global timers.

Returns:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 73

def self.global_timer_set
  GLOBAL_TIMER_SET.value!
end

.leave_transaction (mod_func)

Leave a transaction without committing or aborting - see Concurrent::atomically.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144

def leave_transaction
  raise Transaction::LeaveError.new
end

.monotonic_time(unit = :float_second) ⇒ Float (mod_func)

Note:

Time calculations on all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Returns the current time as tracked by the application monotonic clock.

Parameters:

  • unit (Symbol) (defaults to: :float_second)

    the time unit to be returned, can be either :float_second, :float_millisecond, :float_microsecond, :second, :millisecond, :microsecond, or :nanosecond default to :float_second.

Returns:

  • (Float)

    The current monotonic time since some unspecified starting point

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 15

def monotonic_time(unit = :float_second)
  Process.clock_gettime(Process::CLOCK_MONOTONIC, unit)
end

.new_fast_executor(opts = {})

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 87

def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
      [2, Concurrent.processor_count].max,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "fast"
  )
end

.new_io_executor(opts = {})

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 98

def self.new_io_executor(opts = {})
  CachedThreadPool.new(
      auto_terminate:  opts.fetch(:auto_terminate, true),
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "io"
  )
end

.physical_processor_countInteger

Number of physical processor cores on the current system. For performance reasons the calculated value will be memoized on the first call.

On Windows the Win32 API will be queried for the ‘NumberOfCores from Win32_Processor`. This will return the total number “of cores for the current instance of the processor.” On Unix-like operating systems either the hwprefs or sysctl utility will be called in a subshell and the returned value will be used. In the rare case where none of these methods work or an exception is raised the function will simply return 1.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 181

def self.physical_processor_count
  processor_counter.physical_processor_count
end

.processor_countInteger

Number of processors seen by the OS and used for process scheduling. For performance reasons the calculated value will be memoized on the first call.

When running under JRuby the Java runtime call java.lang.Runtime.getRuntime.availableProcessors will be used. According to the Java documentation this “value may change during a particular invocation of the virtual machine… [applications] should therefore occasionally poll this property.” We still memoize this value once under JRuby.

Otherwise Ruby’s Etc.nprocessors will be used.

Returns:

  • (Integer)

    number of processors seen by the OS or Java runtime

See Also:

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 160

def self.processor_count
  processor_counter.processor_count
end

.use_simple_logger(level = :FATAL, output = $stderr)

Use logger created by #create_simple_logger to log concurrent-ruby messages.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 66

def self.use_simple_logger(level = :FATAL, output = $stderr)
  Concurrent.global_logger = create_simple_logger level, output
end

.use_stdlib_logger(level = :FATAL, output = $stderr)

Deprecated.

Use logger created by #create_stdlib_logger to log concurrent-ruby messages.

[ GitHub ]

  
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 101

def self.use_stdlib_logger(level = :FATAL, output = $stderr)
  Concurrent.global_logger = create_stdlib_logger level, output
end

Instance Method Details

#exchange(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

In some edge cases when a timeout is given a return value of nil may be ambiguous. Specifically, if nil is a valid value in the exchange it will be impossible to tell whether nil is the actual return value or if it signifies timeout. When nil is a valid value in the exchange consider using #exchange! or #try_exchange instead.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread or nil on timeout

[ GitHub ]

#exchange!(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

On timeout a TimeoutError exception will be raised.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread

Raises:

[ GitHub ]

#initialize(opts = {})

Create a new thread pool.

Options Hash (opts):

  • :fallback_policy (Symbol) — default: :discard

    the policy for handling new tasks that are received when the queue size has reached max_queue or the executor has shut down

Raises:

  • (ArgumentError)

    if :fallback_policy is not one of the values specified in FALLBACK_POLICIES

See Also:

[ GitHub ]

#try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

The return value will be a Maybe set to Just on success or Nothing on timeout.

Examples:

exchanger = Concurrent::Exchanger.new

result = exchanger.exchange(:foo, 0.5)

if result.just?
  puts result.value #=> :bar
else
  puts 'timeout'
end

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Concurrent::Maybe)

    on success a Just maybe will be returned with the item exchanged by the other thread as ‘#value`; on timeout a Nothing maybe will be returned with TimeoutError as #reason

[ GitHub ]