Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Car reference fs #4

Merged
merged 7 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions ipldstore/car.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
CAR handling functions.
"""

from typing import List, Optional, Tuple, Union, Iterator, BinaryIO
import dataclasses

import dag_cbor
from multiformats import CID, varint, multicodec, multihash

from .utils import is_cid_list, StreamLike, ensure_stream

DagPbCodec = multicodec.get("dag-pb")
Sha256Hash = multihash.get("sha2-256")

@dataclasses.dataclass
class CARBlockLocation:
varint_size: int
cid_size: int
payload_size: int
offset: int = 0

@property
def cid_offset(self) -> int:
return self.offset + self.varint_size

@property
def payload_offset(self) -> int:
return self.offset + self.varint_size + self.cid_size

@property
def size(self) -> int:
return self.varint_size + self.cid_size + self.payload_size


def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]:
"""
Decodes a CAR header and returns the list of contained roots.
"""
header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
header = dag_cbor.decode(stream.read(header_size))
if not isinstance(header, dict):
raise ValueError("no valid CAR header found")
if header["version"] != 1:
raise ValueError("CAR is not version 1")
roots = header["roots"]
if not isinstance(roots, list):
raise ValueError("CAR header doesn't contain roots")
if not is_cid_list(roots):
raise ValueError("CAR roots do not only contain CIDs")
return roots, visize + header_size


def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]:
try:
block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
except ValueError:
# stream has likely been consumed entirely
return None

data = stream.read(block_size)
# as the size of the CID is variable but not explicitly given in
# the CAR format, we need to partially decode each CID to determine
# its size and the location of the payload data
if data[0] == 0x12 and data[1] == 0x20:
# this is CIDv0
cid_version = 0
default_base = "base58btc"
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec
hash_codec: Union[int, multihash.Multihash] = Sha256Hash
cid_digest = data[2:34]
data = data[34:]
else:
# this is CIDv1(+)
cid_version, _, data = varint.decode_raw(data)
if cid_version != 1:
raise ValueError(f"CIDv{cid_version} is currently not supported")
default_base = "base32"
cid_codec, _, data = multicodec.unwrap_raw(data)
hash_codec, _, data = varint.decode_raw(data)
digest_size, _, data = varint.decode_raw(data)
cid_digest = data[:digest_size]
data = data[digest_size:]
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest))

if not cid.hashfun.digest(data) == cid.digest:
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified")

return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data))


def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]:
"""
Reads a CAR.

Parameters
----------
stream_or_bytes: StreamLike
Stream to read CAR from

Returns
-------
roots : List[CID]
Roots as given by the CAR header
blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]]
Iterator over all blocks contained in the CAR
"""
stream = ensure_stream(stream_or_bytes)
roots, header_size = decode_car_header(stream)
def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]:
offset = header_size
while (next_block := decode_raw_car_block(stream)) is not None:
cid, data, sizes = next_block
yield cid, data, dataclasses.replace(sizes, offset=offset)
offset += sizes.size
return roots, blocks()
60 changes: 60 additions & 0 deletions ipldstore/car_reference_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from typing import Dict, Any, Iterator, Tuple

import dag_cbor
from multiformats import CID, multicodec

from .car import read_car, CARBlockLocation
from .ipldstore import inline_objects
from .utils import StreamLike

def collect_tree_objects(stream_or_bytes: StreamLike) -> Tuple[CID, Dict[CID, Any], Dict[CID, CARBlockLocation]]:
DagCborCodec = multicodec.get("dag-cbor")

roots, blocks = read_car(stream_or_bytes)
if len(roots) != 1:
raise ValueError("need single-rooted car")
root = roots[0]

object_locations = {}
cbor_objects = {}
for cid, data, location in blocks:
object_locations[cid] = location
if cid.codec == DagCborCodec:
cbor_objects[cid] = data

return root, cbor_objects, object_locations


def car2reference_fs_refs(stream_or_bytes: StreamLike, stream_name: str) -> Dict[str, Any]:
root, cbor_objects, object_locations = collect_tree_objects(stream_or_bytes)

tree = dag_cbor.decode(cbor_objects[root])
assert isinstance(tree, dict)
sep = "/"

def iter_nested(prefix: str, mapping: Dict[str, Any]) -> Iterator[Tuple[str, Any]]:
for key, value in mapping.items():
key_parts = key.split(sep)
if key_parts[-1] in inline_objects:
yield prefix + key, value
elif isinstance(value, dict):
yield from iter_nested(prefix + key + sep, value)
else:
yield prefix + key, value

refs: Dict[str, Any] = {}
for key, content in iter_nested("", tree):
if isinstance(content, CID):
loc = object_locations[content]
refs[key] = [stream_name, loc.payload_offset, loc.payload_size]
else:
refs[key] = json.dumps(content)

return refs


def car2reference_fs(filename: str) -> Dict[str, Any]:
with open(filename, "rb") as stream:
refs = car2reference_fs_refs(stream, "{{a}}")
return {"version": 1, "templates": {"a": filename}, "refs": refs}
83 changes: 9 additions & 74 deletions ipldstore/contentstore.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
from abc import ABC, abstractmethod
from typing import MutableMapping, Optional, Union, overload, Iterator, MutableSet, List, Tuple
from typing import MutableMapping, Optional, Union, overload, Iterator, MutableSet, List
from io import BufferedIOBase, BytesIO

from typing_extensions import TypeGuard

from multiformats import CID, multicodec, multibase, multihash, varint
import dag_cbor
from dag_cbor.encoding import EncodableType as DagCborEncodable
from typing_validation import validate

import requests

from .car import read_car
from .utils import StreamLike


ValueType = Union[bytes, DagCborEncodable]

RawCodec = multicodec.get("raw")
DagCborCodec = multicodec.get("dag-cbor")
DagPbCodec = multicodec.get("dag-pb")
Sha256Hash = multihash.get("sha2-256")


def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]:
return all(isinstance(o, CID) for o in os)


class ContentAddressableStore(ABC):
Expand Down Expand Up @@ -59,7 +54,7 @@ def put(self, value: ValueType) -> CID:
else:
return self.put_raw(dag_cbor.encode(value), DagCborCodec)

def normalize_cid(self, cid: CID) -> CID: # pylint: disable=no-self-use
def normalize_cid(self, cid: CID) -> CID: # pylint: disable=no-self-use
return cid

@overload
Expand Down Expand Up @@ -115,76 +110,16 @@ def _to_car(self,
bytes_written += self._to_car(child, stream, already_written)
return bytes_written

def import_car(self, stream_or_bytes: Union[BufferedIOBase, bytes]) -> List[CID]:
validate(stream_or_bytes, Union[BufferedIOBase, bytes])
if isinstance(stream_or_bytes, bytes):
stream: BufferedIOBase = BytesIO(stream_or_bytes)
else:
stream = stream_or_bytes

roots = [self.normalize_cid(root) for root in decode_car_header(stream)]
def import_car(self, stream_or_bytes: StreamLike) -> List[CID]:
roots, blocks = read_car(stream_or_bytes)
roots = [self.normalize_cid(root) for root in roots]

while (next_block := decode_raw_car_block(stream)) is not None:
cid, data = next_block
for cid, data, _ in blocks:
self.put_raw(bytes(data), cid.codec)

return roots


def decode_car_header(stream: BufferedIOBase) -> List[CID]:
"""
Decodes a CAR header and returns the list of contained roots.
"""
header_size = varint.decode(stream)
header = dag_cbor.decode(stream.read(header_size))
if not isinstance(header, dict):
raise ValueError("no valid CAR header found")
roots = header["roots"]
if not isinstance(roots, list):
raise ValueError("CAR header doesn't contain roots")
if not is_cid_list(roots):
raise ValueError("CAR roots do not only contain CIDs")
return roots


def decode_raw_car_block(stream: BufferedIOBase) -> Optional[Tuple[CID, bytes]]:
try:
block_size = varint.decode(stream)
except ValueError:
# stream has likely been consumed entirely
return None

data = stream.read(block_size)
# as the size of the CID is variable but not explicitly given in
# the CAR format, we need to partially decode each CID to determine
# its size and the location of the payload data
if data[0] == 0x12 and data[1] == 0x20:
# this is CIDv0
cid_version = 0
default_base = "base58btc"
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec
hash_codec: Union[int, multihash.Multihash] = Sha256Hash
cid_digest = data[2:34]
data = data[34:]
else:
# this is CIDv1(+)
cid_version, _, data = varint.decode_raw(data)
if cid_version != 1:
raise ValueError(f"CIDv{cid_version} is currently not supported")
default_base = "base32"
cid_codec, _, data = multicodec.unwrap_raw(data)
hash_codec, _, data = varint.decode_raw(data)
digest_size, _, data = varint.decode_raw(data)
cid_digest = data[:digest_size]
data = data[digest_size:]
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest))

if not cid.hashfun.digest(data) == cid.digest:
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified")

return cid, bytes(data)


class MappingCAStore(ContentAddressableStore):
def __init__(self,
mapping: Optional[MutableMapping[str, bytes]] = None,
Expand Down
17 changes: 9 additions & 8 deletions ipldstore/ipldstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@
from io import BufferedIOBase
from collections.abc import MutableMapping
import sys
if sys.version_info >= (3, 9):
MutableMappingT = MutableMapping
MutableMappingSB = MutableMapping[str, bytes]
else:
from typing import MutableMapping as MutableMappingT
MutableMappingSB = MutableMapping
from dataclasses import dataclass
from typing import Optional, Callable, Any, TypeVar, Union, Iterator, overload, List, Dict
import json
Expand All @@ -20,7 +14,14 @@
from numcodecs.compat import ensure_bytes # type: ignore

from .contentstore import ContentAddressableStore, MappingCAStore
from .utils import StreamLike

if sys.version_info >= (3, 9):
MutableMappingT = MutableMapping
MutableMappingSB = MutableMapping[str, bytes]
else:
from typing import MutableMapping as MutableMappingT
MutableMappingSB = MutableMapping

@dataclass
class InlineCodec:
Expand Down Expand Up @@ -120,14 +121,14 @@ def to_car(self, stream: None = None) -> bytes:
def to_car(self, stream: Optional[BufferedIOBase] = None) -> Union[int, bytes]:
return self._store.to_car(self.freeze(), stream)

def import_car(self, stream: Union[BufferedIOBase, bytes]) -> None:
def import_car(self, stream: StreamLike) -> None:
roots = self._store.import_car(stream)
if len(roots) != 1:
raise ValueError(f"CAR must have a single root, the given CAR has {len(roots)} roots!")
self.set_root(roots[0])

@classmethod
def from_car(cls, stream: Union[BufferedIOBase, bytes]) -> "IPLDStore":
def from_car(cls, stream: StreamLike) -> "IPLDStore":
instance = cls()
instance.import_car(stream)
return instance
Expand Down
21 changes: 21 additions & 0 deletions ipldstore/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Some utilities.
"""

from io import BytesIO
from typing import List, Union, BinaryIO

from multiformats import CID
from typing_extensions import TypeGuard

StreamLike = Union[BinaryIO, bytes]

def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO:
if isinstance(stream_or_bytes, bytes):
return BytesIO(stream_or_bytes)
else:
return stream_or_bytes


def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]:
return all(isinstance(o, CID) for o in os)
11 changes: 11 additions & 0 deletions test/test_car.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from io import BytesIO

import ipldstore.car as car

import pytest

def test_car_reject_v2():
v2_start = bytes.fromhex("0aa16776657273696f6e02")
stream = BytesIO(v2_start)
with pytest.raises(ValueError):
car.decode_car_header(stream)
Loading