Skip to content

Commit

Permalink
objects first pass at multi from pathmeta
Browse files Browse the repository at this point in the history
we need mdv6 from augpathlib meta for this to work so at least in dev
there are checks in place to make sure the fs is sane, transitive
metadata has been updated to work with mdv6 as well

haven't cleaned up the old codepaths yet, but e.g. multibads is no
longer needed

dev guide pass kwargs to fetch_files so that --jobs is honored
cli find --fetch now also honors --jobs

pathmeta now also embeds remote errors so we can trace them
  • Loading branch information
tgbugs committed May 5, 2024
1 parent 31dfc72 commit 4411879
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/developer-guide.org
Original file line number Diff line number Diff line change
Expand Up @@ -2702,7 +2702,7 @@ def main(id=None,
fetch_metadata_files(path=path, **kwargs) # FIXME symlink_to
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX fetch_metadata_files does NOT USE the extensions kwarg!
# fetch additional files
fetch_files(path=path)
fetch_files(path=path, **kwargs)

return path

Expand Down
20 changes: 14 additions & 6 deletions sparcur/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,9 +1525,14 @@ def find(self):
limit = self.options.limit
fetch = self.options.fetch
if self.options.refresh:
Async(rate=hz)(deferred(path.remote.refresh)(
update_cache=True, update_data=fetch, size_limit_mb=limit)
for path in paths)
if self.options.jobs <= 1:
[path.remote.refresh(
update_cache=True, update_data=fetch, size_limit_mb=limit)
for path in paths]
else:
Async(rate=hz)(deferred(path.remote.refresh)(
update_cache=True, update_data=fetch, size_limit_mb=limit)
for path in paths)
elif fetch:
def wrap(path):
def inner(*args, **kwargs):
Expand All @@ -1541,9 +1546,12 @@ def inner(*args, **kwargs):

return inner

Async(rate=hz)(deferred(wrap(path))(
size_limit_mb=limit)
for path in paths)
if self.options.jobs <= 1:
[wrap(path)(size_limit_mb=limit) for path in paths]
else:
Async(rate=hz)(deferred(wrap(path))(
size_limit_mb=limit)
for path in paths)

else:
self._print_paths(paths)
Expand Down
2 changes: 1 addition & 1 deletion sparcur/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,10 @@ def restructure(j):
if isinstance(bfobject, self._dp_class):
bfobject.fake_files = []
bfobject._has_multiple_files = False
max_updated = _tsnorm(bfobject._json['content']['updatedAt'])
if 'objects' not in bfobject._json:
log.error(f'{bfobject} has no files!??!')
else:
max_updated = _tsnorm(bfobject._json['content']['updatedAt'])
for i, source in enumerate(
bfobject._json['objects']['source']):
# TODO package id?
Expand Down
157 changes: 122 additions & 35 deletions sparcur/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test():
# bugs
'0c4fc5e8-c332-47d8-9fd8-1fbeb624078c', # dat norm values returning None??? also .DS_Store
#'0967e3b7-09db-4b91-9caf-1090c9d0c437', # jdec, log2(0) also giant 31k files XXX good bench for using sxpr ...
'41ca18b1-c991-4709-892e-8ae98907549b', # JEncode
#'41ca18b1-c991-4709-892e-8ae98907549b', # JEncode # bad filename that has ! and .
'a996cdac-2d00-4ad4-9699-8a75ce29c2f1', # datset description no alt
'aa43eda8-b29a-4c25-9840-ecbd57598afc', # ft reva ultrasound xml format TODO, uuid v1 issues in this one
'c3072708-13f4-45ed-992c-b9a744f6a5f3', # no nums in name for entcomp
Expand Down Expand Up @@ -1672,11 +1672,13 @@ def pathmeta_objmeta(dataset_id, object_id, path, objkeep=tuple(), force=False,
blob['access_control_group_id'] = blob['parent_id']
blob['parent_id'] = object_id

objmeta_blob = objmeta_write_from_pathmeta(dataset_id, object_id, blob, keep=objkeep, force=force, do_write=do_write)
multi = 'multi' in blob and blob['multi']
_do_write = do_write and not multi
objmeta_blob = objmeta_write_from_pathmeta(dataset_id, object_id, blob, keep=objkeep, force=force, do_write=_do_write)

blob['type'] = 'pathmeta' # you can't actually invert everything back to Path because Path is only the name
blob['__pathmeta_version__'] = __pathmeta_version__
return blob, objmeta_blob
return multi, blob, objmeta_blob


def pathmeta_refresh(dataset_id, object_id, path, force=False):
Expand Down Expand Up @@ -1726,7 +1728,7 @@ def pathmeta_refresh(dataset_id, object_id, path, force=False):
return blob, changed


def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False, debug=False, subprocess=False, do_write=True):
def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False, debug=False, subprocess=False, do_write=True, mdv_check=tuple()):
# FIXME TODO probably wire this into sparcron more directly
# to avoid calling this via Async deferred
object_id = path.cache_identifier #RemoteId(path.cache_id)
Expand All @@ -1736,10 +1738,20 @@ def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False,
else:
updated = path.updated_cache_transitive() # something of a hack

#maybe_objmeta_blob = objmeta(dataset_id, object_id, path)
if mdv_check and mdv_check[0] and path.is_broken_symlink():
# XXX normally we won't need to check this at all but in dev
# we have old metadata versions lurking
pure_symlink = pathlib.PurePosixPath(path.readlink(raw=True))
local_name, *parts, data = pure_symlink.parts
_, version, *suffixes = data.split(aug.meta._PathMetaAsSymlink.fieldsep)
vint = int(version[3:])
if vint < 6:
msg = f'pathmeta version {vint} in {dataset_id} means we don\'t have multi, aborting'
raise ValueError(msg)
else:
mdv_check[0] = False

pathmeta_blob, objmeta_blob = pathmeta_objmeta(dataset_id, object_id, path, objkeep=objkeep, force=force, do_write=do_write)
#pathmeta_blob, pathmeta_changed = pathmeta_refresh(dataset_id, object_id, path) # FIXME TODO in point of fact this can run in parallel with extract because there is not any necessary interaction until combine
multi, pathmeta_blob, objmeta_blob = pathmeta_objmeta(dataset_id, object_id, path, objkeep=objkeep, force=force, do_write=do_write)

export_path = extract_export_path(dataset_id, object_id)
done = export_path.exists() # and not pathmeta_changed # FIXME we don't need to rerun the export if the pathmeta changed ... the id does not change
Expand All @@ -1754,16 +1766,17 @@ def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False,
success = 'already-done'
msg = f'{object_id} already done {path}'
log.log(9, msg)
return path, object_id, expex_type, success, updated, pathmeta_blob, objmeta_blob, None, None
return path, object_id, expex_type, success, updated, multi, pathmeta_blob, objmeta_blob, None, None

elif extract_fun is None:
success = 'no-extract-fun-and-objmeta-already-written'
msg = f'{object_id} objmeta already written and no extract_fun {path}'
log.log(9, msg)
return path, object_id, expex_type, success, updated, pathmeta_blob, objmeta_blob, None, None
return path, object_id, expex_type, success, updated, multi, pathmeta_blob, objmeta_blob, None, None

elif debug or not subprocess:
# TODO or raw expex_type in IMPORTANT i.e. stuff that should be in memory ... but that is and optimization
do_write = not multi
try:
blob, status, _e_path = extract(dataset_id.uuid, path.as_posix(), expex_type, extract_fun=extract_fun, force=force, do_write=do_write)
success = True
Expand All @@ -1773,7 +1786,7 @@ def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False,
status = None
log.exception(e)

return path, object_id, expex_type, success, updated, pathmeta_blob, objmeta_blob, blob, status
return path, object_id, expex_type, success, updated, multi, pathmeta_blob, objmeta_blob, blob, status

msg = "don't do this right now"
# likely not needed
Expand Down Expand Up @@ -1820,7 +1833,7 @@ def subprocess_extract(dataset_id, path, time_now, objkeep=tuple(), force=False,
log.exception(e)
success = False

return path, object_id, expex_type, success, updated, pathmeta_blob, None, None # FIXME include pathmeta in the return ...
return path, object_id, expex_type, success, updated, multi, pathmeta_blob, None, None, None # FIXME include pathmeta in the return ...


def from_dataset_path_get_path_to_fetch_and_extract():
Expand Down Expand Up @@ -1945,15 +1958,12 @@ def multibads(dataset_id, rchildren, time_now, debug=False, force=False):
poest_multi = sorted(poest_blobs, key=(lambda poe:poe[0]['remote_inode_id']))

if objdone and exdone and not force:
type_multi = [t for *_, t in poest_blobs]
pass
else:
type_multi = []
o_multi = []
e_multi = [] # hah, works out amusingly, if one fails they _all_ fail (derp)
ex_errors = []
for pm, om, em, s, t in poest_multi:
type_multi.append(t)
if not objdone or force:
o_multi.append(om)

Expand Down Expand Up @@ -2005,7 +2015,7 @@ def multibads(dataset_id, rchildren, time_now, debug=False, force=False):


def from_dataset_path_extract_object_metadata(dataset_path, time_now=None, force=False, debug=False,
debug_async=False, _Async=Async, _deferred=deferred):
debug_async=False, do_write=True, _Async=Async, _deferred=deferred):
""" given a dataset_path extract path-metadata and extract-object-metadata
this is a braindead implementation that ignores the existing way
Expand All @@ -2019,14 +2029,20 @@ def from_dataset_path_extract_object_metadata(dataset_path, time_now=None, force
if time_now is None: # FIXME this should be required?
time_now = utcnowtz()

if dataset_path.cache._fs_version() == 0:
msg = 'cannot run with fs verison 0, need multi in meta'
raise ValueError(msg)
else:
mdv_check = [True]

dataset_id = dataset_path.cache_identifier
#rfiles = transitive_files(dataset_path) # XXX sadly can't use this becaues we have to review the symlinks
# but in a sense that is ok because we also want to pull the path metadata here as well since it will end up
# being more efficient to start from here
rchildren = transitive_paths(dataset_path)
safe_rchildren, bad_type_oids, bad_fsmeta_records, bad_pathmeta_blob_index, bad_updated_cache_transitive = multibads(
dataset_id, rchildren, time_now, debug=debug, force=force)
del rchildren
#safe_rchildren, bad_type_oids, bad_fsmeta_records, bad_pathmeta_blob_index, bad_updated_cache_transitive = multibads(
#dataset_id, rchildren, time_now, debug=debug, force=force)
#del rchildren

# TODO need to sort out the right way to do this when running in sparcron ... especially wrt when the step is actually "done"
# how do we handle the dataset itself (it would be nice to just put the metadata there ...)
Expand Down Expand Up @@ -2054,18 +2070,19 @@ def i(gen):
# it out of the spc export path is likely going to be part of the solution
# since that was also mostly just to get a quick and dirty entrypoint
deferred(subprocess_extract)
(dataset_id, path, time_now, force=force, debug=debug)
for path in chain((dataset_path,), safe_rchildren))
(dataset_id, path, time_now, force=force, debug=debug, do_write=do_write, mdv_check=mdv_check)
for path in chain((dataset_path,), rchildren))

updated_cache_transitive = bad_updated_cache_transitive
updated_cache_transitive = None #bad_updated_cache_transitive
bads = []
_type_oids = defaultdict(list)
good_fsmeta_records = []
good_pathmeta_blob_index = {}
parent_index = {}
id_drp = {}
for path, id, expex_type, ok, updated, pathmeta_blob, objmeta_blob, blob, status in results:
if id.type == 'dataset':
_multis = defaultdict(list)
for path, object_id, expex_type, ok, updated, multi, pathmeta_blob, objmeta_blob, blob, status in results:
if object_id.type == 'dataset':
# do not include dataset when calculating updated transitive
pass
elif updated_cache_transitive is None:
Expand All @@ -2079,7 +2096,7 @@ def i(gen):
# e.g. we don't not run export because there is no manifest ...
# but do update cache transitive because that expects to be
# calculated over all paths regardless
bads.append((path, id))
bads.append((path, object_id))
continue

parent = pathmeta_blob['parent_id']
Expand All @@ -2097,26 +2114,96 @@ def i(gen):
# betting that there is yet another potential source of error where package
# renames won't actually show up in our updated time because we use the file
# updated time ... which shouldn't even be a thing ??? WAT
fsmeta_safe_id = RemoteId(object_id.curie) if object_id.type == 'package' else object_id

_type_oids[expex_type].append((object_id, multi))
good_pathmeta_blob_index[object_id] = pathmeta_blob
parent_index[object_id] = parent
id_drp[object_id] = pathmeta_blob['dataset_relative_path']

fsmeta_safe_id = RemoteId(id.curie) if id.type == 'package' else id
good_fsmeta_records.append((fsmeta_safe_id, parent, updated, path.name))
if multi:
# for multi we need the combined name so we can append to fsmeta_records
# and we need the multi pathmeta blob so we can insert it into the index
# under fsmeta_safe_id, id_drp is essentially the only thing that is missing
# an entry for the curie only id, oh, and of course we need to write objmeta
# and extract blobs in order
_multis[fsmeta_safe_id].append((path, parent, expex_type, ok, updated, multi, pathmeta_blob, objmeta_blob, blob, status))
if fsmeta_safe_id not in parent_index:
parent_index[fsmeta_safe_id] = parent

_type_oids[expex_type].append((id, False))
#object_id_type.append((id, expex_type))
good_pathmeta_blob_index[id] = pathmeta_blob
parent_index[id] = parent
id_drp[id] = pathmeta_blob['dataset_relative_path']
else:
good_fsmeta_records.append((fsmeta_safe_id, parent, updated, path.name))

for fsmeta_safe_id, reslist in _multis.items():

export_objmeta_path = objmeta_version_export_path(dataset_id, fsmeta_safe_id)
objdone = export_objmeta_path.exists()
export_extract_path = extract_export_path(dataset_id, fsmeta_safe_id)
exdone = export_extract_path.exists()

if not (objdone and exdone and not force):
sres = sorted(reslist, key=(lambda peoumpobs: peoumpobs[6]['remote_inode_id']))

names = []
updateds = set() # should all be the same now that we use max_updated
p_multi = []
o_multi = []
e_multi = [] # hah, works out amusingly, if one fails they _all_ fail (derp)
ex_errors = []
for path, parent, t, _, u, _, pm, om, em, s in sres:
names.append(path.name)
updateds.add(u)
if not objdone or force:
o_multi.append(om)

if not exdone or force:
e_multi.append(em)
ex_errors.append(s)

assert len(updateds) == 1, updateds
updated = u # oh look the rare case where loop leaks are ... not completely disasterous?
name = '/' + '/'.join(names)
# FIXME not sure if want? we have the parts and this isn't really a path?
#good_pathmeta_blob_index[fsmeta_safe_id] = {'type': multi-pathmeta', 'remote_id': fsmeta_safe_id , 'multi': p_multi}
good_fsmeta_records.append((fsmeta_safe_id, parent, updated, name))
if not objdone or force:
blob = {
'type': 'multi-objmeta',
# FIXME do we include dataset_id in multi- too or what?
'remote_id': fsmeta_safe_id,
'multi': o_multi,
}
if do_write:
obj_dump_path = dump_objmeta_version_path(dataset_id, fsmeta_safe_id, blob, force=force)

if not exdone or force:
blob = {
'type': 'multi-extract-object-meta',
'remote_id': fsmeta_safe_id,
'multi': e_multi,
}
if (any(ex_errors) or # actual issues -> errors
# believe it or not, no data? also -> errors
[_ for _ in e_multi if _ is None]):
# where goes one so go we all :/
blob['error_status'] = ex_errors
if do_write:
err_dumped_path = dump_error_path(dataset_id, fsmeta_safe_id, blob)
else:
if do_write:
_blob, multi_status, ex_dumped_path = write_extract(dataset_id, fsmeta_safe_id, blob, force=force)

for k, v in bad_type_oids.items():
#for k, v in bad_type_oids.items():
# have to do this while still in default dict mode
# in the event good type oids didn't incalud a particular type
# if you wawnt to know good vs bad just check if v[1] is true
# because multibad ^_^
_type_oids[k].extend(v)
#_type_oids[k].extend(v)

type_oids = {**_type_oids}
fsmeta_records = good_fsmeta_records + bad_fsmeta_records
pathmeta_blob_index = {**good_pathmeta_blob_index, **bad_pathmeta_blob_index}
fsmeta_records = good_fsmeta_records #+ bad_fsmeta_records
pathmeta_blob_index = {**good_pathmeta_blob_index#, **bad_pathmeta_blob_index
}
indicies = type_oids, pathmeta_blob_index, parent_index, id_drp
in_updated_cache_transitive = updated_cache_transitive # fsmeta may modify uct due to lud cases
updated_cache_transitive, fsmeta_blob = fsmeta(dataset_id, records=fsmeta_records, updated_cache_transitive=updated_cache_transitive, debug=debug)
Expand Down
Loading

0 comments on commit 4411879

Please sign in to comment.