Skip to content

Commit

Permalink
Add new apiComm.py helper module
Browse files Browse the repository at this point in the history
  • Loading branch information
ikagod authored Apr 13, 2024
1 parent ae9ebba commit 0457f63
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 37 deletions.
5 changes: 0 additions & 5 deletions ikabot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
t = gettext.translation("config", localedir, languages=languages, fallback=True)
_ = t.gettext

# only use common browsers
if random.randint(0, 1) == 0:
default_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
else:
default_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/110.0"

update_msg = ""

Expand Down
7 changes: 2 additions & 5 deletions ikabot/function/autoPirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ikabot.helpers.pedirInfo import *
from ikabot.helpers.process import run, set_child_mode
from ikabot.helpers.varios import timeStringToSec, wait
from ikabot.helpers.dns import getAddress
from ikabot.helpers.apiComm import getPiratesCaptchaSolution

t = gettext.translation("buyResources", localedir, languages=languages, fallback=True)
_ = t.gettext
Expand Down Expand Up @@ -274,10 +274,7 @@ def resolveCaptcha(session, picture):
"decaptcha" not in session_data
or session_data["decaptcha"]["name"] == "default"
):
address = getAddress(session, publicAPIServerDomain)

files = {"upload_file": picture}
captcha = requests.post(address, files=files).text
captcha = getPiratesCaptchaSolution(session, picture)
return captcha
elif session_data["decaptcha"]["name"] == "custom":
files = {"upload_file": picture}
Expand Down
77 changes: 77 additions & 0 deletions ikabot/helpers/apiComm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-

import traceback
from requests import get, post
from ikabot.helpers.dns import getAddress
from ikabot.config import *


def getNewBlackBoxToken(session):
"""This function returns a newly generated blackbox token from the API
Parameters
----------
session : ikabot.web.session.Session
Session object
Returns
-------
token : str
blackbox token
"""
address = getAddress(session, publicAPIServerDomain) + '/v1/token' + '?user_agent=' + session.user_agent
response = get(address, verify=do_ssl_verify, timeout=900)
assert response.status_code == 200, 'API response code is not OK: ' + str(response.status_code) + '\n' + response.text
response = response.json()
if 'status' in response and response['status'] == 'error':
raise Exception(response['message'])
return 'tra:' + response

def getInteractiveCaptchaSolution(session, text_image, drag_icons):
"""This function returns the solution of the interactive captcha
Parameters
----------
session : ikabot.web.session.Session
Session object
drag_icons : bytes
the image that contains 4 other images
text_image : bytes
the image that contaisn the text
Returns
-------
solution : str
solution of the captcha
"""
address = getAddress(session, publicAPIServerDomain) + '/v1/decaptcha/lobby'
files = {'text_image': text_image, 'drag_icons': drag_icons}
response = post(address, files=files, verify=do_ssl_verify, timeout=900)
assert response.status_code == 200, 'API response code is not OK: ' + str(response.status_code) + '\n' + response.text
response = response.json()
if 'status' in response and response['status'] == 'error':
raise Exception(response['message'])
return response

def getPiratesCaptchaSolution(session, image):
"""This function returns the solution of the pirates captcha
Parameters
----------
session : ikabot.web.session.Session
Session object
image : bytes
the image to be solved
Returns
-------
solution : str
solution of the captcha
"""
address = getAddress(session, publicAPIServerDomain) + '/v1/decaptcha/pirate'
files = {'image': image}
response = post(address, files=files, verify=do_ssl_verify, timeout=900)
assert response.status_code == 200, 'API response code is not OK: ' + str(response.status_code) + '\n' + response.text
response = response.json()
if 'status' in response and response['status'] == 'error':
raise Exception(response['message'])
return response

4 changes: 2 additions & 2 deletions ikabot/helpers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ def getAddress(session = type('test', (object,), {'writeLog': lambda *args, **kw
try:
address = getAddressWithSocket(session, domain)
assert '.' in address or ':' in address.replace('http://', ''), "Bad server address: " + address
return address
return address.replace('/ikagod/ikabot','')
except Exception as e:
session.writeLog("Failed to obtain public API address from socket, falling back to nslookup: " + str(e), level=logLevels.WARN, module= __name__, logTraceback=True)
try:
address = getAddressWithNSlookup(session, domain)
assert '.' in address or ':' in address.replace('http://', ''), "Bad server address: " + address #address is either hostname, IPv4 or IPv6
return address
return address.replace('/ikagod/ikabot','')
except Exception as e:
session.writeLog("Failed to obtain public API address from both socket and nslookup: " + str(e), level=logLevels.ERROR, module= __name__, logTraceback=True)
raise e
Expand Down
22 changes: 7 additions & 15 deletions ikabot/web/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ikabot.helpers.gui import banner
from ikabot.helpers.pedirInfo import read
from ikabot.helpers.varios import getDateTime
from ikabot.helpers.dns import getAddress
from ikabot.helpers.apiComm import getInteractiveCaptchaSolution, getNewBlackBoxToken

t = gettext.translation("session", localedir, languages=languages, fallback=True)
_ = t.gettext
Expand Down Expand Up @@ -272,11 +272,7 @@ def __load_new_blackbox_token(self):
try:
if self.padre:
print("Obtaining new blackbox token, please wait...")
address = getAddress(self, publicAPIServerDomain).replace("/ikagod/ikabot", "/token")
blackbox_token = (
"tra:"
+ requests.get(address, verify=config.do_ssl_verify, timeout=900).text
)
blackbox_token = getNewBlackBoxToken(self)
assert any(
c.isupper() for c in blackbox_token
), "The token must contain uppercase letters."
Expand All @@ -298,8 +294,10 @@ def __load_new_blackbox_token(self):
if not self.padre: # only exit if running in a child process because user won't be looking at the console to provide the cookies
sys.exit('Failed to regenerate blackbox token')
self.blackbox = 'tra:JVqc1fosb5TG-E2h5Ak7bZLEB23OOq0SN2ms0QM1fOFErx5DdafM_kFmmMoXeNsAMmKxBClbi-MIOmyRwwYrXY_VPrAVe-pih7nrEEKFqtwOVcQzmgZrkMLyO6kMOl-Rw-gaXZG26CtcgbP2G01_5FJ_xggtX5G26CtQgrToToO2GX3eD0eq3D6fADCTw_YrX8EjWL73WIvwU7UaT7QYSXvcPnLWDnOk1TeY0Ac6nc81ZpfI_i9hkcMlXpLH7B5QdafqD0FzuimY_2vQ9SdXoA5xn8T2Jk6VBHPaRqvU-Stur_1EkNX6LFyEyzqpEHzhBjh7oNICWM05pAVzmMr6K1mMuuoPQXGZ7GPMMqb5YcImi_0iVITILaMMb9T5K1uD1kutJ4z-bZa77R1Fde0dTX2t8CBkqdL7IFKVuuwcb-ZPtSl85EWpDoCl1wdr3Ua8IZO84RNFapzfBDZonAEyZMf_ZZX2WLvyI4e58CKEue1Ti8H6XsEih7gZSYDjGlGHt-kiVIS981aIuBx-4hR1rOBFqNwUet8XTX6zFjttn8T2OV6QwiOIwPgxZpv_M2fI-y5gkPIqW4_EKovA8SVdjfMjicHxI1uQwvpglcX-M5b6MWmc0TRs0QNmy_9hmvswZp3RNm6TxfccTpHC9CxcgbP2LV-PtOYpW4-05ilOgLIXeKoNQHncEXbZOm6m3kCk20Cl1TZnywJlygBil839LZHyJ43A9leO8SmK6yNcviKH6RlQiL73J43yKmOYyC1jiLrsEUOGq90PQ3iu5BxTuOxRgbHjHE1_tBdHqeATeNxCda7fRabWCG2iBDx1pdtApts9oNM0l8krXpL3Wb73KoztHVG05RhIqc4AMleJzPEjVYnvJFe6Hn-w6Et930Ch0TRkl8wAYsT5X5j5LJH0VrvwVbnqHH3fE3evFEV22DlxqNs-cNYHOGmf0AIyZMb_M2iNv_EWSIuw4hRIgbPrHoO05BV6quJDd63gQqjcDHCp2j-kBmug0Qk_ctM0bNIIOG6k3BF0rBFGf7cbf7EWerLrJInB-ChelPotUoS22w1QgbPnFUV5rOAXTH616htRgbjsEUOGq90Pc6beQXKl2AhpzwU5m9ADaZwCZpvUBDWW9ylav_YsXI_G9luLw_Qmh7_0KI7G_DNqotsUSn7iRaraEUN4rxRNr9QGOF2P0gQ1bp7SAjZrosf5PGGTxfcnWY266h1KfLUJOnGWyQo6bpPGBztvndQFPJa77R9EdrneEEKw4E2CuyWX0D-pF4sDdKcMP3Op3haN-W6gC3GWyPofUZTF_DBmi70AJVeJuuoYSX6j1QcsXqHG-Cp4vSBYuwJcofUmng5fzDeeAESazTGr7TiR-mrgRInbH3jhL2O6I1i9D3rqQpDoOoztRbTpS7kdg-VSh9E0eLnuP4LQA0-U-yBSmPpo3BBdpfpvxzJz3y2EuC6R_mWdAmu5K4XKOnzQSn_EF2qs9UOv5EuY6xxu0RdhyBxkqhF14i9luCVeuAx1ti589juzAXvIQI_TOrMCVrkxVoi63xFUeavdKpkTfOhUtdoMUoe15Qo8bJThQqUOfPBf0jpfktT5K1ukEobrV3yu3iuM7xRGdsUYPW-f9xxOfq_fDT5zmMsNMmSUBnyh1BVGdq_dDTZbjb0Eacw3psv9Q3Wl1gY2Z5fI7R9Plf5w1TuqIkd5v_AhWIa22w0_ZJbZ_jBilMT2KleHuucZUqbXDjNmp9cLMGOk2Aw6aprKJEl7rdIER7UqlgInXKA'
print('Failed to obtain new blackbox token from API: ' + str(e))
print('Using fallback blackbox token: ' + self.blackbox) # using expired fallback token here so that user can insert cookie manually since blackbox generation failed at this point
print(f'{bcolors.RED}[ERROR]{bcolors.ENDC} Failed to obtain new blackbox token from API: ' + str(e)) # using expired fallback token here so that user can insert cookie manually since blackbox generation failed at this point
print('Please report this issue to developers on github or the discord server!!')
print('Using fallback blackbox token: ' + self.blackbox)
enter()

def __login(self, retries=0):
if not self.logged:
Expand Down Expand Up @@ -611,13 +609,7 @@ def __login(self, retries=0):
).content
data = {}
try:

address = getAddress(self, publicAPIServerDomain)

files = {"text_image": text_image, "drag_icons": drag_icons}
captcha = self.s.post(
address, files=files
).text
captcha = getInteractiveCaptchaSolution(self, text_image, drag_icons)
if not captcha.isnumeric():
raise Exception(
"Failed to resolve interactive captcha automatically. Server returned bad data: {}".format(
Expand Down
4 changes: 1 addition & 3 deletions tests/api/test_decaptcha_lobby_route.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
@pytest.fixture
def base_url():
address = getAddress(domain = publicAPIServerDomain)
# Remove "/ikagod/ikabot" from the URL
base_url = address.replace("/ikagod/ikabot", "")
return base_url
return base_url + '/v1'


def test_decaptcha_without_data_should_return_error(base_url):
Expand Down
4 changes: 1 addition & 3 deletions tests/api/test_decaptcha_pirate_route.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
@pytest.fixture
def base_url():
address = getAddress(domain = publicAPIServerDomain)
# Remove "/ikagod/ikabot" from the URL
base_url = address.replace("/ikagod/ikabot", "")
return base_url
return base_url + '/v1'


def test_decaptcha_without_data_should_return_error(base_url):
Expand Down
7 changes: 3 additions & 4 deletions tests/api/test_token_route.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from ikabot.helpers.dns import getAddress
from ikabot.config import *

valid_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.2'

@pytest.fixture
def base_url():
address = getAddress(domain = publicAPIServerDomain)
# Remove "/ikagod/ikabot" from the URL
base_url = address.replace("/ikagod/ikabot", "")
return base_url
return base_url + '/v1'


def test_get_token_should_return_status_code_ok(base_url):
response = requests.get(f"{base_url}/token")
response = requests.get(f"{base_url}/token?user_agent=" + valid_user_agent)
assert response.status_code == 200

0 comments on commit 0457f63

Please sign in to comment.