diff --git a/adapter/test_util.go b/adapter/test_util.go index 95ea22c..3164b5d 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -5,8 +5,6 @@ import ( "log" "net" "strconv" - "sync" - "sync/atomic" "testing" "time" @@ -19,7 +17,7 @@ import ( "github.com/cockroachdb/errors" "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -42,57 +40,7 @@ func shutdown(nodes []Node) { } } -type portsAdress struct { - grpc int - raft int - redis int - dynamo int - grpcAddress string - raftAddress string - redisAddress string - dynamoAddress string -} - -const ( - // raft and the grpc requested by the client use grpc and are received on the same port - grpcPort = 50000 - raftPort = 50000 - - redisPort = 63790 - dynamoPort = 28000 -) - -var mu sync.Mutex -var portGrpc atomic.Int32 -var portRaft atomic.Int32 -var portRedis atomic.Int32 -var portDynamo atomic.Int32 - -func init() { - portGrpc.Store(raftPort) - portRaft.Store(grpcPort) - portRedis.Store(redisPort) - portDynamo.Store(dynamoPort) -} - -func portAssigner() portsAdress { - mu.Lock() - defer mu.Unlock() - gp := portGrpc.Add(1) - rp := portRaft.Add(1) - rd := portRedis.Add(1) - dn := portDynamo.Add(1) - return portsAdress{ - grpc: int(gp), - raft: int(rp), - redis: int(rd), - dynamo: int(dn), - grpcAddress: net.JoinHostPort("localhost", strconv.Itoa(int(gp))), - raftAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rp))), - redisAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rd))), - dynamoAddress: net.JoinHostPort("localhost", strconv.Itoa(int(dn))), - } -} +// Node groups the servers and addresses used in tests. type Node struct { grpcAddress string @@ -131,32 +79,29 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { waitInterval = 100 * time.Millisecond ) - cfg := raft.Configuration{} - ports := make([]portsAdress, n) - ctx := context.Background() - var lc net.ListenConfig - // port assign + // allocate listeners for gRPC/raft in advance so ports are reserved + grpcListeners := make([]net.Listener, n) + cfg := raft.Configuration{} for i := 0; i < n; i++ { - ports[i] = portAssigner() - } + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + grpcListeners[i] = l + addr := l.Addr().String() + grpcAdders = append(grpcAdders, addr) - // build raft node config - for i := 0; i < n; i++ { var suffrage raft.ServerSuffrage if i == 0 { suffrage = raft.Voter } else { suffrage = raft.Nonvoter } - - server := raft.Server{ + cfg.Servers = append(cfg.Servers, raft.Server{ Suffrage: suffrage, ID: raft.ServerID(strconv.Itoa(i)), - Address: raft.ServerAddress(ports[i].raftAddress), - } - cfg.Servers = append(cfg.Servers, server) + Address: raft.ServerAddress(addr), + }) } for i := 0; i < n; i++ { @@ -164,10 +109,8 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { trxSt := store.NewMemoryStoreDefaultTTL() fsm := kv.NewKvFSM(st, trxSt) - port := ports[i] - - r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg) - assert.NoError(t, err) + r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg) + require.NoError(t, err) s := grpc.NewServer() trx := kv.NewTransaction(r) @@ -181,34 +124,32 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { leaderhealth.Setup(r, s, []string{"Example"}) raftadmin.Register(s, r) - grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress) - assert.NoError(t, err) - - grpcAdders = append(grpcAdders, port.grpcAddress) - redisAdders = append(redisAdders, port.redisAddress) - go func() { - assert.NoError(t, s.Serve(grpcSock)) - }() + go func(l net.Listener) { + require.NoError(t, s.Serve(l)) + }(grpcListeners[i]) - l, err := lc.Listen(ctx, "tcp", port.redisAddress) - assert.NoError(t, err) + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + redisAddr := l.Addr().String() + redisAdders = append(redisAdders, redisAddr) rd := NewRedisServer(l, st, coordinator) go func() { - assert.NoError(t, rd.Run()) + require.NoError(t, rd.Run()) }() - dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress) - assert.NoError(t, err) + dl, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + dynamoAddr := dl.Addr().String() ds := NewDynamoDBServer(dl, st, coordinator) go func() { - assert.NoError(t, ds.Run()) + require.NoError(t, ds.Run()) }() nodes = append(nodes, newNode( - port.grpcAddress, - port.raftAddress, - port.redisAddress, - port.dynamoAddress, + grpcAdders[i], + grpcAdders[i], + redisAddr, + dynamoAddr, r, tm, s, @@ -219,7 +160,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { d := &net.Dialer{Timeout: time.Second} for _, n := range nodes { - assert.Eventually(t, func() bool { + require.Eventually(t, func() bool { conn, err := d.DialContext(ctx, "tcp", n.grpcAddress) if err != nil { return false @@ -239,7 +180,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { }, waitTimeout, waitInterval) } - assert.Eventually(t, func() bool { + require.Eventually(t, func() bool { return nodes[0].raft.State() == raft.Leader }, waitTimeout, waitInterval) diff --git a/kv/shard_router.go b/kv/shard_router.go index 556860d..f8c6d66 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -59,20 +59,25 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re } var max uint64 + var errs error for gid, rs := range grouped { g, ok := s.getGroup(gid) if !ok { - return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) + err := errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) + errs = errors.CombineErrors(errs, err) + continue } r, err := fn(g, rs) if err != nil { - return nil, errors.WithStack(err) + errs = errors.CombineErrors(errs, errors.WithStack(err)) + continue } if r.CommitIndex > max { max = r.CommitIndex } } - return &TransactionResponse{CommitIndex: max}, nil + resp := &TransactionResponse{CommitIndex: max} + return resp, errs } func (s *ShardRouter) getGroup(id uint64) (*routerGroup, bool) { diff --git a/kv/shard_router_test.go b/kv/shard_router_test.go index 3f87850..7512b9b 100644 --- a/kv/shard_router_test.go +++ b/kv/shard_router_test.go @@ -3,6 +3,7 @@ package kv import ( "context" "fmt" + "strings" "testing" "time" @@ -253,3 +254,32 @@ func TestShardRouterCommitFailure(t *testing.T) { t.Fatalf("unexpected abort on successful group") } } + +func TestShardRouterCommitMultipleFailures(t *testing.T) { + e := distribution.NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.UpdateRoute([]byte("m"), nil, 2) + + router := NewShardRouter(e) + + fail1 := &fakeTM{commitErr: true} + fail2 := &fakeTM{commitErr: true} + router.Register(1, fail1, nil) + router.Register(2, fail2, nil) + + reqs := []*pb.Request{ + {IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}}, + {IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}}, + } + + _, err := router.Commit(reqs) + if err == nil { + t.Fatalf("expected error") + } + if c := strings.Count(fmt.Sprintf("%+v", err), "commit fail"); c < 2 { + t.Fatalf("expected combined errors, got %d: %+v", c, err) + } + if fail1.commitCalls == 0 || fail2.commitCalls == 0 { + t.Fatalf("expected commits on both groups") + } +}