-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathavro.rb
134 lines (120 loc) · 3.96 KB
/
avro.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# encoding: utf-8
require "open-uri"
require "avro"
require "base64"
require "logstash/codecs/base"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
# Read serialized Avro records as Logstash events
#
# This plugin is used to serialize Logstash events as
# Avro datums, as well as deserializing Avro datums into
# Logstash events.
#
# ==== Encoding
#
# This codec is for serializing individual Logstash events
# as Avro datums that are Avro binary blobs. It does not encode
# Logstash events into an Avro file.
#
#
# ==== Decoding
#
# This codec is for deserializing individual Avro records. It is not for reading
# Avro files. Avro files have a unique format that must be handled upon input.
#
#
# ==== Usage
# Example usage with Kafka input.
#
# [source,ruby]
# ----------------------------------
# input {
# kafka {
# codec => avro {
# schema_uri => "/tmp/schema.avsc"
# }
# }
# }
# filter {
# ...
# }
# output {
# ...
# }
# ----------------------------------
class LogStash::Codecs::Avro < LogStash::Codecs::Base
config_name "avro"
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
BINARY_ENCODING = "binary".freeze
BASE64_ENCODING = "base64".freeze
# Set encoding for Avro's payload.
# Use `base64` (default) encoding to convert the raw binary bytes to a `base64` encoded string.
# Set this option to `binary` to use the plain binary bytes.
config :encoding, :validate => [BINARY_ENCODING, BASE64_ENCODING], :default => BASE64_ENCODING
# schema path to fetch the schema from.
# This can be a 'http' or 'file' scheme URI
# example:
#
# * http - `http://example.com/schema.avsc`
# * file - `/path/to/schema.avsc`
config :schema_uri, :validate => :string, :required => true
# tag events with `_avroparsefailure` when decode fails
config :tag_on_failure, :validate => :boolean, :default => false
# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
#
# NOTE: the target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference
def open_and_read(uri_string)
URI.open(uri_string, &:read)
end
public
def initialize(*params)
super
@original_field = ecs_select[disabled: nil, v1: '[event][original]']
end
def register
@schema = Avro::Schema.parse(open_and_read(schema_uri))
end
public
def decode(data)
if encoding == BASE64_ENCODING
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
else
datum = StringIO.new(data)
end
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(@schema)
event = targeted_event_factory.new_event(datum_reader.read(decoder))
event.set(@original_field, data.dup.freeze) if @original_field
yield event
rescue => e
if tag_on_failure
@logger.error("Avro parse error, original data now in message field", :error => e)
yield event_factory.new_event("message" => data, "tags" => ["_avroparsefailure"])
else
raise e
end
end
public
def encode(event)
dw = Avro::IO::DatumWriter.new(@schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
if encoding == BASE64_ENCODING
@on_event.call(event, Base64.strict_encode64(buffer.string))
else
@on_event.call(event, buffer.string)
end
end
end