123456789_123456789_123456789_123456789_123456789_

Module: Redis::Commands::Streams

Relationships & Source Files
Extension / Inclusion / Inheritance Descendants
Included In:
Defined in: lib/redis/commands/streams.rb

Instance Method Summary

Instance Method Details

#_xread(args, keys, ids, blocking_timeout_msec) (private)

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 385

def _xread(args, keys, ids, blocking_timeout_msec)
  keys = keys.is_a?(Array) ? keys : [keys]
  ids = ids.is_a?(Array) ? ids : [ids]
  args << 'STREAMS'
  args.concat(keys)
  args.concat(ids)

  if blocking_timeout_msec.nil?
    send_command(args, &HashifyStreams)
  elsif blocking_timeout_msec.to_f.zero?
    send_blocking_command(args, 0, &HashifyStreams)
  else
    send_blocking_command(args, blocking_timeout_msec.to_f / 1_000, &HashifyStreams)
  end
end

#xack(key, group, *ids) ⇒ Integer

Removes one or multiple entries from the pending entries list of a stream consumer group.

Examples:

With a entry id

redis.xack('mystream', 'mygroup', '1526569495631-0')

With splatted entry ids

redis.xack('mystream', 'mygroup', '0-1', '0-2')

With arrayed entry ids

redis.xack('mystream', 'mygroup', %w[0-1 0-2])

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • ids (Array<String>)

    one or multiple entry ids

Returns:

  • (Integer)

    the number of entries successfully acknowledged

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 266

def xack(key, group, *ids)
  args = [:xack, key, group].concat(ids.flatten)
  send_command(args)
end

#xadd(key, entry, approximate: nil, maxlen: nil, nomkstream: nil, id: '*') ⇒ String

Add new entry to the stream.

Examples:

Without options

redis.xadd('mystream', f1: 'v1', f2: 'v2')

With options

redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximate: true, nomkstream: true)

Parameters:

  • key (String)

    the stream key

  • entry (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    several options for XADD command

Returns:

  • (String)

    the entry id

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 49

def xadd(key, entry, approximate: nil, maxlen: nil, nomkstream: nil, id: '*')
  args = [:xadd, key]
  args << 'NOMKSTREAM' if nomkstream
  if maxlen
    args << "MAXLEN"
    args << "~" if approximate
    args << maxlen
  end
  args << id
  args.concat(entry.flatten)
  send_command(args)
end

#xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) ⇒ Hash{String => Hash}, Array<String>

Transfers ownership of pending stream entries that match the specified criteria.

Examples:

Claim next pending message stuck > 5 minutes and mark as retry

redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0')

Claim 50 next pending messages stuck > 5 minutes and mark as retry

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50)

Claim next pending message stuck > 5 minutes and don't mark as retry

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true)

Claim next pending message after this id stuck > 5 minutes and mark as retry

redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0')

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • min_idle_time (Integer)

    the number of milliseconds

  • start (String)

    entry id to start scanning from or 0-0 for everything

  • count (Integer)

    number of messages to claim (default 1)

  • justid (Boolean)

    whether to fetch just an array of entry ids or not. Does not increment retry count when true

Returns:

  • (Hash{String => Hash})

    the entries successfully claimed

  • (Array<String>)

    the entry ids successfully claimed if justid option is true

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 336

def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
  args = [:xautoclaim, key, group, consumer, min_idle_time, start]
  if count
    args << 'COUNT' << count.to_s
  end
  args << 'JUSTID' if justid
  blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
  send_command(args, &blk)
end

#xclaim(key, group, consumer, min_idle_time, *ids, **opts) ⇒ Hash{String => Hash}, Array<String>

Changes the ownership of a pending entry

Examples:

With splatted entry ids

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')

With arrayed entry ids

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])

With idle option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)

With time option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)

With retrycount option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)

With force option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)

With justid option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • min_idle_time (Integer)

    the number of milliseconds

  • ids (Array<String>)

    one or multiple entry ids

  • opts (Hash)

    several options for XCLAIM command

Options Hash (**opts):

  • :idle (Integer)

    the number of milliseconds as last time it was delivered of the entry

  • :time (Integer)

    the number of milliseconds as a specific Unix Epoch time

  • :retrycount (Integer)

    the number of retry counter

  • :force (Boolean)

    whether to create the pending entry to the pending entries list or not

  • :justid (Boolean)

    whether to fetch just an array of entry ids or not

Returns:

  • (Hash{String => Hash})

    the entries successfully claimed

  • (Array<String>)

    the entry ids successfully claimed if justid option is true

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 303

def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
  args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
  args.concat(['IDLE',       opts[:idle].to_i])  if opts[:idle]
  args.concat(['TIME',       opts[:time].to_i])  if opts[:time]
  args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
  args << 'FORCE'                                if opts[:force]
  args << 'JUSTID'                               if opts[:justid]
  blk = opts[:justid] ? Noop : HashifyStreamEntries
  send_command(args, &blk)
end

#xdel(key, *ids) ⇒ Integer

Delete entries by entry ids.

Examples:

With splatted entry ids

redis.xdel('mystream', '0-1', '0-2')

With arrayed entry ids

redis.xdel('mystream', ['0-1', '0-2'])

Parameters:

  • key (String)

    the stream key

  • ids (Array<String>)

    one or multiple entry ids

Returns:

  • (Integer)

    the number of entries actually deleted

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 106

def xdel(key, *ids)
  args = [:xdel, key].concat(ids.flatten)
  send_command(args)
end

#xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) ⇒ String, Integer

Manages the consumer group of the stream.

Examples:

With create subcommand

redis.xgroup(:create, 'mystream', 'mygroup', '$')

With setid subcommand

redis.xgroup(:setid, 'mystream', 'mygroup', '$')

With destroy subcommand

redis.xgroup(:destroy, 'mystream', 'mygroup')

With delconsumer subcommand

redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')

Parameters:

  • subcommand (String)

    create setid destroy delconsumer

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • id_or_consumer (String) (defaults to: nil)
    • the entry id or $, required if subcommand is create or setid
    • the consumer name, required if subcommand is delconsumer
  • mkstream (Boolean)

    whether to create an empty stream automatically or not

Returns:

  • (String)

    OK if subcommand is create or setid

  • (Integer)

    effected count if subcommand is destroy or delconsumer

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 214

def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
  args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
  send_command(args)
end

#xinfo(subcommand, key, group = nil) ⇒ Hash+

Returns the stream information each subcommand.

Examples:

stream

redis.xinfo(:stream, 'mystream')

groups

redis.xinfo(:groups, 'mystream')

consumers

redis.xinfo(:consumers, 'mystream', 'mygroup')

Parameters:

  • subcommand (String)

    e.g. stream groups consumers

  • key (String)

    the stream key

  • group (String) (defaults to: nil)

    the consumer group name, required if subcommand is consumers

Returns:

  • (Hash)

    information of the stream if subcommand is stream

  • (Array<Hash>)

    information of the consumer groups if subcommand is groups

  • (Array<Hash>)

    information of the consumers if subcommand is consumers

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 22

def xinfo(subcommand, key, group = nil)
  args = [:xinfo, subcommand, key, group].compact
  block = case subcommand.to_s.downcase
  when 'stream'              then Hashify
  when 'groups', 'consumers' then proc { |r| r.map(&Hashify) }
  end

  send_command(args, &block)
end

#xlen(key) ⇒ Integer

Returns the number of entries inside a stream.

Examples:

With key

redis.xlen('mystream')

Parameters:

  • key (String)

    the stream key

Returns:

  • (Integer)

    the number of entries

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 165

def xlen(key)
  send_command([:xlen, key])
end

#xpending(key, group, *args, idle: nil) ⇒ Hash+

Fetches not acknowledging pending entries

Examples:

With key and group

redis.xpending('mystream', 'mygroup')

With range options

redis.xpending('mystream', 'mygroup', '-', '+', 10)

With range and idle time options

redis.xpending('mystream', 'mygroup', '-', '+', 10, idle: 9000)

With range and consumer options

redis.xpending('mystream', 'mygroup', '-', '+', 10, 'consumer1')

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • start (String)

    start first entry id of range

  • end (String)

    end last entry id of range

  • count (Integer)

    count the number of entries as limit

  • consumer (String)

    the consumer name

  • opts (Hash)

    a customizable set of options

Returns:

  • (Hash)

    the summary of pending entries

  • (Array<Hash>)

    the pending entries details if options were specified

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 368

def xpending(key, group, *args, idle: nil)
  command_args = [:xpending, key, group]
  command_args << 'IDLE' << Integer(idle) if idle
  case args.size
  when 0, 3, 4
    command_args.concat(args)
  else
    raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)"
  end

  summary_needed = args.empty?
  blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
  send_command(command_args, &blk)
end

#xrange(key, start = '-', range_end = '+', count: nil) ⇒ Array<Array<String, Hash>>

Fetches entries of the stream in ascending order.

Examples:

Without options

redis.xrange('mystream')

With a specific start

redis.xrange('mystream', '0-1')

With a specific start and end

redis.xrange('mystream', '0-1', '0-3')

With count options

redis.xrange('mystream', count: 10)

Parameters:

  • key (String)

    the stream key

  • start (String) (defaults to: '-')

    first entry id of range, default value is -

  • end (String)

    last entry id of range, default value is +

  • count (Integer)

    the number of entries as limit

Returns:

  • (Array<Array<String, Hash>>)

    the ids and entries pairs

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 128

def xrange(key, start = '-', range_end = '+', count: nil)
  args = [:xrange, key, start, range_end]
  args.concat(['COUNT', count]) if count
  send_command(args, &HashifyStreamEntries)
end

#xread(keys, ids, count: nil, block: nil) ⇒ Hash{String => Hash{String => Hash}}

Fetches entries from one or multiple streams. Optionally blocking.

Examples:

With a key

redis.xread('mystream', '0-0')

With multiple keys

redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])

With count option

redis.xread('mystream', '0-0', count: 2)

With block option

redis.xread('mystream', '$', block: 1000)

Parameters:

  • keys (Array<String>)

    one or multiple stream keys

  • ids (Array<String>)

    one or multiple entry ids

  • count (Integer)

    the number of entries as limit per stream

  • block (Integer)

    the number of milliseconds as blocking timeout

Returns:

  • (Hash{String => Hash{String => Hash}})

    the entries

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 186

def xread(keys, ids, count: nil, block: nil)
  args = [:xread]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  _xread(args, keys, ids, block)
end

#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) ⇒ Hash{String => Hash{String => Hash}}

Fetches a subset of the entries from one or multiple streams related with the consumer group. Optionally blocking.

Examples:

With a key

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')

With multiple keys

redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])

With count option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)

With block option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)

With noack option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)

Parameters:

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • keys (Array<String>)

    one or multiple stream keys

  • ids (Array<String>)

    one or multiple entry ids

  • opts (Hash)

    several options for XREADGROUP command

Returns:

  • (Hash{String => Hash{String => Hash}})

    the entries

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 244

def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)
  args = [:xreadgroup, 'GROUP', group, consumer]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  args << 'NOACK' if noack
  _xread(args, keys, ids, block)
end

#xrevrange(key, range_end = '+', start = '-', count: nil) ⇒ Array<Array<String, Hash>>

Fetches entries of the stream in descending order.

Examples:

Without options

redis.xrevrange('mystream')

With a specific end

redis.xrevrange('mystream', '0-3')

With a specific end and start

redis.xrevrange('mystream', '0-3', '0-1')

With count options

redis.xrevrange('mystream', count: 10)

Parameters:

  • key (String)

    the stream key

  • end (String)

    first entry id of range, default value is +

  • start (String) (defaults to: '-')

    last entry id of range, default value is -

Returns:

  • (Array<Array<String, Hash>>)

    the ids and entries pairs

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 151

def xrevrange(key, range_end = '+', start = '-', count: nil)
  args = [:xrevrange, key, range_end, start]
  args.concat(['COUNT', count]) if count
  send_command(args, &HashifyStreamEntries)
end

#xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true) ⇒ Integer #xtrim(key, minid, strategy: 'MINID', approximate: true) ⇒ Integer

Trims older entries of the stream if needed.

Examples:

Without options

redis.xtrim('mystream', 1000)

With options

redis.xtrim('mystream', 1000, approximate: true)

With strategy

redis.xtrim('mystream', '1-0', strategy: 'MINID')

Overloads:

  • #xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true) ⇒ Integer

    Parameters:

    • key (String)

      the stream key

    • maxlen (Integer)

      max length of entries

    • strategy (String)

      the limit strategy, must be MAXLEN

    • approximate (Boolean)

      whether to add ~ modifier of maxlen or not

    • limit (Integer)

      maximum count of entries to be evicted

  • #xtrim(key, minid, strategy: 'MINID', approximate: true) ⇒ Integer

    Parameters:

    • key (String)

      the stream key

    • minid (String)

      minimum id of entries

    • strategy (String)

      the limit strategy, must be MINID

    • approximate (Boolean)

      whether to add ~ modifier of minid or not

    • limit (Integer)

      maximum count of entries to be evicted

Returns:

  • (Integer)

    the number of entries actually deleted

[ GitHub ]

  
# File 'lib/redis/commands/streams.rb', line 85

def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil)
  strategy = strategy.to_s.upcase

  args = [:xtrim, key, strategy]
  args << '~' if approximate
  args << len_or_id
  args.concat(['LIMIT', limit]) if limit
  send_command(args)
end