-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
90 lines (80 loc) · 2.94 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import json
import time
import base58
import requests
from config import RPC, PUB_KEY, client
from solana.transaction import Signature
def find_data(data, field):
if isinstance(data, dict):
if field in data:
return data[field]
else:
for value in data.values():
result = find_data(value, field)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = find_data(item, field)
if result is not None:
return result
return None
def get_token_balance(base_mint: str):
try:
headers = {"accept": "application/json", "content-type": "application/json"}
payload = {
"id": 1,
"jsonrpc": "2.0",
"method": "getTokenAccountsByOwner",
"params": [
PUB_KEY,
{"mint": base_mint},
{"encoding": "jsonParsed"},
],
}
response = requests.post(RPC, json=payload, headers=headers)
ui_amount = find_data(response.json(), "uiAmount")
return float(ui_amount)
except Exception as e:
return None
def get_coin_data(mint_str):
url = f"https://frontend-api.pump.fun/coins/{mint_str}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"If-None-Match": 'W/"41b-5sP6oeDs1tG//az0nj9tRYbL22A"',
"Priority": "u=4"
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
return None
def confirm_txn(txn_sig, max_retries=20, retry_interval=3):
retries = 0
if isinstance(txn_sig, str):
txn_sig = Signature.from_string(txn_sig)
while retries < max_retries:
try:
txn_res = client.get_transaction(txn_sig, encoding="json", commitment="confirmed", max_supported_transaction_version=0)
txn_json = json.loads(txn_res.value.transaction.meta.to_json())
if txn_json['err'] is None:
print("Transaction confirmed... try count:", retries+1)
return True
print("Error: Transaction not confirmed. Retrying...")
if txn_json['err']:
print("Transaction failed.")
return False
except Exception as e:
print("Awaiting confirmation... try count:", retries+1)
retries += 1
time.sleep(retry_interval)
print("Max retries reached. Transaction confirmation failed.")
return None