Skip to content

Commit f69969c

Browse files
committed
dlq-logging: reintroduce structured payload where possible
1 parent 2bc99f6 commit f69969c

File tree

6 files changed

+60
-32
lines changed

6 files changed

+60
-32
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.15.2
2+
- Restores DLQ logging behavior from 11.8.x to include the action-tuple as structured [#1105](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1105)
3+
14
## 11.15.1
25
- Move async finish_register to bottom of register to avoid race condition [#1125](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1125)
36

lib/logstash/outputs/elasticsearch.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def handle_event_mapping_errors(event_mapping_errors)
405405

406406
event_mapping_errors.each do |event_mapping_error|
407407
detailed_message = "#{event_mapping_error.message}; event: `#{event_mapping_error.event.to_hash_with_metadata}`"
408-
handle_dlq_status(event_mapping_error.event, :warn, detailed_message)
408+
@dlq_writer ? @dlq_writer.write(event_mapping_error.event, detailed_message) : @logger.warn(detailed_message)
409409
end
410410
@document_level_metrics.increment(:non_retryable_failures, event_mapping_errors.size)
411411
end
@@ -422,7 +422,7 @@ def safe_interpolation_map_events(events)
422422
successful_events << @event_mapper.call(event)
423423
rescue EventMappingError => ie
424424
event_mapping_errors << FailedEventMapping.new(event, ie.message)
425-
end
425+
end
426426
end
427427
MapEventsResult.new(successful_events, event_mapping_errors)
428428
end

lib/logstash/plugin_mixins/elasticsearch/common.rb

+7-13
Original file line numberDiff line numberDiff line change
@@ -227,22 +227,16 @@ def next_sleep_interval(current_interval)
227227
end
228228

229229
def handle_dlq_response(message, action, status, response)
230-
_, action_params = action.event, [action[0], action[1], action[2]]
230+
event, action_params = action.event, [action[0], action[1], action[2]]
231231

232-
# TODO: Change this to send a map with { :status => status, :action => action } in the future
233-
detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}"
234-
235-
log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn
236-
237-
handle_dlq_status(action.event, log_level, detailed_message)
238-
end
239-
240-
def handle_dlq_status(event, log_level, message)
241-
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
242232
if @dlq_writer
243-
@dlq_writer.write(event, "#{message}")
233+
# TODO: Change this to send a map with { :status => status, :action => action } in the future
234+
detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}"
235+
@dlq_writer.write(event, "#{detailed_message}")
244236
else
245-
@logger.send log_level, message
237+
log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn
238+
239+
@logger.public_send(log_level, message, status: status, action: action_params, response: response)
246240
end
247241
end
248242

spec/es_spec_helper.rb

+12-7
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,24 @@ def self.es_version
6767
end
6868

6969
RSpec::Matchers.define :have_hits do |expected|
70+
hits_count_path = ESHelper.es_version_satisfies?(">=7") ? %w(hits total value) : %w(hits total)
71+
7072
match do |actual|
71-
if ESHelper.es_version_satisfies?(">=7")
72-
expected == actual['hits']['total']['value']
73-
else
74-
expected == actual['hits']['total']
75-
end
73+
@actual_hits_count = actual&.dig(*hits_count_path)
74+
values_match? expected, @actual_hits_count
75+
end
76+
failure_message do |actual|
77+
"expected that #{actual} with #{@actual_hits_count || "UNKNOWN" } hits would have #{expected} hits"
7678
end
7779
end
7880

7981
RSpec::Matchers.define :have_index_pattern do |expected|
8082
match do |actual|
81-
test_against = Array(actual['index_patterns'].nil? ? actual['template'] : actual['index_patterns'])
82-
test_against.include?(expected)
83+
@actual_index_pattterns = Array(actual['index_patterns'].nil? ? actual['template'] : actual['index_patterns'])
84+
@actual_index_pattterns.any? { |v| values_match? expected, v }
85+
end
86+
failure_message do |actual|
87+
"expected that #{actual} with index patterns #{@actual_index_pattterns} would have included `#{expected}`"
8388
end
8489
end
8590

spec/integration/outputs/templates_spec.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
elasticsearch_client.indices.delete_template(:name => '*')
2525
# This can fail if there are no indexes, ignore failure.
26-
elasticsearch_client.indices.delete(:index => '*') rescue nil
26+
elasticsearch_client.indices.delete(:index => '*') rescue puts("DELETE INDICES ERROR: #{$!}")
27+
# Since we are pinned to ES client 7.x, we need to delete data streams the hard way...
28+
elasticsearch_client.perform_request("DELETE", "/_data_stream/*") rescue puts("DELETE DATA STREAMS ERROR: #{$!}")
2729
end
2830

2931
context 'with ecs_compatibility => disabled' do

spec/unit/outputs/elasticsearch_spec.rb

+33-9
Original file line numberDiff line numberDiff line change
@@ -1121,41 +1121,63 @@
11211121
end if LOGSTASH_VERSION > '6.0'
11221122

11231123
context 'handling elasticsearch document-level status meant for the DLQ' do
1124+
let(:es_api_action) { "CUSTOM_ACTION" }
1125+
let(:es_api_params) { Hash['_index' => 'MY_INDEX'] }
1126+
11241127
let(:options) { { "manage_template" => false, "data_stream" => 'false' } }
1125-
let(:action) { LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, LogStash::Event.new("foo" => "bar")) }
1128+
let(:action) { LogStash::Outputs::ElasticSearch::EventActionTuple.new(es_api_action, es_api_params, LogStash::Event.new("foo" => "bar")) }
1129+
1130+
let(:logger) { double('logger').as_null_object }
1131+
before(:each) { subject.instance_variable_set(:@logger, logger) }
11261132

11271133
context 'when @dlq_writer is nil' do
11281134
before { subject.instance_variable_set '@dlq_writer', nil }
1129-
let(:action) { LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, LogStash::Event.new("foo" => "bar")) }
11301135

11311136
context 'resorting to previous behaviour of logging the error' do
11321137
context 'getting an invalid_index_name_exception' do
11331138
it 'should log at ERROR level' do
1134-
subject.instance_variable_set(:@logger, double("logger").as_null_object)
1139+
# logger = double("logger").as_null_object
1140+
# subject.instance_variable_set(:@logger, logger)
1141+
11351142
mock_response = { 'index' => { 'error' => { 'type' => 'invalid_index_name_exception' } } }
11361143
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
1144+
1145+
expect(logger).to have_received(:error).with(a_string_including("Could not index event to Elasticsearch"),
1146+
a_hash_including(:status => :some_status,
1147+
:action => [es_api_action, es_api_params, action.event.to_hash],
1148+
:response => mock_response))
11371149
end
11381150
end
11391151

11401152
context 'when getting any other exception' do
11411153
it 'should log at WARN level' do
1142-
logger = double("logger").as_null_object
1143-
subject.instance_variable_set(:@logger, logger)
1144-
expect(logger).to receive(:warn).with(a_string_including "Could not index event to Elasticsearch. status: some_status, action: [:action, :params, {")
1154+
# logger = double("logger").as_null_object
1155+
# subject.instance_variable_set(:@logger, logger)
1156+
11451157
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
11461158
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
1159+
1160+
expect(logger).to have_received(:warn).with(a_string_including("Could not index event to Elasticsearch"),
1161+
a_hash_including(:status => :some_status,
1162+
:action => [es_api_action, es_api_params, action.event.to_hash],
1163+
:response => mock_response))
11471164
end
11481165
end
11491166

11501167
context 'when the response does not include [error]' do
11511168
it 'should not fail, but just log a warning' do
1152-
logger = double("logger").as_null_object
1153-
subject.instance_variable_set(:@logger, logger)
1154-
expect(logger).to receive(:warn).with(a_string_including "Could not index event to Elasticsearch. status: some_status, action: [:action, :params, {")
1169+
# logger = double("logger").as_null_object
1170+
# subject.instance_variable_set(:@logger, logger)
1171+
11551172
mock_response = { 'index' => {} }
11561173
expect do
11571174
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
11581175
end.to_not raise_error
1176+
1177+
expect(logger).to have_received(:warn).with(a_string_including("Could not index event to Elasticsearch"),
1178+
a_hash_including(:status => :some_status,
1179+
:action => [es_api_action, es_api_params, action.event.to_hash],
1180+
:response => mock_response))
11591181
end
11601182
end
11611183
end
@@ -1175,6 +1197,8 @@
11751197
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
11761198
action = LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, event)
11771199
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, 404, mock_response)
1200+
1201+
expect(logger).to_not have_received(:warn).with(a_string_including("Could not index event to Elasticsearch"))
11781202
end
11791203
end
11801204

0 commit comments

Comments
 (0)