Skip to content

Commit

Permalink
fix python
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jul 8, 2024
1 parent a37c1ad commit ede9b71
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
30 changes: 22 additions & 8 deletions python-client/pypegasus/base/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ def __init__(self):
def is_valid(self):
return self.address == 0

def from_string(self, host_port):
host, port = host_port.split(':')
self.address = socket.ntohl(struct.unpack("I", socket.inet_aton(host))[0])
def from_string(self, ip_port):
ip, port = ip_port.split(':')
self.address = socket.ntohl(struct.unpack("I", socket.inet_aton(ip))[0])
self.address = (self.address << 32) + (int(port) << 16) + 1 # TODO why + 1?
return True

def to_host_port(self):
def to_ip_port(self):
s = []
address = self.address
port = (address >> 16) & 0xFFFF
Expand Down Expand Up @@ -307,44 +307,58 @@ def __ne__(self, other):


# TODO(yingchun): host_port is now just a place holder and not well implemented, need improve it
class host_port_types(Enum):
kHostTypeInvalid = 0
kHostTypeIpv4 = 1
kHostTypeGroup = 2


class host_port:

thrift_spec = (
(1, TType.STRING, 'host', None, None, ), # 1
(2, TType.I16, 'port', None, None, ), # 2
(3, TType.I08, 'type', None, None, ), # 3
)

def __init__(self):
self.host = ""
self.port = 0
self.type = host_port_types.kHostTypeInvalid

def is_valid(self):
return self.port != 0
return self.type != host_port_types.kHostTypeInvalid

def from_string(self, host_port):
host_and_port = host_port.split(':')
def from_string(self, host_port_str):
host_and_port = host_port_str.split(':')
if len(host_and_port) != 2:
return False
self.host = host_and_port[0]
self.port = int(host_and_port[1])
# TODO(yingchun): Maybe it's not true, improve it
self.type = host_port_types.kHostTypeIpv4
return True

def to_host_port(self):
if not self.is_valid():
return None, None
return self.host, self.port

def read(self, iprot):
self.host = iprot.readString()
self.port = iprot.readI16()
self.type = iprot.readByte()

def write(self, oprot):
oprot.writeString(self.host)
oprot.writeI16(self.port)
oprot.writeByte(self.type)

def validate(self):
return

def __hash__(self):
return hash(self.host) ^ self.port
return hash(self.host) ^ self.port ^ self.type

def __repr__(self):
L = ['%s=%r' % (key, value)
Expand Down
22 changes: 11 additions & 11 deletions python-client/pypegasus/pgclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ def __init__(self, table_name, timeout):
def add_meta_server(self, meta_addr):
rpc_addr = rpc_address()
if rpc_addr.from_string(meta_addr):
host_port_list = meta_addr.split(':')
if not len(host_port_list) == 2:
ip_port = meta_addr.split(':')
if not len(ip_port) == 2:
return False

host, port = host_port_list[0], int(host_port_list[1])
self.addr_list.append((host, port))
ip, port = ip_port[0], int(ip_port[1])
self.addr_list.append((ip, port))

return True
else:
Expand All @@ -281,9 +281,9 @@ def got_results(self, res):

def query(self):
ds = []
for (host, port) in self.addr_list:
for (ip, port) in self.addr_list:
rpc_addr = rpc_address()
rpc_addr.from_string(host + ':' + str(port))
rpc_addr.from_string(ip + ':' + str(port))
if rpc_addr in self.session_dict:
self.session_dict[rpc_addr].close()

Expand All @@ -294,7 +294,7 @@ def query(self):
None,
self,
self.timeout
).connectTCP(host, port, self.timeout)
).connectTCP(ip, port, self.timeout)
d.addCallbacks(self.got_conn, self.got_err)
d.addCallbacks(self.query_one, self.got_err)
ds.append(d)
Expand Down Expand Up @@ -345,7 +345,7 @@ def update_cfg(self, resp):
if rpc_addr in connected_rpc_addrs or rpc_addr.address == 0:
continue

host, port = rpc_addr.to_host_port()
ip, port = rpc_addr.to_ip_port()
if rpc_addr in self.session_dict:
self.session_dict[rpc_addr].close()

Expand All @@ -356,7 +356,7 @@ def update_cfg(self, resp):
None,
self.container,
self.timeout
).connectTCP(host, port, self.timeout)
).connectTCP(ip, port, self.timeout)
connected_rpc_addrs[rpc_addr] = 1
d.addCallbacks(self.got_conn, self.got_err)
ds.append(d)
Expand Down Expand Up @@ -642,8 +642,8 @@ def __init__(self, meta_addrs=None, table_name='',
self.table = Table(table_name, self, timeout)
self.meta_session_manager = MetaSessionManager(table_name, timeout)
if isinstance(meta_addrs, list):
for host_port in meta_addrs:
self.meta_session_manager.add_meta_server(host_port)
for meta_addr in meta_addrs:
self.meta_session_manager.add_meta_server(meta_addr)
PegasusHash.populate_table()
self.timeout_times = 0
self.update_partition = False
Expand Down

0 comments on commit ede9b71

Please sign in to comment.