Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 33 additions & 92 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"log"
"net"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -19,7 +17,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -42,57 +40,7 @@ func shutdown(nodes []Node) {
}
}

type portsAdress struct {
grpc int
raft int
redis int
dynamo int
grpcAddress string
raftAddress string
redisAddress string
dynamoAddress string
}

const (
// raft and the grpc requested by the client use grpc and are received on the same port
grpcPort = 50000
raftPort = 50000

redisPort = 63790
dynamoPort = 28000
)

var mu sync.Mutex
var portGrpc atomic.Int32
var portRaft atomic.Int32
var portRedis atomic.Int32
var portDynamo atomic.Int32

func init() {
portGrpc.Store(raftPort)
portRaft.Store(grpcPort)
portRedis.Store(redisPort)
portDynamo.Store(dynamoPort)
}

func portAssigner() portsAdress {
mu.Lock()
defer mu.Unlock()
gp := portGrpc.Add(1)
rp := portRaft.Add(1)
rd := portRedis.Add(1)
dn := portDynamo.Add(1)
return portsAdress{
grpc: int(gp),
raft: int(rp),
redis: int(rd),
dynamo: int(dn),
grpcAddress: net.JoinHostPort("localhost", strconv.Itoa(int(gp))),
raftAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rp))),
redisAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rd))),
dynamoAddress: net.JoinHostPort("localhost", strconv.Itoa(int(dn))),
}
}
// Node groups the servers and addresses used in tests.

type Node struct {
grpcAddress string
Expand Down Expand Up @@ -131,43 +79,38 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
waitInterval = 100 * time.Millisecond
)

cfg := raft.Configuration{}
ports := make([]portsAdress, n)

ctx := context.Background()
var lc net.ListenConfig

// port assign
// allocate listeners for gRPC/raft in advance so ports are reserved
grpcListeners := make([]net.Listener, n)
cfg := raft.Configuration{}
for i := 0; i < n; i++ {
ports[i] = portAssigner()
}
l, err := net.Listen("tcp", "127.0.0.1:0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)

require.NoError(t, err)
grpcListeners[i] = l
addr := l.Addr().String()
grpcAdders = append(grpcAdders, addr)

// build raft node config
for i := 0; i < n; i++ {
var suffrage raft.ServerSuffrage
if i == 0 {
suffrage = raft.Voter
} else {
suffrage = raft.Nonvoter
}

server := raft.Server{
cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: suffrage,
ID: raft.ServerID(strconv.Itoa(i)),
Address: raft.ServerAddress(ports[i].raftAddress),
}
cfg.Servers = append(cfg.Servers, server)
Address: raft.ServerAddress(addr),
})
}

for i := 0; i < n; i++ {
st := store.NewRbMemoryStore()
trxSt := store.NewMemoryStoreDefaultTTL()
fsm := kv.NewKvFSM(st, trxSt)

port := ports[i]

r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg)
assert.NoError(t, err)
r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg)
require.NoError(t, err)

s := grpc.NewServer()
trx := kv.NewTransaction(r)
Expand All @@ -181,34 +124,32 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)

grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
assert.NoError(t, err)

grpcAdders = append(grpcAdders, port.grpcAddress)
redisAdders = append(redisAdders, port.redisAddress)
go func() {
assert.NoError(t, s.Serve(grpcSock))
}()
go func(l net.Listener) {
require.NoError(t, s.Serve(l))
}(grpcListeners[i])

l, err := lc.Listen(ctx, "tcp", port.redisAddress)
assert.NoError(t, err)
l, err := net.Listen("tcp", "127.0.0.1:0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)

require.NoError(t, err)
redisAddr := l.Addr().String()
redisAdders = append(redisAdders, redisAddr)
rd := NewRedisServer(l, st, coordinator)
go func() {
assert.NoError(t, rd.Run())
require.NoError(t, rd.Run())
}()

dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
assert.NoError(t, err)
dl, err := net.Listen("tcp", "127.0.0.1:0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)

require.NoError(t, err)
dynamoAddr := dl.Addr().String()
ds := NewDynamoDBServer(dl, st, coordinator)
go func() {
assert.NoError(t, ds.Run())
require.NoError(t, ds.Run())
}()

nodes = append(nodes, newNode(
port.grpcAddress,
port.raftAddress,
port.redisAddress,
port.dynamoAddress,
grpcAdders[i],
grpcAdders[i],
redisAddr,
dynamoAddr,
r,
tm,
s,
Expand All @@ -219,7 +160,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {

d := &net.Dialer{Timeout: time.Second}
for _, n := range nodes {
assert.Eventually(t, func() bool {
require.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
Expand All @@ -239,7 +180,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
}, waitTimeout, waitInterval)
}

assert.Eventually(t, func() bool {
require.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)

Expand Down
11 changes: 8 additions & 3 deletions kv/shard_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,25 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re
}

var max uint64
var errs error
for gid, rs := range grouped {
g, ok := s.getGroup(gid)
if !ok {
return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid)
err := errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid)
errs = errors.CombineErrors(errs, err)
continue
}
r, err := fn(g, rs)
if err != nil {
return nil, errors.WithStack(err)
errs = errors.CombineErrors(errs, errors.WithStack(err))
continue
}
Comment on lines 71 to 74

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When an error occurs during the fn call, the response r is nil. The current logic correctly continues to the next iteration. However, if fn were to return a non-nil response along with an error, its CommitIndex would be ignored. While the current implementations of Transactional.Commit seem to return a nil response on error, it might be safer to handle the response even when an error is returned, just in case future implementations behave differently.

if r.CommitIndex > max {
max = r.CommitIndex
}
}
return &TransactionResponse{CommitIndex: max}, nil
resp := &TransactionResponse{CommitIndex: max}
return resp, errs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
error returned from external package is unwrapped: sig: func github.com/cockroachdb/errors.CombineErrors(err error, otherErr error) error (wrapcheck)

}

func (s *ShardRouter) getGroup(id uint64) (*routerGroup, bool) {
Expand Down
30 changes: 30 additions & 0 deletions kv/shard_router_test.go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
calculated cyclomatic complexity for function newTestRaft is 14, max is 10 (cyclop)

func newTestRaft(t *testing.T, id string, fsm raft.FSM) (*raft.Raft, func()) {

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kv
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -253,3 +254,32 @@ func TestShardRouterCommitFailure(t *testing.T) {
t.Fatalf("unexpected abort on successful group")
}
}

func TestShardRouterCommitMultipleFailures(t *testing.T) {
e := distribution.NewEngine()
e.UpdateRoute([]byte("a"), []byte("m"), 1)
e.UpdateRoute([]byte("m"), nil, 2)

router := NewShardRouter(e)

fail1 := &fakeTM{commitErr: true}
fail2 := &fakeTM{commitErr: true}
router.Register(1, fail1, nil)
router.Register(2, fail2, nil)

reqs := []*pb.Request{
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
}

_, err := router.Commit(reqs)
if err == nil {
t.Fatalf("expected error")
}
if c := strings.Count(fmt.Sprintf("%+v", err), "commit fail"); c < 2 {
t.Fatalf("expected combined errors, got %d: %+v", c, err)
}
Comment on lines +279 to +281

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check is a bit brittle because it relies on the verbose string formatting of the error (%+v), which includes stack traces. Using err.Error() is more robust as it typically only contains the error messages, making the assertion less likely to break due to changes in stack trace formatting. Additionally, checking for exactly 2 errors (c != 2) would make the test more precise about its expectations.

Suggested change
if c := strings.Count(fmt.Sprintf("%+v", err), "commit fail"); c < 2 {
t.Fatalf("expected combined errors, got %d: %+v", c, err)
}
if c := strings.Count(err.Error(), "commit fail"); c != 2 {
t.Fatalf("expected 2 combined errors, got %d: %v", c, err)
}

if fail1.commitCalls == 0 || fail2.commitCalls == 0 {
t.Fatalf("expected commits on both groups")
}
}
Loading