Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sakit"
version = "14.1.8"
version = "14.1.9"
description = "Solana Agent Kit"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
license = "MIT"
Expand Down
88 changes: 60 additions & 28 deletions sakit/privy_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from solders.transaction import VersionedTransaction # type: ignore
from solders.message import to_bytes_versioned # type: ignore

from sakit.utils.trigger import JupiterTrigger
from sakit.utils.trigger import (
JupiterTrigger,
replace_blockhash_in_transaction,
get_fresh_blockhash,
)
from sakit.utils.wallet import send_raw_transaction_with_priority

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -342,14 +346,45 @@ async def _sign_and_execute( # pragma: no cover
transaction_base64: str,
request_id: str,
) -> Dict[str, Any]:
"""Sign with payer (if configured) and Privy, then execute."""
"""
Replace blockhash, sign with payer (if configured) and Privy, then send.

Jupiter's /execute endpoint always times out with 504, so we:
1. Get a fresh blockhash from our RPC
2. Replace the blockhash in Jupiter's transaction
3. Sign with our keys
4. Send directly via our RPC
"""
try:
tx_to_sign = transaction_base64
# RPC URL is required - Jupiter's execute doesn't work
if not self._rpc_url:
return {
"status": "error",
"message": "rpc_url must be configured for trigger orders. Jupiter's execute endpoint is broken.",
}

# Step 1: Get fresh blockhash from our RPC
blockhash_result = await get_fresh_blockhash(self._rpc_url)
if "error" in blockhash_result:
return {
"status": "error",
"message": f"Failed to get blockhash: {blockhash_result['error']}",
}

# If payer is configured, sign with payer first
fresh_blockhash = blockhash_result["blockhash"]
logger.info(f"Got fresh blockhash: {fresh_blockhash}")

# Step 2: Replace blockhash in the transaction
tx_with_new_blockhash = replace_blockhash_in_transaction(
transaction_base64, fresh_blockhash
)

tx_to_sign = tx_with_new_blockhash

# Step 3: If payer is configured, sign with payer first
if self._payer_private_key:
payer_keypair = Keypair.from_base58_string(self._payer_private_key)
tx_bytes = base64.b64decode(transaction_base64)
tx_bytes = base64.b64decode(tx_with_new_blockhash)
transaction = VersionedTransaction.from_bytes(tx_bytes)
message_bytes = to_bytes_versioned(transaction.message)
payer_signature = payer_keypair.sign_message(message_bytes)
Expand All @@ -361,7 +396,7 @@ async def _sign_and_execute( # pragma: no cover
)
tx_to_sign = base64.b64encode(bytes(partially_signed)).decode("utf-8")

# Sign with Privy using the official SDK
# Step 4: Sign with Privy using the official SDK
signed_tx = await _privy_sign_transaction(
privy_client,
wallet_id,
Expand All @@ -375,28 +410,25 @@ async def _sign_and_execute( # pragma: no cover
"message": "Failed to sign transaction via Privy.",
}

# Send via RPC or fallback to Jupiter execute
if self._rpc_url:
# Use configured RPC (Helius recommended) instead of Jupiter's execute endpoint
tx_bytes = base64.b64decode(signed_tx)
send_result = await send_raw_transaction_with_priority(
rpc_url=self._rpc_url,
tx_bytes=tx_bytes,
)
if not send_result.get("success"):
return {
"status": "error",
"message": send_result.get(
"error", "Failed to send transaction"
),
}
return {"status": "success", "signature": send_result.get("signature")}
else:
# Fallback to Jupiter execute if no RPC configured
exec_result = await trigger.execute(signed_tx, request_id)
if not exec_result.success:
return {"status": "error", "message": exec_result.error}
return {"status": "success", "signature": exec_result.signature}
# Step 5: Send via our RPC
tx_bytes = base64.b64decode(signed_tx)
send_result = await send_raw_transaction_with_priority(
rpc_url=self._rpc_url,
tx_bytes=tx_bytes,
skip_confirmation=False, # Now we can wait - blockhash is from our RPC
confirm_timeout=30.0,
)

if not send_result.get("success"):
return {
"status": "error",
"message": send_result.get("error", "Failed to send transaction"),
}

return {
"status": "success",
"signature": send_result.get("signature"),
}

except Exception as e:
logger.exception(f"Failed to sign and execute: {str(e)}")
Expand Down
165 changes: 139 additions & 26 deletions sakit/utils/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import httpx

from solders.transaction import VersionedTransaction # type: ignore
from solders.message import to_bytes_versioned # type: ignore
from solders.message import to_bytes_versioned, MessageV0 # type: ignore
from solders.hash import Hash # type: ignore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -289,51 +290,82 @@ async def execute( # pragma: no cover
self,
signed_transaction: str,
request_id: str,
max_retries: int = 3,
) -> TriggerExecuteResponse:
"""
Execute a signed trigger order transaction.

Args:
signed_transaction: Base64 encoded signed transaction
request_id: Request ID from create/cancel response
max_retries: Number of retries on 504 timeout (default 3)

Returns:
TriggerExecuteResponse with execution result
"""
import asyncio

payload = {
"signedTransaction": signed_transaction,
"requestId": request_id,
}

try:
# Use longer timeout for execute - Jupiter waits for tx confirmation
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.base_url}/execute",
json=payload,
headers=self._headers,
)
last_error = None
for attempt in range(max_retries + 1):
try:
# Use longer timeout for execute - Jupiter waits for tx confirmation
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.base_url}/execute",
json=payload,
headers=self._headers,
)

# Retry on 504 Gateway Timeout
if response.status_code == 504:
last_error = f"504 Gateway Timeout (attempt {attempt + 1}/{max_retries + 1})"
logger.warning(f"Jupiter execute timed out: {last_error}")
if attempt < max_retries:
await asyncio.sleep(2**attempt) # Exponential backoff
continue
return TriggerExecuteResponse(
success=False,
error=f"Jupiter execute endpoint timed out after {max_retries + 1} attempts",
)

if response.status_code != 200:
return TriggerExecuteResponse(
success=False,
error=f"Failed to execute trigger order: {response.status_code} - {response.text}",
)

data = response.json()
status = data.get("status", "")

if response.status_code != 200:
return TriggerExecuteResponse(
success=False,
error=f"Failed to execute trigger order: {response.status_code} - {response.text}",
success=status.lower() == "success",
status=status,
signature=data.get("signature"),
error=data.get("error"),
code=data.get("code", 0),
raw_response=data,
)

data = response.json()
status = data.get("status", "")

return TriggerExecuteResponse(
success=status.lower() == "success",
status=status,
signature=data.get("signature"),
error=data.get("error"),
code=data.get("code", 0),
raw_response=data,
except httpx.TimeoutException as e:
last_error = (
f"Timeout: {str(e)} (attempt {attempt + 1}/{max_retries + 1})"
)
except Exception as e:
logger.exception("Failed to execute trigger order")
return TriggerExecuteResponse(success=False, error=str(e))
logger.warning(f"Jupiter execute request timed out: {last_error}")
if attempt < max_retries:
await asyncio.sleep(2**attempt)
continue
except Exception as e:
logger.exception("Failed to execute trigger order")
return TriggerExecuteResponse(success=False, error=str(e))

return TriggerExecuteResponse(
success=False,
error=last_error or "Failed after all retries",
)

async def get_orders( # pragma: no cover
self,
Expand Down Expand Up @@ -393,6 +425,87 @@ async def get_orders( # pragma: no cover
return {"success": False, "error": str(e), "orders": []}


def replace_blockhash_in_transaction( # pragma: no cover
transaction_base64: str,
new_blockhash: str,
) -> str:
"""
Replace the blockhash in a versioned transaction with a fresh one.

This is necessary when the transaction was created by an external service
(like Jupiter) using their RPC, but we need to send it via a different RPC.
The blockhash must be recent and recognized by the sending RPC.

Args:
transaction_base64: Base64 encoded unsigned transaction
new_blockhash: Fresh blockhash string from our RPC

Returns:
Base64 encoded transaction with replaced blockhash (still unsigned)
"""
transaction_bytes = base64.b64decode(transaction_base64)
transaction = VersionedTransaction.from_bytes(transaction_bytes)

# Get the message and replace blockhash
old_message = transaction.message

# Create new message with updated blockhash
new_message = MessageV0(
header=old_message.header,
account_keys=old_message.account_keys,
recent_blockhash=Hash.from_string(new_blockhash),
instructions=old_message.instructions,
address_table_lookups=old_message.address_table_lookups,
)

# Create new unsigned transaction with new message
# Use default signatures (all zeros) since it's unsigned
new_transaction = VersionedTransaction.populate(
new_message,
transaction.signatures, # Keep placeholder signatures
)

return base64.b64encode(bytes(new_transaction)).decode("utf-8")


async def get_fresh_blockhash(rpc_url: str) -> dict: # pragma: no cover
"""
Get a fresh blockhash from the RPC.

Args:
rpc_url: The RPC endpoint URL

Returns:
Dict with 'blockhash' and 'lastValidBlockHeight' on success,
or 'error' on failure.
"""
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getLatestBlockhash",
"params": [{"commitment": "confirmed"}],
}

try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(rpc_url, json=payload)
if response.status_code != 200:
return {"error": f"RPC error: {response.status_code}"}

data = response.json()
if "error" in data:
return {"error": f"RPC error: {data['error']}"}

result = data.get("result", {}).get("value", {})
return {
"blockhash": result.get("blockhash"),
"lastValidBlockHeight": result.get("lastValidBlockHeight"),
}
except Exception as e:
logger.exception("Failed to get fresh blockhash")
return {"error": str(e)}


def sign_trigger_transaction( # pragma: no cover
transaction_base64: str,
sign_message_func,
Expand Down
31 changes: 25 additions & 6 deletions sakit/utils/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ async def send_raw_transaction_with_priority( # pragma: no cover
tx_bytes: bytes,
skip_preflight: bool = True,
max_retries: int = 5,
confirm_timeout: float = 30.0,
skip_confirmation: bool = False,
) -> Dict[str, any]:
"""
Send a raw transaction to Solana RPC, with Helius priority fee logging if applicable.
Expand All @@ -132,10 +134,14 @@ async def send_raw_transaction_with_priority( # pragma: no cover
tx_bytes: The serialized signed transaction bytes
skip_preflight: Skip preflight simulation (default True for pre-signed txs)
max_retries: Number of retries for the RPC call
confirm_timeout: Max seconds to wait for confirmation (default 30s)
skip_confirmation: Skip waiting for confirmation entirely (default False)

Returns:
Dict with 'success' and 'signature' on success, or 'error' on failure.
"""
import asyncio

try:
client = AsyncClient(rpc_url)
try:
Expand Down Expand Up @@ -183,19 +189,32 @@ async def send_raw_transaction_with_priority( # pragma: no cover
signature = str(result.value)
logger.info(f"Transaction sent: {signature}")

# Confirm the transaction
# Skip confirmation if requested (useful for pre-signed txs with external blockhashes)
if skip_confirmation:
return {"success": True, "signature": signature}

# Confirm the transaction with timeout
try:
confirmation = await client.confirm_transaction(
result.value,
commitment=Confirmed,
sleep_seconds=0.5,
last_valid_block_height=None,
confirmation = await asyncio.wait_for(
client.confirm_transaction(
result.value,
commitment=Confirmed,
sleep_seconds=0.5,
last_valid_block_height=None,
),
timeout=confirm_timeout,
)
if confirmation.value and confirmation.value[0].err:
return {
"success": False,
"error": f"Transaction failed: {confirmation.value[0].err}",
}
except asyncio.TimeoutError:
logger.warning(
f"Transaction confirmation timed out after {confirm_timeout}s. "
f"Transaction may still land. Signature: {signature}"
)
# Return success anyway - tx was sent, just not confirmed in time
except Exception as confirm_error:
logger.debug(f"Could not confirm transaction: {confirm_error}")
# Still return success since transaction was sent
Expand Down
Loading