From 9cd6dbf971a4ec9801339561b866e2269763f650 Mon Sep 17 00:00:00 2001 From: Tim Middleton Date: Thu, 1 May 2025 12:30:40 +0800 Subject: [PATCH] Add streaming tests --- .../workflows/examples-jakarta-v1.2.2.yaml | 6 +- .github/workflows/examples-jakarta.yaml | 5 +- .github/workflows/examples-v1.2.2.yaml | 6 +- .github/workflows/examples.yaml | 8 +- ...resolver-clusters-compatability-tests.yaml | 7 +- .../resolver-compatability-tests.yaml | 7 +- .github/workflows/streaming-jakarta.yaml | 79 ++++++ .github/workflows/streaming.yaml | 80 ++++++ Makefile | 8 + scripts/run-compat-ce.sh | 7 +- test/e2e/streaming/streaming_test.go | 237 ++++++++++++++++++ test/e2e/streaming/suite_test.go | 17 ++ 12 files changed, 442 insertions(+), 25 deletions(-) create mode 100644 .github/workflows/streaming-jakarta.yaml create mode 100644 .github/workflows/streaming.yaml create mode 100644 test/e2e/streaming/streaming_test.go create mode 100644 test/e2e/streaming/suite_test.go diff --git a/.github/workflows/examples-jakarta-v1.2.2.yaml b/.github/workflows/examples-jakarta-v1.2.2.yaml index 7abf5d0..f7fe4a9 100644 --- a/.github/workflows/examples-jakarta-v1.2.2.yaml +++ b/.github/workflows/examples-jakarta-v1.2.2.yaml @@ -21,12 +21,10 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 24.09.3 - 25.03 - - 25.03.1-SNAPSHOT + - 25.03.1 + - 25.03.2-SNAPSHOT go-version: - - 1.19.x - - 1.20.x - 1.21.x - 1.22.x - 1.23.x diff --git a/.github/workflows/examples-jakarta.yaml b/.github/workflows/examples-jakarta.yaml index ec2667d..1dce1d3 100644 --- a/.github/workflows/examples-jakarta.yaml +++ b/.github/workflows/examples-jakarta.yaml @@ -20,9 +20,8 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 24.09.3 - - 25.03 - - 25.03.1-SNAPSHOT + - 25.03.1 + - 25.03.2-SNAPSHOT go-version: - 1.23.x - 1.24.x diff --git a/.github/workflows/examples-v1.2.2.yaml b/.github/workflows/examples-v1.2.2.yaml index c4310a4..938f7b2 100644 --- a/.github/workflows/examples-v1.2.2.yaml +++ b/.github/workflows/examples-v1.2.2.yaml @@ -21,11 +21,9 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 22.06.12-SNAPSHOT - - 22.06.11 + - 22.06.13-SNAPSHOT + - 22.06.12 go-version: - - 1.19.x - - 1.20.x - 1.21.x - 1.22.x - 1.23.x diff --git a/.github/workflows/examples.yaml b/.github/workflows/examples.yaml index c9c5ef1..8da60e7 100644 --- a/.github/workflows/examples.yaml +++ b/.github/workflows/examples.yaml @@ -20,10 +20,10 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 22.06.12-SNAPSHOT - - 22.06.11 - - 14.1.2-0-1 - - 14.1.2-0-2-SNAPSHOT + - 22.06.13-SNAPSHOT + - 22.06.12 + - 14.1.2-0-2 + - 14.1.2-0-3-SNAPSHOT go-version: - 1.23.x - 1.24.x diff --git a/.github/workflows/resolver-clusters-compatability-tests.yaml b/.github/workflows/resolver-clusters-compatability-tests.yaml index 07b0229..1c3efc7 100644 --- a/.github/workflows/resolver-clusters-compatability-tests.yaml +++ b/.github/workflows/resolver-clusters-compatability-tests.yaml @@ -20,10 +20,9 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 22.06.11 - - 24.09.3 - - 25.03 - - 14.1.2-0-1 + - 22.06.12 + - 25.03.1 + - 14.1.2-0-2 go-version: - 1.23.x - 1.24.x diff --git a/.github/workflows/resolver-compatability-tests.yaml b/.github/workflows/resolver-compatability-tests.yaml index 2b530ee..792701c 100644 --- a/.github/workflows/resolver-compatability-tests.yaml +++ b/.github/workflows/resolver-compatability-tests.yaml @@ -20,10 +20,9 @@ jobs: fail-fast: false matrix: coherenceVersion: - - 22.06.11 - - 24.09.3 - - 25.03 - - 14.1.2-0-1 + - 22.06.12 + - 25.03.1 + - 14.1.2-0-2 go-version: - 1.23.x - 1.24.x diff --git a/.github/workflows/streaming-jakarta.yaml b/.github/workflows/streaming-jakarta.yaml new file mode 100644 index 0000000..1a42720 --- /dev/null +++ b/.github/workflows/streaming-jakarta.yaml @@ -0,0 +1,79 @@ +# Copyright 2025 Oracle Corporation and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. + +# --------------------------------------------------------------------------- +# Coherence Go Client GitHub Actions test streaming against v23.03+ +# --------------------------------------------------------------------------- +name: CI-Examples Jakarta + +on: + workflow_dispatch: + push: + branches: + - '*' + +jobs: + build: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + coherenceVersion: + - 25.03.1 + - 25.03.2-SNAPSHOT + go-version: + - 1.23.x + - 1.24.x + +# Checkout the source, we need a depth of zero to fetch all of the history otherwise +# the copyright check cannot work out the date of the files from Git. + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Get Docker Images + shell: bash + run: | + docker pull gcr.io/distroless/java17-debian12 + uname -a + + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'zulu' + + - name: Cache Go Modules + uses: actions/cache@v4 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-mods-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go-mods- + + - name: Cache Maven packages + uses: actions/cache@v4 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '${{ matrix.go-version }}' + + - name: Verify Examples + shell: bash + run: | + go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 + COHERENCE_BASE_IMAGE=gcr.io/distroless/java17-debian12 COHERENCE_VERSION=${{ matrix.coherenceVersion }} PROFILES=,jakarta,-javax make clean generate-proto generate-proto-v1 build-test-images test-cluster-startup test-e2e-streaming + make test-cluster-shutdown || true + + - uses: actions/upload-artifact@v4 + if: failure() + with: + name: test-output-${{ matrix.go-version }}-${{ matrix.coherenceVersion }} + path: build/_output/test-logs diff --git a/.github/workflows/streaming.yaml b/.github/workflows/streaming.yaml new file mode 100644 index 0000000..370ac41 --- /dev/null +++ b/.github/workflows/streaming.yaml @@ -0,0 +1,80 @@ +# Copyright 2025 Oracle Corporation and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. + +# --------------------------------------------------------------------------- +# Coherence Go Client GitHub Actions test streaming against v22.06 versions +# --------------------------------------------------------------------------- +name: CI-Streaming v22.06 + +on: + workflow_dispatch: + push: + branches: + - '*' + +jobs: + build: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + coherenceVersion: + - 22.06.13-SNAPSHOT + - 22.06.12 + - 14.1.2-0-2 + - 14.1.2-0-3-SNAPSHOT + go-version: + - 1.23.x + - 1.24.x + +# Checkout the source, we need a depth of zero to fetch all of the history otherwise +# the copyright check cannot work out the date of the files from Git. + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Get Docker Images + shell: bash + run: | + docker pull gcr.io/distroless/java17-debian12 + + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'zulu' + + - name: Cache Go Modules + uses: actions/cache@v4 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-mods-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go-mods- + + - name: Cache Maven packages + uses: actions/cache@v4 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '${{ matrix.go-version }}' + + - name: Verify Examples + shell: bash + run: | + go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 + COHERENCE_BASE_IMAGE=gcr.io/distroless/java17-debian12 COHERENCE_VERSION=${{ matrix.coherenceVersion }} PROFILES=,-jakarta,javax make clean generate-proto generate-proto-v1 build-test-images test-cluster-startup test-e2e-streaming + make test-cluster-shutdown || true + + - uses: actions/upload-artifact@v4 + if: failure() + with: + name: test-output-${{ matrix.go-version }}-${{ matrix.coherenceVersion }} + path: build/_output/test-logs diff --git a/Makefile b/Makefile index 91766fa..33a0df4 100644 --- a/Makefile +++ b/Makefile @@ -284,6 +284,14 @@ test-e2e-standalone: test-clean test gotestsum $(BUILD_PROPS) ## Run e2e tests w -- $(GO_TEST_FLAGS) -v -coverprofile=$(COVERAGE_DIR)/cover-functional.out -v ./test/e2e/standalone/... -coverpkg=github.com/oracle/coherence-go-client/v2/coherence/... go tool cover -func=$(COVERAGE_DIR)/cover-functional.out | grep -v '0.0%' +# ---------------------------------------------------------------------------------------------------------------------- +# Executes the Go streaming tests for standalone Coherence +# ---------------------------------------------------------------------------------------------------------------------- +.PHONY: test-e2e-streaming +test-e2e-streaming: test-clean test gotestsum $(BUILD_PROPS) ## Run e2e tests with Coherence + CGO_ENABLED=0 $(GOTESTSUM) --format testname --junitfile $(TEST_LOGS_DIR)/go-streaming-test.xml \ + -- $(GO_TEST_FLAGS) -v -coverprofile=$(COVERAGE_DIR)/cover-functional.out -v ./test/e2e/streaming/... -coverpkg=github.com/oracle/coherence-go-client/v2/coherence/... + # ---------------------------------------------------------------------------------------------------------------------- # Executes the Go end to end tests for standalone Coherence with Scope set # ---------------------------------------------------------------------------------------------------------------------- diff --git a/scripts/run-compat-ce.sh b/scripts/run-compat-ce.sh index 393f34d..431765a 100755 --- a/scripts/run-compat-ce.sh +++ b/scripts/run-compat-ce.sh @@ -15,8 +15,11 @@ set -e echo "Coherence CE 22.06.10" COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone -echo "Coherence CE 14.1.2-0-0" -COHERENCE_BASE_IMAGE=gcr.io/distroless/java17-debian12 COHERENCE_VERSION=14.1.2-0-0 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone +echo "Coherence CE 14.1.2-0-2" +COHERENCE_BASE_IMAGE=gcr.io/distroless/java17-debian12 COHERENCE_VERSION=14.1.2-0-2 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone + +echo "Coherence CE 14.1.2-0-2 Streaming" +COHERENCE_BASE_IMAGE=gcr.io/distroless/java17-debian12 COHERENCE_VERSION=14.1.2-0-2 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-streaming echo "Coherence CE 22.06.10 with scope" COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax,scope make clean generate-proto build-test-images test-e2e-standalone-scope diff --git a/test/e2e/streaming/streaming_test.go b/test/e2e/streaming/streaming_test.go new file mode 100644 index 0000000..58bff54 --- /dev/null +++ b/test/e2e/streaming/streaming_test.go @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. + * Licensed under the Universal Permissive License v 1.0 as shown at + * https://oss.oracle.com/licenses/upl. + */ + +package streaming + +import ( + "context" + "fmt" + "github.com/onsi/gomega" + "github.com/oracle/coherence-go-client/v2/coherence" + "github.com/oracle/coherence-go-client/v2/coherence/aggregators" + "github.com/oracle/coherence-go-client/v2/coherence/extractors" + "github.com/oracle/coherence-go-client/v2/coherence/filters" + "github.com/oracle/coherence-go-client/v2/test/utils" + "log" + "math/rand" + "os" + "strconv" + "sync" + "testing" + "time" +) + +const ( + testTypeEntrySet = "entrySet" + testTypeEntrySetFilter = "entrySetFilter" + testTypeKeySet = "keySet" + testTypeKeySetFilter = "keySetFilter" + testTypeValues = "values" + testTypeValuesFilter = "valuesFilter" + + defaultCacheCount = 200_000 +) + +var ( + rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec + extractor = extractors.Extract[string]("department") + filterIT = filters.Equal[string](extractor, "IT") + ctx = context.Background() + actualCacheCount = defaultCacheCount +) + +// TestStreaming runs various streaming concurrent tests to ensure that all the streaming +// operations are thread safe. +func TestStreamingConcurrency(t *testing.T) { + g := gomega.NewWithT(t) + + _ = os.Setenv("COHERENCE_CLIENT_REQUEST_TIMEOUT", "300000") + + osCacheCount := os.Getenv("COHERENCE_CACHE_COUNT") + if os.Getenv(osCacheCount) != "" { + v, err := strconv.Atoi(osCacheCount) + if err != nil { + actualCacheCount = v + } + } + + session, err := utils.GetSession() + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer session.Close() + + cache := utils.GetNamedMap[int, utils.Person](g, session, "streaming-people") + + // clear and re-populate + g.Expect(cache.Clear(ctx)).ShouldNot(gomega.HaveOccurred()) + + g.Expect(coherence.AddIndex[int, utils.Person](ctx, cache, extractor, true)).ShouldNot(gomega.HaveOccurred()) + + g.Expect(populateCache(cache, actualCacheCount)).ShouldNot(gomega.HaveOccurred()) + + // get the count of people in IT + itCount, err := coherence.AggregateFilter[int, utils.Person](ctx, cache, filterIT, aggregators.Count()) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + size, err := cache.Size(ctx) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + g.Expect(size).To(gomega.Equal(actualCacheCount)) + + log.Println("Cache Size", size) + + testCases := []struct { + testName string + test func(t *testing.T, namedCache coherence.NamedMap[int, utils.Person], testType string, expectedITCount int64, fltr filters.Filter) + testType string + fltr filters.Filter + }{ + {getTestName("FilterIT", testTypeEntrySetFilter), RunTestStreaming, testTypeEntrySetFilter, filterIT}, + {getTestName("NoFilter", testTypeEntrySet), RunTestStreaming, testTypeEntrySet, nil}, + {getTestName("FilterIT", testTypeKeySetFilter), RunTestStreaming, testTypeKeySetFilter, filterIT}, + {getTestName("NoFilter", testTypeKeySet), RunTestStreaming, testTypeKeySet, nil}, + {getTestName("FilterIT", testTypeValuesFilter), RunTestStreaming, testTypeValuesFilter, filterIT}, + {getTestName("NoFilter", testTypeValues), RunTestStreaming, testTypeValues, nil}, + } + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + tc.test(t, cache, tc.testType, *itCount, tc.fltr) + }) + } +} + +func getTestName(prefix, testType string) string { + return fmt.Sprint("StreamingTest_", prefix, testType) +} + +func RunTestStreaming(t *testing.T, namedMap coherence.NamedMap[int, utils.Person], testType string, expectedCount int64, fltr filters.Filter) { + var ( + g = gomega.NewWithT(t) + wg sync.WaitGroup + threadCount = 20 + ) + + if fltr == nil { + // nil filter means we should get all entries + expectedCount = int64(actualCacheCount) + } + + log.Printf("testType=%s, expectedCount=%d\n", testType, expectedCount) + + wg.Add(threadCount) + for i := 0; i < threadCount; i++ { + go func(t int) { + defer wg.Done() + actualCount := runStreamingTest(g, namedMap, testType, fltr) + log.Printf("thread=%d, expectedCount=%d, actualCount=%d\n", t, expectedCount, actualCount) + g.Expect(actualCount).To(gomega.Equal(int(expectedCount))) + }(i) + } + wg.Wait() +} + +func runStreamingTest(g *gomega.WithT, namedMap coherence.NamedMap[int, utils.Person], testType string, fltr filters.Filter) int { + var ( + count = 0 + finalFilter = filters.Always() + chEntry <-chan *coherence.StreamedEntry[int, utils.Person] + chKey <-chan *coherence.StreamedKey[int] + chValue <-chan *coherence.StreamedValue[utils.Person] + ) + + if fltr != nil { + finalFilter = fltr + } + + if testType == testTypeEntrySetFilter { + chEntry = namedMap.EntrySetFilter(ctx, finalFilter) + } else if testType == testTypeEntrySet { + chEntry = namedMap.EntrySet(ctx) + } else if testType == testTypeKeySetFilter { + chKey = namedMap.KeySetFilter(ctx, finalFilter) + } else if testType == testTypeKeySet { + chKey = namedMap.KeySet(ctx) + } else if testType == testTypeValuesFilter { + chValue = namedMap.ValuesFilter(ctx, finalFilter) + } else if testType == testTypeValues { + chValue = namedMap.Values(ctx) + } else { + g.Fail(fmt.Sprintf("unknown testType=%s", testType)) + } + + if testType == testTypeEntrySetFilter || testType == testTypeEntrySet { + for ch := range chEntry { + g.Expect(ch.Err).ShouldNot(gomega.HaveOccurred()) + if ch.Err != nil { + continue + } + _ = ch.Value + count++ + } + return count + } + + if testType == testTypeKeySetFilter || testType == testTypeKeySet { + for ch := range chKey { + g.Expect(ch.Err).ShouldNot(gomega.HaveOccurred()) + if ch.Err != nil { + continue + } + _ = ch.Key + count++ + } + return count + } + + if testType == testTypeValuesFilter || testType == testTypeValues { + for ch := range chValue { + g.Expect(ch.Err).ShouldNot(gomega.HaveOccurred()) + if ch.Err != nil { + continue + } + _ = ch.Value + count++ + } + return count + } + + return 0 +} + +func populateCache(cache coherence.NamedMap[int, utils.Person], count int) error { + var ( + buffer = make(map[int]utils.Person) + err error + departments = []string{"IT", "HR", "FINANCE", "MARKETING", "DEV"} + batchSize = 1000 + ) + + for i := 1; i <= count; i++ { + buffer[i] = utils.Person{ + ID: i, + Name: fmt.Sprintf("person%d", i), + Age: i%100 + 10, + Department: randomize(departments), + } + if i%batchSize == 0 { + err = cache.PutAll(ctx, buffer) + if err != nil { + return err + } + buffer = make(map[int]utils.Person) + } + } + if len(buffer) > 0 { + return cache.PutAll(ctx, buffer) + } + + return nil +} + +func randomize(arr []string) string { + if len(arr) == 0 { + return "" + } + return arr[rnd.Intn(len(arr))] +} diff --git a/test/e2e/streaming/suite_test.go b/test/e2e/streaming/suite_test.go new file mode 100644 index 0000000..5fce8fe --- /dev/null +++ b/test/e2e/streaming/suite_test.go @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2025 Oracle and/or its affiliates. + * Licensed under the Universal Permissive License v 1.0 as shown at + * https://oss.oracle.com/licenses/upl. + */ + +package streaming + +import ( + "github.com/oracle/coherence-go-client/v2/test/utils" + "testing" +) + +// The entry point for the test suite +func TestMain(m *testing.M) { + utils.RunTest(m, 1408, 30000, 8080, true) +}