diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..0f2642c --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,177 @@ +name: Release Pipeline + +on: + push: + branches: + - master + - latest + +jobs: + beta-release: + name: Beta Release + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/master' + outputs: + version: ${{ steps.get_version.outputs.version }} + status: ${{ job.status }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + pip install --quiet build twine toml + + - name: Modify package name for beta + run: | + sed -i -E 's/^(name *= *")superstream-clients(")/\1superstream-clients-beta\2/' pyproject.toml + + - name: Build package + run: python -m build + + - name: Publish to PyPI + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + twine upload dist/* + + - name: Get version + if: always() + id: get_version + run: | + VERSION=$(python3 -c "import toml; print(toml.load('pyproject.toml')['project']['version'])") + echo "version=$VERSION" >> $GITHUB_OUTPUT + + prod-release: + name: Production Release + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/latest' + outputs: + version: ${{ steps.get_version.outputs.version }} + status: ${{ job.status }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + pip install --quiet build twine toml + + - name: Build package + run: python -m build + + - name: Publish to PyPI + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + twine upload dist/* + + - name: Get version from pyproject.toml + if: always() + id: get_version + run: | + VERSION=$(python3 -c "import toml; print(toml.load('pyproject.toml')['project']['version'])") + echo "version=$VERSION" >> $GITHUB_OUTPUT + echo "Version: $VERSION" + + - name: Create Git tag + run: | + git config user.email "github-actions@superstream.ai" + git config user.name "GitHub Actions" + git tag -a ${{ steps.get_version.outputs.version }} -m "${{ steps.get_version.outputs.version }}" + git push origin ${{ steps.get_version.outputs.version }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Create GitHub Release + uses: softprops/action-gh-release@v1 + with: + tag_name: ${{ steps.get_version.outputs.version }} + files: dist/superstream_clients-${{ steps.get_version.outputs.version }}.tar.gz + generate_release_notes: true + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + notify: + name: Send Notifications + runs-on: ubuntu-latest + needs: prod-release + if: always() && github.ref == 'refs/heads/latest' + + steps: + - name: Send Slack notification on success + if: needs.prod-release.result == 'success' + uses: slackapi/slack-github-action@v1 + with: + payload: | + { + "text": "superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release SUCCESSFUL ✅", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release SUCCESSFUL* ✅\n*Branch:* ${{ github.ref_name }}\n*Commit:* <${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}|${{ github.sha }}>\n*Author:* ${{ github.event.head_commit.author.name }}\n*Release:* <${{ github.server_url }}/${{ github.repository }}/releases/tag/${{ needs.prod-release.outputs.version }}|View Release>" + } + } + ] + } + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + continue-on-error: true + + - name: Send Slack notification on failure + if: needs.prod-release.result == 'failure' + uses: slackapi/slack-github-action@v1 + with: + payload: | + { + "text": "superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release FAILED ❌", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release FAILED* ❌\n*Branch:* ${{ github.ref_name }}\n*Commit:* <${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}|${{ github.sha }}>\n*Author:* ${{ github.event.head_commit.author.name }}\n*Workflow Run:* <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Details>" + } + } + ] + } + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + continue-on-error: true + + - name: Send Slack notification on cancellation + if: needs.prod-release.result == 'cancelled' + uses: slackapi/slack-github-action@v1 + with: + payload: | + { + "text": "superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release ABORTED ⚠️", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*superstream-python-clients v${{ needs.prod-release.outputs.version }} Production Release ABORTED* ⚠️\n*Branch:* ${{ github.ref_name }}\n*Commit:* <${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}|${{ github.sha }}>\n*Author:* ${{ github.event.head_commit.author.name }}" + } + } + ] + } + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + continue-on-error: true diff --git a/Jenkinsfile b/Jenkinsfile deleted file mode 100644 index d5ffe87..0000000 --- a/Jenkinsfile +++ /dev/null @@ -1,178 +0,0 @@ -@Library('shared-library') _ - -pipeline { - - agent { - docker { - label 'memphis-jenkins-big-fleet,' - image 'python:3.11.9-bullseye' - args '-u root' - } - } - - environment { - HOME = '/tmp' - SLACK_CHANNEL = '#jenkins-events' - } - - stages { - - stage('Prepare Environment') { - when { - anyOf { - allOf { - branch 'master' - triggeredBy 'UserIdCause' // Manual trigger on master - } - allOf { - branch 'latest' - } - } - } - steps { - script { - sh 'git config --global --add safe.directory $(pwd)' - env.GIT_AUTHOR = sh(script: 'git log -1 --pretty=%an', returnStdout: true).trim() - env.COMMIT_MESSAGE = sh(script: 'git log -1 --pretty=%B', returnStdout: true).trim() - def triggerCause = currentBuild.getBuildCauses().find { it._class == 'hudson.model.Cause$UserIdCause' } - env.TRIGGERED_BY = triggerCause ? triggerCause.userId : 'Commit' - } - sh """ - export DEBIAN_FRONTEND=noninteractive - apt update -y - apt install -y python3 python3-pip python3-dev gcc make - # wget -qO - https://packages.confluent.io/deb/7.0/archive.key | apt-key add - - # add-apt-repository "deb https://packages.confluent.io/clients/deb \$(lsb_release -cs) main" - apt update - pip install --user pdm - pip install requests - """ - } - } - - - stage('Beta Release') { - when { - allOf { - branch 'master' - triggeredBy 'UserIdCause' // Manual "Build Now" - } - } - steps { - sh ''' - sed -i -E 's/^(name *= *")superstream-clients(")/\\1superstream-clients-beta\\2/' pyproject.toml - ''' - sh 'pip install --quiet build twine' - sh 'python -m build' - withCredentials([usernamePassword(credentialsId: 'superstream-pypi', usernameVariable: 'USR', passwordVariable: 'PSW')]) { - sh """ - twine upload dist/* \ - -u $USR \ - -p $PSW - """ - } - } - } - stage('Prod Release') { - when { - branch 'latest' - } - steps { - sh 'pip install --quiet build twine' - sh 'python -m build' - withCredentials([usernamePassword(credentialsId: 'superstream-pypi', usernameVariable: 'USR', passwordVariable: 'PSW')]) { - sh """ - twine upload dist/* \ - -u $USR \ - -p $PSW - """ - } - } - } - stage('Create Release'){ - when { - branch 'latest' - } - steps { - sh 'pip install toml' - script { - def version = sh( - script: ''' - python3 -c "import toml; print(toml.load('pyproject.toml')['project']['version'])" - ''', - returnStdout: true - ).trim() - - env.versionTag = version - } - sh """ - curl -L https://github.com/cli/cli/releases/download/v2.40.0/gh_2.40.0_linux_amd64.tar.gz -o gh.tar.gz - tar -xvf gh.tar.gz - mv gh_2.40.0_linux_amd64/bin/gh /usr/local/bin - rm -rf gh_2.40.0_linux_amd64 gh.tar.gz - """ - withCredentials([sshUserPrivateKey(keyFileVariable:'check',credentialsId: 'main-github')]) { - sh """ - GIT_SSH_COMMAND='ssh -i $check -o StrictHostKeyChecking=no' git config --global user.email "jenkins@superstream.ai" - GIT_SSH_COMMAND='ssh -i $check -o StrictHostKeyChecking=no' git config --global user.name "Jenkins" - GIT_SSH_COMMAND='ssh -i $check -o StrictHostKeyChecking=no' git tag -a $versionTag -m "$versionTag" - GIT_SSH_COMMAND='ssh -i $check -o StrictHostKeyChecking=no' git push origin $versionTag - """ - } - withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) { - sh """ - gh release create $versionTag dist/superstream_clients-${env.versionTag}.tar.gz --generate-notes - """ - } - } - } - } - post { - always { - cleanWs() - } - success { - script { - if (env.GIT_BRANCH == 'latest') { - sendSlackNotification('SUCCESS') - notifySuccessful() - } - } - } - - failure { - script { - if (env.GIT_BRANCH == 'latest') { - sendSlackNotification('FAILURE') - notifyFailed() - } - } - } - aborted { - script { - if (env.BRANCH_NAME == 'latest') { - sendSlackNotification('ABORTED') - } - // Get the build log to check for the specific exception and retry job - AgentOfflineException() - } - } - } -} - -def notifySuccessful() { - emailext ( - subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'", - body: """SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]': - Check console output and connection attributes at ${env.BUILD_URL}""", - to: 'tech-leads@superstream.ai' - ) -} -def notifyFailed() { - emailext ( - subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'", - body: """FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]': - Check console output at ${env.BUILD_URL}""", - to: 'tech-leads@superstream.ai' - ) -} diff --git a/examples/aiokafka/apache.py b/examples/aiokafka/apache.py index 08cc2c2..64ddab2 100644 --- a/examples/aiokafka/apache.py +++ b/examples/aiokafka/apache.py @@ -21,7 +21,7 @@ async def create_producer(client_id): await producer.start() return producer -async def send_messages_to_topics(producer, topics, producer_name, num_messages=50): +async def send_messages_to_topics(producer, topics, producer_name, num_messages=5): """Send random JSON messages to specified Kafka topics""" successful = 0 @@ -36,7 +36,7 @@ async def send_messages_to_topics(producer, topics, producer_name, num_messages= # Send message to each topic for topic in topics: - result = await producer.send_and_wait(topic, message) + result = await producer.send(topic, message) successful += 1 @@ -55,7 +55,7 @@ async def main(): try: # Create two separate producers producer1 = await create_producer('aiokafka-producer-1') - producer2 = await create_producer('aiokafka-producer-2') + # producer2 = await create_producer('aiokafka-producer-2') # First producer sends to test-topic and test-topic-1 topics1 = ['test-topic', 'test-topic-1'] @@ -63,22 +63,21 @@ async def main(): # Second producer sends to test-topic-2 and test-topic-3 topics2 = ['test-topic-2', 'test-topic-3'] - await send_messages_to_topics(producer2, topics2, 'aiokafka-producer-2') - + # await send_messages_to_topics(producer2, topics2, 'aiokafka-producer-2') + # Sleep for 10 minutes at the end + print("Sleeping for 10 minutes...") + await asyncio.sleep(600) + print("Sleep completed") except Exception as e: print(f"Error: {e}") finally: if producer1: await producer1.stop() print("Producer 1 closed") - if producer2: - await producer2.stop() - print("Producer 2 closed") + # if producer2: + # await producer2.stop() + # print("Producer 2 closed") - # Sleep for 10 minutes at the end - print("Sleeping for 10 minutes...") - await asyncio.sleep(600) - print("Sleep completed") if __name__ == "__main__": # Run the async main function diff --git a/examples/confluent_kafka/aiven.py b/examples/confluent_kafka/aiven.py index e94e57b..70921da 100644 --- a/examples/confluent_kafka/aiven.py +++ b/examples/confluent_kafka/aiven.py @@ -42,7 +42,7 @@ def create_producer(client_id): 'ssl.ca.location': SSL_CAFILE, 'ssl.certificate.location': SSL_CERTFILE, 'ssl.key.location': SSL_KEYFILE, - 'batch.num.messages': BATCH_SIZE, + 'batch.size': BATCH_SIZE, 'linger.ms': LINGER_MS }) diff --git a/examples/confluent_kafka/apache.py b/examples/confluent_kafka/apache.py index 96069ef..2fa7dba 100644 --- a/examples/confluent_kafka/apache.py +++ b/examples/confluent_kafka/apache.py @@ -56,9 +56,8 @@ def send_messages_to_topics(producer, topics, producer_name, num_messages=50): print(f"Failed to send message {i+1}: {e}") # Small delay between messages (optional) - time.sleep(0.01) + time.sleep(2) - producer.flush(timeout=30) print(f"\n{producer_name} Summary: {successful} successful, {failed} failed") def main(): @@ -67,7 +66,7 @@ def main(): try: # Create two separate producers producer1 = create_producer('confluent-kafka-producer-1') - producer2 = create_producer('confluent-kafka-producer-2') + # producer2 = create_producer('confluent-kafka-producer-2') # First producer sends to test-topic and test-topic-1 topics1 = ['test-topic', 'test-topic-1'] @@ -75,15 +74,23 @@ def main(): # Second producer sends to test-topic-2 and test-topic-3 topics2 = ['test-topic-2', 'test-topic-3'] - send_messages_to_topics(producer2, topics2, 'confluent-kafka-producer-2') - + # send_messages_to_topics(producer2, topics2, 'confluent-kafka-producer-2') + print("Sleeping...") + time.sleep(30) + print("Sleep completed") except Exception as e: print(f"Error: {e}") - # Sleep for 10 minutes at the end - print("Sleeping for 10 minutes...") - time.sleep(600) - print("Sleep completed") + # Explicitly set producers to None to force garbage collection + print("Setting producers to None to trigger Superstream cleanup...") + producer1 = None + # producer2 = None + + # Force garbage collection to trigger __del__ immediately + import gc + print("Forcing garbage collection...") + gc.collect() + print("Garbage collection completed") if __name__ == "__main__": main() \ No newline at end of file diff --git a/examples/confluent_kafka/confluent.py b/examples/confluent_kafka/confluent.py index 55fa956..5c588c6 100644 --- a/examples/confluent_kafka/confluent.py +++ b/examples/confluent_kafka/confluent.py @@ -40,7 +40,7 @@ def create_producer(client_id): 'sasl.password': SASL_PASSWORD, 'client.id': client_id, 'linger.ms': LINGER_MS, - 'batch.num.messages': BATCH_SIZE + 'batch.size': BATCH_SIZE }) def send_messages_to_topics(producer, topics, producer_name, num_messages=50): diff --git a/examples/kafkapy/apache.py b/examples/kafkapy/apache.py index 86d55e3..21e630e 100644 --- a/examples/kafkapy/apache.py +++ b/examples/kafkapy/apache.py @@ -6,7 +6,49 @@ import time from kafka import KafkaProducer from kafka.errors import KafkaError -from json_generator import generate_random_json + +import random +import string +from datetime import datetime + +def generate_random_json(min_size_kb=1): + """Generate a random JSON object of at least min_size_kb size""" + base_data = { + "timestamp": datetime.now().isoformat(), + "event_id": f"evt_{random.randint(100000, 999999)}", + "user_id": f"user_{random.randint(1000, 9999)}", + "session_id": f"session_{random.randint(10000, 99999)}", + "event_type": random.choice(["click", "view", "purchase", "login", "logout"]), + "device_type": random.choice(["mobile", "desktop", "tablet"]), + "os": random.choice(["Windows", "macOS", "Linux", "iOS", "Android"]), + "browser": random.choice(["Chrome", "Firefox", "Safari", "Edge"]), + "country": random.choice(["US", "UK", "DE", "FR", "JP", "BR", "IN"]), + "metrics": { + "load_time": round(random.uniform(0.1, 5.0), 3), + "response_time": round(random.uniform(0.01, 1.0), 3), + "cpu_usage": round(random.uniform(0, 100), 2), + "memory_usage": round(random.uniform(0, 100), 2) + } + } + + # Calculate current size + current_json = json.dumps(base_data) + current_size = len(current_json.encode('utf-8')) + target_size = min_size_kb * 1024 + + # Add padding data if needed to reach target size + if current_size < target_size: + padding_size = target_size - current_size + # Generate random string data for padding + padding_data = { + "additional_data": { + f"field_{i}": ''.join(random.choices(string.ascii_letters + string.digits, k=50)) + for i in range(padding_size // 50) + } + } + base_data.update(padding_data) + + return base_data def create_producer(client_id): """Create and configure Kafka producer""" @@ -19,7 +61,7 @@ def create_producer(client_id): value_serializer=lambda v: json.dumps(v).encode('utf-8'), ) -def send_messages_to_topics(producer, topics, producer_name, num_messages=50): +def send_messages_to_topics(producer, topics, producer_name, num_messages=5): """Send random JSON messages to specified Kafka topics""" successful = 0 @@ -57,15 +99,18 @@ def main(): try: # Create two separate producers producer1 = create_producer('kafka-python-producer-1') - producer2 = create_producer('kafka-python-producer-2') + # producer2 = create_producer('kafka-python-producer-2') # First producer sends to test-topic and test-topic-1 topics1 = ['test-topic', 'test-topic-1'] send_messages_to_topics(producer1, topics1, 'kafka-python-producer-1') - + # Sleep for 10 minutes at the end + print("Sleeping for 10 minutes...") + time.sleep(600) + print("Sleep completed") # Second producer sends to test-topic-2 and test-topic-3 topics2 = ['test-topic-2', 'test-topic-3'] - send_messages_to_topics(producer2, topics2, 'kafka-python-producer-2') + # send_messages_to_topics(producer2, topics2, 'kafka-python-producer-2') except Exception as e: print(f"Error: {e}") @@ -73,14 +118,9 @@ def main(): if producer1: producer1.close() print("Producer 1 closed") - if producer2: - producer2.close() - print("Producer 2 closed") - - # Sleep for 10 minutes at the end - print("Sleeping for 10 minutes...") - time.sleep(600) - print("Sleep completed") + # if producer2: + # producer2.close() + # print("Producer 2 closed") if __name__ == "__main__": main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8923521..7f0ba1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "superstream-clients" -version = "1.0.1" +version = "1.0.3" description = "Superstream optimisation library for Kafka producers" authors = [{name = "Superstream Labs", email = "support@superstream.ai"}] license = "Apache-2.0"