123456789_123456789_123456789_123456789_123456789_

Class: ActiveStorage::Service::MirrorService

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Service
Instance Chain:
self, Service
Inherits: Service
  • ::Object
Defined in: activestorage/lib/active_storage/service/mirror_service.rb

Overview

Active Storage Mirror Service

Wraps a set of mirror services and provides a single ::ActiveStorage::Service object that will all have the files uploaded to them. A #primary service is designated to answer calls to:

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(primary:, mirrors:) ⇒ MirrorService

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 30

def initialize(primary:, mirrors:)
  @primary, @mirrors = primary, mirrors
  @executor = Concurrent::ThreadPoolExecutor.new(
    name: "ActiveStorage-mirror-service",
    min_threads: 1,
    max_threads: mirrors.size,
    max_queue: 0,
    fallback_policy: :caller_runs,
    idle_time: 60
  )
end

Class Method Details

.build(primary:, mirrors:, name:, configurator:, **options)

This method is for internal use only.

Stitch together from named services.

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 21

def self.build(primary:, mirrors:, name:, configurator:, **options) # :nodoc:
  new(
    primary: configurator.build(primary),
    mirrors: mirrors.collect { |mirror_name| configurator.build mirror_name }
  ).tap do |service_instance|
    service_instance.name = name
  end
end

Instance Attribute Details

#compose (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#download (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#download_chunk (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#headers_for_direct_upload (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#mirrors (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 15

attr_reader :primary, :mirrors

#path_for (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#primary (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 15

attr_reader :primary, :mirrors

#url (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#url_for_direct_upload (readonly)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

Instance Method Details

#delete(key)

Delete the file at the key on all services.

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 52

def delete(key)
  perform_across_services :delete, key
end

#delete_prefixed(prefix)

Delete files at keys starting with the prefix on all services.

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 57

def delete_prefixed(prefix)
  perform_across_services :delete_prefixed, prefix
end

#each_service(&block) (private)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 95

def each_service(&block)
  [ primary, *mirrors ].each(&block)
end

#exist?Boolean

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 17

delegate :download, :download_chunk, :exist?, :url,
  :url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary

#mirror(key, checksum:)

Copy the file at the key from the primary service to each of the mirrors where it doesn't already exist. Both the existence checks and the uploads run in parallel across mirrors using the internal thread pool.

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 67

def mirror(key, checksum:)
  instrument :mirror, key: key, checksum: checksum do
    mirrors_in_need_of_mirroring = mirrors_needing_mirroring(key)
    if mirrors_in_need_of_mirroring.any?
       = ActiveStorage::Blob.find_by(key: key)&. || {}

      primary.open(key, checksum: checksum, verify: checksum.present?) do |io|
        io.rewind
        content = io.read.freeze
        tasks = mirrors_in_need_of_mirroring.map do |service|
          Concurrent::Promise.execute(executor: @executor) do
            service.upload key, StringIO.new(content), checksum: checksum, **
          end
        end
        tasks.each(&:value!)
      end
    end
  end
end

#mirror_later(key, checksum:)

This method is for internal use only.
[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 61

def mirror_later(key, checksum:) # :nodoc:
  ActiveStorage::MirrorJob.perform_later key, checksum: checksum
end

#mirrors_needing_mirroring(key) (private)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 88

def mirrors_needing_mirroring(key)
  tasks = mirrors.map do |service|
    [ service, Concurrent::Promise.execute(executor: @executor) { service.exist?(key) } ]
  end
  tasks.reject { |_, promise| promise.value! }.map(&:first)
end

#perform_across_services(method, *args) (private)

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 99

def perform_across_services(method, *args)
  tasks = each_service.collect do |service|
    Concurrent::Promise.execute(executor: @executor) do
      service.public_send method, *args
    end
  end
  tasks.each(&:value!)
end

#upload(key, io, checksum: nil, **options)

Upload the io to the key specified to all services. The upload to the primary service is done synchronously whereas the upload to the mirrors is done asynchronously. If a checksum is provided, all services will ensure a match when the upload has completed or raise an ::ActiveStorage::IntegrityError.

[ GitHub ]

  
# File 'activestorage/lib/active_storage/service/mirror_service.rb', line 45

def upload(key, io, checksum: nil, **options)
  io.rewind
  primary.upload key, io, checksum: checksum, **options
  mirror_later key, checksum: checksum
end