123456789_123456789_123456789_123456789_123456789_

Class: ActiveSupport::Ractors::Logger::Writer

Do not use. This class is for internal use only.
Relationships & Source Files
Namespace Children
Classes:
Inherits: Object
Defined in: activesupport/lib/active_support/ractors/logger/writer.rb

Class Method Summary

Instance Method Summary

Constructor Details

.newWriter

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 11

def initialize
  unless defined?(::Ractor::Port)
    raise NotImplementedError, "ActiveSupport::Ractors::Logger requires Ractor::Port support"
  end

  @port = ::Ractor::Port.new
end

Class Method Details

.spawn

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 7

def self.spawn(...)
  new.spawn(...)
end

Instance Method Details

#async(message)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 25

def async(message)
  @port << [:write, message]
rescue ::Ractor::ClosedError
  # Consumer is gone; drop the line rather than raising in the caller.
  nil
end

#build_logdev(logdev, shift_age, shift_size, binmode, shift_period_suffix) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 65

def build_logdev(logdev, shift_age, shift_size, binmode, shift_period_suffix)
  return NullDevice.new if logdev.nil?

  ::Logger::LogDevice.new(logdev,
    shift_age: shift_age,
    shift_size: shift_size,
    shift_period_suffix: shift_period_suffix,
    binmode: binmode)
end

#call(operation, *args)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 32

def call(operation, *args)
  reply = reply_port
  @port << [:call, reply, operation, args]
  status, value = reply.receive
  raise RuntimeError, value if status == :error
  value
rescue ::Ractor::ClosedError
  # Consumer is gone; degrade to a no-op rather than raising in the caller.
  nil
end

#flush

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 43

def flush
  call(:flush)
end

#flush_device(logdev) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 128

def flush_device(logdev)
  dev = logdev.respond_to?(:dev) ? logdev.dev : logdev
  dev.flush if dev.respond_to?(:flush)
end

#reopen(log, options)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 47

def reopen(log, options)
  call(:reopen, log, options)
end

#reply_port (private)

Reuse one reply port per calling thread instead of allocating one per call. A thread blocks on its own reply, so its calls are serial and the port is safe to reuse.

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 60

def reply_port
  ::Thread.current.thread_variable_get(:active_support_shareable_logger_reply_port) ||
    ::Thread.current.thread_variable_set(:active_support_shareable_logger_reply_port, ::Ractor::Port.new)
end

#shutdown

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 51

def shutdown
  call(:shutdown) unless @port.closed?
ensure
  @port.close
end

#spawn(logdev = nil, shift_age = 0, shift_size = 1048576, binmode: false, shift_period_suffix: "%Y%m%d")

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 19

def spawn(logdev = nil, shift_age = 0, shift_size = 1048576, binmode: false, shift_period_suffix: "%Y%m%d")
  device = build_logdev(logdev, shift_age, shift_size, binmode, shift_period_suffix)
  start_consumer(@port, device)
  ::Ractor.make_shareable(self)
end

#start_consumer(port, logdev) (private)

The consumer must survive a failing log device the same way the stock ::ActiveSupport::Ractors::Logger does: a write error is swallowed and reported to stderr, and the logger keeps working.

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 77

def start_consumer(port, logdev)
  Thread.new do
    closed = false
    until closed
      begin
        message = port.receive
      rescue ::Ractor::ClosedError
        # Port closed; nothing left to consume.
        break
      end
      case message[0]
      when :write
        begin
          logdev.write(message[1])
        rescue StandardError => error
          warn_failure(error)
        end
      when :call
        _, reply, operation, args = message
        begin
          case operation
          when :flush
            flush_device(logdev)
          when :reopen
            logdev.reopen(args[0], **args[1])
          when :shutdown
            flush_device(logdev)
            logdev.close
            closed = true
          end
        rescue StandardError => error
          warn_failure(error)
        ensure
          reply << [:ok, operation == :shutdown ? true : :ok]
        end
      end
    end
  ensure
    # On an unexpected exit close the port too, so producers get a ClosedError instead of blocking forever on a
    # dead consumer.
    unless closed
      logdev.close
      port.close
    end
  end
end

#warn_failure(error) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/ractors/logger/writer.rb', line 124

def warn_failure(error)
  warn "[ActiveSupport::Ractors::Logger::Writer] #{error.class}: #{error.message}"
end