Module: Mongo::Operation::SessionsSupported Private
Relationships & Source Files | |
Extension / Inclusion / Inheritance Descendants | |
Included In:
Aggregate::OpMsg ,
Command::OpMsg ,
Count::OpMsg ,
Create::OpMsg ,
CreateIndex::OpMsg ,
CreateSearchIndexes::OpMsg ,
CreateUser::OpMsg ,
Delete::OpMsg ,
Distinct::OpMsg ,
Drop::OpMsg ,
DropDatabase::OpMsg ,
DropIndex::OpMsg ,
DropSearchIndex::OpMsg ,
Explain::OpMsg ,
Find::OpMsg ,
GetMore::OpMsg ,
Indexes::OpMsg ,
Insert::OpMsg ,
KillCursors::OpMsg ,
ListCollections::OpMsg ,
MapReduce::OpMsg ,
OpMsgBase ,
ParallelScan::OpMsg ,
RemoveUser::OpMsg ,
Update::OpMsg ,
UpdateSearchIndex::OpMsg ,
UpdateUser::OpMsg ,
UsersInfo::OpMsg ,
WriteCommand::OpMsg
| |
Defined in: | lib/mongo/operation/shared/sessions_supported.rb |
Overview
Shared behavior of operations that support a session.
Constant Summary
-
READ_COMMANDS =
[ :aggregate, :count, :dbStats, :distinct, :find, :geoNear, :geoSearch, :group, :mapReduce, :parallelCollectionScan ].freeze
-
ZERO_TIMESTAMP =
BSON::Timestamp.new(0, 0)
Instance Method Summary
-
#add_read_preference(sel, connection)
private
Internal use only
Adds $readPreference field to the command document.
- #add_write_concern!(sel) private Internal use only
- #apply_autocommit!(selector) private Internal use only
-
#apply_causal_consistency!(selector, connection)
private
Internal use only
Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.
-
#apply_causal_consistency_if_possible(selector, connection)
private
Internal use only
Adds causal consistency document to the selector, if one can be constructed.
- #apply_cluster_time!(selector, connection) private Internal use only
- #apply_read_pref!(selector) private Internal use only
- #apply_session_options(sel, connection) private Internal use only
- #apply_start_transaction!(selector) private Internal use only
- #apply_txn_num!(selector) private Internal use only
- #apply_txn_opts!(selector) private Internal use only
- #build_message(connection, context) private Internal use only
- #command(connection) private Internal use only
- #flags private Internal use only
- #read_command?(sel) ⇒ Boolean private Internal use only
- #suppress_read_write_concern!(selector) private Internal use only
- #validate_read_preference!(selector) private Internal use only
Instance Method Details
#add_read_preference(sel, connection) (private)
Adds $readPreference field to the command document.
$readPreference is only sent when the server is a mongos, following the rules described in github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#passing-read-preference-to-mongos. The topology does not matter for figuring out whether to send $readPreference since the decision is always made based on server type.
$readPreference is sent to OP_MSG-grokking replica set members.
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 167
def add_read_preference(sel, connection) Lint.assert_type(connection, Server::Connection) # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#topology-type-single read_doc = if connection.description.standalone? # Read preference is never sent to standalones. nil elsif connection.server.load_balancer? read&.to_mongos elsif connection.description.mongos? # When server is a mongos: # - $readPreference is never sent when mode is 'primary' # - Otherwise $readPreference is sent # When mode is 'secondaryPreferred' $readPreference is currently # required to only be sent when a non-mode field (i.e. tag_sets) # is present, but this causes wrong behavior (DRIVERS-1642). read&.to_mongos elsif connection.server.cluster.single? # In Single topology: # - If no read preference is specified by the application, the driver # adds mode: primaryPreferred. # - If a read preference is specified by the application, the driver # replaces the mode with primaryPreferred. read_doc = if read BSON::Document.new(read.to_doc) else BSON::Document.new end if [nil, 'primary'].include?(read_doc['mode']) read_doc['mode'] = 'primaryPreferred' end read_doc else # In replica sets, read preference is passed to the server if one # is specified by the application, except for primary read preferences. read_doc = BSON::Document.new(read&.to_doc || {}) if [nil, 'primary'].include?(read_doc['mode']) nil else read_doc end end if read_doc sel['$readPreference'] = read_doc end end
#add_write_concern!(sel) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 96
def add_write_concern!(sel) sel[:writeConcern] = write_concern. if write_concern end
#apply_autocommit!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 100
def apply_autocommit!(selector) session.add_autocommit!(selector) end
#apply_causal_consistency!(selector, connection) (private)
Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.
When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 51
def apply_causal_consistency!(selector, connection) return unless selector[:startTransaction] apply_causal_consistency_if_possible(selector, connection) end
#apply_causal_consistency_if_possible(selector, connection) (private)
Adds causal consistency document to the selector, if one can be constructed.
In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 64
def apply_causal_consistency_if_possible(selector, connection) if !connection.description.standalone? cc_doc = session.send(:causal_consistency_doc) if cc_doc rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc) selector[:readConcern] = Options::Mapper.transform_values_to_strings( rc_doc) end end end
#apply_cluster_time!(selector, connection) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 79
def apply_cluster_time!(selector, connection) if !connection.description.standalone? cluster_time = [ connection.cluster_time, session&.cluster_time, ].compact.max if cluster_time selector['$clusterTime'] = cluster_time end end end
#apply_read_pref!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 112
def apply_read_pref!(selector) session.apply_read_pref!(selector) if read_command?(selector) end
#apply_session_options(sel, connection) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 215
def (sel, connection) apply_cluster_time!(sel, connection) sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num sel.merge!(lsid: session.session_id) apply_start_transaction!(sel) apply_causal_consistency!(sel, connection) apply_autocommit!(sel) apply_txn_opts!(sel) suppress_read_write_concern!(sel) validate_read_preference!(sel) apply_txn_num!(sel) if session.recovery_token && (sel[:commitTransaction] || sel[:abortTransaction]) then sel[:recoveryToken] = session.recovery_token end if session.snapshot? unless connection.description.server_version_gte?('5.0') raise Error::SnapshotSessionInvalidServerVersion end sel[:readConcern] = {level: 'snapshot'} if session. sel[:readConcern][:atClusterTime] = session. end end end
#apply_start_transaction!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 104
def apply_start_transaction!(selector) session.add_start_transaction!(selector) end
#apply_txn_num!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 108
def apply_txn_num!(selector) session.add_txn_num!(selector) end
#apply_txn_opts!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 116
def apply_txn_opts!(selector) session.add_txn_opts!(selector, read_command?(selector), context) end
#build_message(connection, context) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 244
def (connection, context) if self.session != context.session if self.session raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}" else # Some operations are not constructed with sessions but are # executed in a context where a session is available. # This could be OK or a driver issue. # TODO investigate. end end super.tap do || if session = context.session # Serialize the message to detect client-side problems, # such as invalid BSON keys or too large messages. # The message will be serialized again # later prior to being sent to the connection. buf = BSON::ByteBuffer.new .serialize(buf) if buf.length > connection. raise Error::MaxMessageSize.new(connection. ) end session.update_state! end end end
#command(connection) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 128
def command(connection) if Lint.enabled? unless connection.is_a?(Server::Connection) raise Error::LintError, "Connection is not a Connection instance: #{connection}" end end sel = BSON::Document.new(selector(connection)) add_write_concern!(sel) sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name add_read_preference(sel, connection) if connection.features.sessions_enabled? apply_cluster_time!(sel, connection) if session && (acknowledged_write? || session.in_transaction?) (sel, connection) end elsif session && session.explicit? (sel, connection) end sel end
#flags (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 75
def flags acknowledged_write? ? [] : [:more_to_come] end
#read_command?(sel) ⇒ Boolean
(private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 92
def read_command?(sel) READ_COMMANDS.any? { |c| sel[c] } end
#suppress_read_write_concern!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 120
def suppress_read_write_concern!(selector) session.suppress_read_write_concern!(selector) end
#validate_read_preference!(selector) (private)
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 124
def validate_read_preference!(selector) session.validate_read_preference!(selector) if read_command?(selector) end