Class: Mongo::Grid::FSBucket::Stream::Read
Relationships & Source Files | |
Super Chains via Extension / Inclusion / Inheritance | |
Instance Chain:
self,
Enumerable
|
|
Inherits: | Object |
Defined in: | lib/mongo/grid/stream/read.rb |
Overview
A stream that reads files from the ::Mongo::Grid::FSBucket
.
Class Method Summary
-
.new(fs, options) ⇒ Read
constructor
Create a stream for reading files from the
::Mongo::Grid::FSBucket
.
Instance Attribute Summary
-
#closed? ⇒ true, false
readonly
Is the stream closed.
- #file_id ⇒ BSON::ObjectId, Object readonly
- #fs ⇒ FSBucket readonly
- #options ⇒ Hash readonly
Instance Method Summary
-
#close ⇒ BSON::ObjectId, Object
Close the read stream.
-
#each {|Each| ... } ⇒ Enumerator
Iterate through chunk data streamed from the
::Mongo::Grid::FSBucket
. -
#file_info ⇒ File::Info
Get the files collection file information document for the file being read.
-
#read ⇒ String
Read
all file data. -
#read_preference ⇒ BSON::Document
Get the read preference.
- #ensure_file_info! private
- #ensure_open! private
- #ensure_readable! private
- #raise_unexpected_chunk_length!(chunk) private
- #validate!(index, num_chunks, chunk, length_read) private
- #validate_length!(index, num_chunks, chunk, length_read) private
- #validate_n!(index, chunk) private
- #view private
Constructor Details
.new(fs, options) ⇒ Read
Create a stream for reading files from the ::Mongo::Grid::FSBucket
.
# File 'lib/mongo/grid/stream/read.rb', line 56
def initialize(fs, ) @fs = fs @options = .dup @file_id = @options.delete(:file_id) @options.freeze @open = true @timeout_holder = CsotTimeoutHolder.new( operation_timeouts: { operation_timeout_ms: [:timeout_ms], inherited_timeout_ms: fs.database.timeout_ms } ) end
Instance Attribute Details
#closed? ⇒ true
, false
(readonly)
Is the stream closed.
# File 'lib/mongo/grid/stream/read.rb', line 147
def closed? !@open end
#file_id ⇒ BSON::ObjectId
, Object
(readonly)
# File 'lib/mongo/grid/stream/read.rb', line 42
attr_reader :file_id
#fs ⇒ FSBucket (readonly)
# File 'lib/mongo/grid/stream/read.rb', line 32
attr_reader :fs
#options ⇒ Hash
(readonly)
# File 'lib/mongo/grid/stream/read.rb', line 37
attr_reader :
Instance Method Details
#close ⇒ BSON::ObjectId
, Object
Close the read stream.
If the stream is already closed, this method does nothing.
#each {|Each| ... } ⇒ Enumerator
Iterate through chunk data streamed from the ::Mongo::Grid::FSBucket
.
# File 'lib/mongo/grid/stream/read.rb', line 84
def each ensure_readable! info = file_info num_chunks = (info.length + info.chunk_size - 1) / info.chunk_size num_read = 0 if block_given? view.each_with_index.reduce(0) do |length_read, (doc, index)| chunk = Grid::File::Chunk.new(doc) validate!(index, num_chunks, chunk, length_read) data = chunk.data.data yield data num_read += 1 length_read += data.size end.tap do if num_read < num_chunks raise Error::MissingFileChunk.new(num_chunks, num_read) end end else view.to_enum end end
#ensure_file_info! (private)
# File 'lib/mongo/grid/stream/read.rb', line 206
def ensure_file_info! raise Error::FileNotFound.new(file_id, :id) unless file_info end
#ensure_open! (private)
# File 'lib/mongo/grid/stream/read.rb', line 202
def ensure_open! raise Error::ClosedStream.new if closed? end
#ensure_readable! (private)
# File 'lib/mongo/grid/stream/read.rb', line 210
def ensure_readable! ensure_open! ensure_file_info! end
#file_info ⇒ File::Info
The file information is cached in the stream. Subsequent calls to file_info will return the same information that the first call returned, and will not query the database again.
Get the files collection file information document for the file being read.
# File 'lib/mongo/grid/stream/read.rb', line 185
def file_info @file_info ||= begin doc = [:file_info_doc] || fs.files_collection.find( { _id: file_id }, { timeout_ms: @timeout_holder.remaining_timeout_ms! } ).first if doc File::Info.new(Options::Mapper.transform(doc, File::Info::MAPPINGS.invert)) else nil end end end
#raise_unexpected_chunk_length!(chunk) (private)
# File 'lib/mongo/grid/stream/read.rb', line 236
def raise_unexpected_chunk_length!(chunk) close raise Error::UnexpectedChunkLength.new(file_info.chunk_size, chunk) end
#read ⇒ String
Read
all file data.
# File 'lib/mongo/grid/stream/read.rb', line 117
def read to_a.join end
#read_preference ⇒ BSON::Document
This method always returns a BSON::Document instance, even though the constructor specifies the type of :read
as a Hash, not as a BSON::Document.
Get the read preference.
#validate!(index, num_chunks, chunk, length_read) (private)
# File 'lib/mongo/grid/stream/read.rb', line 231
def validate!(index, num_chunks, chunk, length_read) validate_n!(index, chunk) validate_length!(index, num_chunks, chunk, length_read) end
#validate_length!(index, num_chunks, chunk, length_read) (private)
# File 'lib/mongo/grid/stream/read.rb', line 241
def validate_length!(index, num_chunks, chunk, length_read) if num_chunks > 0 && chunk.data.data.size > 0 raise Error::ExtraFileChunk.new unless index < num_chunks if index == num_chunks - 1 unless chunk.data.data.size + length_read == file_info.length raise_unexpected_chunk_length!(chunk) end elsif chunk.data.data.size != file_info.chunk_size raise_unexpected_chunk_length!(chunk) end end end
#validate_n!(index, chunk) (private)
# File 'lib/mongo/grid/stream/read.rb', line 254
def validate_n!(index, chunk) unless index == chunk.n close raise Error::MissingFileChunk.new(index, chunk) end end
#view (private)
# File 'lib/mongo/grid/stream/read.rb', line 215
def view @view ||= begin opts = if read_preference .merge(read: read_preference) else end if @timeout_holder.csot? opts[:timeout_ms] = @timeout_holder.remaining_timeout_ms! opts[:timeout_mode] = :cursor_lifetime end fs.chunks_collection.find({ :files_id => file_id }, opts).sort(:n => 1) end end