|
13 | 13 | import httpx |
14 | 14 |
|
15 | 15 | from solders.transaction import VersionedTransaction # type: ignore |
16 | | -from solders.message import to_bytes_versioned # type: ignore |
| 16 | +from solders.message import to_bytes_versioned, MessageV0 # type: ignore |
| 17 | +from solders.hash import Hash # type: ignore |
17 | 18 |
|
18 | 19 | logger = logging.getLogger(__name__) |
19 | 20 |
|
@@ -289,51 +290,82 @@ async def execute( # pragma: no cover |
289 | 290 | self, |
290 | 291 | signed_transaction: str, |
291 | 292 | request_id: str, |
| 293 | + max_retries: int = 3, |
292 | 294 | ) -> TriggerExecuteResponse: |
293 | 295 | """ |
294 | 296 | Execute a signed trigger order transaction. |
295 | 297 |
|
296 | 298 | Args: |
297 | 299 | signed_transaction: Base64 encoded signed transaction |
298 | 300 | request_id: Request ID from create/cancel response |
| 301 | + max_retries: Number of retries on 504 timeout (default 3) |
299 | 302 |
|
300 | 303 | Returns: |
301 | 304 | TriggerExecuteResponse with execution result |
302 | 305 | """ |
| 306 | + import asyncio |
| 307 | + |
303 | 308 | payload = { |
304 | 309 | "signedTransaction": signed_transaction, |
305 | 310 | "requestId": request_id, |
306 | 311 | } |
307 | 312 |
|
308 | | - try: |
309 | | - # Use longer timeout for execute - Jupiter waits for tx confirmation |
310 | | - async with httpx.AsyncClient(timeout=120.0) as client: |
311 | | - response = await client.post( |
312 | | - f"{self.base_url}/execute", |
313 | | - json=payload, |
314 | | - headers=self._headers, |
315 | | - ) |
| 313 | + last_error = None |
| 314 | + for attempt in range(max_retries + 1): |
| 315 | + try: |
| 316 | + # Use longer timeout for execute - Jupiter waits for tx confirmation |
| 317 | + async with httpx.AsyncClient(timeout=120.0) as client: |
| 318 | + response = await client.post( |
| 319 | + f"{self.base_url}/execute", |
| 320 | + json=payload, |
| 321 | + headers=self._headers, |
| 322 | + ) |
| 323 | + |
| 324 | + # Retry on 504 Gateway Timeout |
| 325 | + if response.status_code == 504: |
| 326 | + last_error = f"504 Gateway Timeout (attempt {attempt + 1}/{max_retries + 1})" |
| 327 | + logger.warning(f"Jupiter execute timed out: {last_error}") |
| 328 | + if attempt < max_retries: |
| 329 | + await asyncio.sleep(2**attempt) # Exponential backoff |
| 330 | + continue |
| 331 | + return TriggerExecuteResponse( |
| 332 | + success=False, |
| 333 | + error=f"Jupiter execute endpoint timed out after {max_retries + 1} attempts", |
| 334 | + ) |
| 335 | + |
| 336 | + if response.status_code != 200: |
| 337 | + return TriggerExecuteResponse( |
| 338 | + success=False, |
| 339 | + error=f"Failed to execute trigger order: {response.status_code} - {response.text}", |
| 340 | + ) |
| 341 | + |
| 342 | + data = response.json() |
| 343 | + status = data.get("status", "") |
316 | 344 |
|
317 | | - if response.status_code != 200: |
318 | 345 | return TriggerExecuteResponse( |
319 | | - success=False, |
320 | | - error=f"Failed to execute trigger order: {response.status_code} - {response.text}", |
| 346 | + success=status.lower() == "success", |
| 347 | + status=status, |
| 348 | + signature=data.get("signature"), |
| 349 | + error=data.get("error"), |
| 350 | + code=data.get("code", 0), |
| 351 | + raw_response=data, |
321 | 352 | ) |
322 | | - |
323 | | - data = response.json() |
324 | | - status = data.get("status", "") |
325 | | - |
326 | | - return TriggerExecuteResponse( |
327 | | - success=status.lower() == "success", |
328 | | - status=status, |
329 | | - signature=data.get("signature"), |
330 | | - error=data.get("error"), |
331 | | - code=data.get("code", 0), |
332 | | - raw_response=data, |
| 353 | + except httpx.TimeoutException as e: |
| 354 | + last_error = ( |
| 355 | + f"Timeout: {str(e)} (attempt {attempt + 1}/{max_retries + 1})" |
333 | 356 | ) |
334 | | - except Exception as e: |
335 | | - logger.exception("Failed to execute trigger order") |
336 | | - return TriggerExecuteResponse(success=False, error=str(e)) |
| 357 | + logger.warning(f"Jupiter execute request timed out: {last_error}") |
| 358 | + if attempt < max_retries: |
| 359 | + await asyncio.sleep(2**attempt) |
| 360 | + continue |
| 361 | + except Exception as e: |
| 362 | + logger.exception("Failed to execute trigger order") |
| 363 | + return TriggerExecuteResponse(success=False, error=str(e)) |
| 364 | + |
| 365 | + return TriggerExecuteResponse( |
| 366 | + success=False, |
| 367 | + error=last_error or "Failed after all retries", |
| 368 | + ) |
337 | 369 |
|
338 | 370 | async def get_orders( # pragma: no cover |
339 | 371 | self, |
@@ -393,6 +425,87 @@ async def get_orders( # pragma: no cover |
393 | 425 | return {"success": False, "error": str(e), "orders": []} |
394 | 426 |
|
395 | 427 |
|
| 428 | +def replace_blockhash_in_transaction( # pragma: no cover |
| 429 | + transaction_base64: str, |
| 430 | + new_blockhash: str, |
| 431 | +) -> str: |
| 432 | + """ |
| 433 | + Replace the blockhash in a versioned transaction with a fresh one. |
| 434 | +
|
| 435 | + This is necessary when the transaction was created by an external service |
| 436 | + (like Jupiter) using their RPC, but we need to send it via a different RPC. |
| 437 | + The blockhash must be recent and recognized by the sending RPC. |
| 438 | +
|
| 439 | + Args: |
| 440 | + transaction_base64: Base64 encoded unsigned transaction |
| 441 | + new_blockhash: Fresh blockhash string from our RPC |
| 442 | +
|
| 443 | + Returns: |
| 444 | + Base64 encoded transaction with replaced blockhash (still unsigned) |
| 445 | + """ |
| 446 | + transaction_bytes = base64.b64decode(transaction_base64) |
| 447 | + transaction = VersionedTransaction.from_bytes(transaction_bytes) |
| 448 | + |
| 449 | + # Get the message and replace blockhash |
| 450 | + old_message = transaction.message |
| 451 | + |
| 452 | + # Create new message with updated blockhash |
| 453 | + new_message = MessageV0( |
| 454 | + header=old_message.header, |
| 455 | + account_keys=old_message.account_keys, |
| 456 | + recent_blockhash=Hash.from_string(new_blockhash), |
| 457 | + instructions=old_message.instructions, |
| 458 | + address_table_lookups=old_message.address_table_lookups, |
| 459 | + ) |
| 460 | + |
| 461 | + # Create new unsigned transaction with new message |
| 462 | + # Use default signatures (all zeros) since it's unsigned |
| 463 | + new_transaction = VersionedTransaction.populate( |
| 464 | + new_message, |
| 465 | + transaction.signatures, # Keep placeholder signatures |
| 466 | + ) |
| 467 | + |
| 468 | + return base64.b64encode(bytes(new_transaction)).decode("utf-8") |
| 469 | + |
| 470 | + |
| 471 | +async def get_fresh_blockhash(rpc_url: str) -> dict: # pragma: no cover |
| 472 | + """ |
| 473 | + Get a fresh blockhash from the RPC. |
| 474 | +
|
| 475 | + Args: |
| 476 | + rpc_url: The RPC endpoint URL |
| 477 | +
|
| 478 | + Returns: |
| 479 | + Dict with 'blockhash' and 'lastValidBlockHeight' on success, |
| 480 | + or 'error' on failure. |
| 481 | + """ |
| 482 | + payload = { |
| 483 | + "jsonrpc": "2.0", |
| 484 | + "id": 1, |
| 485 | + "method": "getLatestBlockhash", |
| 486 | + "params": [{"commitment": "confirmed"}], |
| 487 | + } |
| 488 | + |
| 489 | + try: |
| 490 | + async with httpx.AsyncClient(timeout=10.0) as client: |
| 491 | + response = await client.post(rpc_url, json=payload) |
| 492 | + if response.status_code != 200: |
| 493 | + return {"error": f"RPC error: {response.status_code}"} |
| 494 | + |
| 495 | + data = response.json() |
| 496 | + if "error" in data: |
| 497 | + return {"error": f"RPC error: {data['error']}"} |
| 498 | + |
| 499 | + result = data.get("result", {}).get("value", {}) |
| 500 | + return { |
| 501 | + "blockhash": result.get("blockhash"), |
| 502 | + "lastValidBlockHeight": result.get("lastValidBlockHeight"), |
| 503 | + } |
| 504 | + except Exception as e: |
| 505 | + logger.exception("Failed to get fresh blockhash") |
| 506 | + return {"error": str(e)} |
| 507 | + |
| 508 | + |
396 | 509 | def sign_trigger_transaction( # pragma: no cover |
397 | 510 | transaction_base64: str, |
398 | 511 | sign_message_func, |
|
0 commit comments