Skip to content

Commit

Permalink
Single-locale Zarr IO + single chunk updates (#25361)
Browse files Browse the repository at this point in the history
This PR adds support for single-locale reading/writing of Zarr stores,
which was a user request. It also adds an `updateZarrChunk` method to
allow for writing individual chunks to storage rather than writing the
whole array. This is in anticipation of adding more support for partial
IO on Zarr stores.

Testing: CHPL_COMM={none,gasnet} x machines={mac, horizon}

Reviewed by: @jabraham17
  • Loading branch information
brandon-neth authored Jul 2, 2024
2 parents 2b0e7d4 + 2658104 commit 5421963
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 20 deletions.
190 changes: 170 additions & 20 deletions modules/packages/Zarr.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,16 @@ module Zarr {
}


private proc buildChunkPath(directoryPath: string, delimiter: string, const chunkIndices: ?dimCount * int) {
private proc buildChunkPath(directoryPath: string, delimiter: string, const chunkIndex: ?dimCount * int) {
var indexStrings: dimCount*string;
for i in 0..<dimCount do indexStrings[i] = chunkIndices[i] : string;
for i in 0..<dimCount do indexStrings[i] = chunkIndex[i] : string;
return joinPath(directoryPath, delimiter.join(indexStrings));
}
private proc buildChunkPath(directoryPath: string, delimiter: string, chunkIndex: int) {
return joinPath(directoryPath, chunkIndex:string);
}

/* Returns the domain of chunks that the calling locale is responsible for */
/* Returns the indices of the chunks that contain elements present in a subdomain of the array. */
proc getLocalChunks(D: domain(?), localD: domain(?), chunkShape: ?dimCount*int): domain(dimCount) {

const totalShape = D.shape;
Expand All @@ -164,18 +164,18 @@ module Zarr {
}


/* Returns the domain of the `chunkIndices`-th chunk for chunks of size `chunkShape` */
private proc getChunkDomain(chunkShape: ?dimCount*int, chunkIndices: dimCount*int) {
/* Returns the domain of the `chunkIndex`-th chunk for chunks of size `chunkShape` */
private proc getChunkDomain(chunkShape: ?dimCount*int, chunkIndex: dimCount*int) {
var thisChunkRange: dimCount*range(int);
for i in 0..<dimCount {
const start = chunkIndices[i] * chunkShape[i];
const start = chunkIndex[i] * chunkShape[i];
thisChunkRange[i] = start..<start+chunkShape[i];
}
const thisChunkDomain: domain(dimCount) = thisChunkRange;
return thisChunkDomain;
}
private proc getChunkDomain(chunkShape: ?dimCount*int, chunkIndices: int) {
return getChunkDomain(chunkShape, (chunkIndices,));
private proc getChunkDomain(chunkShape: ?dimCount*int, chunkIndex: int) {
return getChunkDomain(chunkShape, (chunkIndex,));
}


Expand Down Expand Up @@ -294,10 +294,10 @@ module Zarr {
}

/*
Reads a v2.0 zarr store from storage, returning a block distributed array.
Each locale reads and decompresses the chunks with elements in its
subdomain. This method assumes a shared filesystem where all nodes can
access the store directory.
Reads a v2.0 zarr store from storage using all locales, returning a
block distributed array. Each locale reads and decompresses the chunks
with elements in its subdomain. This method assumes a shared filesystem
where all nodes can access the store directory.
:arg directoryPath: Relative or absolute path to the root of the zarr
store. The store is expected to contain a '.zarray' metadata file
Expand Down Expand Up @@ -339,11 +339,11 @@ module Zarr {
ref hereA = A[hereD];

const localChunks = getLocalChunks(D, hereD, chunkShape);
forall chunkIndices in localChunks {
forall chunkIndex in localChunks {

const chunkPath = buildChunkPath(directoryPath, ".", chunkIndices);
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);

const thisChunkDomain = getChunkDomain(chunkShape, chunkIndices);
const thisChunkDomain = getChunkDomain(chunkShape, chunkIndex);
const thisChunkHere = hereD[thisChunkDomain];

ref thisChunkSlice = hereA.localSlice(thisChunkHere);
Expand All @@ -354,6 +354,8 @@ module Zarr {
return A;
}



/*
Writes an array to storage as a v2.0 zarr store. The array metadata and
chunks will be stored within the `directoryPath` directory, which is created
Expand Down Expand Up @@ -418,17 +420,165 @@ module Zarr {
// Identify the range of chunks this locale will contribute to
const localChunks = getLocalChunks(normA.domain, hereD, chunkShape);

forall chunkIndices in localChunks {
forall chunkIndex in localChunks {
// Get the part of the array that contributes to this chunk
const thisChunkDomain = getChunkDomain(chunkShape, chunkIndices);
const thisChunkDomain = getChunkDomain(chunkShape, chunkIndex);
const thisChunkHere = hereD[thisChunkDomain];
ref thisChunkSlice = hereA.localSlice(thisChunkHere);
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndices);
locks[chunkIndices].writeEF(true);
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);
locks[chunkIndex].writeEF(true);
writeChunk(dimCount, chunkPath, thisChunkDomain, thisChunkSlice, bloscLevel=bloscLevel);
locks[chunkIndices].readFE();
locks[chunkIndex].readFE();
}
}
}


/*
Reads a v2.0 zarr store from storage using a single locale, returning a
locally allocated array. This method assumes a shared filesystem
where the current locale can access the store directory.
:arg directoryPath: Relative or absolute path to the root of the zarr
store. The store is expected to contain a '.zarray' metadata file
:arg dtype: Chapel type of the store's data
:arg dimCount: Dimensionality of the zarr array
:arg bloscThreads: The number of threads to use during decompression
(default=1)
*/
proc readZarrArrayLocal(directoryPath: string, type dtype, param dimCount: int, bloscThreads: int(32) = 1) throws {
var md = getMetadata(directoryPath);
validateMetadata(md, dtype, dimCount);
var totalShape, chunkShape : dimCount*int;
var chunkCounts: dimCount*int;
var totalRanges,chunkRanges: dimCount*range(int);
for i in 0..<dimCount {
totalShape[i] = md.shape[i];
chunkShape[i] = md.chunks[i];
chunkCounts[i] = ceil(totalShape[i]:real / chunkShape[i]:real) : int;
totalRanges[i] = 0..<totalShape[i];
chunkRanges[i] = 0..<chunkCounts[i];
}
const fullChunkDomain: domain(dimCount) = chunkRanges;

// Initialize the domain and array
const D : domain(dimCount) = totalRanges;
var A: [D] dtype;

blosc_init();
blosc_set_nthreads(bloscThreads);
forall chunkIndex in fullChunkDomain {
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);
const thisChunkDomain = D[getChunkDomain(chunkShape, chunkIndex)];
ref thisChunkSlice = A[thisChunkDomain];
readChunk(dimCount, chunkPath, thisChunkDomain, thisChunkSlice);
}
blosc_destroy();
return A;
}

/*
Writes an array to storage as a v2.0 zarr store using a single locale. The
array metadata and chunks will be stored within the `directoryPath`
directory, which is created if it does not yet exist. The chunks will
have the dimensions given in the`chunkShape` argument.
:arg directoryPath: Relative or absolute path to the root of the zarr store.
The directory and all necessary parent directories will be created if it
does not exist.
:arg A: The array to write to storage.
:arg chunkShape: The dimension extents to use when breaking A into chunks.
:arg bloscThreads: The number of threads to use during compression (default=1)
:arg bloscLevel: Compression level to use. 0 indicates no compression,
9 (default) indicates maximum compression.
*/
proc writeZarrArrayLocal(directoryPath: string, ref A: [?domainType] ?dtype, chunkShape: ?dimCount*int, bloscThreads: int(32) = 1, bloscLevel: int(32) = 9) throws {

// Create the metadata record that is written before the chunks
var shape, chunks: list(int);
for size in A.shape do shape.pushBack(size);
for size in chunkShape do chunks.pushBack(size);
const md: zarrMetadataV2 = new zarrMetadataV2(2, chunks, dtypeString(dtype), shape);

// Clear the directory before writing
if exists(directoryPath) then rmTree(directoryPath);
mkdir(directoryPath, parents=true);

// Write the metadata
const metadataPath = joinPath(directoryPath, ".zarray");
const w = openWriter(metadataPath, serializer = new jsonSerializer(),locking=true);
w.writef("%?\n", md);

// Normalize the array's domain to be zero-indexed
var normalizedRanges: dimCount*range(int);
for i in 0..<dimCount do
normalizedRanges[i] = 0..<shape[i];
const D: domain(dimCount) = normalizedRanges;
ref normA = A.reindex(D);


blosc_init();
blosc_set_nthreads(bloscThreads);

const localChunks = getLocalChunks(D, D, chunkShape);

forall chunkIndex in localChunks {
// Get the part of the array that contributes to this chunk
const chunkBounds = getChunkDomain(chunkShape, chunkIndex);
const chunkForDomain = D[chunkBounds];
ref chunkData = normA[chunkForDomain];
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);
writeChunk(dimCount, chunkPath, chunkData.domain, chunkData, bloscLevel=bloscLevel);
}

blosc_destroy();
}

/*
Updates a single chunk within a Zarr store with the data in `A`. The
Zarr store and the associated metadata file must already exist.
:arg directoryPath: Relative or absolute path to the root of the zarr
store. This directory should exist and contain a '.zarray' metadata file.
:arg A: The array to update the chunk with.
:arg chunkIndex: The index of the chunk to update.
:arg bloscThreads: The number of threads to use during compression (default=1)
*/
proc updateZarrChunk(directoryPath: string, ref A: [?domainType] ?dtype, chunkIndex: ?dimCount*int, bloscThreads: int(32) = 1) throws {
var md = getMetadata(directoryPath);
validateMetadata(md, dtype, dimCount);
var chunkShape: dimCount*int;
for i in 0..<dimCount {
chunkShape[i] = md.chunks[i];
}

// Normalize the array's domain to be zero-indexed
var normalizedRanges: dimCount*range(int);
for i in 0..<dimCount do
normalizedRanges[i] = 0..<md.shape[i];
const D: domain(dimCount) = normalizedRanges;
ref normA = A.reindex(D);

ref chunkData = normA[D[getChunkDomain(chunkShape, chunkIndex)]];
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);

blosc_init();
blosc_set_nthreads(bloscThreads);
writeChunk(dimCount, chunkPath, chunkData.domain, chunkData);
blosc_destroy();
}

proc updateZarrChunk(directoryPath: string, ref A: [?domainType] ?dtype, chunkIndex: int, bloscThreads: int(32) = 1) throws {
updateZarrChunk(directoryPath, A, (chunkIndex,), bloscThreads);
}
}
1 change: 1 addition & 0 deletions test/library/packages/Zarr/CLEANFILES
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ ReindexStore
Test1D
Test2D
Test3D
LocalIOStore_*
64 changes: 64 additions & 0 deletions test/library/packages/Zarr/ZarrTest.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,68 @@ proc reindexTest(type dtype) {

}

proc localIOTest(type dtype) {
// Test having each locale read and write a different store
const N = 100;
coforall loc in Locales do on loc {
const storeName = "LocalIOStore_%?".format(loc.id);
const D = {0..<N};
var A: [D] dtype;
fillRandom(A);
if exists(storeName) then rmTree(storeName);
writeZarrArrayLocal(storeName, A, (7,));
var B = readZarrArrayLocal(storeName, dtype, 1);
forall i in D do
assert(A[i] == B[i], "Mismatch on indices %?. Written: %?.\n Read: %?.\nFailure on locale %?\n".format(i, A[i], B[i], loc.id));
}

// 2 dimensional stores with different low bounds
coforall loc in Locales do on loc {
const storeName = "LocalIOStore_%?".format(loc.id);
const D = {0..<N,1..N};
var A: [D] dtype;
fillRandom(A);
if exists(storeName) then rmTree(storeName);
writeZarrArrayLocal(storeName, A, (7,22));
var B = readZarrArrayLocal(storeName, dtype, 2);
ref viewB = B.reindex(D);
forall i in D do
assert(A[i] == viewB[i], "Mismatch on indices %?. Written: %?.\n Read: %?.\nFailure on locale %?\n".format(i, A[i], viewB[i], loc.id));
}
}

proc updateChunkTest(type dtype) {
const N = 100;
const N1 = 100;
const D1: domain(1) dmapped new blockDist({0..<N1}) = {0..<N1};
var A1: [D1] dtype;
for i in D1 do A1[i] = (i + 3):dtype;
if (isDir("Test1D")) then rmTree("Test1D");
writeZarrArray("Test1D", A1, (7,));

A1[10] = -1;
updateZarrChunk("Test1D", A1, 1);

var B1 = readZarrArray("Test1D", dtype, 1);

assert(B1[10] == -1, "Failed to update chunk in 1D array");

rmTree("Test1D");

const N2 = 100;
const D2: domain(2) dmapped new blockDist({0..<N2,0..<N2}) = {0..<N2,0..<N2};
var A2: [D2] dtype;
fillRandom(A2);
if (exists("Test2D")) then rmTree("Test2D");
writeZarrArray("Test2D", A2, (7,18));

A2[10,10] *= 3;
updateZarrChunk("Test2D", A2, (1,0));

var B2 = readZarrArray("Test2D", dtype, 2);
assert(B2[10,10] == A2[10,10], "Failed to update chunk in 2D array");

}

proc main() {
testGetLocalChunks();
Expand All @@ -114,6 +176,8 @@ proc main() {
writeln("Testing ", dtype:string);
smallTest(dtype);
reindexTest(dtype);
localIOTest(dtype);
updateChunkTest(dtype);
}
writeln("Pass");
}

0 comments on commit 5421963

Please sign in to comment.