123456789_123456789_123456789_123456789_123456789_

Module: Mongo::Operation::SessionsSupported Private

Do not use. This module is for internal use only.

Overview

Shared behavior of operations that support a session.

Since:

  • 2.5.2

Constant Summary

Instance Method Summary

Instance Method Details

#add_read_preference(sel, connection) (private)

Adds $readPreference field to the command document.

$readPreference is only sent when the server is a mongos, following the rules described in github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#passing-read-preference-to-mongos. The topology does not matter for figuring out whether to send $readPreference since the decision is always made based on server type.

$readPreference is sent to OP_MSG-grokking replica set members.

Parameters:

  • sel (Hash)

    Existing command document which will be mutated.

  • connection (Server::Connection)

    The connection that the operation will be executed on.

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 167

def add_read_preference(sel, connection)
  Lint.assert_type(connection, Server::Connection)

  # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#topology-type-single
  read_doc = if connection.description.standalone?
    # Read preference is never sent to standalones.
    nil
  elsif connection.server.load_balancer?
    read&.to_mongos
  elsif connection.description.mongos?
    # When server is a mongos:
    # - $readPreference is never sent when mode is 'primary'
    # - Otherwise $readPreference is sent
    # When mode is 'secondaryPreferred' $readPreference is currently
    # required to only be sent when a non-mode field (i.e. tag_sets)
    # is present, but this causes wrong behavior (DRIVERS-1642).
    read&.to_mongos
  elsif connection.server.cluster.single?
    # In Single topology:
    # - If no read preference is specified by the application, the driver
    #   adds mode: primaryPreferred.
    # - If a read preference is specified by the application, the driver
    #   replaces the mode with primaryPreferred.
    read_doc = if read
      BSON::Document.new(read.to_doc)
    else
      BSON::Document.new
    end
    if [nil, 'primary'].include?(read_doc['mode'])
      read_doc['mode'] = 'primaryPreferred'
    end
    read_doc
  else
    # In replica sets, read preference is passed to the server if one
    # is specified by the application, except for primary read preferences.
    read_doc = BSON::Document.new(read&.to_doc || {})
    if [nil, 'primary'].include?(read_doc['mode'])
      nil
    else
      read_doc
    end
  end

  if read_doc
    sel['$readPreference'] = read_doc
  end
end

#add_write_concern!(sel) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 96

def add_write_concern!(sel)
  sel[:writeConcern] = write_concern.options if write_concern
end

#apply_autocommit!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 100

def apply_autocommit!(selector)
  session.add_autocommit!(selector)
end

#apply_causal_consistency!(selector, connection) (private)

Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.

When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 51

def apply_causal_consistency!(selector, connection)
  return unless selector[:startTransaction]

  apply_causal_consistency_if_possible(selector, connection)
end

#apply_causal_consistency_if_possible(selector, connection) (private)

Adds causal consistency document to the selector, if one can be constructed.

In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 64

def apply_causal_consistency_if_possible(selector, connection)
  if !connection.description.standalone?
    cc_doc = session.send(:causal_consistency_doc)
    if cc_doc
      rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc)
      selector[:readConcern] = Options::Mapper.transform_values_to_strings(
        rc_doc)
    end
  end
end

#apply_cluster_time!(selector, connection) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 79

def apply_cluster_time!(selector, connection)
  if !connection.description.standalone?
    cluster_time = [
      connection.cluster_time,
      session&.cluster_time,
    ].compact.max

    if cluster_time
      selector['$clusterTime'] = cluster_time
    end
  end
end

#apply_read_pref!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 112

def apply_read_pref!(selector)
  session.apply_read_pref!(selector) if read_command?(selector)
end

#apply_session_options(sel, connection) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 215

def apply_session_options(sel, connection)
  apply_cluster_time!(sel, connection)
  sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
  sel.merge!(lsid: session.session_id)
  apply_start_transaction!(sel)
  apply_causal_consistency!(sel, connection)
  apply_autocommit!(sel)
  apply_txn_opts!(sel)
  suppress_read_write_concern!(sel)
  validate_read_preference!(sel)
  apply_txn_num!(sel)
  if session.recovery_token &&
    (sel[:commitTransaction] || sel[:abortTransaction])
  then
    sel[:recoveryToken] = session.recovery_token
  end

  if session.snapshot?
    unless connection.description.server_version_gte?('5.0')
      raise Error::SnapshotSessionInvalidServerVersion
    end

    sel[:readConcern] = {level: 'snapshot'}
    if session.snapshot_timestamp
      sel[:readConcern][:atClusterTime] = session.snapshot_timestamp
    end
  end
end

#apply_start_transaction!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 104

def apply_start_transaction!(selector)
  session.add_start_transaction!(selector)
end

#apply_txn_num!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 108

def apply_txn_num!(selector)
  session.add_txn_num!(selector)
end

#apply_txn_opts!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 116

def apply_txn_opts!(selector)
  session.add_txn_opts!(selector, read_command?(selector))
end

#build_message(connection, context) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 244

def build_message(connection, context)
  if self.session != context.session
    if self.session
      raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}"
    else
      # Some operations are not constructed with sessions but are
      # executed in a context where a session is available.
      # This could be OK or a driver issue.
      # TODO investigate.
    end
  end

  super.tap do |message|
    if session = context.session
      # Serialize the message to detect client-side problems,
      # such as invalid BSON keys or too large messages.
      # The message will be serialized again
      # later prior to being sent to the connection.
      buf = BSON::ByteBuffer.new
      message.serialize(buf)
      if buf.length > connection.max_message_size
        raise Error::MaxMessageSize.new(connection.max_message_size)
      end
      session.update_state!
    end
  end
end

#command(connection) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 128

def command(connection)
  if Lint.enabled?
    unless connection.is_a?(Server::Connection)
      raise Error::LintError, "Connection is not a Connection instance: #{connection}"
    end
  end

  sel = BSON::Document.new(selector(connection))
  add_write_concern!(sel)
  sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name

  add_read_preference(sel, connection)

  if connection.features.sessions_enabled?
    apply_cluster_time!(sel, connection)
    if session && (acknowledged_write? || session.in_transaction?)
      apply_session_options(sel, connection)
    end
  elsif session && session.explicit?
    apply_session_options(sel, connection)
  end

  sel
end

#flags (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 75

def flags
  acknowledged_write? ? [] : [:more_to_come]
end

#read_command?(sel) ⇒ Boolean (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 92

def read_command?(sel)
  READ_COMMANDS.any? { |c| sel[c] }
end

#suppress_read_write_concern!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 120

def suppress_read_write_concern!(selector)
  session.suppress_read_write_concern!(selector)
end

#validate_read_preference!(selector) (private)

Since:

  • 2.5.2

[ GitHub ]

  
# File 'lib/mongo/operation/shared/sessions_supported.rb', line 124

def validate_read_preference!(selector)
  session.validate_read_preference!(selector) if read_command?(selector)
end