Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nx/lib/nx/serving.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ defmodule Nx.Serving do
end

defp distributed_batched_run_with_retries!(name, input, retries) do
case :pg.get_members(Nx.Serving.PG, __MODULE__) do
case :pg.get_members(Nx.Serving.PG, name) do
[] ->
exit({:noproc, {__MODULE__, :distributed_batched_run, [name, input, [retries: retries]]}})

Expand Down Expand Up @@ -1332,7 +1332,7 @@ defmodule Nx.Serving do
)

serving_weight = max(1, weight * partitions_count)
:pg.join(Nx.Serving.PG, __MODULE__, List.duplicate(self(), serving_weight))
:pg.join(Nx.Serving.PG, name, List.duplicate(self(), serving_weight))

for batch_key <- batch_keys do
stack_init(batch_key)
Expand Down
17 changes: 11 additions & 6 deletions nx/test/nx/serving_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,8 @@ defmodule Nx.ServingTest do
]

Node.spawn_link(:"secondary@127.0.0.1", DistributedServings, :multiply, [parent, opts])
assert_receive {_, :join, Nx.Serving, _}
assert_receive {_, :join, name, _}
assert name == config.test

batch = Nx.Batch.concatenate([Nx.tensor([1, 2])])

Expand Down Expand Up @@ -1327,14 +1328,16 @@ defmodule Nx.ServingTest do
opts2 = Keyword.put(opts, :distribution_weight, 4)

Node.spawn_link(:"secondary@127.0.0.1", DistributedServings, :multiply, [parent, opts])
assert_receive {_, :join, Nx.Serving, pids}
assert_receive {_, :join, name, pids}
assert length(pids) == 1
assert name == config.test

Node.spawn_link(:"tertiary@127.0.0.1", DistributedServings, :multiply, [parent, opts2])
assert_receive {_, :join, Nx.Serving, pids}
assert_receive {_, :join, name, pids}
assert length(pids) == 4
assert name == config.test

members = :pg.get_members(Nx.Serving.PG, Nx.Serving)
members = :pg.get_members(Nx.Serving.PG, config.test)
assert length(members) == 5
end

Expand All @@ -1356,7 +1359,8 @@ defmodule Nx.ServingTest do

args = [parent, opts]
Node.spawn_link(:"secondary@127.0.0.1", DistributedServings, :add_five_round_about, args)
assert_receive {_, :join, Nx.Serving, _}
assert_receive {_, :join, name, _}
assert name == config.test

batch = Nx.Batch.concatenate([Nx.tensor([1, 2])])

Expand Down Expand Up @@ -1412,7 +1416,8 @@ defmodule Nx.ServingTest do
]

Node.spawn_link(:"tertiary@127.0.0.1", DistributedServings, :multiply, [parent, opts])
assert_receive {_, :join, Nx.Serving, _}
assert_receive {_, :join, name, _}
assert name == config.test

batch = Nx.Batch.concatenate([Nx.tensor([1, 2])])

Expand Down
Loading