From 83bd3944424b56675065f44a9ef1dc2167fdf83e Mon Sep 17 00:00:00 2001 From: truemagic-coder Date: Thu, 4 Dec 2025 11:43:17 -0800 Subject: [PATCH] update --- pyproject.toml | 2 +- sakit/privy_trigger.py | 88 +++++++++++------ sakit/utils/trigger.py | 165 ++++++++++++++++++++++++++----- sakit/utils/wallet.py | 31 ++++-- tests/test_privy_trigger_tool.py | 78 +++++++++++---- 5 files changed, 285 insertions(+), 79 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5a2a1a1..2ccd637 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sakit" -version = "14.1.8" +version = "14.1.9" description = "Solana Agent Kit" authors = ["Bevan Hunt "] license = "MIT" diff --git a/sakit/privy_trigger.py b/sakit/privy_trigger.py index 71418eb..64c98d1 100644 --- a/sakit/privy_trigger.py +++ b/sakit/privy_trigger.py @@ -18,7 +18,11 @@ from solders.transaction import VersionedTransaction # type: ignore from solders.message import to_bytes_versioned # type: ignore -from sakit.utils.trigger import JupiterTrigger +from sakit.utils.trigger import ( + JupiterTrigger, + replace_blockhash_in_transaction, + get_fresh_blockhash, +) from sakit.utils.wallet import send_raw_transaction_with_priority logger = logging.getLogger(__name__) @@ -342,14 +346,45 @@ async def _sign_and_execute( # pragma: no cover transaction_base64: str, request_id: str, ) -> Dict[str, Any]: - """Sign with payer (if configured) and Privy, then execute.""" + """ + Replace blockhash, sign with payer (if configured) and Privy, then send. + + Jupiter's /execute endpoint always times out with 504, so we: + 1. Get a fresh blockhash from our RPC + 2. Replace the blockhash in Jupiter's transaction + 3. Sign with our keys + 4. Send directly via our RPC + """ try: - tx_to_sign = transaction_base64 + # RPC URL is required - Jupiter's execute doesn't work + if not self._rpc_url: + return { + "status": "error", + "message": "rpc_url must be configured for trigger orders. Jupiter's execute endpoint is broken.", + } + + # Step 1: Get fresh blockhash from our RPC + blockhash_result = await get_fresh_blockhash(self._rpc_url) + if "error" in blockhash_result: + return { + "status": "error", + "message": f"Failed to get blockhash: {blockhash_result['error']}", + } - # If payer is configured, sign with payer first + fresh_blockhash = blockhash_result["blockhash"] + logger.info(f"Got fresh blockhash: {fresh_blockhash}") + + # Step 2: Replace blockhash in the transaction + tx_with_new_blockhash = replace_blockhash_in_transaction( + transaction_base64, fresh_blockhash + ) + + tx_to_sign = tx_with_new_blockhash + + # Step 3: If payer is configured, sign with payer first if self._payer_private_key: payer_keypair = Keypair.from_base58_string(self._payer_private_key) - tx_bytes = base64.b64decode(transaction_base64) + tx_bytes = base64.b64decode(tx_with_new_blockhash) transaction = VersionedTransaction.from_bytes(tx_bytes) message_bytes = to_bytes_versioned(transaction.message) payer_signature = payer_keypair.sign_message(message_bytes) @@ -361,7 +396,7 @@ async def _sign_and_execute( # pragma: no cover ) tx_to_sign = base64.b64encode(bytes(partially_signed)).decode("utf-8") - # Sign with Privy using the official SDK + # Step 4: Sign with Privy using the official SDK signed_tx = await _privy_sign_transaction( privy_client, wallet_id, @@ -375,28 +410,25 @@ async def _sign_and_execute( # pragma: no cover "message": "Failed to sign transaction via Privy.", } - # Send via RPC or fallback to Jupiter execute - if self._rpc_url: - # Use configured RPC (Helius recommended) instead of Jupiter's execute endpoint - tx_bytes = base64.b64decode(signed_tx) - send_result = await send_raw_transaction_with_priority( - rpc_url=self._rpc_url, - tx_bytes=tx_bytes, - ) - if not send_result.get("success"): - return { - "status": "error", - "message": send_result.get( - "error", "Failed to send transaction" - ), - } - return {"status": "success", "signature": send_result.get("signature")} - else: - # Fallback to Jupiter execute if no RPC configured - exec_result = await trigger.execute(signed_tx, request_id) - if not exec_result.success: - return {"status": "error", "message": exec_result.error} - return {"status": "success", "signature": exec_result.signature} + # Step 5: Send via our RPC + tx_bytes = base64.b64decode(signed_tx) + send_result = await send_raw_transaction_with_priority( + rpc_url=self._rpc_url, + tx_bytes=tx_bytes, + skip_confirmation=False, # Now we can wait - blockhash is from our RPC + confirm_timeout=30.0, + ) + + if not send_result.get("success"): + return { + "status": "error", + "message": send_result.get("error", "Failed to send transaction"), + } + + return { + "status": "success", + "signature": send_result.get("signature"), + } except Exception as e: logger.exception(f"Failed to sign and execute: {str(e)}") diff --git a/sakit/utils/trigger.py b/sakit/utils/trigger.py index 41e391e..07ee1be 100644 --- a/sakit/utils/trigger.py +++ b/sakit/utils/trigger.py @@ -13,7 +13,8 @@ import httpx from solders.transaction import VersionedTransaction # type: ignore -from solders.message import to_bytes_versioned # type: ignore +from solders.message import to_bytes_versioned, MessageV0 # type: ignore +from solders.hash import Hash # type: ignore logger = logging.getLogger(__name__) @@ -289,6 +290,7 @@ async def execute( # pragma: no cover self, signed_transaction: str, request_id: str, + max_retries: int = 3, ) -> TriggerExecuteResponse: """ Execute a signed trigger order transaction. @@ -296,44 +298,74 @@ async def execute( # pragma: no cover Args: signed_transaction: Base64 encoded signed transaction request_id: Request ID from create/cancel response + max_retries: Number of retries on 504 timeout (default 3) Returns: TriggerExecuteResponse with execution result """ + import asyncio + payload = { "signedTransaction": signed_transaction, "requestId": request_id, } - try: - # Use longer timeout for execute - Jupiter waits for tx confirmation - async with httpx.AsyncClient(timeout=120.0) as client: - response = await client.post( - f"{self.base_url}/execute", - json=payload, - headers=self._headers, - ) + last_error = None + for attempt in range(max_retries + 1): + try: + # Use longer timeout for execute - Jupiter waits for tx confirmation + async with httpx.AsyncClient(timeout=120.0) as client: + response = await client.post( + f"{self.base_url}/execute", + json=payload, + headers=self._headers, + ) + + # Retry on 504 Gateway Timeout + if response.status_code == 504: + last_error = f"504 Gateway Timeout (attempt {attempt + 1}/{max_retries + 1})" + logger.warning(f"Jupiter execute timed out: {last_error}") + if attempt < max_retries: + await asyncio.sleep(2**attempt) # Exponential backoff + continue + return TriggerExecuteResponse( + success=False, + error=f"Jupiter execute endpoint timed out after {max_retries + 1} attempts", + ) + + if response.status_code != 200: + return TriggerExecuteResponse( + success=False, + error=f"Failed to execute trigger order: {response.status_code} - {response.text}", + ) + + data = response.json() + status = data.get("status", "") - if response.status_code != 200: return TriggerExecuteResponse( - success=False, - error=f"Failed to execute trigger order: {response.status_code} - {response.text}", + success=status.lower() == "success", + status=status, + signature=data.get("signature"), + error=data.get("error"), + code=data.get("code", 0), + raw_response=data, ) - - data = response.json() - status = data.get("status", "") - - return TriggerExecuteResponse( - success=status.lower() == "success", - status=status, - signature=data.get("signature"), - error=data.get("error"), - code=data.get("code", 0), - raw_response=data, + except httpx.TimeoutException as e: + last_error = ( + f"Timeout: {str(e)} (attempt {attempt + 1}/{max_retries + 1})" ) - except Exception as e: - logger.exception("Failed to execute trigger order") - return TriggerExecuteResponse(success=False, error=str(e)) + logger.warning(f"Jupiter execute request timed out: {last_error}") + if attempt < max_retries: + await asyncio.sleep(2**attempt) + continue + except Exception as e: + logger.exception("Failed to execute trigger order") + return TriggerExecuteResponse(success=False, error=str(e)) + + return TriggerExecuteResponse( + success=False, + error=last_error or "Failed after all retries", + ) async def get_orders( # pragma: no cover self, @@ -393,6 +425,87 @@ async def get_orders( # pragma: no cover return {"success": False, "error": str(e), "orders": []} +def replace_blockhash_in_transaction( # pragma: no cover + transaction_base64: str, + new_blockhash: str, +) -> str: + """ + Replace the blockhash in a versioned transaction with a fresh one. + + This is necessary when the transaction was created by an external service + (like Jupiter) using their RPC, but we need to send it via a different RPC. + The blockhash must be recent and recognized by the sending RPC. + + Args: + transaction_base64: Base64 encoded unsigned transaction + new_blockhash: Fresh blockhash string from our RPC + + Returns: + Base64 encoded transaction with replaced blockhash (still unsigned) + """ + transaction_bytes = base64.b64decode(transaction_base64) + transaction = VersionedTransaction.from_bytes(transaction_bytes) + + # Get the message and replace blockhash + old_message = transaction.message + + # Create new message with updated blockhash + new_message = MessageV0( + header=old_message.header, + account_keys=old_message.account_keys, + recent_blockhash=Hash.from_string(new_blockhash), + instructions=old_message.instructions, + address_table_lookups=old_message.address_table_lookups, + ) + + # Create new unsigned transaction with new message + # Use default signatures (all zeros) since it's unsigned + new_transaction = VersionedTransaction.populate( + new_message, + transaction.signatures, # Keep placeholder signatures + ) + + return base64.b64encode(bytes(new_transaction)).decode("utf-8") + + +async def get_fresh_blockhash(rpc_url: str) -> dict: # pragma: no cover + """ + Get a fresh blockhash from the RPC. + + Args: + rpc_url: The RPC endpoint URL + + Returns: + Dict with 'blockhash' and 'lastValidBlockHeight' on success, + or 'error' on failure. + """ + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "getLatestBlockhash", + "params": [{"commitment": "confirmed"}], + } + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post(rpc_url, json=payload) + if response.status_code != 200: + return {"error": f"RPC error: {response.status_code}"} + + data = response.json() + if "error" in data: + return {"error": f"RPC error: {data['error']}"} + + result = data.get("result", {}).get("value", {}) + return { + "blockhash": result.get("blockhash"), + "lastValidBlockHeight": result.get("lastValidBlockHeight"), + } + except Exception as e: + logger.exception("Failed to get fresh blockhash") + return {"error": str(e)} + + def sign_trigger_transaction( # pragma: no cover transaction_base64: str, sign_message_func, diff --git a/sakit/utils/wallet.py b/sakit/utils/wallet.py index 400af7f..d38d2a1 100644 --- a/sakit/utils/wallet.py +++ b/sakit/utils/wallet.py @@ -120,6 +120,8 @@ async def send_raw_transaction_with_priority( # pragma: no cover tx_bytes: bytes, skip_preflight: bool = True, max_retries: int = 5, + confirm_timeout: float = 30.0, + skip_confirmation: bool = False, ) -> Dict[str, any]: """ Send a raw transaction to Solana RPC, with Helius priority fee logging if applicable. @@ -132,10 +134,14 @@ async def send_raw_transaction_with_priority( # pragma: no cover tx_bytes: The serialized signed transaction bytes skip_preflight: Skip preflight simulation (default True for pre-signed txs) max_retries: Number of retries for the RPC call + confirm_timeout: Max seconds to wait for confirmation (default 30s) + skip_confirmation: Skip waiting for confirmation entirely (default False) Returns: Dict with 'success' and 'signature' on success, or 'error' on failure. """ + import asyncio + try: client = AsyncClient(rpc_url) try: @@ -183,19 +189,32 @@ async def send_raw_transaction_with_priority( # pragma: no cover signature = str(result.value) logger.info(f"Transaction sent: {signature}") - # Confirm the transaction + # Skip confirmation if requested (useful for pre-signed txs with external blockhashes) + if skip_confirmation: + return {"success": True, "signature": signature} + + # Confirm the transaction with timeout try: - confirmation = await client.confirm_transaction( - result.value, - commitment=Confirmed, - sleep_seconds=0.5, - last_valid_block_height=None, + confirmation = await asyncio.wait_for( + client.confirm_transaction( + result.value, + commitment=Confirmed, + sleep_seconds=0.5, + last_valid_block_height=None, + ), + timeout=confirm_timeout, ) if confirmation.value and confirmation.value[0].err: return { "success": False, "error": f"Transaction failed: {confirmation.value[0].err}", } + except asyncio.TimeoutError: + logger.warning( + f"Transaction confirmation timed out after {confirm_timeout}s. " + f"Transaction may still land. Signature: {signature}" + ) + # Return success anyway - tx was sent, just not confirmed in time except Exception as confirm_error: logger.debug(f"Could not confirm transaction: {confirm_error}") # Still return success since transaction was sent diff --git a/tests/test_privy_trigger_tool.py b/tests/test_privy_trigger_tool.py index daeafa0..76df38f 100644 --- a/tests/test_privy_trigger_tool.py +++ b/tests/test_privy_trigger_tool.py @@ -34,6 +34,7 @@ def privy_trigger_tool(): "app_secret": "test-app-secret", "signing_key": f"wallet-auth:{TEST_EC_KEY_SEC1}", "jupiter_api_key": "test-api-key", + "rpc_url": "https://mainnet.helius-rpc.com/?api-key=test", } } } @@ -437,6 +438,11 @@ async def test_create_order_success(self, privy_trigger_tool): mock_create_result.request_id = "req-123" mock_create_result.order = "order-pubkey-123" + # Valid base64 that decodes to some bytes (for base64.b64decode to work) + mock_signed_tx = ( + "dGVzdC1zaWduZWQtdHJhbnNhY3Rpb24=" # "test-signed-transaction" in base64 + ) + with ( patch( "sakit.privy_trigger._get_privy_embedded_wallet", @@ -447,17 +453,29 @@ async def test_create_order_success(self, privy_trigger_tool): patch( "sakit.privy_trigger._privy_sign_transaction", new_callable=AsyncMock, - return_value="signed-tx-base64", + return_value=mock_signed_tx, + ), + patch( + "sakit.privy_trigger.get_fresh_blockhash", + new_callable=AsyncMock, + return_value={ + "blockhash": "FreshBlockhash123", + "lastValidBlockHeight": 12345, + }, + ), + patch( + "sakit.privy_trigger.replace_blockhash_in_transaction", + return_value="tx-with-new-blockhash-base64", + ), + patch( + "sakit.privy_trigger.send_raw_transaction_with_priority", + new_callable=AsyncMock, + return_value={"success": True, "signature": "tx-sig-456"}, ), ): mock_instance = MockTrigger.return_value mock_instance.create_order = AsyncMock(return_value=mock_create_result) - mock_exec_result = MagicMock() - mock_exec_result.success = True - mock_exec_result.signature = "tx-sig-456" - mock_instance.execute = AsyncMock(return_value=mock_exec_result) - result = await privy_trigger_tool.execute( user_id="did:privy:user123", action="create", @@ -627,7 +645,24 @@ async def test_cancel_order_success(self, privy_trigger_tool): patch( "sakit.privy_trigger._privy_sign_transaction", new_callable=AsyncMock, - return_value="signed-cancel-tx-base64", + return_value="dGVzdC1zaWduZWQtdHJhbnNhY3Rpb24=", # valid base64 + ), + patch( + "sakit.privy_trigger.get_fresh_blockhash", + new_callable=AsyncMock, + return_value={ + "blockhash": "FreshBlockhash123", + "lastValidBlockHeight": 12345, + }, + ), + patch( + "sakit.privy_trigger.replace_blockhash_in_transaction", + return_value="tx-with-new-blockhash-base64", + ), + patch( + "sakit.privy_trigger.send_raw_transaction_with_priority", + new_callable=AsyncMock, + return_value={"success": True, "signature": "cancel-sig-789"}, ), ): mock_instance = MockTrigger.return_value @@ -639,11 +674,6 @@ async def test_cancel_order_success(self, privy_trigger_tool): ) mock_instance.cancel_order = AsyncMock(return_value=mock_cancel_result) - mock_exec_result = MagicMock() - mock_exec_result.success = True - mock_exec_result.signature = "cancel-sig-789" - mock_instance.execute = AsyncMock(return_value=mock_exec_result) - result = await privy_trigger_tool.execute( user_id="did:privy:user123", action="cancel", @@ -711,7 +741,24 @@ async def test_cancel_all_success(self, privy_trigger_tool): patch( "sakit.privy_trigger._privy_sign_transaction", new_callable=AsyncMock, - return_value="signed-tx", + return_value="dGVzdC1zaWduZWQtdHJhbnNhY3Rpb24=", # valid base64 + ), + patch( + "sakit.privy_trigger.get_fresh_blockhash", + new_callable=AsyncMock, + return_value={ + "blockhash": "FreshBlockhash123", + "lastValidBlockHeight": 12345, + }, + ), + patch( + "sakit.privy_trigger.replace_blockhash_in_transaction", + return_value="tx-with-new-blockhash-base64", + ), + patch( + "sakit.privy_trigger.send_raw_transaction_with_priority", + new_callable=AsyncMock, + return_value={"success": True, "signature": "sig-123"}, ), ): mock_instance = MockTrigger.return_value @@ -723,11 +770,6 @@ async def test_cancel_all_success(self, privy_trigger_tool): ) mock_instance.cancel_orders = AsyncMock(return_value=mock_cancel_result) - mock_exec_result = MagicMock() - mock_exec_result.success = True - mock_exec_result.signature = "sig-123" - mock_instance.execute = AsyncMock(return_value=mock_exec_result) - result = await privy_trigger_tool.execute( user_id="did:privy:user123", action="cancel_all",