Skip to content

Commit

Permalink
Merge pull request #356 from KantaTamura/httpcache-profile
Browse files Browse the repository at this point in the history
Support PPE profiling for HTTPCache / HTTPCachedFS
  • Loading branch information
k5342 authored Oct 21, 2024
2 parents 09e15ed + 02f90b8 commit 3078de4
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 153 deletions.
141 changes: 78 additions & 63 deletions pfio/cache/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib.parse
from typing import Any, Dict, Generator, List, Optional

from pfio._profiler import record
from pfio.cache import Cache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,7 +74,8 @@ class HTTPConnector(object):
def __init__(self,
url: str,
bearer_token_path: Optional[str] = None,
timeout: int = 3):
timeout: int = 3,
trace: bool = False):
parsed = urllib.parse.urlparse(url)
if parsed.scheme == "":
url = "http://" + url
Expand Down Expand Up @@ -105,57 +107,63 @@ def __init__(self,
if self.bearer_token_path is not None:
self._token_read_now()

self.trace = trace

def put(self, suffix: str, data: bytes) -> bool:
conn: http.client.HTTPConnection
with self.conn.get(self.host) as conn:
try:
conn.request(
"PUT",
url=self.path + suffix,
body=data,
headers=self._header_with_token()
)
res = conn.getresponse()
res.read()

if res.status == 201:
return True
else:
logger.warning(
"put: unexpected status code {}".format(res.status)
)
with record("pfio.cache.http.conn:put", trace=self.trace):
conn: http.client.HTTPConnection
with self.conn.get(self.host) as conn:
try:
with record("pfio.cache.http.conn:put:request", trace=self.trace):
conn.request(
"PUT",
url=self.path + suffix,
body=data,
headers=self._header_with_token()
)
res = conn.getresponse()
res.read()

if res.status == 201:
return True
else:
logger.warning(
"put: unexpected status code {}".format(res.status)
)
return False

except (http.client.HTTPException, OSError) as e:
logger.warning("put: {} {}".format(e, type(e)))
conn.close() # fix error state
return False

except (http.client.HTTPException, OSError) as e:
logger.warning("put: {} {}".format(e, type(e)))
conn.close() # fix error state
return False

def get(self, suffix: str) -> Optional[bytes]:
conn: http.client.HTTPConnection
with self.conn.get(self.host) as conn:
try:
conn.request(
"GET",
url=self.path + suffix,
headers=self._header_with_token()
)
res = conn.getresponse()
data = res.read()

if res.status == 200:
return data
elif res.status == 404:
with record("pfio.cache.http.conn:get", trace=self.trace):
conn: http.client.HTTPConnection
with self.conn.get(self.host) as conn:
try:
with record("pfio.cache.http.conn:get:request", trace=self.trace):
conn.request(
"GET",
url=self.path + suffix,
headers=self._header_with_token()
)
res = conn.getresponse()
data = res.read()

if res.status == 200:
return data
elif res.status == 404:
return None
else:
logger.warning(
"get: unexpected status code {}".format(res.status)
)
return None
except (http.client.HTTPException, OSError) as e:
logger.warning("get: {} {}".format(e, type(e)))
conn.close() # fix error state
return None
else:
logger.warning(
"get: unexpected status code {}".format(res.status)
)
return None
except (http.client.HTTPException, OSError) as e:
logger.warning("get: {} {}".format(e, type(e)))
conn.close() # fix error state
return None

def _header_with_token(self) -> dict:
if self.bearer_token_path is None:
Expand Down Expand Up @@ -203,6 +211,9 @@ class HTTPCache(Cache):
do_pickle (bool):
Do automatic pickle and unpickle inside the cache.
trace (bool):
Enable PPE Profiler.
.. note:: This feature is experimental.
"""
Expand All @@ -211,13 +222,15 @@ def __init__(self,
length: int,
url: str,
bearer_token_path=None,
do_pickle=False):
do_pickle=False, trace=False):
super().__init__()

self.length = length
assert self.length > 0

self.connector = HTTPConnector(url, bearer_token_path)
self.trace = trace
self.connector = HTTPConnector(
url, bearer_token_path, trace=self.trace)
self.do_pickle = do_pickle

def __len__(self):
Expand All @@ -232,22 +245,24 @@ def multithread_safe(self):
return True

def put(self, i: int, data: Any):
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))
if self.do_pickle:
data = pickle.dumps(data)
with record("pfio.cache.http:put", trace=self.trace):
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))
if self.do_pickle:
data = pickle.dumps(data)

return self.connector.put(str(i), data)
return self.connector.put(str(i), data)

def get(self, i: int) -> Any:
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))
with record("pfio.cache.http:get", trace=self.trace):
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))

data = self.connector.get(str(i))
data = self.connector.get(str(i))

if self.do_pickle and data is not None:
return pickle.loads(data)
else:
return data
if self.do_pickle and data is not None:
return pickle.loads(data)
else:
return data
3 changes: 2 additions & 1 deletion pfio/cache/multiprocess_file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ def _put(self, i, data):

index_entry = pack('Qq', data_pos, len(data))
with record("pfio.cache.multiprocessfile:put:write_index", trace=self.trace):
assert os.pwrite(self.cache_fd, index_entry, index_ofst) == self.buflen
assert os.pwrite(self.cache_fd, index_entry,
index_ofst) == self.buflen
with record("pfio.cache.multiprocessfile:put:write_data", trace=self.trace):
assert os.pwrite(self.cache_fd, data, data_pos) == len(data)
with record("pfio.cache.multiprocessfile:put:sync", trace=self.trace):
Expand Down
6 changes: 6 additions & 0 deletions pfio/v2/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class FS(abc.ABC):

def __init__(self):
self.pid = os.getpid()
self.trace = False

@property
def cwd(self):
Expand Down Expand Up @@ -154,6 +155,11 @@ def is_forked(self):
assert hasattr(self, 'pid')
return self.pid != os.getpid()

@property
def is_traced(self):
assert hasattr(self, 'trace')
return self.trace

def close(self) -> None:
pass

Expand Down
Loading

0 comments on commit 3078de4

Please sign in to comment.