123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Grid::FSBucket::Stream::Read

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, Enumerable
Inherits: Object
Defined in: lib/mongo/grid/stream/read.rb

Overview

A stream that reads files from the ::Mongo::Grid::FSBucket.

Since:

  • 2.1.0

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(fs, options) ⇒ Read

Create a stream for reading files from the ::Mongo::Grid::FSBucket.

Examples:

Create the stream.

Stream::Read.new(fs, options)

Parameters:

  • fs (FSBucket)

    The GridFS bucket object.

  • options (Hash)

    The read stream options.

Options Hash (options):

  • :file_info_doc (BSON::Document)

    For internal driver use only. A BSON document to use as file information.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 54

def initialize(fs, options)
  @fs = fs
  @options = options.dup
  @file_id = @options.delete(:file_id)
  @options.freeze
  @open = true
  @timeout_holder = CsotTimeoutHolder.new(
    operation_timeouts: {
      operation_timeout_ms: options[:timeout_ms],
      inherited_timeout_ms: fs.database.timeout_ms
    }
  )
end

Instance Attribute Details

#closed?true, false (readonly)

Is the stream closed.

Examples:

Is the stream closd.

stream.closed?

Returns:

  • (true, false)

    Whether the stream is closed.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 143

def closed?
  !@open
end

#file_idBSON::ObjectId, Object (readonly)

Returns:

  • (BSON::ObjectId, Object)

    file_id The id of the file being read.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 40

attr_reader :file_id

#fsFSBucket (readonly)

Returns:

  • (FSBucket)

    fs The fs bucket from which this stream reads.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 30

attr_reader :fs

#optionsHash (readonly)

Returns:

  • (Hash)

    options The stream options.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 35

attr_reader :options

Instance Method Details

#closeBSON::ObjectId, Object

Close the read stream.

If the stream is already closed, this method does nothing.

Examples:

Close the stream.

stream.close

Returns:

  • (BSON::ObjectId, Object)

    The file id.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 127

def close
  if @open
    view.close_query
    @open = false
  end
  file_id
end

#each {|Each| ... } ⇒ Enumerator

Iterate through chunk data streamed from the ::Mongo::Grid::FSBucket.

Examples:

Iterate through the chunk data.

stream.each do |data|
  buffer << data
end

Yield Parameters:

  • Each (Hash)

    chunk of file data.

Returns:

  • (Enumerator)

    The enumerator.

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 82

def each
  ensure_readable!
  info = file_info
  num_chunks = (info.length + info.chunk_size - 1) / info.chunk_size
  num_read = 0
  if block_given?
    view.each_with_index.reduce(0) do |length_read, (doc, index)|
      chunk = Grid::File::Chunk.new(doc)
      validate!(index, num_chunks, chunk, length_read)
      data = chunk.data.data
      yield data
      num_read += 1
      length_read + data.size
    end.tap do
      raise Error::MissingFileChunk.new(num_chunks, num_read) if num_read < num_chunks
    end
  else
    view.to_enum
  end
end

#ensure_file_info! (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 198

def ensure_file_info!
  raise Error::FileNotFound.new(file_id, :id) unless file_info
end

#ensure_open! (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 194

def ensure_open!
  raise Error::ClosedStream.new if closed?
end

#ensure_readable! (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 202

def ensure_readable!
  ensure_open!
  ensure_file_info!
end

#file_infoFile::Info

Note:

The file information is cached in the stream. Subsequent calls to file_info will return the same information that the first call returned, and will not query the database again.

Get the files collection file information document for the file being read.

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 181

def file_info
  @file_info ||= begin
    doc = options[:file_info_doc] ||
          fs.files_collection.find(
            { _id: file_id },
            { timeout_ms: @timeout_holder.remaining_timeout_ms! }
          ).first
    File::Info.new(Options::Mapper.transform(doc, File::Info::MAPPINGS.invert)) if doc
  end
end

#raise_unexpected_chunk_length!(chunk) (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 228

def raise_unexpected_chunk_length!(chunk)
  close
  raise Error::UnexpectedChunkLength.new(file_info.chunk_size, chunk)
end

#readString

Read all file data.

Examples:

Read the file data.

stream.read

Returns:

  • (String)

    The file data.

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 113

def read
  to_a.join
end

#read_preferenceBSON::Document

Note:

This method always returns a BSON::Document instance, even though the constructor specifies the type of :read as a Hash, not as a BSON::Document.

Get the read preference.

Returns:

  • (BSON::Document)

    The read preference. The document may have the following fields:

    • :mode – read preference specified as a symbol; valid values are :primary, :primary_preferred, :secondary, :secondary_preferred and :nearest.

    • :tag_sets – an array of hashes.

    • :local_threshold.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 160

def read_preference
  @read_preference ||= begin
    pref = options[:read] || fs.read_preference
    if pref.is_a?(BSON::Document)
      pref
    else
      BSON::Document.new(pref)
    end
  end
end

#validate!(index, num_chunks, chunk, length_read) (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 223

def validate!(index, num_chunks, chunk, length_read)
  validate_n!(index, chunk)
  validate_length!(index, num_chunks, chunk, length_read)
end

#validate_length!(index, num_chunks, chunk, length_read) (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 233

def validate_length!(index, num_chunks, chunk, length_read)
  return unless num_chunks > 0 && chunk.data.data.size > 0
  raise Error::ExtraFileChunk.new unless index < num_chunks

  if index == num_chunks - 1
    raise_unexpected_chunk_length!(chunk) unless chunk.data.data.size + length_read == file_info.length
  elsif chunk.data.data.size != file_info.chunk_size
    raise_unexpected_chunk_length!(chunk)
  end
end

#validate_n!(index, chunk) (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 244

def validate_n!(index, chunk)
  return if index == chunk.n

  close
  raise Error::MissingFileChunk.new(index, chunk)
end

#view (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 207

def view
  @view ||= begin
    opts = if read_preference
             options.merge(read: read_preference)
           else
             options
           end
    if @timeout_holder.csot?
      opts[:timeout_ms] = @timeout_holder.remaining_timeout_ms!
      opts[:timeout_mode] = :cursor_lifetime
    end

    fs.chunks_collection.find({ files_id: file_id }, opts).sort(n: 1)
  end
end