-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4 from d70-t/car_reference_fs
Car reference fs
- Loading branch information
Showing
9 changed files
with
258 additions
and
83 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
""" | ||
CAR handling functions. | ||
""" | ||
|
||
from typing import List, Optional, Tuple, Union, Iterator, BinaryIO | ||
import dataclasses | ||
|
||
import dag_cbor | ||
from multiformats import CID, varint, multicodec, multihash | ||
|
||
from .utils import is_cid_list, StreamLike, ensure_stream | ||
|
||
DagPbCodec = multicodec.get("dag-pb") | ||
Sha256Hash = multihash.get("sha2-256") | ||
|
||
@dataclasses.dataclass | ||
class CARBlockLocation: | ||
varint_size: int | ||
cid_size: int | ||
payload_size: int | ||
offset: int = 0 | ||
|
||
@property | ||
def cid_offset(self) -> int: | ||
return self.offset + self.varint_size | ||
|
||
@property | ||
def payload_offset(self) -> int: | ||
return self.offset + self.varint_size + self.cid_size | ||
|
||
@property | ||
def size(self) -> int: | ||
return self.varint_size + self.cid_size + self.payload_size | ||
|
||
|
||
def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]: | ||
""" | ||
Decodes a CAR header and returns the list of contained roots. | ||
""" | ||
header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase | ||
header = dag_cbor.decode(stream.read(header_size)) | ||
if not isinstance(header, dict): | ||
raise ValueError("no valid CAR header found") | ||
if header["version"] != 1: | ||
raise ValueError("CAR is not version 1") | ||
roots = header["roots"] | ||
if not isinstance(roots, list): | ||
raise ValueError("CAR header doesn't contain roots") | ||
if not is_cid_list(roots): | ||
raise ValueError("CAR roots do not only contain CIDs") | ||
return roots, visize + header_size | ||
|
||
|
||
def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]: | ||
try: | ||
block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase | ||
except ValueError: | ||
# stream has likely been consumed entirely | ||
return None | ||
|
||
data = stream.read(block_size) | ||
# as the size of the CID is variable but not explicitly given in | ||
# the CAR format, we need to partially decode each CID to determine | ||
# its size and the location of the payload data | ||
if data[0] == 0x12 and data[1] == 0x20: | ||
# this is CIDv0 | ||
cid_version = 0 | ||
default_base = "base58btc" | ||
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec | ||
hash_codec: Union[int, multihash.Multihash] = Sha256Hash | ||
cid_digest = data[2:34] | ||
data = data[34:] | ||
else: | ||
# this is CIDv1(+) | ||
cid_version, _, data = varint.decode_raw(data) | ||
if cid_version != 1: | ||
raise ValueError(f"CIDv{cid_version} is currently not supported") | ||
default_base = "base32" | ||
cid_codec, _, data = multicodec.unwrap_raw(data) | ||
hash_codec, _, data = varint.decode_raw(data) | ||
digest_size, _, data = varint.decode_raw(data) | ||
cid_digest = data[:digest_size] | ||
data = data[digest_size:] | ||
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest)) | ||
|
||
if not cid.hashfun.digest(data) == cid.digest: | ||
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified") | ||
|
||
return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data)) | ||
|
||
|
||
def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]: | ||
""" | ||
Reads a CAR. | ||
Parameters | ||
---------- | ||
stream_or_bytes: StreamLike | ||
Stream to read CAR from | ||
Returns | ||
------- | ||
roots : List[CID] | ||
Roots as given by the CAR header | ||
blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]] | ||
Iterator over all blocks contained in the CAR | ||
""" | ||
stream = ensure_stream(stream_or_bytes) | ||
roots, header_size = decode_car_header(stream) | ||
def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]: | ||
offset = header_size | ||
while (next_block := decode_raw_car_block(stream)) is not None: | ||
cid, data, sizes = next_block | ||
yield cid, data, dataclasses.replace(sizes, offset=offset) | ||
offset += sizes.size | ||
return roots, blocks() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import json | ||
from typing import Dict, Any, Iterator, Tuple | ||
|
||
import dag_cbor | ||
from multiformats import CID, multicodec | ||
|
||
from .car import read_car, CARBlockLocation | ||
from .ipldstore import inline_objects | ||
from .utils import StreamLike | ||
|
||
def collect_tree_objects(stream_or_bytes: StreamLike) -> Tuple[CID, Dict[CID, Any], Dict[CID, CARBlockLocation]]: | ||
DagCborCodec = multicodec.get("dag-cbor") | ||
|
||
roots, blocks = read_car(stream_or_bytes) | ||
if len(roots) != 1: | ||
raise ValueError("need single-rooted car") | ||
root = roots[0] | ||
|
||
object_locations = {} | ||
cbor_objects = {} | ||
for cid, data, location in blocks: | ||
object_locations[cid] = location | ||
if cid.codec == DagCborCodec: | ||
cbor_objects[cid] = data | ||
|
||
return root, cbor_objects, object_locations | ||
|
||
|
||
def car2reference_fs_refs(stream_or_bytes: StreamLike, stream_name: str) -> Dict[str, Any]: | ||
root, cbor_objects, object_locations = collect_tree_objects(stream_or_bytes) | ||
|
||
tree = dag_cbor.decode(cbor_objects[root]) | ||
assert isinstance(tree, dict) | ||
sep = "/" | ||
|
||
def iter_nested(prefix: str, mapping: Dict[str, Any]) -> Iterator[Tuple[str, Any]]: | ||
for key, value in mapping.items(): | ||
key_parts = key.split(sep) | ||
if key_parts[-1] in inline_objects: | ||
yield prefix + key, value | ||
elif isinstance(value, dict): | ||
yield from iter_nested(prefix + key + sep, value) | ||
else: | ||
yield prefix + key, value | ||
|
||
refs: Dict[str, Any] = {} | ||
for key, content in iter_nested("", tree): | ||
if isinstance(content, CID): | ||
loc = object_locations[content] | ||
refs[key] = [stream_name, loc.payload_offset, loc.payload_size] | ||
else: | ||
refs[key] = json.dumps(content) | ||
|
||
return refs | ||
|
||
|
||
def car2reference_fs(filename: str) -> Dict[str, Any]: | ||
with open(filename, "rb") as stream: | ||
refs = car2reference_fs_refs(stream, "{{a}}") | ||
return {"version": 1, "templates": {"a": filename}, "refs": refs} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
""" | ||
Some utilities. | ||
""" | ||
|
||
from io import BytesIO | ||
from typing import List, Union, BinaryIO | ||
|
||
from multiformats import CID | ||
from typing_extensions import TypeGuard | ||
|
||
StreamLike = Union[BinaryIO, bytes] | ||
|
||
def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO: | ||
if isinstance(stream_or_bytes, bytes): | ||
return BytesIO(stream_or_bytes) | ||
else: | ||
return stream_or_bytes | ||
|
||
|
||
def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]: | ||
return all(isinstance(o, CID) for o in os) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from io import BytesIO | ||
|
||
import ipldstore.car as car | ||
|
||
import pytest | ||
|
||
def test_car_reject_v2(): | ||
v2_start = bytes.fromhex("0aa16776657273696f6e02") | ||
stream = BytesIO(v2_start) | ||
with pytest.raises(ValueError): | ||
car.decode_car_header(stream) |
Oops, something went wrong.