Skip to content

Commit c80be59

Browse files
update (#24)
1 parent 0e5c237 commit c80be59

File tree

7 files changed

+277
-14
lines changed

7 files changed

+277
-14
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "sakit"
3-
version = "13.3.4"
3+
version = "13.3.5"
44
description = "Solana Agent Kit"
55
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
66
license = "MIT"

sakit/privy_recurring.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from solana_agent import AutoTool, ToolRegistry
1313
from privy import AsyncPrivyAPI
1414
from privy.lib.authorization_signatures import get_authorization_signature
15+
from cryptography.hazmat.primitives import serialization
1516
from solders.keypair import Keypair # type: ignore
1617
from solders.transaction import VersionedTransaction # type: ignore
1718
from solders.message import to_bytes_versioned # type: ignore
@@ -21,6 +22,60 @@
2122
logger = logging.getLogger(__name__)
2223

2324

25+
def _convert_key_to_pkcs8_pem(key_string: str) -> str:
26+
"""Convert a private key to PKCS#8 PEM format for the Privy SDK."""
27+
private_key_string = key_string.replace("wallet-auth:", "")
28+
29+
# Try loading as PKCS#8 PEM format first
30+
try:
31+
private_key_pem = f"-----BEGIN PRIVATE KEY-----\n{private_key_string}\n-----END PRIVATE KEY-----"
32+
serialization.load_pem_private_key(
33+
private_key_pem.encode("utf-8"), password=None
34+
)
35+
return private_key_string
36+
except (ValueError, TypeError):
37+
pass
38+
39+
# Try as EC PRIVATE KEY (SEC1) format
40+
try:
41+
ec_key_pem = f"-----BEGIN EC PRIVATE KEY-----\n{private_key_string}\n-----END EC PRIVATE KEY-----"
42+
private_key = serialization.load_pem_private_key(
43+
ec_key_pem.encode("utf-8"), password=None
44+
)
45+
pkcs8_bytes = private_key.private_bytes(
46+
encoding=serialization.Encoding.PEM,
47+
format=serialization.PrivateFormat.PKCS8,
48+
encryption_algorithm=serialization.NoEncryption(),
49+
)
50+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
51+
lines = pkcs8_pem.strip().split("\n")
52+
return "".join(lines[1:-1])
53+
except (ValueError, TypeError):
54+
pass
55+
56+
# Try loading as raw DER bytes
57+
try:
58+
der_bytes = base64.b64decode(private_key_string)
59+
try:
60+
private_key = serialization.load_der_private_key(der_bytes, password=None)
61+
except (ValueError, TypeError):
62+
from cryptography.hazmat.primitives.asymmetric import ec
63+
64+
private_key = ec.derive_private_key(
65+
int.from_bytes(der_bytes, "big"), ec.SECP256R1()
66+
)
67+
pkcs8_bytes = private_key.private_bytes(
68+
encoding=serialization.Encoding.PEM,
69+
format=serialization.PrivateFormat.PKCS8,
70+
encryption_algorithm=serialization.NoEncryption(),
71+
)
72+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
73+
lines = pkcs8_pem.strip().split("\n")
74+
return "".join(lines[1:-1])
75+
except (ValueError, TypeError) as e:
76+
raise ValueError(f"Could not load private key: {e}")
77+
78+
2479
async def _get_privy_embedded_wallet(
2580
privy_client: AsyncPrivyAPI, user_id: str
2681
) -> Optional[Dict[str, str]]:
@@ -82,6 +137,9 @@ async def _privy_sign_transaction(
82137
) -> Optional[str]:
83138
"""Sign a Solana transaction via Privy using the official SDK."""
84139
try:
140+
# Convert the key to PKCS#8 format expected by the SDK
141+
pkcs8_key = _convert_key_to_pkcs8_pem(signing_key)
142+
85143
url = f"https://api.privy.io/v1/wallets/{wallet_id}/rpc"
86144
body = {
87145
"method": "signTransaction",
@@ -93,7 +151,7 @@ async def _privy_sign_transaction(
93151
body=body,
94152
method="POST",
95153
app_id=privy_client.app_id,
96-
private_key=signing_key,
154+
private_key=pkcs8_key,
97155
)
98156

99157
result = await privy_client.wallets.rpc(

sakit/privy_transfer.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from solana_agent import AutoTool, ToolRegistry
55
from privy import AsyncPrivyAPI
66
from privy.lib.authorization_signatures import get_authorization_signature
7+
from cryptography.hazmat.primitives import serialization
78
from sakit.utils.wallet import SolanaWalletClient
89
from sakit.utils.transfer import TokenTransferManager
910

@@ -14,6 +15,60 @@
1415
TOKEN_2022_PROGRAM_ID = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb"
1516

1617

18+
def _convert_key_to_pkcs8_pem(key_string: str) -> str:
19+
"""Convert a private key to PKCS#8 PEM format for the Privy SDK."""
20+
private_key_string = key_string.replace("wallet-auth:", "")
21+
22+
# Try loading as PKCS#8 PEM format first
23+
try:
24+
private_key_pem = f"-----BEGIN PRIVATE KEY-----\n{private_key_string}\n-----END PRIVATE KEY-----"
25+
serialization.load_pem_private_key(
26+
private_key_pem.encode("utf-8"), password=None
27+
)
28+
return private_key_string
29+
except (ValueError, TypeError):
30+
pass
31+
32+
# Try as EC PRIVATE KEY (SEC1) format
33+
try:
34+
ec_key_pem = f"-----BEGIN EC PRIVATE KEY-----\n{private_key_string}\n-----END EC PRIVATE KEY-----"
35+
private_key = serialization.load_pem_private_key(
36+
ec_key_pem.encode("utf-8"), password=None
37+
)
38+
pkcs8_bytes = private_key.private_bytes(
39+
encoding=serialization.Encoding.PEM,
40+
format=serialization.PrivateFormat.PKCS8,
41+
encryption_algorithm=serialization.NoEncryption(),
42+
)
43+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
44+
lines = pkcs8_pem.strip().split("\n")
45+
return "".join(lines[1:-1])
46+
except (ValueError, TypeError):
47+
pass
48+
49+
# Try loading as raw DER bytes
50+
try:
51+
der_bytes = base64.b64decode(private_key_string)
52+
try:
53+
private_key = serialization.load_der_private_key(der_bytes, password=None)
54+
except (ValueError, TypeError):
55+
from cryptography.hazmat.primitives.asymmetric import ec
56+
57+
private_key = ec.derive_private_key(
58+
int.from_bytes(der_bytes, "big"), ec.SECP256R1()
59+
)
60+
pkcs8_bytes = private_key.private_bytes(
61+
encoding=serialization.Encoding.PEM,
62+
format=serialization.PrivateFormat.PKCS8,
63+
encryption_algorithm=serialization.NoEncryption(),
64+
)
65+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
66+
lines = pkcs8_pem.strip().split("\n")
67+
return "".join(lines[1:-1])
68+
except (ValueError, TypeError) as e:
69+
raise ValueError(f"Could not load private key: {e}")
70+
71+
1772
async def get_privy_embedded_wallet(
1873
user_id: str, privy_client: AsyncPrivyAPI
1974
) -> Optional[Dict[str, str]]:
@@ -65,25 +120,26 @@ async def privy_sign_and_send(
65120
) -> Dict[str, Any]:
66121
"""Sign and send a transaction using Privy SDK."""
67122
try:
123+
# Convert key to PKCS#8 PEM format for SDK compatibility
124+
pem_key = _convert_key_to_pkcs8_pem(privy_auth_key)
125+
68126
# Generate authorization signature using SDK
69127
url = f"https://api.privy.io/v1/wallets/{wallet_id}/rpc"
70128
body = {
71129
"method": "signAndSendTransaction",
72-
"caip2": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp",
73130
"params": {"transaction": encoded_tx, "encoding": "base64"},
74131
}
75132
auth_signature = get_authorization_signature(
76133
url=url,
77134
body=body,
78135
privy_app_id=privy_client.app_id,
79-
privy_authorization_key=privy_auth_key,
136+
privy_authorization_key=pem_key,
80137
)
81138

82-
# Use SDK's wallets.rpc method with caip2 for signAndSendTransaction
139+
# Use SDK's wallets.rpc method for signAndSendTransaction
83140
result = await privy_client.wallets.rpc(
84141
wallet_id=wallet_id,
85142
method="signAndSendTransaction",
86-
caip2="solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp",
87143
params={"transaction": encoded_tx, "encoding": "base64"},
88144
privy_authorization_signature=auth_signature,
89145
)

sakit/privy_trigger.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from solana_agent import AutoTool, ToolRegistry
1313
from privy import AsyncPrivyAPI
1414
from privy.lib.authorization_signatures import get_authorization_signature
15+
from cryptography.hazmat.primitives import serialization
1516
from solders.keypair import Keypair # type: ignore
1617
from solders.transaction import VersionedTransaction # type: ignore
1718
from solders.message import to_bytes_versioned # type: ignore
@@ -21,6 +22,60 @@
2122
logger = logging.getLogger(__name__)
2223

2324

25+
def _convert_key_to_pkcs8_pem(key_string: str) -> str:
26+
"""Convert a private key to PKCS#8 PEM format for the Privy SDK."""
27+
private_key_string = key_string.replace("wallet-auth:", "")
28+
29+
# Try loading as PKCS#8 PEM format first
30+
try:
31+
private_key_pem = f"-----BEGIN PRIVATE KEY-----\n{private_key_string}\n-----END PRIVATE KEY-----"
32+
serialization.load_pem_private_key(
33+
private_key_pem.encode("utf-8"), password=None
34+
)
35+
return private_key_string
36+
except (ValueError, TypeError):
37+
pass
38+
39+
# Try as EC PRIVATE KEY (SEC1) format
40+
try:
41+
ec_key_pem = f"-----BEGIN EC PRIVATE KEY-----\n{private_key_string}\n-----END EC PRIVATE KEY-----"
42+
private_key = serialization.load_pem_private_key(
43+
ec_key_pem.encode("utf-8"), password=None
44+
)
45+
pkcs8_bytes = private_key.private_bytes(
46+
encoding=serialization.Encoding.PEM,
47+
format=serialization.PrivateFormat.PKCS8,
48+
encryption_algorithm=serialization.NoEncryption(),
49+
)
50+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
51+
lines = pkcs8_pem.strip().split("\n")
52+
return "".join(lines[1:-1])
53+
except (ValueError, TypeError):
54+
pass
55+
56+
# Try loading as raw DER bytes
57+
try:
58+
der_bytes = base64.b64decode(private_key_string)
59+
try:
60+
private_key = serialization.load_der_private_key(der_bytes, password=None)
61+
except (ValueError, TypeError):
62+
from cryptography.hazmat.primitives.asymmetric import ec
63+
64+
private_key = ec.derive_private_key(
65+
int.from_bytes(der_bytes, "big"), ec.SECP256R1()
66+
)
67+
pkcs8_bytes = private_key.private_bytes(
68+
encoding=serialization.Encoding.PEM,
69+
format=serialization.PrivateFormat.PKCS8,
70+
encryption_algorithm=serialization.NoEncryption(),
71+
)
72+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
73+
lines = pkcs8_pem.strip().split("\n")
74+
return "".join(lines[1:-1])
75+
except (ValueError, TypeError) as e:
76+
raise ValueError(f"Could not load private key: {e}")
77+
78+
2479
async def _get_privy_embedded_wallet(
2580
privy_client: AsyncPrivyAPI, user_id: str
2681
) -> Optional[Dict[str, str]]:
@@ -82,6 +137,9 @@ async def _privy_sign_transaction(
82137
) -> Optional[str]:
83138
"""Sign a Solana transaction via Privy using the official SDK."""
84139
try:
140+
# Convert the key to PKCS#8 format expected by the SDK
141+
pkcs8_key = _convert_key_to_pkcs8_pem(signing_key)
142+
85143
url = f"https://api.privy.io/v1/wallets/{wallet_id}/rpc"
86144
body = {
87145
"method": "signTransaction",
@@ -93,7 +151,7 @@ async def _privy_sign_transaction(
93151
body=body,
94152
method="POST",
95153
app_id=privy_client.app_id,
96-
private_key=signing_key,
154+
private_key=pkcs8_key,
97155
)
98156

99157
result = await privy_client.wallets.rpc(

sakit/privy_ultra.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from solana_agent import AutoTool, ToolRegistry
1313
from privy import AsyncPrivyAPI
1414
from privy.lib.authorization_signatures import get_authorization_signature
15+
from cryptography.hazmat.primitives import serialization
1516
from solders.keypair import Keypair
1617
from solders.transaction import VersionedTransaction
1718
from solders.message import to_bytes_versioned
@@ -21,6 +22,81 @@
2122
logger = logging.getLogger(__name__)
2223

2324

25+
def convert_key_to_pkcs8_pem(key_string: str) -> str:
26+
"""Convert a private key to PKCS#8 PEM format for the Privy SDK.
27+
28+
The SDK expects keys in PKCS#8 PEM format (-----BEGIN PRIVATE KEY-----).
29+
This function handles keys in various formats:
30+
- Already in PKCS#8 PEM base64
31+
- SEC1 EC format (-----BEGIN EC PRIVATE KEY-----)
32+
- Raw DER bytes (PKCS#8 or SEC1)
33+
34+
Returns the base64 content (without headers) in PKCS#8 format.
35+
"""
36+
# Strip wallet-auth: prefix if present
37+
private_key_string = key_string.replace("wallet-auth:", "")
38+
39+
# Try loading as PKCS#8 PEM format first
40+
try:
41+
private_key_pem = f"-----BEGIN PRIVATE KEY-----\n{private_key_string}\n-----END PRIVATE KEY-----"
42+
private_key = serialization.load_pem_private_key(
43+
private_key_pem.encode("utf-8"), password=None
44+
)
45+
# Already in correct format, return as-is
46+
return private_key_string
47+
except (ValueError, TypeError):
48+
pass
49+
50+
# Try as EC PRIVATE KEY (SEC1) format
51+
try:
52+
ec_key_pem = f"-----BEGIN EC PRIVATE KEY-----\n{private_key_string}\n-----END EC PRIVATE KEY-----"
53+
private_key = serialization.load_pem_private_key(
54+
ec_key_pem.encode("utf-8"), password=None
55+
)
56+
# Convert to PKCS#8 format
57+
pkcs8_bytes = private_key.private_bytes(
58+
encoding=serialization.Encoding.PEM,
59+
format=serialization.PrivateFormat.PKCS8,
60+
encryption_algorithm=serialization.NoEncryption(),
61+
)
62+
# Extract base64 content (remove headers and newlines)
63+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
64+
lines = pkcs8_pem.strip().split("\n")
65+
base64_content = "".join(lines[1:-1])
66+
return base64_content
67+
except (ValueError, TypeError):
68+
pass
69+
70+
# Try loading as raw DER bytes
71+
try:
72+
der_bytes = base64.b64decode(private_key_string)
73+
# Try PKCS#8 DER
74+
try:
75+
private_key = serialization.load_der_private_key(der_bytes, password=None)
76+
except (ValueError, TypeError):
77+
# Try SEC1 DER
78+
from cryptography.hazmat.primitives.asymmetric import ec
79+
80+
private_key = ec.derive_private_key(
81+
int.from_bytes(der_bytes, "big"), ec.SECP256R1()
82+
)
83+
# Convert to PKCS#8 PEM format
84+
pkcs8_bytes = private_key.private_bytes(
85+
encoding=serialization.Encoding.PEM,
86+
format=serialization.PrivateFormat.PKCS8,
87+
encryption_algorithm=serialization.NoEncryption(),
88+
)
89+
pkcs8_pem = pkcs8_bytes.decode("utf-8")
90+
lines = pkcs8_pem.strip().split("\n")
91+
base64_content = "".join(lines[1:-1])
92+
return base64_content
93+
except (ValueError, TypeError) as e:
94+
raise ValueError(
95+
f"Could not load private key. Expected base64-encoded PKCS#8 or SEC1 format. "
96+
f"Generate with: openssl genpkey -algorithm EC -pkeyopt ec_paramgen_curve:P-256. Error: {e}"
97+
)
98+
99+
24100
async def get_privy_embedded_wallet(
25101
privy_client: AsyncPrivyAPI, user_id: str
26102
) -> Optional[Dict[str, str]]:
@@ -116,6 +192,9 @@ async def privy_sign_transaction(
116192
Uses the wallets.rpc method with method="signTransaction" for Solana.
117193
The SDK handles authorization signature generation automatically when provided.
118194
"""
195+
# Convert the key to PKCS#8 format expected by the SDK
196+
pkcs8_key = convert_key_to_pkcs8_pem(signing_key)
197+
119198
# Generate the authorization signature using the SDK's utility
120199
url = f"https://api.privy.io/v1/wallets/{wallet_id}/rpc"
121200
body = {
@@ -129,7 +208,7 @@ async def privy_sign_transaction(
129208
body=body,
130209
method="POST",
131210
app_id=privy_client.app_id,
132-
private_key=signing_key,
211+
private_key=pkcs8_key,
133212
)
134213

135214
# Call the SDK's rpc method for Solana signTransaction

0 commit comments

Comments
 (0)