Skip to content

Commit

Permalink
add profiler to http_cache_fs
Browse files Browse the repository at this point in the history
Signed-off-by: KantaTamura <[email protected]>
  • Loading branch information
KantaTamura committed Oct 21, 2024
1 parent d1b6042 commit f29ff19
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 88 deletions.
6 changes: 6 additions & 0 deletions pfio/v2/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class FS(abc.ABC):

def __init__(self):
self.pid = os.getpid()
self.trace = False

@property
def cwd(self):
Expand Down Expand Up @@ -154,6 +155,11 @@ def is_forked(self):
assert hasattr(self, 'pid')
return self.pid != os.getpid()

@property
def is_trace(self):
assert hasattr(self, 'trace')
return self.trace

def close(self) -> None:
pass

Expand Down
184 changes: 96 additions & 88 deletions pfio/v2/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from types import TracebackType
from typing import Any, Iterator, Optional, Type, Union

from pfio._profiler import record
from pfio.cache import HTTPConnector

from .fs import FS, FileStat
Expand Down Expand Up @@ -53,8 +54,9 @@ def __init__(self,
super().__init__()

self.fs = fs
self.trace = self.fs.is_trace
self.max_cache_size = max_cache_size
self.conn = HTTPConnector(url, bearer_token_path)
self.conn = HTTPConnector(url, bearer_token_path, trace=self.trace)
if url.endswith("/"):
self.url = url
else:
Expand All @@ -64,12 +66,13 @@ def open(self,
file_path: str,
mode: str = 'rb',
*args, **kwargs) -> io.IOBase:
if 'r' in mode:
kwargs['mode'] = mode
return _HTTPCacheIOBase(file_path, self.conn, self.fs,
self.max_cache_size, args, kwargs)
else:
return self.fs.open(file_path, mode, *args, **kwargs)
with record("pfio.v2.http_cache:open", trace=self.trace):
if 'r' in mode:
kwargs['mode'] = mode
return _HTTPCacheIOBase(file_path, self.conn, self.fs,
self.max_cache_size, args, kwargs)
else:
return self.fs.open(file_path, mode, *args, **kwargs)

def _reset(self):
self.fs._reset()
Expand Down Expand Up @@ -126,70 +129,73 @@ def __init__(self,
self.whole_file: Optional[bytes] = None
self.pos: Optional[int] = None
self.fp: Optional[io.RawIOBase] = None
self.trace = self.fs.is_trace

self._closed = False

def _load_file(self):
if self.whole_file is not None:
return

if self.fp is not None:
return

# Try HTTPCache.
data = self.conn.get(self.cache_path)
if data is not None:
self.whole_file = data
self.pos = 0
return

# Check size in underlying fs.
stat = self.fs.stat(self.file_path)
if stat.size < self.max_cache_size:
# The filesize is smaller than max_cache_size so let's cache it

# Read whole file
with self.fs.open(self.file_path,
*self.open_args, **self.open_kwargs) as fp:
self.whole_file = fp.read(-1)
self.pos = 0

# Put it to HTTPCache.
self.conn.put(self.cache_path, self.whole_file)
else:
# The file is larger than max_cache_size
print(
"HTTPCachedFS: Too big data ({} bytes), skipping cache".format(
stat.size
with record("pfio.v2.http_cache:load_file", trace=self.trace):
if self.whole_file is not None:
return

if self.fp is not None:
return

# Try HTTPCache.
data = self.conn.get(self.cache_path)
if data is not None:
self.whole_file = data
self.pos = 0
return

# Check size in underlying fs.
stat = self.fs.stat(self.file_path)
if stat.size < self.max_cache_size:
# The filesize is smaller than max_cache_size so let's cache it

# Read whole file
with self.fs.open(self.file_path,
*self.open_args, **self.open_kwargs) as fp:
self.whole_file = fp.read(-1)
self.pos = 0

# Put it to HTTPCache.
self.conn.put(self.cache_path, self.whole_file)
else:
# The file is larger than max_cache_size
print(
"HTTPCachedFS: Too big data ({} bytes), skipping cache".format(
stat.size
)
)
)

# Access through underlying filesystem
self.fp = self.fs.open(self.file_path,
*self.open_args, **self.open_kwargs)
# Access through underlying filesystem
self.fp = self.fs.open(self.file_path,
*self.open_args, **self.open_kwargs)

def read(self, size=-1) -> bytes:
self._load_file()
if self.whole_file is not None:
assert self.pos is not None

if len(self.whole_file) <= self.pos:
return b''
elif size <= 0:
data = self.whole_file[self.pos:]
else:
end = min(self.pos + size, len(self.whole_file))
data = self.whole_file[self.pos:end]

self.pos += len(data)
return data
elif self.fp is not None:
data_from_fp = self.fp.read(size)
if data_from_fp is not None:
return data_from_fp

print("HTTPCachedFS: failed to read from backend fs")
return b''
with record("pfio.v2.http_cache:read", trace=self.trace):
self._load_file()
if self.whole_file is not None:
assert self.pos is not None

if len(self.whole_file) <= self.pos:
return b''
elif size <= 0:
data = self.whole_file[self.pos:]
else:
end = min(self.pos + size, len(self.whole_file))
data = self.whole_file[self.pos:end]

self.pos += len(data)
return data
elif self.fp is not None:
data_from_fp = self.fp.read(size)
if data_from_fp is not None:
return data_from_fp

print("HTTPCachedFS: failed to read from backend fs")
return b''

def readline(self):
raise NotImplementedError()
Expand Down Expand Up @@ -224,38 +230,40 @@ def seekable(self):
return True

def tell(self):
self._load_file()
with record("pfio.v2.http_cache:tell", trace=self.trace):
self._load_file()

if self.pos is not None:
return self.pos
else:
assert self.fp is not None
return self.fp.tell()
if self.pos is not None:
return self.pos
else:
assert self.fp is not None
return self.fp.tell()

def truncate(self, size=None):
raise io.UnsupportedOperation('truncate')

def seek(self, pos, whence=io.SEEK_SET):
self._load_file()

if self.pos is not None:
if whence in [0, io.SEEK_SET]:
pass
elif whence in [1, io.SEEK_CUR]:
pos += self.pos
elif whence in [2, io.SEEK_END]:
pos += len(self.whole_file)
with record("pfio.v2.http_cache:seek", trace=self.trace):
self._load_file()

if self.pos is not None:
if whence in [0, io.SEEK_SET]:
pass
elif whence in [1, io.SEEK_CUR]:
pos += self.pos
elif whence in [2, io.SEEK_END]:
pos += len(self.whole_file)
else:
raise ValueError('Wrong whence value: {}'.format(whence))

if pos < 0:
raise OSError(22, "[Errno 22] Invalid argument")

self.pos = pos
return self.pos
else:
raise ValueError('Wrong whence value: {}'.format(whence))

if pos < 0:
raise OSError(22, "[Errno 22] Invalid argument")

self.pos = pos
return self.pos
else:
assert self.fp is not None
return self.fp.seek(pos, whence)
assert self.fp is not None
return self.fp.seek(pos, whence)

def writable(self):
return False
Expand Down

0 comments on commit f29ff19

Please sign in to comment.