Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: object and op deletion backend + basic sdk + basic fe #2319

Draft
wants to merge 78 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
fbb11a8
wip
gtarpenning Sep 5, 2024
83ac645
wip
gtarpenning Sep 5, 2024
021e07f
Merge branch 'master' into griffin/objs-delete
gtarpenning Sep 5, 2024
6c14fff
wip, working??
gtarpenning Sep 5, 2024
52abf53
obj.delete()
gtarpenning Sep 5, 2024
e78dc87
Merge branch 'master' into griffin/objs-delete
gtarpenning Sep 6, 2024
29e2376
add test
gtarpenning Sep 6, 2024
52839eb
wip, now with configurable include_deleted flag
gtarpenning Sep 6, 2024
ff13b45
wip
gtarpenning Sep 6, 2024
99fc486
print
gtarpenning Sep 6, 2024
50c5728
wip
gtarpenning Sep 9, 2024
6cb4446
insert with full data payload
gtarpenning Sep 9, 2024
c8b20aa
retain version #
gtarpenning Sep 9, 2024
4d3194e
query magic
gtarpenning Sep 9, 2024
d553d21
merge
gtarpenning Sep 9, 2024
1fb0080
working with ops
gtarpenning Sep 9, 2024
f5a9c90
wip simple object deletion fe
gtarpenning Sep 10, 2024
07da651
refetch on delete
gtarpenning Sep 10, 2024
7569b84
simpler
gtarpenning Sep 10, 2024
49c559b
add order by
gtarpenning Sep 10, 2024
4182a24
merge
gtarpenning Sep 30, 2024
54ec61e
lint
gtarpenning Sep 30, 2024
18105cc
lint
gtarpenning Sep 30, 2024
92106bc
NotFoundError
gtarpenning Sep 30, 2024
c96b5cd
add back sqlite handling
gtarpenning Sep 30, 2024
04cd07e
undo
gtarpenning Sep 30, 2024
b76724a
cleanup
gtarpenning Oct 1, 2024
c3fe08b
merge
gtarpenning Oct 1, 2024
6a1a037
Merge branch 'master' into griffin/objs-delete
gtarpenning Oct 1, 2024
d381dfd
better error handling
gtarpenning Oct 1, 2024
fd09f90
sort by asc default
gtarpenning Oct 1, 2024
b92153c
merge
gtarpenning Oct 2, 2024
07917cf
add support for purging object values
gtarpenning Oct 2, 2024
41191fd
merge
gtarpenning Oct 6, 2024
4715f41
wip
gtarpenning Oct 7, 2024
17c8cd7
better
gtarpenning Oct 7, 2024
939621f
Merge branch 'master' into griffin/objs-delete
gtarpenning Oct 7, 2024
8bfd8c7
lint
gtarpenning Oct 7, 2024
f0cb066
merge
gtarpenning Oct 11, 2024
d10a9b7
now working with identical objects
gtarpenning Oct 11, 2024
3772edd
limit-1
gtarpenning Oct 14, 2024
72112cf
merge
gtarpenning Oct 30, 2024
d12d667
Merge branch 'master' into griffin/objs-delete
gtarpenning Oct 30, 2024
667ccf5
Merge branch 'master' into griffin/objs-delete
gtarpenning Oct 30, 2024
b6c2a16
use groupby
gtarpenning Oct 30, 2024
1731d8f
refactor delete and add op delete
gtarpenning Oct 30, 2024
3d80f49
object viewer support for deleted refs
gtarpenning Oct 30, 2024
fe1c7d4
better hack
gtarpenning Oct 30, 2024
32b466e
wip recursive
gtarpenning Oct 30, 2024
38c4074
simpler
gtarpenning Nov 7, 2024
226e1e3
merge
gtarpenning Nov 15, 2024
d42d1ec
merge
gtarpenning Nov 26, 2024
c1629fa
fix
gtarpenning Nov 26, 2024
8ae0003
fixes, working
gtarpenning Nov 26, 2024
d3c63f9
costmetic code refactor
gtarpenning Dec 3, 2024
4f1fc99
whitespace
gtarpenning Dec 3, 2024
e433562
Merge branch 'master' into griffin/objs-delete
gtarpenning Dec 3, 2024
3bcc8af
cleanup
gtarpenning Dec 3, 2024
6d4aa7e
fix test
gtarpenning Dec 3, 2024
7d0d151
merge
gtarpenning Dec 4, 2024
964a4a9
fix-sqlite-dupes
gtarpenning Dec 4, 2024
827b1ee
with lock
gtarpenning Dec 4, 2024
05bf534
lint
gtarpenning Dec 4, 2024
f56870a
cleaner
gtarpenning Dec 5, 2024
6a22665
Merge branch 'master' into griffin/objs-delete
gtarpenning Dec 5, 2024
5b37bcf
w
gtarpenning Dec 5, 2024
72a25c9
WIP
gtarpenning Dec 9, 2024
b795953
obj_delete with optional digests list
gtarpenning Dec 9, 2024
feef47e
lint
gtarpenning Dec 9, 2024
a8c7e95
merge
gtarpenning Dec 9, 2024
c343ec0
fix
gtarpenning Dec 9, 2024
add641f
lint
gtarpenning Dec 9, 2024
014d108
update
gtarpenning Dec 9, 2024
f0015a3
fixtest
gtarpenning Dec 9, 2024
e7ced6c
fix button spacing and delete modal autosizing
gtarpenning Dec 11, 2024
3e634d5
cache handling fe
gtarpenning Dec 11, 2024
f3e513b
dont do table delete on object delete
gtarpenning Dec 11, 2024
c30ef52
add back importand val_dump step
gtarpenning Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions weave/tests/trace/test_weave_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,3 +1384,77 @@ def f(a):
calls = list(calls)
assert len(calls) == 1
assert calls[0].output["table"] == o.table.table_ref.uri()


def test_object_deletion(client):
# Simple case, delete a single version of an object
obj = {"a": 5}
weave_obj = client.save(obj, "my-obj")
assert client.get(weave_obj.ref) == obj

client.delete_object(weave_obj.ref)
with pytest.raises(weave.trace_server.errors.ObjectDeletedError):
client.get(weave_obj.ref)

# create 3 versions of the object
obj["a"] = 6
weave_obj2 = client.save(obj, "my-obj")
obj["a"] = 7
weave_obj3 = client.save(obj, "my-obj")
obj["a"] = 8
weave_obj4 = client.save(obj, "my-obj")

# delete weave_obj3 with class method
weave_obj3.delete()

# make sure we can't get the deleted object
with pytest.raises(weave.trace_server.errors.ObjectDeletedError):
client.get(weave_obj3.ref)

# count the number of versions of the object
versions = client.server.objs_query(
req=tsi.ObjQueryReq(
project_id=client._project_id(),
names=["my-obj"],
)
)
assert len(versions.objs) == 2

# make sure we can still get the existing object versions
assert client.get(weave_obj4.ref)
assert client.get(weave_obj2.ref)

weave_obj4.delete()
weave_obj2.delete()

versions = client.server.objs_query(
req=tsi.ObjQueryReq(
project_id=client._project_id(),
names=["my-obj"],
)
)
assert len(versions.objs) == 0


def test_recursive_object_deletion(client):
# Create a bunch of objects that refer to each other
obj1 = {"a": 5}
obj1_ref = client.save(obj1, "obj1").ref

obj2 = {"b": obj1_ref}
obj2_ref = client.save(obj2, "obj2").ref

obj3 = {"c": obj2_ref}
obj3_ref = client.save(obj3, "obj3").ref

# Delete obj1
client.get(obj1_ref).delete()

# Make sure we can't get obj1
with pytest.raises(weave.trace_server.errors.ObjectDeletedError):
client.get(obj1_ref)

# Make sure we can get obj2, but the ref to object 1 should return None
assert client.get(obj2_ref) == {"b": None}
# Object2 should store the ref to object2, as instantiated
assert client.get(obj3_ref) == {"c": obj2}
35 changes: 28 additions & 7 deletions weave/trace/vals.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
)
from weave.trace.serialize import from_json
from weave.trace.table import Table
from weave.trace_server.errors import ObjectDeletedError
from weave.trace_server.trace_server_interface import (
ObjDeleteReq,
ObjReadReq,
TableQueryReq,
TableRowFilter,
Expand Down Expand Up @@ -119,6 +121,20 @@ def save(self) -> ObjectRef:
raise NotImplementedError("Traceable.save not implemented")
# return self.server.mutate(self.ref, mutations)

def delete(self) -> None:
if self.ref is None:
raise ValueError("Cannot delete object that is not saved")
if not isinstance(self.ref, ObjectRef):
raise ValueError("Cannot delete non-object ref")
ref: ObjectRef = typing.cast(ObjectRef, self.ref)
self.server.obj_delete(
ObjDeleteReq(
project_id=f"{ref.entity}/{ref.project}",
object_id=ref.name,
digest=ref.digest,
)
)


def pydantic_getattribute(self: BaseModel, name: str) -> Any:
attribute = object.__getattribute__(self, name)
Expand Down Expand Up @@ -488,14 +504,19 @@ def make_trace_obj(
if isinstance(val, ObjectRef):
new_ref = val
extra = val.extra
read_res = server.obj_read(
ObjReadReq(
project_id=f"{val.entity}/{val.project}",
object_id=val.name,
digest=val.digest,
try:
read_res = server.obj_read(
ObjReadReq(
project_id=f"{val.entity}/{val.project}",
object_id=val.name,
digest=val.digest,
)
)
)
val = from_json(read_res.obj.val, val.entity + "/" + val.project, server)
val = from_json(read_res.obj.val, val.entity + "/" + val.project, server)
except ObjectDeletedError:
# Catch error case where an object has been deleted. Val here is likely
# part of a nested object, return None to indicate a deleted object.
val = None

if isinstance(val, Table):
val_ref = val.ref
Expand Down
10 changes: 10 additions & 0 deletions weave/trace/weave_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
CostQueryReq,
EndedCallSchemaForInsert,
ObjCreateReq,
ObjDeleteReq,
ObjectVersionFilter,
ObjQueryReq,
ObjReadReq,
Expand Down Expand Up @@ -774,6 +775,15 @@ def feedback(
query=query, reaction=reaction, offset=offset, limit=limit
)

def delete_object(self, object: ObjectRef) -> None:
self.server.obj_delete(
ObjDeleteReq(
project_id=self._project_id(),
object_id=object.name,
digest=object.digest,
)
)

def add_costs(self, costs: Dict[str, CostCreateInput]) -> CostCreateRes:
"""Add costs to the current project.
The cost object will be created with the effective date of the date of insertion `datetime.datetime.now(ZoneInfo("UTC"))` if no effective_date is provided.
Expand Down
6 changes: 6 additions & 0 deletions weave/trace_server/clickhouse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ class ObjCHInsertable(BaseModel):
_refs = field_validator("refs")(validation.refs_list_validator)


class ObjDeleteCHInsertable(ObjCHInsertable):
deleted_at: datetime.datetime
created_at: datetime.datetime


class SelectableCHObjSchema(BaseModel):
project_id: str
object_id: str
Expand All @@ -151,3 +156,4 @@ class SelectableCHObjSchema(BaseModel):
digest: str
version_index: int
is_latest: int
deleted_at: typing.Optional[datetime.datetime]
65 changes: 56 additions & 9 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
HardCodedFilter,
combine_conditions,
)
from weave.trace_server.errors import NotFoundError, ObjectDeletedError
from weave.trace_server.ids import generate_id
from weave.trace_server.trace_server_common import make_derived_summary_fields

Expand All @@ -67,6 +68,7 @@
CallStartCHInsertable,
CallUpdateCHInsertable,
ObjCHInsertable,
ObjDeleteCHInsertable,
SelectableCHCallSchema,
SelectableCHObjSchema,
)
Expand Down Expand Up @@ -105,10 +107,6 @@
MAX_DELETE_CALLS_COUNT = 100


class NotFoundError(Exception):
pass


CallCHInsertable = Union[
CallStartCHInsertable,
CallEndCHInsertable,
Expand Down Expand Up @@ -583,11 +581,20 @@ def obj_read(self, req: tsi.ObjReadReq) -> tsi.ObjReadRes:
conds.append("digest = {version_digest: String}")
parameters["version_digest"] = req.digest
objs = self._select_objs_query(
req.project_id, conditions=conds, parameters=parameters
req.project_id,
conditions=conds,
parameters=parameters,
include_deleted=True,
)
if len(objs) == 0:
raise NotFoundError(f"Obj {req.object_id}:{req.digest} not found")

if objs[0].deleted_at is not None:
# this does not get propogated to the client
raise ObjectDeletedError(
f"Obj {req.object_id}:{req.digest} was deleted at {objs[0].deleted_at}"
)

return tsi.ObjReadRes(obj=_ch_obj_to_obj_schema(objs[0]))

def objs_query(self, req: tsi.ObjQueryReq) -> tsi.ObjQueryRes:
Expand Down Expand Up @@ -618,6 +625,36 @@ def objs_query(self, req: tsi.ObjQueryReq) -> tsi.ObjQueryRes:

return tsi.ObjQueryRes(objs=[_ch_obj_to_obj_schema(obj) for obj in objs])

def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes:
# To ensure no data is deleted, read obj from db first, then
# create payload with deleted_at and insert
db_obj = self.obj_read(
tsi.ObjReadReq(
project_id=req.project_id,
object_id=req.object_id,
digest=req.digest,
)
).obj

deleted_at = datetime.datetime.now()
ch_obj = ObjDeleteCHInsertable(
project_id=req.project_id,
object_id=req.object_id,
digest=req.digest,
kind=get_kind(db_obj.val),
val_dump=json.dumps(db_obj.val),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interestingly, if we made this value something like DELETED, then we would never need to clean this up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice! we could add a flag here for "hard delete" which sets that

refs=extract_refs_from_values(db_obj.val),
base_object_class=get_base_object_class(db_obj.val),
deleted_at=deleted_at,
created_at=db_obj.created_at,
)
self._insert(
"object_versions",
data=[list(ch_obj.model_dump().values())],
column_names=list(ch_obj.model_fields.keys()),
)
return tsi.ObjDeleteRes()

def table_create(self, req: tsi.TableCreateReq) -> tsi.TableCreateRes:
insert_rows = []
for r in req.table.rows:
Expand Down Expand Up @@ -925,6 +962,7 @@ def get_object_refs_root_val(
parameters[version_param_key] = ref.version

if len(conds) > 0:
conds += ["deleted_at IS NULL"]
conditions = [combine_conditions(conds, "OR")]
objs = self._select_objs_query(
project_id=project_id_scope,
Expand Down Expand Up @@ -1367,9 +1405,12 @@ def _select_objs_query(
conditions: Optional[list[str]] = None,
limit: Optional[int] = None,
parameters: Optional[Dict[str, Any]] = None,
include_deleted: bool = False,
) -> list[SelectableCHObjSchema]:
if not conditions:
conditions = ["1 = 1"]
if not include_deleted:
conditions.append("deleted_at IS NULL")

conditions_part = combine_conditions(conditions, "AND")

Expand All @@ -1386,6 +1427,7 @@ def _select_objs_query(
project_id,
object_id,
created_at,
deleted_at,
kind,
base_object_class,
refs,
Expand All @@ -1400,21 +1442,25 @@ def _select_objs_query(
SELECT project_id,
object_id,
created_at,
deleted_at,
kind,
base_object_class,
refs,
val_dump,
digest,
if (kind = 'op', 1, 0) AS is_op,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id
PARTITION BY project_id, kind, object_id
ORDER BY created_at ASC
) AS _version_index_plus_1,
_version_index_plus_1 - 1 AS version_index,
row_number() OVER (
PARTITION BY project_id, kind, object_id
ORDER BY (deleted_at IS NOT NULL), created_at ASC
) AS alive_version_index,
gtarpenning marked this conversation as resolved.
Show resolved Hide resolved
count(*) OVER (PARTITION BY project_id, kind, object_id) as version_count,
if(_version_index_plus_1 = version_count, 1, 0) AS is_latest
COUNT(*) OVER (PARTITION BY project_id, kind, object_id ORDER BY (deleted_at is not NULL)) AS deleted_count,
gtarpenning marked this conversation as resolved.
Show resolved Hide resolved
if(alive_version_index = (version_count - deleted_count), 1, 0) AS is_latest
FROM (
SELECT *,
row_number() OVER (
Expand Down Expand Up @@ -1445,6 +1491,7 @@ def _select_objs_query(
"project_id",
"object_id",
"created_at",
"deleted_at",
"kind",
"base_object_class",
"refs",
Expand Down
12 changes: 12 additions & 0 deletions weave/trace_server/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ class InvalidRequest(Error):
"""Raised when a request is invalid."""

pass


class ObjectDeletedError(Error):
"""Raised when an object has been deleted."""

pass


class NotFoundError(Error):
"""Raised when not found."""

pass
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ def objs_query(self, req: tsi.ObjQueryReq) -> tsi.ObjQueryRes:
obj.project_id = original_project_id
return res

def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes:
req.project_id = self._idc.ext_to_int_project_id(req.project_id)
return self._ref_apply(self._internal_trace_server.obj_delete, req)

def table_create(self, req: tsi.TableCreateReq) -> tsi.TableCreateRes:
req.table.project_id = self._idc.ext_to_int_project_id(req.table.project_id)
return self._ref_apply(self._internal_trace_server.table_create, req)
Expand Down
Loading
Loading