pipeline
@config = grammar.parse(configstr)
code = @config.compile
eval(code)
@input_queue = LogStash::Util::WrappedSynchronousQueue.new
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
@inputs.each do |input|
input.register
@input_threads << Thread.new do
LogStash::Util::set_thread_name("[#{pipeline_id}]<#{input.class.config_name}")
plugin.run(@input_queue)
end
end
@outputs.each {|o| o.register }
@filters.each {|f| f.register }
@settings[:pipeline_workers].times do |t|
@worker_threads << Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
while true
input_batch = []
batch_size.times do |t|
event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay)
input_batch << event
end
input_batch.reduce([]) do |acc,e|
filtered = filter_func(e)
filtered.each {|fe| acc << fe unless fe.cancelled?}
acc
end
.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event|
outputs_for_event = output_func(event) || []
outputs_for_event.each { |output| acc[output] << event }
acc
end
.each { |output, events| output.multi_receive(events) }
end
end
endLast updated