Skip to content

Commit

Permalink
Merge pull request #43 from SmithSamuelM/main
Browse files Browse the repository at this point in the history
Add basic support for UDP and UXD  Peer non blocking io classes.
  • Loading branch information
SmithSamuelM authored Jan 21, 2025
2 parents 43a19c5 + ef17e13 commit 93764ce
Show file tree
Hide file tree
Showing 13 changed files with 789 additions and 79 deletions.
10 changes: 6 additions & 4 deletions src/hio/base/doing.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ def __call__(self, **kwa):
"""
Returns generator
Does not advance to first yield.
The advance to first yield effectively invodes the enter or open context
The advance to first yield effectively invokes the enter or open context
on the generator.
To enter either call .next or .send(None) on generator
"""
Expand Down Expand Up @@ -567,8 +567,8 @@ def do(self, tymth, *, tock=0.0, **opts):
tyme = (yield (self.tock)) # yields .tock then waits for next send
self.done = self.recur(tyme=tyme)

except GeneratorExit: # close context, forced exit due to .close
self.close()
except GeneratorExit: # close context, forced exit due to .close on generator
self.close() # close method on instance not generator

except Exception as ex: # abort context, forced exit due to uncaught exception
self.abort(ex=ex)
Expand All @@ -581,7 +581,9 @@ def do(self, tymth, *, tock=0.0, **opts):
self.exit()

# return value of yield from or StopIteration.value indicates completion
return self.done # Only returns done state if normal return not close or abort raise
# python 3.13 gh-104770: If a generator returns a value upon being
# closed, the value is now returned by generator.close().
return self.done # Only returns done state if normal return or close not abort raise


def enter(self):
Expand Down
11 changes: 11 additions & 0 deletions src/hio/core/serial/serialing.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ def get(self, bs=None):
return line


def service(self):
"""
Service puts and gets
"""


class ConsoleDoer(doing.Doer):
"""
Basic Console Doer. Wraps console in doer context so opens and closes console
Expand Down Expand Up @@ -194,6 +200,11 @@ def enter(self):
result = self.console.reopen()


def recur(self, tyme):
""""""
self.console.service()


def exit(self):
""""""
self.console.close()
Expand Down
20 changes: 4 additions & 16 deletions src/hio/core/tcp/clienting.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(self,
super(Client, self).__init__(**kwa)
self.tymeout = tymeout if tymeout is not None else self.Tymeout
self.tymer = tyming.Tymer(tymth=self.tymth, duration=self.tymeout) # reconnect retry timer
self.reinitHostPort(ha=ha, hostname=host, port=port)

self.ha = ha or (host, port)
host, port = self.ha
self.hostname = host # host domain name
Expand Down Expand Up @@ -188,20 +188,6 @@ def wind(self, tymth):
self.tymer.wind(tymth)


def reinitHostPort(self, ha=None, hostname=u'127.0.0.1', port=56000):
"""
Reinit self.ha and self.hostname from ha = (host, port) or hostname port
self.ha is of form (host, port) where host is either dns name or ip address
self.hostname is hostname as dns name
host eventually is host ip address output from normalizeHost()
"""
self.ha = ha or (hostname, port)
hostname, port = self.ha
self.hostname = hostname # host domain name
host = coring.normalizeHost(hostname) # ip host address
self.ha = (host, port)


def actualBufSizes(self):
"""
Returns duple of the the actual socket send and receive buffer size
Expand Down Expand Up @@ -559,6 +545,8 @@ def __init__(self,
if context is None: # create context
if not version: # use default context
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
context.verify_flags &= ~ssl.VERIFY_X509_STRICT # XXXX new with python 3.13
# XXXX ToDo create new test certificates that are RFC 5280 compliant
hostify = hostify if hostify is not None else context.check_hostname
context.check_hostname = hostify
certify = certify if certify is not None else context.verify_mode
Expand Down Expand Up @@ -759,7 +747,7 @@ def send(self, data):

class ClientDoer(doing.Doer):
"""
Basic TCP Client
Basic TCP Client Doer
See Doer for inherited attributes, properties, and methods.
Expand Down
2 changes: 2 additions & 0 deletions src/hio/core/tcp/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ def initServerContext(context=None,
if context is None: # create context
if not version: # use default context with default protocol version
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.verify_flags &= ~ssl.VERIFY_X509_STRICT # XXXX new with python 3.13
# XXXX ToDo create new test certificates that are RFC 5280 compliant
context.verify_mode = certify if certify is not None else ssl.CERT_REQUIRED

else: # create context with specified protocol version
Expand Down
185 changes: 161 additions & 24 deletions src/hio/core/udp/udping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
hio.core.udping Module
"""
import sys
import platform
import os
import errno
import socket

from contextlib import contextmanager

from ... import help
from ...base import tyming, doing
from .. import coring

logger = help.ogler.getLogger()

Expand All @@ -19,18 +22,52 @@
UDP_MAX_PACKET_SIZE = min(1024, UDP_MAX_DATAGRAM_SIZE) # assumes IPV6 capable equipment


class Peer(object):

@contextmanager
def openPeer(cls=None, **kwa):
"""
Class to manage non blocking I/O on UDP socket.
Wrapper to create and open UDP Peer instances
When used in with statement block, calls .close() on exit of with block
Parameters:
cls is Class instance of subclass instance
Usage:
with openPeer() as peer0:
peer0.receive()
with openPeer(cls=PeerBig) as peer0:
peer0.receive()
"""
if cls is None:
cls = Peer
try:
peer = cls(**kwa)
peer.reopen()

yield peer

finally:
peer.close()


def __init__(self,

class Peer(tyming.Tymee):
"""Class to manage non blocking I/O on UDP socket.
SubClass of Tymee to enable support for retry tymers as UDP is unreliable.
"""
Tymeout = 0.0 # tymeout in seconds, tymeout of 0.0 means ignore tymeout

def __init__(self, *,
tymeout=None,
ha=None,
host='',
port=55000,
bufsize=1024,
wl=None,
bcast=False):
bcast=False,
**kwa):
"""
Initialization method for instance.
Expand All @@ -42,17 +79,66 @@ def __init__(self,
wl = WireLog instance ref for debug logging or over the wire tx and rx
bcast = Flag if True enables sending to broadcast addresses on socket
"""
super(Peer, self).__init__(**kwa)
self.tymeout = tymeout if tymeout is not None else self.Tymeout
#self.tymer = tyming.Tymer(tymth=self.tymth, duration=self.tymeout) # retry tymer

self.ha = ha or (host, port) # ha = host address duple (host, port)
host, port = self.ha
host = coring.normalizeHost(host) # ip host address
self.ha = (host, port)

self.bs = bufsize
self.wl = wl
self.bcast = bcast

self.ss = None #server's socket needs to be opened
self.ss = None # server's socket needs to be opened
self.opened = False

def actualBufSizes(self):
@property
def host(self):
"""
Property that returns host in .ha duple
"""
return self.ha[0]


@host.setter
def host(self, value):
"""
setter for host property
"""
self.ha = (value, self.port)


@property
def port(self):
"""
Property that returns port in .ha duple
"""
return self.ha[1]


@port.setter
def port(self, value):
"""
setter for port property
"""
self.ha = (self.host, value)



def wind(self, tymth):
"""
Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base.
Updates winds .tymer .tymth
"""
Returns duple of the the actual socket send and receive buffer size
super(Peer, self).wind(tymth)
#self.tymer.wind(tymth)


def actualBufSizes(self):
"""Returns duple of the the actual socket send and receive buffer size
(send, receive)
"""
if not self.ss:
Expand All @@ -62,8 +148,7 @@ def actualBufSizes(self):
self.ss.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))

def open(self):
"""
Opens socket in non blocking mode.
"""Opens socket in non blocking mode.
if socket not closed properly, binding socket gets error
OSError: (48, 'Address already in use')
Expand All @@ -79,11 +164,14 @@ def open(self):
# make socket address and port reusable. doesn't seem to have an effect.
# the SO_REUSEADDR flag tells the kernel to reuse a local socket in
# TIME_WAIT state, without waiting for its natural timeout to expire.
# may want to look at SO_REUSEPORT
# Also use SO_REUSEPORT on linux and darwin
# https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ

self.ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if platform.system() != 'Windows':
self.ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

# setup buffers
if self.ss.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) < self.bs:
self.ss.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.bs)
if self.ss.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) < self.bs:
Expand All @@ -102,28 +190,26 @@ def open(self):
return True

def reopen(self):
"""
Idempotently open socket
"""Idempotently open socket
"""
self.close()
return self.open()

def close(self):
"""
Closes socket and logs if any
"""Closes socket.
"""
if self.ss:
self.ss.close() #close socket
self.ss = None
self.opened = False

def receive(self):
"""
Perform non blocking read on socket.
"""Perform non blocking read on socket.
returns tuple of form (data, sa)
if no data then returns (b'',None)
but always returns a tuple with two elements
Returns:
tuple of form (data, sa)
if no data then returns (b'',None)
but always returns a tuple with two elements
"""
try:
data, sa = self.ss.recvfrom(self.bs) # sa is source (host, port)
Expand All @@ -136,16 +222,15 @@ def receive(self):
return (b'', None) #receive has nothing empty string for data
else:
logger.error("Error receive on UDP %s\n %s\n", self.ha, ex)
raise #re raise exception ex1
raise #re raise exception ex

if self.wl: # log over the wire receive
self.wl.writeRx(data, who=sa)

return (data, sa)

def send(self, data, da):
"""
Perform non blocking send on socket.
"""Perform non blocking send on socket.
data is string in python2 and bytes in python3
da is destination address tuple (destHost, destPort)
Expand All @@ -163,3 +248,55 @@ def send(self, data, da):
return result


def service(self):
"""
Service sends and receives
"""


class PeerDoer(doing.Doer):
"""
Basic UDP Peer Doer
See Doer for inherited attributes, properties, and methods.
Attributes:
.peer is UDP Peer instance
"""

def __init__(self, peer, **kwa):
"""
Initialize instance.
Parameters:
peer is UDP Peer instance
"""
super(PeerDoer, self).__init__(**kwa)
self.peer = peer
if self.tymth:
self.peer.wind(self.tymth)


def wind(self, tymth):
"""
Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base.
Updates winds .tymer .tymth
"""
super(PeerDoer, self).wind(tymth)
self.peer.wind(tymth)


def enter(self):
""""""
self.peer.reopen()


def recur(self, tyme):
""""""
self.peer.service()


def exit(self):
""""""
self.peer.close()
Loading

0 comments on commit 93764ce

Please sign in to comment.