Skip to content

Commit

Permalink
fix (Core): Optimise and support messages >100000000 bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisburr committed Oct 16, 2024
1 parent 57d2a43 commit 2f5c0c2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
22 changes: 12 additions & 10 deletions src/DIRAC/Core/DISET/private/Transports/BaseTransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BaseTransport:
def __init__(self, stServerAddress, bServerMode=False, **kwargs):
self.bServerMode = bServerMode
self.extraArgsDict = kwargs
self.byteStream = b""
self.byteStream = bytearray()
self.packetSize = 1048576 # 1MiB
self.stServerAddress = stServerAddress
self.peerCredentials = {}
Expand Down Expand Up @@ -191,7 +191,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
maxBufferSize = max(maxBufferSize, 0)
try:
# Look either for message length of keep alive magic string
iSeparatorPosition = self.byteStream.find(b":", 0, 10)
iSeparatorPosition = self.byteStream.find(b":")
keepAliveMagicLen = len(BaseTransport.keepAliveMagic)
isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0
# While not found the message length or the ka, keep receiving
Expand All @@ -204,17 +204,18 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
if not retVal["Value"]:
return S_ERROR("Peer closed connection")
# New data!
self.byteStream += retVal["Value"]
# Look again for either message length of ka magic string
iSeparatorPosition = self.byteStream.find(b":", 0, 10)
self.byteStream.extend(retVal["Value"])

# Look again for either message length or keep alive magic string
iSeparatorPosition = self.byteStream.find(b":")
isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0
# Over the limit?
if maxBufferSize and len(self.byteStream) > maxBufferSize and iSeparatorPosition == -1:
return S_ERROR(f"Read limit exceeded ({maxBufferSize} chars)")
# Keep alive magic!
if isKeepAlive:
gLogger.debug("Received keep alive header")
# Remove the ka magic from the buffer and process the keep alive
# Remove the keep-alive magic from the buffer and process the keep-alive
self.byteStream = self.byteStream[keepAliveMagicLen:]
return self.__processKeepAlive(maxBufferSize, blockAfterKeepAlive)
# From here it must be a real message!
Expand All @@ -225,7 +226,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
if readSize >= pkgSize:
# If we already have all the data we need
data = pkgData[:pkgSize]
self.byteStream = pkgData[pkgSize:]
self.byteStream = self.byteStream[pkgSize + iSeparatorPosition + 1 :]
else:
# If we still need to read stuff
pkgMem = BytesIO()
Expand All @@ -245,11 +246,12 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
# Data is here! take it out from the bytestream, dencode and return
if readSize == pkgSize:
data = pkgMem.getvalue()
self.byteStream = b""
else: # readSize > pkgSize:
self.byteStream = bytearray() # Reset the byteStream
else:
pkgMem.seek(0, 0)
data = pkgMem.read(pkgSize)
self.byteStream = pkgMem.read()
self.byteStream = bytearray(pkgMem.read()) # store the rest in bytearray

try:
data = MixedEncode.decode(data)[0]
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Utilities/DEncode.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def decode(data):
if not data:
return data
# print("DECODE FUNCTION : %s" % g_dDecodeFunctions[ sStream [ iIndex ] ])
if not isinstance(data, bytes):
if not isinstance(data, (bytes, bytearray)):
raise NotImplementedError("This should never happen")
return g_dDecodeFunctions[data[0]](data, 0)

Expand Down

0 comments on commit 2f5c0c2

Please sign in to comment.