Module: Redis::Commands::Streams
| Relationships & Source Files | |
| Extension / Inclusion / Inheritance Descendants | |
| Included In: | |
| Defined in: | lib/redis/commands/streams.rb | 
Instance Method Summary
- 
    
      #xack(key, group, *ids)  ⇒ Integer 
    
    Removes one or multiple entries from the pending entries list of a stream consumer group. 
- 
    
      #xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*')  ⇒ String 
    
    Add new entry to the stream. 
- 
    
      #xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)  ⇒ Hash{String => Hash}, Array<String> 
    
    Transfers ownership of pending stream entries that match the specified criteria. 
- 
    
      #xclaim(key, group, consumer, min_idle_time, *ids, **opts)  ⇒ Hash{String => Hash}, Array<String> 
    
    Changes the ownership of a pending entry. 
- 
    
      #xdel(key, *ids)  ⇒ Integer 
    
    Delete entries by entry ids. 
- 
    
      #xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)  ⇒ String, Integer 
    
    Manages the consumer group of the stream. 
- 
    
      #xinfo(subcommand, key, group = nil)  ⇒ Hash+ 
    
    Returns the stream information each subcommand. 
- 
    
      #xlen(key)  ⇒ Integer 
    
    Returns the number of entries inside a stream. 
- 
    
      #xpending(key, group, *args, idle: nil)  ⇒ Hash+ 
    
    Fetches not acknowledging pending entries. 
- 
    
      #xrange(key, start = '-', range_end = '+', count: nil)  ⇒ Array<Array<String, Hash>> 
    
    Fetches entries of the stream in ascending order. 
- 
    
      #xread(keys, ids, count: nil, block: nil)  ⇒ Hash{String => Hash{String => Hash}} 
    
    Fetches entries from one or multiple streams. 
- 
    
      #xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)  ⇒ Hash{String => Hash{String => Hash}} 
    
    Fetches a subset of the entries from one or multiple streams related with the consumer group. 
- 
    
      #xrevrange(key, range_end = '+', start = '-', count: nil)  ⇒ Array<Array<String, Hash>> 
    
    Fetches entries of the stream in descending order. 
- 
    
      #xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true)  ⇒ Integer 
    
    Trims older entries of the stream if needed. 
- #_xread(args, keys, ids, blocking_timeout_msec) private
Instance Method Details
#_xread(args, keys, ids, blocking_timeout_msec) (private)
[ GitHub ]# File 'lib/redis/commands/streams.rb', line 392
def _xread(args, keys, ids, blocking_timeout_msec) keys = keys.is_a?(Array) ? keys : [keys] ids = ids.is_a?(Array) ? ids : [ids] args << 'STREAMS' args.concat(keys) args.concat(ids) if blocking_timeout_msec.nil? send_command(args, &HashifyStreams) elsif blocking_timeout_msec.to_f.zero? send_blocking_command(args, 0, &HashifyStreams) else send_blocking_command(args, blocking_timeout_msec.to_f / 1_000, &HashifyStreams) end end
    #xack(key, group, *ids)  ⇒ Integer 
  
Removes one or multiple entries from the pending entries list of a stream consumer group.
# File 'lib/redis/commands/streams.rb', line 273
def xack(key, group, *ids) args = [:xack, key, group].concat(ids.flatten) send_command(args) end
    #xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*')  ⇒ String 
  
Add new entry to the stream.
# File 'lib/redis/commands/streams.rb', line 50
def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: '*') args = [:xadd, key] args << 'NOMKSTREAM' if nomkstream if maxlen raise ArgumentError, "can't supply both maxlen and minid" if minid args << "MAXLEN" args << "~" if approximate args << maxlen elsif minid args << "MINID" args << "~" if approximate args << minid end args << id args.concat(entry.flatten) send_command(args) end
    #xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)  ⇒ Hash{String => Hash}, Array<String> 
  
Transfers ownership of pending stream entries that match the specified criteria.
# File 'lib/redis/commands/streams.rb', line 343
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) args = [:xautoclaim, key, group, consumer, min_idle_time, start] if count args << 'COUNT' << count.to_s end args << 'JUSTID' if justid blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim send_command(args, &blk) end
    #xclaim(key, group, consumer, min_idle_time, *ids, **opts)  ⇒ Hash{String => Hash}, Array<String> 
  
Changes the ownership of a pending entry
# File 'lib/redis/commands/streams.rb', line 310
def xclaim(key, group, consumer, min_idle_time, *ids, **opts) args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten) args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle] args.concat(['TIME', opts[:time].to_i]) if opts[:time] args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount] args << 'FORCE' if opts[:force] args << 'JUSTID' if opts[:justid] blk = opts[:justid] ? Noop : HashifyStreamEntries send_command(args, &blk) end
    #xdel(key, *ids)  ⇒ Integer 
  
Delete entries by entry ids.
# File 'lib/redis/commands/streams.rb', line 113
def xdel(key, *ids) args = [:xdel, key].concat(ids.flatten) send_command(args) end
    #xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)  ⇒ String, Integer 
  
Manages the consumer group of the stream.
# File 'lib/redis/commands/streams.rb', line 221
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact send_command(args) end
    #xinfo(subcommand, key, group = nil)  ⇒ Hash+ 
  
Returns the stream information each subcommand.
# File 'lib/redis/commands/streams.rb', line 22
def xinfo(subcommand, key, group = nil) args = [:xinfo, subcommand, key, group].compact block = case subcommand.to_s.downcase when 'stream' then Hashify when 'groups', 'consumers' then proc { |r| r.map(&Hashify) } end send_command(args, &block) end
    #xlen(key)  ⇒ Integer 
  
Returns the number of entries inside a stream.
# File 'lib/redis/commands/streams.rb', line 172
def xlen(key) send_command([:xlen, key]) end
    #xpending(key, group, *args, idle: nil)  ⇒ Hash+ 
  
Fetches not acknowledging pending entries
# File 'lib/redis/commands/streams.rb', line 375
def xpending(key, group, *args, idle: nil) command_args = [:xpending, key, group] command_args << 'IDLE' << Integer(idle) if idle case args.size when 0, 3, 4 command_args.concat(args) else raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)" end summary_needed = args.empty? blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails send_command(command_args, &blk) end
    #xrange(key, start = '-', range_end = '+', count: nil)  ⇒ Array<Array<String, Hash>> 
  
Fetches entries of the stream in ascending order.
# File 'lib/redis/commands/streams.rb', line 135
def xrange(key, start = '-', range_end = '+', count: nil) args = [:xrange, key, start, range_end] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end
    #xread(keys, ids, count: nil, block: nil)  ⇒ Hash{String => Hash{String => Hash}} 
  
Fetches entries from one or multiple streams. Optionally blocking.
# File 'lib/redis/commands/streams.rb', line 193
def xread(keys, ids, count: nil, block: nil) args = [:xread] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block _xread(args, keys, ids, block) end
    #xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)  ⇒ Hash{String => Hash{String => Hash}} 
  
Fetches a subset of the entries from one or multiple streams related with the consumer group. Optionally blocking.
# File 'lib/redis/commands/streams.rb', line 251
def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) args = [:xreadgroup, 'GROUP', group, consumer] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block args << 'NOACK' if noack _xread(args, keys, ids, block) end
    #xrevrange(key, range_end = '+', start = '-', count: nil)  ⇒ Array<Array<String, Hash>> 
  
Fetches entries of the stream in descending order.
# File 'lib/redis/commands/streams.rb', line 158
def xrevrange(key, range_end = '+', start = '-', count: nil) args = [:xrevrange, key, range_end, start] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end
    
      #xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true)  ⇒ Integer 
      #xtrim(key, minid, strategy: 'MINID', approximate: true)  ⇒ Integer 
    
  
Integer 
      #xtrim(key, minid, strategy: 'MINID', approximate: true)  ⇒ Integer 
    Trims older entries of the stream if needed.
# File 'lib/redis/commands/streams.rb', line 92
def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil) strategy = strategy.to_s.upcase args = [:xtrim, key, strategy] args << '~' if approximate args << len_or_id args.concat(['LIMIT', limit]) if limit send_command(args) end