123456789_123456789_123456789_123456789_123456789_

Class: ActiveSupport::ContinuousIntegration::Group

Do not use. This class is for internal use only.
Relationships & Source Files
Namespace Children
Classes:
Inherits: Object
Defined in: activesupport/lib/active_support/continuous_integration/group.rb

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(ci, name, parallel:, &block) ⇒ Group

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 26

def initialize(ci, name, parallel:, &block)
  @ci = ci
  @name = name
  @parallel = parallel
  @tasks = TaskCollector.new(&block).tasks
  @start_time = Time.now.to_f
  @mutex = Mutex.new
  @running = {}
  @progress_visible = false
  @log_files = []
end

Instance Attribute Details

#pty_available?Boolean (readonly, private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 138

def pty_available?
  require "pty"
  true
rescue LoadError
  false
end

Instance Method Details

#capture_output(command) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 111

def capture_output(command)
  log_path = Dir::Tmpname.create(["ci-", ".log"]) { }
  @mutex.synchronize { @log_files << log_path }

  success = spawn_process(command) do |output|
    File.open(log_path, "w") do |f|
      loop { f.write(output.readpartial(8192)) }
    rescue EOFError, Errno::EIO
      # Expected when process exits
    end
  end

  [success, log_path]
rescue SystemCallError => e
  File.write(log_path, "#{e.message}: #{command.join(" ")}\n")
  [false, log_path]
end

#clear_progress (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 182

def clear_progress
  return unless @progress_visible
  print "\r\e[2A\e[J"
  @progress_visible = false
end

#dequeue(queue) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 193

def dequeue(queue)
  queue.pop(true)
rescue ThreadError
  nil
end

#execute_group(name, block) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 99

def execute_group(name, block)
  all_success = true
  TaskCollector.new(&block).tasks.each do |type, title, payload|
    unless execute_task(type, title, payload)
      all_success = false
      break if @ci.fail_fast?
    end
  end

  all_success
end

#execute_step(title, command) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 82

def execute_step(title, command)
  @mutex.synchronize { @running[title] = Time.now.to_f }
  success, log_path = capture_output(command)
  @mutex.synchronize do
    started = @running.delete(title)
    clear_progress

    @ci.report_step(title, command) do
      replay_and_cleanup(log_path)
      [success, Time.now.to_f - started]
    end

    refresh_progress
  end
  success
end

#execute_task(type, title, payload) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 75

def execute_task(type, title, payload)
  case type
  when :step  then execute_step(title, payload)
  when :group then execute_group(title, payload)
  end
end

#format_elapsed_brief(seconds) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 188

def format_elapsed_brief(seconds)
  min, sec = seconds.divmod(60)
  "#{"#{min.to_i}m" if min > 0}#{sec.to_i}s"
end

#refresh_progress (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 172

def refresh_progress
  if @running.any?
    print "\n\n" unless @progress_visible
    elapsed = format_elapsed_brief(Time.now.to_f - @start_time)
    print "\r\e[K#{@ci.colorize("#{@name} (#{elapsed}) - #{@running.keys.join(' | ')}...", :progress)}"
    @progress_visible = true
    $stdout.flush
  end
end

#replay_and_cleanup(log_path) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 163

def replay_and_cleanup(log_path)
  File.open(log_path, "r") do |f|
    while (chunk = f.read(8192))
      print chunk
    end
  end
  File.delete(log_path)
end

#run

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 38

def run
  previous_trap = Signal.trap("INT") { abort @ci.colorize("\n❌ #{@running.keys.join(', ')} interrupted", :error) }

  queue = Queue.new
  @tasks.each { |task| queue << task }

  with_progress do
    @parallel.times.map do
      Thread.new do
        while (task = dequeue(queue))
          break if @ci.failing_fast?
          execute_task(*task)
        end
      end
    end.each(&:join)
  end
ensure
  Signal.trap("INT", previous_trap || "-")
  @log_files.each { |path| File.delete(path) if File.exist?(path) }
end

#spawn_process(command, &block) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 129

def spawn_process(command, &block)
  # Prefer PTY if available to retain output colors
  if pty_available?
    spawn_via_pty(command, &block)
  else
    spawn_via_open3(command, &block)
  end
end

#spawn_via_open3(command) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 154

def spawn_via_open3(command)
  require "open3"
  Open3.popen2e(*command) do |input, output, wait_thr|
    input.close
    yield output
    wait_thr.value.success?
  end
end

#spawn_via_pty(command) (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 145

def spawn_via_pty(command)
  output, input, pid = PTY.spawn(*command)
  input.close
  yield output
  Process.waitpid2(pid).last.success?
rescue PTY::ChildExited => e
  e.status.success?
end

#with_progress (private)

[ GitHub ]

  
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 60

def with_progress
  stop = false
  thread = Thread.new do
    until stop
      @mutex.synchronize { refresh_progress }
      sleep 0.1
    end
  end

  yield

  stop = true
  thread.join
end