Skip to content

Commit

Permalink
more work on time hinting
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansev committed Nov 7, 2023
1 parent e499041 commit 9edcc4c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
51 changes: 25 additions & 26 deletions alienpy/wb_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
import logging
import traceback
from pathlib import Path
from typing import Union
from typing import Callable, Optional, TYPE_CHECKING, Union
import time

try:
import websockets.exceptions as wb_exceptions
except Exception:
print("websockets module could not be imported! Make sure you can do:\npython3 -c 'import websockets.exceptions as wb_exceptions'", file = sys.stderr, flush = True)
sys.exit(1)
from websockets import WebSocketClientProtocol

#from .global_vars import * # nosec PYL-W0614
from .data_structs import RET, XrdCpArgs
from .global_vars import ALIENPY_GLOBAL_WB, AlienSessionInfo, DEBUG, DEBUG_FILE, DEBUG_TIMING, TIME_CONNECT, TMPDIR, specs_split
from .data_structs import RET
from .global_vars import ALIENPY_GLOBAL_WB, AlienSessionInfo, DEBUG, DEBUG_FILE, DEBUG_TIMING, TIME_CONNECT, TMPDIR
from .setup_logging import print_err, print_out
from .async_tools import syncify
from .wb_async import IsWbConnected, wb_close, wb_create, wb_sendmsg, wb_sendmsg_multi
Expand All @@ -28,7 +28,7 @@
from .connect_ssl import get_certs_names


def wb_create_tryout(host: str, port: Union[str, int], path: str = '/', use_usercert: bool = False, localConnect: bool = False):
def wb_create_tryout(host: str, port: Union[str, int], path: str = '/', use_usercert: bool = False, localConnect: bool = False) -> WebSocketClientProtocol:
"""WebSocket creation with tryouts (configurable by env ALIENPY_CONNECT_TRIES and ALIENPY_CONNECT_TRIES_INTERVAL)"""
wb = None
nr_tries = 0
Expand Down Expand Up @@ -62,7 +62,7 @@ def wb_create_tryout(host: str, port: Union[str, int], path: str = '/', use_user
return wb


def AlienConnect(wb = None, token_args: Union[None, list] = None, use_usercert: bool = False, localConnect: bool = False):
def AlienConnect(wb: Optional[WebSocketClientProtocol] = None, token_args: Optional[list] = None, use_usercert: bool = False, localConnect: bool = False) -> WebSocketClientProtocol:
"""Create a websocket connection to AliEn services either directly to alice-jcentral.cern.ch or trough a local found jbox instance"""
if not token_args: token_args = []
init_begin = time.perf_counter() if (TIME_CONNECT or DEBUG) else None
Expand Down Expand Up @@ -109,11 +109,11 @@ def AlienConnect(wb = None, token_args: Union[None, list] = None, use_usercert:
return wb


def InitConnection(wb = None, token_args: Union[None, list] = None, use_usercert: bool = False, localConnect: bool = False, cmdlist_func = None):
def InitConnection(wb: Optional[WebSocketClientProtocol] = None, token_args: Optional[list] = None, use_usercert: bool = False, localConnect: bool = False, cmdlist_func: Optional[Callable] = None) -> WebSocketClientProtocol:
"""Create a session to AliEn services, including session globals and token regeneration"""
global ALIENPY_GLOBAL_WB

wb = AlienConnect(wb, token_args, use_usercert, localConnect)
wb = AlienConnect(wb, token_args, use_usercert, localConnect) # Always valid, as the program will exit if connection could not be established
ALIENPY_GLOBAL_WB = wb

# NO MATTER WHAT BEFORE ENYTHING ELSE SESSION MUST BE INITIALIZED !!!!!!!!!!!!!!!!
Expand Down Expand Up @@ -146,7 +146,7 @@ def InitConnection(wb = None, token_args: Union[None, list] = None, use_usercert
return wb


def SendMsg(wb, cmdline: str, args: Union[None, list] = None, opts: str = '') -> RET:
def SendMsg(wb: WebSocketClientProtocol, cmdline: str, args: Optional[list] = None, opts: str = '') -> RET:
"""Send a json message to the specified websocket; it will return the server answer"""
if not wb:
msg = "SendMsg:: websocket not initialized"
Expand Down Expand Up @@ -197,7 +197,7 @@ def SendMsg(wb, cmdline: str, args: Union[None, list] = None, opts: str = '') ->
return ret_obj # noqa: R504


def SendMsgMulti(wb, cmds_list: list, opts: str = '') -> list:
def SendMsgMulti(wb: WebSocketClientProtocol, cmds_list: list, opts: str = '') -> list:
"""Send a json message to the specified websocket; it will return the server answer"""
if not wb:
msg = "SendMsg:: websocket not initialized"
Expand Down Expand Up @@ -331,7 +331,7 @@ def retf_print(ret_obj: RET, opts: str = '') -> int:
return ret_obj.exitcode


def token(wb, args: Union[None, list] = None) -> int:
def token(wb: WebSocketClientProtocol, args: Optional[list] = None) -> int:
"""(Re)create the tokencert and tokenkey files"""
if not wb: return 1
if not args: args = []
Expand Down Expand Up @@ -378,7 +378,7 @@ def token(wb, args: Union[None, list] = None) -> int:
return int(0)


def token_regen(wb, args: Union[None, list] = None):
def token_regen(wb: WebSocketClientProtocol, args: Optional[list] = None) -> WebSocketClientProtocol:
"""Do the disconnect, connect with user cert, generate token, re-connect with token procedure"""
wb_usercert = None
if not args: args = []
Expand All @@ -405,7 +405,7 @@ def token_regen(wb, args: Union[None, list] = None):
return wb_token_new


def cd(wb, args: Union[str, list, None] = None, opts: str = '') -> RET:
def cd(wb: WebSocketClientProtocol, args: Union[str, list, None] = None, opts: str = '') -> RET:
"""Override cd to add to home and to prev functions"""
if args is None: args = []
if isinstance(args, str): args = args.split()
Expand All @@ -416,47 +416,46 @@ def cd(wb, args: Union[str, list, None] = None, opts: str = '') -> RET:
return SendMsg(wb, 'cd', args, opts)


def get_list_entries(wb, lfn, fullpath: bool = False) -> list:
def get_list_entries(wb: WebSocketClientProtocol, lfn: str = '', fullpath: bool = False) -> list:
"""return a list of entries of the lfn argument, full paths if 2nd arg is True"""
if not lfn: return []
key = 'path' if fullpath else 'name'
ret_obj = SendMsg(wb, 'ls', ['-nomsg', '-a', '-F', os.path.normpath(lfn)])
if ret_obj.exitcode != 0: return []
return [item[key] for item in ret_obj.ansdict['results']]


def lfn_list(wb, lfn: str = ''):
def lfn_list(wb: WebSocketClientProtocol, lfn: str = '') -> list:
"""Completer function : for a given lfn return all options for latest leaf"""
if not wb: return []
if not lfn: lfn = '.' # AlienSessionInfo['currentdir']
list_lfns = []
lfn_path = Path(lfn)
base_dir = '/' if lfn_path.parent.as_posix() == '/' else f'{lfn_path.parent.as_posix()}/'
name = f'{lfn_path.name}/' if lfn.endswith('/') else lfn_path.name

def item_format(base_dir, name, item):
# print_out(f'\nbase_dir: {base_dir} ; name: {name} ; item: {item}')
def item_format(base_dir: str, name: str, item: str) -> str:
if name.endswith('/') and name != '/':
return f'{name}{item}' if base_dir == './' else f'{base_dir}{name}{item}'
return item if base_dir == './' else f'{base_dir}{item}'

if lfn.endswith('/'):
listing = get_list_entries(wb, lfn)
list_lfns = [item_format(base_dir, name, item) for item in listing]
else:
listing = get_list_entries(wb, base_dir)
list_lfns = [item_format(base_dir, name, item) for item in listing if item.startswith(name)]
return list_lfns
return [item_format(base_dir, name, item) for item in listing]

# we gave an initial name
listing = get_list_entries(wb, base_dir)
return [item_format(base_dir, name, item) for item in listing if item.startswith(name)]


def wb_ping(wb) -> float:
def wb_ping(wb: WebSocketClientProtocol) -> float:
"""Websocket ping function, it will return rtt in ms"""
init_begin = time.perf_counter()
if IsWbConnected(wb):
return float(deltat_ms_perf(init_begin))
return float(-1)


def get_help_srv(wb, cmd: str = '') -> RET:
def get_help_srv(wb: WebSocketClientProtocol, cmd: str = '') -> RET:
"""Return the help option for server-side known commands"""
if not cmd: return RET(1, '', 'No command specified for help request')
return SendMsg(wb, f'{cmd} -h')
Expand Down Expand Up @@ -490,7 +489,7 @@ def msgstr(self) -> str:
def __call__(self) -> tuple:
return (self.cmd, self.args, self.opts)

def __bool__(self):
def __bool__(self) -> bool:
return bool(self.cmd)


Expand Down
16 changes: 8 additions & 8 deletions alienpy/wb_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import socket
import logging
import time
from typing import Union
from typing import Optional, TYPE_CHECKING, Union

try:
import websockets.client as wb_client
Expand All @@ -15,6 +15,7 @@
except Exception:
print("websockets module could not be imported! Make sure you can do:\npython3 -c 'import websockets.client as wb_client'", file = sys.stderr, flush = True)
sys.exit(1)
from websockets import WebSocketClientProtocol

if not os.getenv('ALIENPY_NO_STAGGER'):
try:
Expand All @@ -24,8 +25,7 @@
sys.exit(1)

from .version import ALIENPY_VERSION_STR
from .data_structs import RET, XrdCpArgs
from .global_vars import DEBUG, DEBUG_FILE, DEBUG_TIMING, TMPDIR, specs_split
from .global_vars import DEBUG, DEBUG_FILE, DEBUG_TIMING, TMPDIR
from .tools_nowb import deltat_ms_perf
from .setup_logging import print_err
from .connect_ssl import create_ssl_context, renewCredFilesInfo
Expand All @@ -40,7 +40,7 @@


@syncify
async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', path: str = '/', use_usercert: bool = False, localConnect: bool = False):
async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', path: str = '/', use_usercert: bool = False, localConnect: bool = False) -> Optional[WebSocketClientProtocol]:
"""Create a websocket to wss://host:port/path (it is implied a SSL context)"""
if not host:
msg = 'wb_create:: provided host argument is empty'
Expand Down Expand Up @@ -163,7 +163,7 @@ async def wb_create(host: str = 'localhost', port: Union[str, int] = '8097', pat


@syncify
async def IsWbConnected(wb) -> bool:
async def IsWbConnected(wb: WebSocketClientProtocol) -> bool:
"""Check if websocket is connected with the protocol ping/pong"""
time_begin = time.perf_counter() if DEBUG_TIMING else None
if DEBUG:
Expand All @@ -179,7 +179,7 @@ async def IsWbConnected(wb) -> bool:


@syncify
async def wb_close(wb, code, reason):
async def wb_close(wb: WebSocketClientProtocol, code, reason):
"""Send close to websocket"""
try:
await wb.close(code = code, reason = reason)
Expand All @@ -188,7 +188,7 @@ async def wb_close(wb, code, reason):


@syncify
async def wb_sendmsg(wb, jsonmsg: str) -> str:
async def wb_sendmsg(wb: WebSocketClientProtocol, jsonmsg: str) -> str:
"""The low level async function for send/receive"""
time_begin = time.perf_counter() if DEBUG_TIMING else None
await wb.send(jsonmsg)
Expand All @@ -198,7 +198,7 @@ async def wb_sendmsg(wb, jsonmsg: str) -> str:


@syncify
async def wb_sendmsg_multi(wb, jsonmsg_list: list) -> list:
async def wb_sendmsg_multi(wb: WebSocketClientProtocol, jsonmsg_list: list) -> list:
"""The low level async function for send/receive multiple messages once"""
if not jsonmsg_list: return []
time_begin = time.perf_counter() if DEBUG_TIMING else None
Expand Down

0 comments on commit 9edcc4c

Please sign in to comment.