Skip to content

Commit 65cb45a

Browse files
authored
Merge pull request #105 from fluent/dynamic-database-name
out_mongo: Support dynamic database name by built-in placeholders
2 parents 978c159 + 58b66f4 commit 65cb45a

File tree

3 files changed

+63
-8
lines changed

3 files changed

+63
-8
lines changed

README.rdoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,30 @@ Use _mongo_ type in match.
5858

5959
For _connection_string_ parameter, see https://docs.mongodb.com/manual/reference/connection-string/ article for more detail.
6060

61+
===== built-in placeholders
62+
63+
fluent-plugin-mongo support built-in placeholders.
64+
_database_ and _collection_ parameters can handle them.
65+
66+
Here is an example to use built-in placeholders:
67+
68+
<match mongo.**>
69+
@type mongo
70+
71+
database ${tag[0]}
72+
73+
# collection name to insert
74+
collection ${tag[1]}-%Y%m%d
75+
76+
# Other buffer configurations here
77+
<buffer tag, time>
78+
@type memory
79+
timekey 3600
80+
</buffer>
81+
</match>
82+
83+
In more detail, please refer to the officilal document for built-in placeholders: https://docs.fluentd.org/v1.0/articles/buffer-section#placeholders
84+
6185
=== mongo(tag mapped mode)
6286

6387
Tag mapped to MongoDB collection automatically.

lib/fluent/plugin/out_mongo.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,17 @@ def multi_workers_ready?
173173

174174
def write(chunk)
175175
collection_name = extract_placeholders(@collection, chunk.metadata)
176-
operate(format_collection_name(collection_name), collect_records(chunk))
176+
database_name = extract_placeholders(@database, chunk.metadata)
177+
operate(database_name, format_collection_name(collection_name), collect_records(chunk))
177178
end
178179

179180
private
180181

181-
def client
182+
def client(database = @database)
182183
if @connection_string
183184
Mongo::Client.new(@connection_string)
184185
else
185-
@client_options[:database] = @database
186+
@client_options[:database] = database
186187
@client_options[:user] = @user if @user
187188
@client_options[:password] = @password if @password
188189
Mongo::Client.new(@nodes, @client_options)
@@ -227,7 +228,8 @@ def collection_exists?(name)
227228
end
228229
end
229230

230-
def get_collection(name, options)
231+
def get_collection(database, name, options)
232+
@client = client(database) if database && @database != database
231233
return @client[name] if @collections[name]
232234

233235
unless collection_exists?(name)
@@ -242,7 +244,7 @@ def forget_collection(name)
242244
@collections.delete(name)
243245
end
244246

245-
def operate(collection, records)
247+
def operate(database, collection, records)
246248
begin
247249
if @replace_dot_in_key_with
248250
records.map! do |r|
@@ -255,7 +257,7 @@ def operate(collection, records)
255257
end
256258
end
257259

258-
get_collection(collection, @collection_options).insert_many(records)
260+
get_collection(database, collection, @collection_options).insert_many(records)
259261
rescue Mongo::Error::BulkWriteError => e
260262
log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
261263
forget_collection(collection)

test/plugin/test_out_mongo.rb

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ def default_config
3636
]
3737
end
3838

39-
def setup_mongod
39+
def setup_mongod(database = database_name)
4040
options = {}
41-
options[:database] = database_name
41+
options[:database] = database
4242
@client = ::Mongo::Client.new(["localhost:#{port}"], options)
4343
end
4444

@@ -211,6 +211,35 @@ def test_write_with_collection_placeholder
211211
end
212212
end
213213

214+
class WriteWithDatabasePlaceholder < self
215+
def setup
216+
@tag = 'custom'
217+
setup_mongod(@tag)
218+
end
219+
220+
def teardown
221+
teardown_mongod
222+
end
223+
224+
def test_write_with_database_placeholder
225+
d = create_driver(%[
226+
@type mongo
227+
database ${tag}
228+
collection #{collection_name}
229+
include_time_key true
230+
])
231+
d.run(default_tag: @tag) do
232+
emit_documents(d)
233+
end
234+
235+
actual_documents = get_documents
236+
time = event_time("2011-01-02 13:14:15 UTC")
237+
expected = [{'a' => 1, d.instance.inject_config.time_key => Time.at(time).localtime},
238+
{'a' => 2, d.instance.inject_config.time_key => Time.at(time).localtime}]
239+
assert_equal(expected, actual_documents)
240+
end
241+
end
242+
214243
def test_write_at_enable_tag
215244
d = create_driver(default_config + %[
216245
include_tag_key true

0 commit comments

Comments
 (0)