Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2059,39 +2059,46 @@ defmodule AshPostgres.DataLayer do
{Map.take(r, keys), r}
end)

ash_query =
resource
|> Ash.Query.do_filter(
or:
changesets
|> Enum.filter(fn changeset ->
not Map.has_key?(
results_by_identity,
Map.take(changeset.attributes, keys)
)
end)
|> Enum.map(fn changeset ->
changeset.attributes
|> Map.take(keys)
|> Keyword.new()
end)
)
|> then(fn
query when is_nil(identity) or is_nil(identity.where) -> query
query -> Ash.Query.do_filter(query, identity.where)
skipped_filter =
changesets
|> Enum.filter(fn changeset ->
not Map.has_key?(
results_by_identity,
Map.take(changeset.attributes, keys)
)
end)
|> Enum.map(fn changeset ->
changeset.attributes
|> Map.take(keys)
|> Keyword.new()
end)
|> Ash.Query.set_tenant(changeset.tenant)

skipped_upserts =
with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
{:ok, results} <- run_query(ecto_query, resource) do
results
|> Enum.map(fn result ->
Ash.Resource.put_metadata(result, :upsert_skipped, true)
end)
|> Enum.reduce(%{}, fn r, acc ->
Map.put(acc, Map.take(r, keys), r)
end)
case skipped_filter do
[] ->
# No skipped records to query for
%{}

skipped_filter ->
ash_query =
resource
|> Ash.Query.do_filter(or: skipped_filter)
|> then(fn
query when is_nil(identity) or is_nil(identity.where) -> query
query -> Ash.Query.do_filter(query, identity.where)
end)
|> Ash.Query.set_tenant(changeset.tenant)

with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
{:ok, results} <- run_query(ecto_query, resource) do
results
|> Enum.map(fn result ->
Ash.Resource.put_metadata(result, :upsert_skipped, true)
end)
|> Enum.reduce(%{}, fn r, acc ->
Map.put(acc, Map.take(r, keys), r)
end)
end
end

results =
Expand Down
28 changes: 28 additions & 0 deletions test/bulk_create_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,34 @@ defmodule AshPostgres.BulkCreateTest do
refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped)
end

test "bulk upsert with return_skipped_upsert? when all records are not skipped" do
%Ash.BulkResult{records: records, errors: errors} =
Ash.bulk_create!(
[
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 100},
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 200}
],
Post,
:upsert_with_no_filter,
upsert_condition: expr(price != upsert_conflict(:price)),
return_errors?: true,
return_records?: true,
return_skipped_upsert?: true
)

assert errors == []

assert [row1, row2] = records

assert row1.title == "fredfoo"
assert row1.price == 100
refute Ash.Resource.get_metadata(row1, :upsert_skipped)

assert row2.title == "georgefoo"
assert row2.price == 200
refute Ash.Resource.get_metadata(row2, :upsert_skipped)
end

# confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert.
# leaving this test here for posterity
# test "bulk creates can upsert with id" do
Expand Down
Loading