Skip to content

Commit

Permalink
feat: add withConsolidated store helper (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
manzt authored Jan 14, 2024
1 parent 7a1a056 commit 4d177d8
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 141 deletions.
28 changes: 28 additions & 0 deletions .changeset/popular-glasses-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@zarrita/indexing": minor
"zarrita": minor
"@zarrita/core": minor
---

feat: Add `withConsolidated` store utility

**BREAKING**: Replaces [`openConsolidated`](https://github.com/manzt/zarrita.js/pull/91)
to provide a consistent interface for accessing consolidated and non-consolidated stores.

```javascript
import * as zarr from "zarrita";

// non-consolidated
let store = new zarr.FetchStore("https://localhost:8080/data.zarr");
let grp = await zarr.open(store); // network request for .zgroup/.zattrs
let foo = await zarr.open(grp.resolve("/foo"), { kind: array }); // network request for .zarray/.zattrs

// consolidated
let store = new zarr.FetchStore("https://localhost:8080/data.zarr");
let consolidatedStore = await zarr.withConsolidated(store); // opens ./zmetadata
let contents = consolidatedStore.contents(); // [ {path: "/", kind: "group" }, { path: "/foo", kind: "array" }, ...]
let grp = await zarr.open(consolidatedStore); // no network request
let foo = await zarr.open(grp.resolve(contents[1].path), {
kind: contents[1].kind,
}); // no network request
```
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"test": "vitest --api",
"fmt": "dprint fmt",
"lint": "dprint check",
"publint": "pnpm --recursive --filter=\"./packages/**\" exec publint"
"publint": "pnpm --recursive --filter=\"./packages/**\" exec publint",
"errors": "vim -c \"copen\" -c \"cexpr system('npx build')\" -c \"wincmd p\""
},
"devDependencies": {
"@changesets/cli": "^2.27.1",
Expand Down
26 changes: 17 additions & 9 deletions packages/core/__tests__/consolidated.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ import * as path from "node:path";
import * as url from "node:url";

import { FileSystemStore } from "@zarrita/storage";
import { openConsolidated } from "../src/consolidated.js";
import { withConsolidated } from "../src/consolidated.js";
import { open } from "../src/open.js";
import { Array as ZarrArray } from "../src/hierarchy.js";

let __dirname = path.dirname(url.fileURLToPath(import.meta.url));

describe("openConsolidated", () => {
describe("withConsolidated", () => {
it("loads consolidated metadata", async () => {
let root = path.join(__dirname, "../../../fixtures/v2/data.zarr");
let h = await openConsolidated(new FileSystemStore(root));
let store = await withConsolidated(new FileSystemStore(root));
let map = new Map(
[...h.contents.values()].map((entry) => [entry.path, entry.kind]),
store.contents().map((x) => [x.path, x.kind]),
);
expect(map).toMatchInlineSnapshot(`
Map {
Expand Down Expand Up @@ -49,8 +51,14 @@ describe("openConsolidated", () => {

it("loads chunk data from underlying store", async () => {
let root = path.join(__dirname, "../../../fixtures/v2/data.zarr");
let h = await openConsolidated(new FileSystemStore(root));
let arr = h.open("/3d.chunked.mixed.i2.C", { kind: "array" });
let store = await withConsolidated(new FileSystemStore(root));
let entry = store.contents().find((x) =>
x.path === "/3d.chunked.mixed.i2.C"
)!;
let grp = await open(store, { kind: "group" });
let arr = await open(grp.resolve(entry.path), { kind: entry.kind });
expect(arr).toBeInstanceOf(ZarrArray);
// @ts-expect-error - we know this is an array
expect(await arr.getChunk([0, 0, 0])).toMatchInlineSnapshot(`
{
"data": Int16Array [
Expand Down Expand Up @@ -80,10 +88,10 @@ describe("openConsolidated", () => {

it("loads and navigates from root", async () => {
let path_root = path.join(__dirname, "../../../fixtures/v2/data.zarr");
let h = await openConsolidated(new FileSystemStore(path_root));
let grp = h.root();
let store = await withConsolidated(new FileSystemStore(path_root));
let grp = await open(store, { kind: "group" });
expect(grp.kind).toBe("group");
let arr = h.open(grp.resolve("1d.chunked.i2"), { kind: "array" });
let arr = await open(grp.resolve("1d.chunked.i2"), { kind: "array" });
expect(arr.kind).toBe("array");
});
});
164 changes: 82 additions & 82 deletions packages/core/src/consolidated.ts
Original file line number Diff line number Diff line change
@@ -1,102 +1,102 @@
import type { AbsolutePath, Readable } from "@zarrita/storage";

import { Array, Group, Location } from "./hierarchy.js";
import {
json_decode_object,
v2_to_v3_array_metadata,
v2_to_v3_group_metadata,
} from "./util.js";
import type { ArrayMetadataV2, DataType, GroupMetadataV2 } from "./metadata.js";
import { NodeNotFoundError } from "./errors.js";
import { type AbsolutePath, type Readable } from "@zarrita/storage";
import { json_decode_object, json_encode_object } from "./util.js";
import { KeyError, NodeNotFoundError } from "./errors.js";
import type {
ArrayMetadata,
ArrayMetadataV2,
Attributes,
GroupMetadata,
GroupMetadataV2,
} from "./metadata.js";

type ConsolidatedMetadata = {
metadata: Record<string, any>;
metadata: Record<string, ArrayMetadataV2 | GroupMetadataV2>;
zarr_consolidated_format: 1;
};

type Listable<Store extends Readable> = {
get: Store["get"];
contents(): { path: AbsolutePath; kind: "array" | "group" }[];
};

async function get_consolidated_metadata(
store: Readable,
): Promise<ConsolidatedMetadata> {
let bytes = await store.get("/.zmetadata");
if (!bytes) throw new Error("No consolidated metadata found.");
if (!bytes) {
throw new NodeNotFoundError("v2 consolidated metadata", {
cause: new KeyError("/.zmetadata"),
});
}
let meta: ConsolidatedMetadata = json_decode_object(bytes);
if (meta.zarr_consolidated_format !== 1) {
throw new Error("Unsupported consolidated format.");
}
return meta;
}

/** Proxies requests to the underlying store. */
export async function openConsolidated<Store extends Readable>(
type Metadata =
| ArrayMetadataV2
| GroupMetadataV2
| ArrayMetadata
| GroupMetadata
| Attributes;

function is_meta_key(key: string): boolean {
return (
key.endsWith(".zarray") ||
key.endsWith(".zgroup") ||
key.endsWith(".zattrs") ||
key.endsWith("zarr.json")
);
}

function is_v3(meta: Metadata): meta is ArrayMetadata | GroupMetadata {
return "zarr_format" in meta && meta.zarr_format === 3;
}

export async function withConsolidated<Store extends Readable>(
store: Store,
) {
let { metadata } = await get_consolidated_metadata(store);
let meta_nodes = Object
.entries(metadata)
.reduce(
(acc, [path, content]) => {
let parts = path.split("/");
let file_name = parts.pop()!;
let key: AbsolutePath = `/${parts.join("/")}`;
if (!acc[key]) acc[key] = {};
if (file_name === ".zarray") {
acc[key].meta = content;
} else if (file_name === ".zgroup") {
acc[key].meta = content;
} else if (file_name === ".zattrs") {
acc[key].attrs = content;
}
return acc;
},
{} as Record<
AbsolutePath,
{
meta?: ArrayMetadataV2 | GroupMetadataV2;
attrs?: Record<string, any>;
): Promise<Listable<Store>> {
let known_meta: Record<AbsolutePath, Metadata> =
await get_consolidated_metadata(store)
.then((meta) => {
let new_meta: Record<AbsolutePath, Metadata> = {};
for (let [key, value] of Object.entries(meta.metadata)) {
new_meta[`/${key}`] = value;
}
>,
);
let nodes = new Map<AbsolutePath, Array<DataType, Store> | Group<Store>>();
for (let [path, { meta, attrs }] of Object.entries(meta_nodes)) {
if (!meta) throw new Error("missing metadata");
let node: Array<DataType, Store> | Group<Store>;
if ("shape" in meta) {
let metadata = v2_to_v3_array_metadata(meta, attrs);
node = new Array(store, path as AbsolutePath, metadata);
} else {
let metadata = v2_to_v3_group_metadata(meta, attrs);
node = new Group(store, path as AbsolutePath, metadata);
}
nodes.set(path as AbsolutePath, node);
}
return new ConsolidatedHierarchy(nodes);
}
return new_meta;
})
.catch(() => ({}));

class ConsolidatedHierarchy<Store extends Readable> {
constructor(
public contents: Map<AbsolutePath, Array<DataType, Store> | Group<Store>>,
) {}
open(
where: AbsolutePath | Location<unknown>,
options: { kind: "group" },
): Group<Store>;
open(
where: AbsolutePath | Location<unknown>,
options: { kind: "array" },
): Array<DataType, Store>;
open(
where: AbsolutePath | Location<unknown>,
): Array<DataType, Store> | Group<Store>;
open(
where: AbsolutePath | Location<unknown>,
options: { kind?: "array" | "group" } = {},
) {
let path = typeof where === "string" ? where : where.path;
let node = this.contents.get(path);
if (node && (!options.kind || options.kind == node.kind)) return node;
throw new NodeNotFoundError(path);
}
root() {
return this.open("/", { kind: "group" });
}
return {
async get(
...args: Parameters<Store["get"]>
): Promise<Uint8Array | undefined> {
let [key, opts] = args;
if (known_meta[key]) {
return json_encode_object(known_meta[key]);
}
let maybe_bytes = await store.get(key, opts);
if (is_meta_key(key) && maybe_bytes) {
let meta = json_decode_object(maybe_bytes);
known_meta[key] = meta;
}
return maybe_bytes;
},
contents(): { path: AbsolutePath; kind: "array" | "group" }[] {
let contents: { path: AbsolutePath; kind: "array" | "group" }[] = [];
for (let [key, value] of Object.entries(known_meta)) {
let parts = key.split("/");
let filename = parts.pop()!;
let path = (parts.join("/") || "/") as AbsolutePath;
if (filename === ".zarray") contents.push({ path, kind: "array" });
if (filename === ".zgroup") contents.push({ path, kind: "group" });
if (is_v3(value)) {
contents.push({ path, kind: value.node_type });
}
}
return contents;
},
};
}
8 changes: 4 additions & 4 deletions packages/core/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
export class NodeNotFoundError extends Error {
constructor(msg: string) {
super(msg);
constructor(context: string, options: { cause?: Error } = {}) {
super(`Node not found: ${context}`, options);
this.name = "NodeNotFoundError";
}
}

export class KeyError extends Error {
constructor(msg: string) {
super(msg);
constructor(path: string) {
super(`Missing key: ${path}`);
this.name = "KeyError";
}
}
1 change: 0 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ export {
export { open } from "./open.js";
export { create } from "./create.js";
export { registry } from "./codecs.js";
export { openConsolidated } from "./consolidated.js";
export type * from "./metadata.js";
25 changes: 18 additions & 7 deletions packages/core/src/open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import type {
GroupMetadata,
} from "./metadata.js";
import { Array, Group, Location } from "./hierarchy.js";
import { NodeNotFoundError } from "./errors.js";
import { KeyError, NodeNotFoundError } from "./errors.js";
import {
json_decode_object,
v2_to_v3_array_metadata,
v2_to_v3_group_metadata,
} from "./util.js";

let VERSION_COUNTER = create_version_counter();
function create_version_counter() {
let version_counts = new WeakMap<Readable, { v2: number; v3: number }>();
function get_counts(store: Readable) {
Expand All @@ -30,7 +31,6 @@ function create_version_counter() {
},
};
}
let VERSION_COUNTER = create_version_counter();

async function load_attrs(
location: Location<Readable>,
Expand Down Expand Up @@ -77,7 +77,9 @@ async function open_array_v2<Store extends Readable>(
let { path } = location.resolve(".zarray");
let meta = await location.store.get(path);
if (!meta) {
throw new NodeNotFoundError(path);
throw new NodeNotFoundError("v2 array", {
cause: new KeyError(path),
});
}
VERSION_COUNTER.increment(location.store, "v2");
return new Array(
Expand All @@ -94,7 +96,9 @@ async function open_group_v2<Store extends Readable>(
let { path } = location.resolve(".zgroup");
let meta = await location.store.get(path);
if (!meta) {
throw new NodeNotFoundError(path);
throw new NodeNotFoundError("v2 group", {
cause: new KeyError(path),
});
}
VERSION_COUNTER.increment(location.store, "v2");
return new Group(
Expand All @@ -110,7 +114,9 @@ async function _open_v3<Store extends Readable>(
let { store, path } = location.resolve("zarr.json");
let meta = await location.store.get(path);
if (!meta) {
throw new NodeNotFoundError(path);
throw new NodeNotFoundError("v3 array or group", {
cause: new KeyError(path),
});
}
let meta_doc: ArrayMetadata<DataType> | GroupMetadata = json_decode_object(
meta,
Expand Down Expand Up @@ -169,6 +175,11 @@ export function open<Store extends Readable>(
options: { kind: "array" },
): Promise<Array<DataType, Store>>;

export async function open<Store extends Readable>(
location: Location<Store> | Store,
options: { kind?: "array" | "group" },
): Promise<Array<DataType, Store> | Group<Store>>;

export function open<Store extends Readable>(
location: Location<Store> | Store,
): Promise<Array<DataType, Store> | Group<Store>>;
Expand All @@ -181,8 +192,8 @@ export async function open<Store extends Readable>(
location: Location<Store> | Store,
options: { kind?: "array" | "group" } = {},
): Promise<Array<DataType, Store> | Group<Store>> {
const store = "store" in location ? location.store : location;
const version_max = VERSION_COUNTER.version_max(store);
let store = "store" in location ? location.store : location;
let version_max = VERSION_COUNTER.version_max(store);
// Use the open function for the version with the most successful opens.
// Note that here we use the dot syntax to access the open functions
// because this enables us to use vi.spyOn during testing.
Expand Down
Loading

0 comments on commit 4d177d8

Please sign in to comment.