123456789_123456789_123456789_123456789_123456789_

Class: Mongo::Grid::FSBucket::Stream::Read

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Instance Chain:
self, Enumerable
Inherits: Object
Defined in: lib/mongo/grid/stream/read.rb

Overview

A stream that reads files from the ::Mongo::Grid::FSBucket.

Since:

  • 2.1.0

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(fs, options) ⇒ Read

Create a stream for reading files from the ::Mongo::Grid::FSBucket.

Examples:

Create the stream.

Stream::Read.new(fs, options)

Parameters:

  • fs (FSBucket)

    The GridFS bucket object.

  • options (Hash)

    The read stream options.

Options Hash (options):

  • :file_info_doc (BSON::Document)

    For internal driver use only. A BSON document to use as file information.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 56

def initialize(fs, options)
  @fs = fs
  @options = options.dup
  @file_id = @options.delete(:file_id)
  @options.freeze
  @open = true
  @timeout_holder = CsotTimeoutHolder.new(
    operation_timeouts: {
      operation_timeout_ms: options[:timeout_ms],
      inherited_timeout_ms: fs.database.timeout_ms
    }
  )
end

Instance Attribute Details

#closed?true, false (readonly)

Is the stream closed.

Examples:

Is the stream closd.

stream.closed?

Returns:

  • (true, false)

    Whether the stream is closed.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 147

def closed?
  !@open
end

#file_idBSON::ObjectId, Object (readonly)

Returns:

  • (BSON::ObjectId, Object)

    file_id The id of the file being read.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 42

attr_reader :file_id

#fsFSBucket (readonly)

Returns:

  • (FSBucket)

    fs The fs bucket from which this stream reads.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 32

attr_reader :fs

#optionsHash (readonly)

Returns:

  • (Hash)

    options The stream options.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 37

attr_reader :options

Instance Method Details

#closeBSON::ObjectId, Object

Close the read stream.

If the stream is already closed, this method does nothing.

Examples:

Close the stream.

stream.close

Returns:

  • (BSON::ObjectId, Object)

    The file id.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 131

def close
  if @open
    view.close_query
    @open = false
  end
  file_id
end

#each {|Each| ... } ⇒ Enumerator

Iterate through chunk data streamed from the ::Mongo::Grid::FSBucket.

Examples:

Iterate through the chunk data.

stream.each do |data|
  buffer << data
end

Yield Parameters:

  • Each (Hash)

    chunk of file data.

Returns:

  • (Enumerator)

    The enumerator.

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 84

def each
  ensure_readable!
  info = file_info
  num_chunks = (info.length + info.chunk_size - 1) / info.chunk_size
  num_read = 0
  if block_given?
    view.each_with_index.reduce(0) do |length_read, (doc, index)|
      chunk = Grid::File::Chunk.new(doc)
      validate!(index, num_chunks, chunk, length_read)
      data = chunk.data.data
      yield data
      num_read += 1
      length_read += data.size
    end.tap do
      if num_read < num_chunks
        raise Error::MissingFileChunk.new(num_chunks, num_read)
      end
    end
  else
    view.to_enum
  end
end

#ensure_file_info! (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 206

def ensure_file_info!
  raise Error::FileNotFound.new(file_id, :id) unless file_info
end

#ensure_open! (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 202

def ensure_open!
  raise Error::ClosedStream.new if closed?
end

#ensure_readable! (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 210

def ensure_readable!
  ensure_open!
  ensure_file_info!
end

#file_infoFile::Info

Note:

The file information is cached in the stream. Subsequent calls to file_info will return the same information that the first call returned, and will not query the database again.

Get the files collection file information document for the file being read.

Returns:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 185

def file_info
  @file_info ||= begin
    doc = options[:file_info_doc] ||
      fs.files_collection.find(
        { _id: file_id },
        { timeout_ms: @timeout_holder.remaining_timeout_ms! }
      ).first
    if doc
      File::Info.new(Options::Mapper.transform(doc, File::Info::MAPPINGS.invert))
    else
      nil
    end
  end
end

#raise_unexpected_chunk_length!(chunk) (private)

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 236

def raise_unexpected_chunk_length!(chunk)
  close
  raise Error::UnexpectedChunkLength.new(file_info.chunk_size, chunk)
end

#readString

Read all file data.

Examples:

Read the file data.

stream.read

Returns:

  • (String)

    The file data.

Raises:

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 117

def read
  to_a.join
end

#read_preferenceBSON::Document

Note:

This method always returns a BSON::Document instance, even though the constructor specifies the type of :read as a Hash, not as a BSON::Document.

Get the read preference.

Returns:

  • (BSON::Document)

    The read preference. The document may have the following fields:

    • :mode – read preference specified as a symbol; valid values are :primary, :primary_preferred, :secondary, :secondary_preferred and :nearest.

    • :tag_sets – an array of hashes.

    • :local_threshold.

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 164

def read_preference
  @read_preference ||= begin
    pref = options[:read] || fs.read_preference
    if BSON::Document === pref
      pref
    else
      BSON::Document.new(pref)
    end
  end
end

#validate!(index, num_chunks, chunk, length_read) (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 231

def validate!(index, num_chunks, chunk, length_read)
  validate_n!(index, chunk)
  validate_length!(index, num_chunks, chunk, length_read)
end

#validate_length!(index, num_chunks, chunk, length_read) (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 241

def validate_length!(index, num_chunks, chunk, length_read)
  if num_chunks > 0 && chunk.data.data.size > 0
    raise Error::ExtraFileChunk.new unless index < num_chunks
    if index == num_chunks - 1
      unless chunk.data.data.size + length_read == file_info.length
        raise_unexpected_chunk_length!(chunk)
      end
    elsif chunk.data.data.size != file_info.chunk_size
      raise_unexpected_chunk_length!(chunk)
    end
  end
end

#validate_n!(index, chunk) (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 254

def validate_n!(index, chunk)
  unless index == chunk.n
    close
    raise Error::MissingFileChunk.new(index, chunk)
  end
end

#view (private)

Since:

  • 2.1.0

[ GitHub ]

  
# File 'lib/mongo/grid/stream/read.rb', line 215

def view
  @view ||= begin
    opts = if read_preference
      options.merge(read: read_preference)
    else
      options
    end
    if @timeout_holder.csot?
      opts[:timeout_ms] = @timeout_holder.remaining_timeout_ms!
      opts[:timeout_mode] = :cursor_lifetime
    end

    fs.chunks_collection.find({ :files_id => file_id }, opts).sort(:n => 1)
  end
end