Skip to content

Commit 803821a

Browse files
committed
out_mongo: Support dynamic database name by built-in placeholders
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
1 parent 978c159 commit 803821a

File tree

2 files changed

+39
-8
lines changed

2 files changed

+39
-8
lines changed

lib/fluent/plugin/out_mongo.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,17 @@ def multi_workers_ready?
173173

174174
def write(chunk)
175175
collection_name = extract_placeholders(@collection, chunk.metadata)
176-
operate(format_collection_name(collection_name), collect_records(chunk))
176+
database_name = extract_placeholders(@database, chunk.metadata)
177+
operate(database_name, format_collection_name(collection_name), collect_records(chunk))
177178
end
178179

179180
private
180181

181-
def client
182+
def client(database = @database)
182183
if @connection_string
183184
Mongo::Client.new(@connection_string)
184185
else
185-
@client_options[:database] = @database
186+
@client_options[:database] = database
186187
@client_options[:user] = @user if @user
187188
@client_options[:password] = @password if @password
188189
Mongo::Client.new(@nodes, @client_options)
@@ -227,7 +228,8 @@ def collection_exists?(name)
227228
end
228229
end
229230

230-
def get_collection(name, options)
231+
def get_collection(database, name, options)
232+
@client = client(database) if database && @database != database
231233
return @client[name] if @collections[name]
232234

233235
unless collection_exists?(name)
@@ -242,7 +244,7 @@ def forget_collection(name)
242244
@collections.delete(name)
243245
end
244246

245-
def operate(collection, records)
247+
def operate(database, collection, records)
246248
begin
247249
if @replace_dot_in_key_with
248250
records.map! do |r|
@@ -255,7 +257,7 @@ def operate(collection, records)
255257
end
256258
end
257259

258-
get_collection(collection, @collection_options).insert_many(records)
260+
get_collection(database, collection, @collection_options).insert_many(records)
259261
rescue Mongo::Error::BulkWriteError => e
260262
log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
261263
forget_collection(collection)

test/plugin/test_out_mongo.rb

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ def default_config
3636
]
3737
end
3838

39-
def setup_mongod
39+
def setup_mongod(database = database_name)
4040
options = {}
41-
options[:database] = database_name
41+
options[:database] = database
4242
@client = ::Mongo::Client.new(["localhost:#{port}"], options)
4343
end
4444

@@ -211,6 +211,35 @@ def test_write_with_collection_placeholder
211211
end
212212
end
213213

214+
class WriteWithDatabasePlaceholder < self
215+
def setup
216+
@tag = 'custom'
217+
setup_mongod(@tag)
218+
end
219+
220+
def teardown
221+
teardown_mongod
222+
end
223+
224+
def test_write_with_database_placeholder
225+
d = create_driver(%[
226+
@type mongo
227+
database ${tag}
228+
collection #{collection_name}
229+
include_time_key true
230+
])
231+
d.run(default_tag: @tag) do
232+
emit_documents(d)
233+
end
234+
235+
actual_documents = get_documents
236+
time = event_time("2011-01-02 13:14:15 UTC")
237+
expected = [{'a' => 1, d.instance.inject_config.time_key => Time.at(time).localtime},
238+
{'a' => 2, d.instance.inject_config.time_key => Time.at(time).localtime}]
239+
assert_equal(expected, actual_documents)
240+
end
241+
end
242+
214243
def test_write_at_enable_tag
215244
d = create_driver(default_config + %[
216245
include_tag_key true

0 commit comments

Comments
 (0)