Skip to content

Commit

Permalink
feat: impl response
Browse files Browse the repository at this point in the history
  • Loading branch information
manzt committed Jun 2, 2022
1 parent 93c2400 commit a2a1027
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 88 deletions.
14 changes: 10 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Checkout
uses: actions/checkout@v3

- name: Install Node.js
uses: actions/setup-node@v3
with:
node-version: 18

- uses: pnpm/[email protected]
name: Install pnpm
with:
version: 6.23.6
version: 7

- run: pnpm install
- run: pnpm test
env:
CI: true
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"unzipit": "^1.3.6"
},
"devDependencies": {
"@types/node": "^16.10.2",
"@types/node": "^17.0.38",
"mkdist": "^0.3.5",
"tsm": "^2.2.1",
"typescript": "^4.6.2",
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/lib/hierarchy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ export class Array<
opts?: Parameters<Store["get"]>[1],
): Promise<Chunk<Dtype>> {
const chunk_key = this.chunk_key(chunk_coords);
const maybe_bytes = await this.store.get(chunk_key, opts);
if (!maybe_bytes) {
const response = await this.store.get(chunk_key, opts);
if (!response.ok) {
throw new KeyError(chunk_key);
}
const data = await decode_chunk(this, maybe_bytes);
let bytes = new Uint8Array(await response.arrayBuffer())
const data = await decode_chunk(this, bytes);
return {
data,
shape: this.chunk_shape.slice(),
Expand Down
13 changes: 4 additions & 9 deletions src/storage/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,14 @@ function resolve(root: string | URL, path: AbsolutePath): URL {
class FetchStore implements Async<Readable<RequestInit>> {
constructor(public url: string | URL) {}

async get(key: AbsolutePath, opts: RequestInit = {}): Promise<Uint8Array | undefined> {
async get(key: AbsolutePath, opts: RequestInit = {}) {
const { href } = resolve(this.url, key);
const res = await fetch(href, opts);
if (res.status === 404 || res.status === 403) {
return undefined;
}
const value = await res.arrayBuffer();
return new Uint8Array(value);
return fetch(href, opts);
}

has(key: AbsolutePath): Promise<boolean> {
has(key: AbsolutePath) {
// TODO: make parameter, use HEAD request if possible.
return this.get(key).then((res) => res !== undefined);
return this.get(key).then(res => res.ok);
}
}

Expand Down
41 changes: 25 additions & 16 deletions src/storage/fs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as fs from "node:fs";
import * as fsp from "node:fs/promises";
import * as path from "node:path";
import * as stream from "node:stream";
import { strip_prefix } from "./util";

import type {
Expand All @@ -11,34 +12,42 @@ import type {
Writeable,
} from "../types";


class FileSystemStore implements Async<ExtendedReadable & Writeable> {
constructor(public root: string) {}

get(key: AbsolutePath): Promise<Uint8Array | undefined> {
async get(key: AbsolutePath) {
const fp = path.join(this.root, strip_prefix(key));
return fs.promises.readFile(fp)
.then((buf) => new Uint8Array(buf.buffer))
.catch((err) => {
// return undefined is no file or directory
if (err.code === "ENOENT") return undefined;
throw err;
});
try {
let filehandle = await fsp.open(fp);
// Could use `file.readableWebStream()` but this doesn't close the
// underlying filehandle once the resource is read. The following
// allows us to create a response that closes the filehandle once read.
const readable = filehandle.createReadStream({ autoClose: true });
// @ts-expect-error `Readable.toWeb` is avaiable in Node v17 but not in `@types/node`
return new Response(stream.Readable.toWeb(readable));
} catch (err: any) {
if (err.code === "ENOENT") {
return new Response(null, { status: 404 })
}
throw err;
}
}

has(key: AbsolutePath): Promise<boolean> {
has(key: AbsolutePath) {
const fp = path.join(this.root, strip_prefix(key));
return fs.promises.access(fp).then(() => true).catch(() => false);
return fsp.access(fp).then(() => true).catch(() => false);
}

async set(key: AbsolutePath, value: Uint8Array): Promise<void> {
const fp = path.join(this.root, strip_prefix(key));
await fs.promises.mkdir(path.dirname(fp), { recursive: true });
await fs.promises.writeFile(fp, value, null);
await fsp.mkdir(path.dirname(fp), { recursive: true });
await fsp.writeFile(fp, value, null);
}

async delete(key: AbsolutePath): Promise<boolean> {
const fp = path.join(this.root, strip_prefix(key));
await fs.promises.unlink(fp);
await fsp.unlink(fp);
return true;
}

Expand All @@ -62,7 +71,7 @@ class FileSystemStore implements Async<ExtendedReadable & Writeable> {

const fp = path.join(this.root, prefix.slice(1));
try {
const dir = await fs.promises.readdir(fp, { withFileTypes: true });
const dir = await fsp.readdir(fp, { withFileTypes: true });
dir.forEach((d) => {
if (d.isFile()) contents.push(d.name);
if (d.isDirectory()) prefixes.push(d.name); // directory
Expand All @@ -78,7 +87,7 @@ class FileSystemStore implements Async<ExtendedReadable & Writeable> {
}

async function* walk(dir: string): AsyncGenerator<string> {
const dirents = await fs.promises.readdir(dir, { withFileTypes: true });
const dirents = await fsp.readdir(dir, { withFileTypes: true });
for (const dirent of dirents) {
const res = path.join(dir, dirent.name);
if (dirent.isDirectory()) {
Expand Down
9 changes: 8 additions & 1 deletion src/storage/mem.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import type { ExtendedReadable, PrefixPath, RootPath, Writeable } from "../types";
import type { ExtendedReadable, PrefixPath, RootPath, Writeable, AbsolutePath } from "../types";

class MemoryStore extends Map<string, Uint8Array> implements ExtendedReadable, Writeable {
get(path: AbsolutePath) {
return new Response(
super.get(path),
{ status: super.has(path) ? 200 : 404 },
);
}

list_prefix(prefix: RootPath | PrefixPath) {
const items = [];
for (const path of super.keys()) {
Expand Down
21 changes: 9 additions & 12 deletions src/storage/ref.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ class ReferenceStore implements Async<Readable<RequestInit>> {
async get(key: AbsolutePath, opts: RequestInit = {}) {
let ref = this.refs.get(strip_prefix(key));

if (!ref) return;
if (!ref) {
return new Response(null, { status: 404 });
}

if (typeof ref === "string") {
let data;
if (ref.startsWith("base64:")) {
return to_binary(ref.slice("base64:".length));
data = to_binary(ref.slice("base64:".length));
} else {
data = new TextEncoder().encode(ref);
}
return new TextEncoder().encode(ref);
return new Response(data, { status: 200 });
}

let [urlOrNull, offset, size] = ref;
Expand All @@ -32,15 +37,7 @@ class ReferenceStore implements Async<Readable<RequestInit>> {
throw Error(`No url for key ${key}, and no target url provided.`);
}

let res = await fetch_range({ url: uri2href(url), offset, size }, opts);

if (res.status === 200 || res.status === 206) {
return new Uint8Array(await res.arrayBuffer());
}

throw new Error(
`Request unsuccessful for key ${key}. Response status: ${res.status}.`,
);
return fetch_range({ url: uri2href(url), offset, size }, opts);
}

async has(key: AbsolutePath) {
Expand Down
6 changes: 4 additions & 2 deletions src/storage/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ class ZipFileStore<R extends Reader> implements Async<Readable> {

async get(key: AbsolutePath) {
let entry = (await this.info).entries[strip_prefix(key)];
if (!entry) return;
return new Uint8Array(await entry.arrayBuffer());
if (!entry) {
return new Response(null, { status: 404 });
}
return new Response(await entry.blob(), { status: 202 });
}

async has(key: AbsolutePath) {
Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export type Async<T extends Record<string, any>> = {
};

export interface Readable<Opts = any> {
get(key: AbsolutePath, opts?: Opts): Uint8Array | undefined;
get(key: AbsolutePath, opts?: Opts): Response;
}

export interface Writeable {
Expand Down
19 changes: 10 additions & 9 deletions src/v2.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Array as BaseArray, ArrayProps, Group as BaseGroup } from "./lib/hierarchy";
import { registry } from "./lib/codec-registry";
import { KeyError, NodeNotFoundError } from "./lib/errors";
import { is_dtype, json_decode_object, json_encode_object } from "./lib/util";
import { is_dtype, json_encode_object } from "./lib/util";
import type {
AbsolutePath,
Async,
Expand All @@ -23,8 +23,8 @@ async function get_attrs<Store extends Readable | Async<Readable>>(
store: Store,
path: AbsolutePath,
) {
const maybe_bytes = await store.get(attrs_key(path));
const attrs: Attrs = maybe_bytes ? json_decode_object(maybe_bytes) : {};
const response = await store.get(attrs_key(path));
const attrs: Attrs = response.ok ? await response.json() : {};
return attrs;
}

Expand Down Expand Up @@ -183,11 +183,11 @@ async function _get_array<
Path extends AbsolutePath,
>(store: Store, path: Path) {
const meta_key = array_meta_key(path);
const meta_doc = await store.get(meta_key);
if (!meta_doc) {
const response = await store.get(meta_key);
if (!response.ok) {
throw new NodeNotFoundError(path);
}
const meta: ArrayMetadata<DataType> = json_decode_object(meta_doc);
const meta: ArrayMetadata<DataType> = await response.json();
return new Array({
store: store,
path,
Expand Down Expand Up @@ -261,11 +261,12 @@ async function _get_group<
Path extends AbsolutePath,
>(store: Store, path: Path) {
const meta_key = group_meta_key(path);
const meta_doc = await store.get(meta_key);
if (!meta_doc) {
const response = await store.get(meta_key);
if (!response.ok) {
throw new NodeNotFoundError(path);
}
return new Group({ store, path });
const attrs = await response.json();
return new Group({ store, path, attrs });
}

/**
Expand Down
20 changes: 10 additions & 10 deletions src/v3.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Array as BaseArray, ArrayProps, Group } from "./lib/hierarchy";
import { registry } from "./lib/codec-registry";
import { assert, KeyError, NodeNotFoundError, NotImplementedError } from "./lib/errors";
import { is_dtype, json_decode_object, json_encode_object } from "./lib/util";
import { is_dtype, json_encode_object } from "./lib/util";

import type {
AbsolutePath,
Expand Down Expand Up @@ -224,13 +224,13 @@ export async function get_hierarchy<Store extends Readable | Async<Readable>>(
): Promise<Hierarchy<Store>> {
// retrieve and parse entry point metadata document
const meta_key = "/zarr.json";
const meta_doc = await store.get(meta_key);
const response = await store.get(meta_key);

if (!meta_doc) {
if (!response.ok) {
throw new NodeNotFoundError(meta_key);
}

const meta: RootMetadata = json_decode_object(meta_doc);
const meta: RootMetadata = await response.json();

// check protocol version
const segments = meta.zarr_format.split("/");
Expand Down Expand Up @@ -444,13 +444,13 @@ async function _get_array<
path: Path,
) {
const key = meta_key(path, hierarchy.meta_key_suffix, "array");
const meta_doc = await hierarchy.store.get(key);
const response = await hierarchy.store.get(key);

if (!meta_doc) {
if (!response.ok) {
throw new NodeNotFoundError(path);
}

const meta: ArrayMetadata<DataType> = json_decode_object(meta_doc);
const meta: ArrayMetadata<DataType> = await response.json();

// decode and check metadata
const {
Expand Down Expand Up @@ -527,11 +527,11 @@ async function _get_group<
path: Path,
) {
const key = meta_key(path, hierarchy.meta_key_suffix, "group");
const meta_doc = await hierarchy.store.get(key);
if (!meta_doc) {
const response = await hierarchy.store.get(key);
if (!response.ok) {
throw new NodeNotFoundError(path);
}
const meta: GroupMetadata = json_decode_object(meta_doc);
const meta: GroupMetadata = await response.json();
return new ExplicitGroup({
store: hierarchy.store,
owner: hierarchy,
Expand Down
Loading

0 comments on commit a2a1027

Please sign in to comment.