Skip to content

Commit aa34423

Browse files
author
Tim Middleton
committed
Updates to resolver
1 parent 798d483 commit aa34423

File tree

7 files changed

+63
-19
lines changed

7 files changed

+63
-19
lines changed

coherence/common.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const (
4444
// envResolverDebug enables resolver debug messages to be displayed.
4545
envResolverDebug = "COHERENCE_RESOLVER_DEBUG"
4646

47+
// envResolverDebug sets the number of retries when the resolver fails.
48+
envResolverRetries = "COHERENCE_RESOLVER_RETRIES"
49+
4750
// envResolverDebug enables randomization of addresses returned by resolver
4851
envResolverRandomize = "COHERENCE_RESOLVER_RANDOMIZE"
4952

coherence/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1219,7 +1219,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
12191219
bc.filterListenersV1 = make(map[filters.Filter]*listenerGroupV1[K, V], 0)
12201220
bc.filterIDToGroupV1 = make(map[int64]*listenerGroupV1[K, V], 0)
12211221

1222-
// re-ensure all the caches as the connected has gone and so has the gRPC Proxy
1222+
// re-ensure all the caches as the connection has gone and so has the gRPC Proxy
12231223
for _, c := range cacheNames {
12241224
cacheID, err3 := bc.session.v1StreamManagerCache.ensureCache(context.Background(), c)
12251225
if err3 != nil {

coherence/resolver.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import (
1212
"github.com/oracle/coherence-go-client/v2/coherence/discovery"
1313
"google.golang.org/grpc/resolver"
1414
"math/rand"
15+
"strconv"
1516
"strings"
1617
"sync"
1718
"time"
1819
)
1920

2021
const (
21-
nsLookupScheme = "coherence"
22+
nsLookupScheme = "coherence"
23+
defaultRetries = 20
24+
defaultResolverDelay = 1000 // ms
2225
)
2326

2427
var (
@@ -42,16 +45,27 @@ func (b *nsLookupResolverBuilder) Build(target resolver.Target, cc resolver.Clie
4245
}
4346
checkResolverDebug()
4447

48+
// set the number of resolver retried
49+
retries := getStringValueFromEnvVarOrDefault(envResolverRetries, "20")
50+
retriesValue, err := strconv.Atoi(retries)
51+
if err != nil {
52+
retriesValue = defaultRetries
53+
}
54+
55+
resolverDebug("resolver retries=%v", retriesValue)
56+
r.resolverRetries = retriesValue
57+
4558
r.start()
4659
return r, nil
4760
}
4861
func (*nsLookupResolverBuilder) Scheme() string { return nsLookupScheme }
4962

5063
type nsLookupResolver struct {
51-
target resolver.Target
52-
cc resolver.ClientConn
53-
mutex sync.Mutex
54-
addrStore map[string][]string
64+
target resolver.Target
65+
cc resolver.ClientConn
66+
mutex sync.Mutex
67+
addrStore map[string][]string
68+
resolverRetries int
5569
}
5670

5771
func (r *nsLookupResolver) resolve() {
@@ -60,10 +74,10 @@ func (r *nsLookupResolver) resolve() {
6074
defer r.mutex.Unlock()
6175

6276
if len(grpcEndpoints) == 0 {
63-
// try 8 times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
64-
for i := 0; i < 8; i++ {
65-
resolverDebug("retrying NSLookup attempt", i)
66-
time.Sleep(time.Duration(250) * time.Millisecond)
77+
// try r.resolverRetries; times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
78+
for i := 1; i <= r.resolverRetries; i++ {
79+
resolverDebug("retrying NSLookup attempt: %v", i)
80+
time.Sleep(time.Duration(defaultResolverDelay) * time.Millisecond)
6781
grpcEndpoints = generateNSAddresses(r.target.Endpoint())
6882
if len(grpcEndpoints) != 0 {
6983
break

coherence/session.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"google.golang.org/grpc/credentials"
2020
"google.golang.org/grpc/credentials/insecure"
2121
"google.golang.org/grpc/resolver"
22+
"google.golang.org/grpc/status"
2223
"log"
2324
"os"
2425
"reflect"
@@ -510,7 +511,14 @@ func (s *Session) ensureConnection() error {
510511
s.v1StreamManagerCache = manager
511512
apiMessage = fmt.Sprintf(" %v", manager)
512513
} else {
513-
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
514+
// check if this is a gRPC status error
515+
if sts, ok := status.FromError(err1); ok {
516+
if sts.Message() == "Method not found: coherence.proxy.v1.ProxyService/subChannel" {
517+
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
518+
} else {
519+
s.debug("received a different gRPC error: %v", err1)
520+
}
521+
}
514522
}
515523

516524
logMessage(INFO, "Session [%s] connected to [%s]%s", s.sessionID, s.sessOpts.Address, apiMessage)
@@ -547,14 +555,14 @@ func (s *Session) ensureConnection() error {
547555
return
548556
}
549557

550-
if newState == connectivity.Ready || newState == connectivity.Idle {
558+
if newState == connectivity.Ready { //|| newState == connectivity.Idle {
551559
if !firstConnect && !connected {
552-
// Reconnect
560+
// Reconnected
553561
disconnectTime = 0
554562
session.closed = false
555563
connected = true
556564

557-
logMessage(INFO, "Session [%s] re-connected to address %s", session.sessionID, session.sessOpts.Address)
565+
logMessage(INFO, "Session [%s] re-connected to address %s (%v)", session.sessionID, session.sessOpts.Address, newState)
558566
session.dispatch(Reconnected, func() SessionLifecycleEvent {
559567
return newSessionLifecycleEvent(session, Reconnected)
560568
})

coherence/session_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
33
* Licensed under the Universal Permissive License v 1.0 as shown at
44
* https://oss.oracle.com/licenses/upl.
55
*/
@@ -9,6 +9,7 @@ package coherence
99
import (
1010
"context"
1111
"github.com/onsi/gomega"
12+
"os"
1213
"strconv"
1314
"testing"
1415
"time"
@@ -21,6 +22,8 @@ func TestSessionValidation(t *testing.T) {
2122
ctx = context.Background()
2223
)
2324

25+
os.Setenv("COHERENCE_SESSION_DEBUG", "true")
26+
2427
_, err = NewSession(ctx, WithFormat("not-json"))
2528
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))
2629

java/coherence-go-queues/pom.xml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
<profile>
5151
<id>javax</id>
5252
<activation>
53-
<!-- This is a work-around for the fact that activeByDefault does not do what you'd think it should -->
54-
<file>
55-
<exists>.</exists>
56-
</file>
53+
<activeByDefault>false</activeByDefault>
5754
</activation>
5855
<dependencies>
5956
<dependency>

scripts/run-checkin-test.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/bash
2+
3+
#
4+
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
5+
# Licensed under the Universal Permissive License v 1.0 as shown at
6+
# https://oss.oracle.com/licenses/upl.
7+
#
8+
9+
# This script runs some tests that should succeed to be sure we can push
10+
set -e
11+
12+
echo "Coherence CE 24.09 All Tests gRPC v1"
13+
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone
14+
15+
echo "Coherence CE 24.09 with queues"
16+
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax,queues COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone-queues
17+
18+
echo "Coherence CE 22.06.10"
19+
COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone

0 commit comments

Comments
 (0)