Class: ActiveSupport::ContinuousIntegration::Group
Do not use. This class is for internal use only.
| Relationships & Source Files | |
| Namespace Children | |
|
Classes:
| |
| Inherits: | Object |
| Defined in: | activesupport/lib/active_support/continuous_integration/group.rb |
Class Method Summary
- .new(ci, name, parallel:, &block) ⇒ Group constructor
Instance Attribute Summary
- #pty_available? ⇒ Boolean readonly private
Instance Method Summary
- #run
- #capture_output(command) private
- #clear_progress private
- #dequeue(queue) private
- #execute_group(name, block) private
- #execute_step(title, command) private
- #execute_task(type, title, payload) private
- #format_elapsed_brief(seconds) private
- #refresh_progress private
- #replay_and_cleanup(log_path) private
- #spawn_process(command, &block) private
- #spawn_via_open3(command) private
- #spawn_via_pty(command) private
- #with_progress private
Constructor Details
.new(ci, name, parallel:, &block) ⇒ Group
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 26
def initialize(ci, name, parallel:, &block) @ci = ci @name = name @parallel = parallel @tasks = TaskCollector.new(&block).tasks @start_time = Time.now.to_f @mutex = Mutex.new @running = {} @progress_visible = false @log_files = [] end
Instance Attribute Details
#pty_available? ⇒ Boolean (readonly, private)
[ GitHub ]
# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 138
def pty_available? require "pty" true rescue LoadError false end
Instance Method Details
#capture_output(command) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 111
def capture_output(command) log_path = Dir::Tmpname.create(["ci-", ".log"]) { } @mutex.synchronize { @log_files << log_path } success = spawn_process(command) do |output| File.open(log_path, "w") do |f| loop { f.write(output.readpartial(8192)) } rescue EOFError, Errno::EIO # Expected when process exits end end [success, log_path] rescue SystemCallError => e File.write(log_path, "#{e.}: #{command.join(" ")}\n") [false, log_path] end
#clear_progress (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 182
def clear_progress return unless @progress_visible print "\r\e[2A\e[J" @progress_visible = false end
#dequeue(queue) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 193
def dequeue(queue) queue.pop(true) rescue ThreadError nil end
#execute_group(name, block) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 99
def execute_group(name, block) all_success = true TaskCollector.new(&block).tasks.each do |type, title, payload| unless execute_task(type, title, payload) all_success = false break if @ci.fail_fast? end end all_success end
#execute_step(title, command) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 82
def execute_step(title, command) @mutex.synchronize { @running[title] = Time.now.to_f } success, log_path = capture_output(command) @mutex.synchronize do started = @running.delete(title) clear_progress @ci.report_step(title, command) do replay_and_cleanup(log_path) [success, Time.now.to_f - started] end refresh_progress end success end
#execute_task(type, title, payload) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 75
def execute_task(type, title, payload) case type when :step then execute_step(title, payload) when :group then execute_group(title, payload) end end
#format_elapsed_brief(seconds) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 188
def format_elapsed_brief(seconds) min, sec = seconds.divmod(60) "#{"#{min.to_i}m" if min > 0}#{sec.to_i}s" end
#refresh_progress (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 172
def refresh_progress if @running.any? print "\n\n" unless @progress_visible elapsed = format_elapsed_brief(Time.now.to_f - @start_time) print "\r\e[K#{@ci.colorize("#{@name} (#{elapsed}) - #{@running.keys.join(' | ')}...", :progress)}" @progress_visible = true $stdout.flush end end
#replay_and_cleanup(log_path) (private)
[ GitHub ]#run
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 38
def run previous_trap = Signal.trap("INT") { abort @ci.colorize("\n❌ #{@running.keys.join(', ')} interrupted", :error) } queue = Queue.new @tasks.each { |task| queue << task } with_progress do @parallel.times.map do Thread.new do while (task = dequeue(queue)) break if @ci.failing_fast? execute_task(*task) end end end.each(&:join) end ensure Signal.trap("INT", previous_trap || "-") @log_files.each { |path| File.delete(path) if File.exist?(path) } end
#spawn_process(command, &block) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 129
def spawn_process(command, &block) # Prefer PTY if available to retain output colors if pty_available? spawn_via_pty(command, &block) else spawn_via_open3(command, &block) end end
#spawn_via_open3(command) (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 154
def spawn_via_open3(command) require "open3" Open3.popen2e(*command) do |input, output, wait_thr| input.close yield output wait_thr.value.success? end end
#spawn_via_pty(command) (private)
[ GitHub ]#with_progress (private)
[ GitHub ]# File 'activesupport/lib/active_support/continuous_integration/group.rb', line 60
def with_progress stop = false thread = Thread.new do until stop @mutex.synchronize { refresh_progress } sleep 0.1 end end yield stop = true thread.join end