Skip to content

Commit

Permalink
Support multirange in the driver
Browse files Browse the repository at this point in the history
  • Loading branch information
scotttrinh committed Oct 4, 2023
1 parent 0b97399 commit 73df1c7
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 87 deletions.
15 changes: 9 additions & 6 deletions packages/driver/src/codecs/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* limitations under the License.
*/

import { ICodec, Codec, ScalarCodec, uuid, CodecKind } from "./ifaces";
import type { ICodec, uuid, CodecKind } from "./ifaces";
import { Codec, ScalarCodec } from "./ifaces";
import { WriteBuffer, ReadBuffer } from "../primitives/buffer";
import { TupleCodec } from "./tuple";
import { RangeCodec } from "./range";
import { MultiRangeCodec, RangeCodec } from "./range";
import { InvalidArgumentError, ProtocolError } from "../errors";
import { NamedTupleCodec } from "./namedtuple";

Expand All @@ -39,7 +40,8 @@ export class ArrayCodec extends Codec implements ICodec {
this.subCodec instanceof ScalarCodec ||
this.subCodec instanceof TupleCodec ||
this.subCodec instanceof NamedTupleCodec ||
this.subCodec instanceof RangeCodec
this.subCodec instanceof RangeCodec ||
this.subCodec instanceof MultiRangeCodec
)
) {
throw new InvalidArgumentError(
Expand All @@ -48,7 +50,9 @@ export class ArrayCodec extends Codec implements ICodec {
}

if (!Array.isArray(obj) && !isTypedArray(obj)) {
throw new InvalidArgumentError("an array was expected");
throw new InvalidArgumentError(
`an array or multirange was expected (got type ${obj.constructor.name})`
);
}

const subCodec = this.subCodec;
Expand All @@ -60,8 +64,7 @@ export class ArrayCodec extends Codec implements ICodec {
throw new InvalidArgumentError("too many elements in array");
}

for (let i = 0; i < objLen; i++) {
const item = obj[i];
for (const item of obj) {
if (item == null) {
elemData.writeInt32(-1);
} else {
Expand Down
3 changes: 2 additions & 1 deletion packages/driver/src/codecs/ifaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ export type CodecKind =
| "set"
| "scalar"
| "sparse_object"
| "range";
| "range"
| "multirange";

export interface ICodec {
readonly tid: uuid;
Expand Down
194 changes: 146 additions & 48 deletions packages/driver/src/codecs/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* limitations under the License.
*/

import { ICodec, Codec, uuid, CodecKind } from "./ifaces";
import type { ICodec, uuid, CodecKind } from "./ifaces";
import { Codec } from "./ifaces";
import { WriteBuffer, ReadBuffer } from "../primitives/buffer";
import { Range } from "../datatypes/range";
import { InvalidArgumentError } from "../errors";
import { MultiRange, Range } from "../datatypes/range";
import { InvalidArgumentError, ProtocolError } from "../errors";

enum RangeFlags {
EMPTY = 1 << 0,
Expand All @@ -29,6 +30,68 @@ enum RangeFlags {
EMPTY_UPPER = 1 << 4,
}

const MAXINT32 = 0x7fffffff;

function encodeRange(buf: WriteBuffer, obj: any, subCodec: ICodec): void {
if (!(obj instanceof Range)) {
throw new InvalidArgumentError("a Range was expected");
}

const elemData = new WriteBuffer();

if (obj.lower !== null) {
subCodec.encode(elemData, obj.lower);
}
if (obj.upper !== null) {
subCodec.encode(elemData, obj.upper);
}

const elemBuf = elemData.unwrap();

buf.writeInt32(1 + elemBuf.length);
buf.writeUInt8(
obj.isEmpty
? RangeFlags.EMPTY
: (obj.incLower ? RangeFlags.INC_LOWER : 0) |
(obj.incUpper ? RangeFlags.INC_UPPER : 0) |
(obj.lower === null ? RangeFlags.EMPTY_LOWER : 0) |
(obj.upper === null ? RangeFlags.EMPTY_UPPER : 0)
);
buf.writeBuffer(elemBuf);
}

function decodeRange(buf: ReadBuffer, subCodec: ICodec): any {
const flags = buf.readUInt8();

if (flags & RangeFlags.EMPTY) {
return Range.empty();
}

const elemBuf = ReadBuffer.alloc();

let lower: any = null;
let upper: any = null;

if (!(flags & RangeFlags.EMPTY_LOWER)) {
buf.sliceInto(elemBuf, buf.readInt32());
lower = subCodec.decode(elemBuf);
elemBuf.finish();
}

if (!(flags & RangeFlags.EMPTY_UPPER)) {
buf.sliceInto(elemBuf, buf.readInt32());
upper = subCodec.decode(elemBuf);
elemBuf.finish();
}

return new Range(
lower,
upper,
!!(flags & RangeFlags.INC_LOWER),
!!(flags & RangeFlags.INC_UPPER)
);
}

export class RangeCodec extends Codec implements ICodec {
private subCodec: ICodec;

Expand All @@ -37,73 +100,108 @@ export class RangeCodec extends Codec implements ICodec {
this.subCodec = subCodec;
}

encode(buf: WriteBuffer, obj: any) {
return encodeRange(buf, obj, this.subCodec);
}

decode(buf: ReadBuffer): any {
return decodeRange(buf, this.subCodec);
}

getSubcodecs(): ICodec[] {
return [this.subCodec];
}

getKind(): CodecKind {
return "range";
}
}

export class MultiRangeCodec extends Codec implements ICodec {
private subCodec: ICodec;

constructor(tid: uuid, subCodec: ICodec) {
super(tid);
this.subCodec = subCodec;
}

encode(buf: WriteBuffer, obj: any): void {
if (!(obj instanceof Range)) {
throw new InvalidArgumentError("a Range was expected");
if (!(obj instanceof MultiRange)) {
throw new TypeError(
`a MultiRange expected (got type ${obj.constructor.name})`
);
}

const subCodec = this.subCodec;
const elemData = new WriteBuffer();
const objLen = obj.length;
if (objLen > MAXINT32) {
throw new InvalidArgumentError("too many elements in array");
}

if (obj.lower !== null) {
subCodec.encode(elemData, obj.lower);
const elemData = new WriteBuffer();
for (const item of obj) {
try {
encodeRange(elemData, item, this.subCodec);
} catch (e) {
if (e instanceof InvalidArgumentError) {
throw new InvalidArgumentError(
`invalid multirange element: ${e.message}`
);
} else {
throw e;
}
}
}
if (obj.upper !== null) {
subCodec.encode(elemData, obj.upper);

const elemBuf = elemData.unwrap()
const elemDataLen = elemBuf.length;
if (elemDataLen > MAXINT32 - 4) {
throw new InvalidArgumentError(
`size of encoded multirange datum exceeds the maximum allowed ${
MAXINT32 - 4
} bytes`
);
}

const elemBuf = elemData.unwrap();

buf.writeInt32(1 + elemBuf.length);
buf.writeUInt8(
obj.isEmpty
? RangeFlags.EMPTY
: (obj.incLower ? RangeFlags.INC_LOWER : 0) |
(obj.incUpper ? RangeFlags.INC_UPPER : 0) |
(obj.lower === null ? RangeFlags.EMPTY_LOWER : 0) |
(obj.upper === null ? RangeFlags.EMPTY_UPPER : 0)
);
// Datum length
buf.writeInt32(4 + elemDataLen);

// Number of elements in multirange
buf.writeInt32(objLen);
buf.writeBuffer(elemBuf);
}

decode(buf: ReadBuffer): any {
const flags = buf.readUInt8();

if (flags & RangeFlags.EMPTY) {
return Range.empty();
}

const elemCount = buf.readInt32();
const result = new Array(elemCount);
const elemBuf = ReadBuffer.alloc();
const subCodec = this.subCodec;

let lower: any = null;
let upper: any = null;

if (!(flags & RangeFlags.EMPTY_LOWER)) {
buf.sliceInto(elemBuf, buf.readInt32());
lower = subCodec.decode(elemBuf);
elemBuf.finish();
for (let i = 0; i < elemCount; i++) {
const elemLen = buf.readInt32();
if (elemLen === -1) {
throw new ProtocolError("unexpected NULL element in multirange value");
} else {
buf.sliceInto(elemBuf, elemLen);
const elem = decodeRange(elemBuf, subCodec);
if (elemBuf.length) {
throw new ProtocolError(
`unexpected trailing data in buffer after multirange element decoding: ${elemBuf.length}`
);
}

result[i] = elem;
elemBuf.finish();
}
}

if (!(flags & RangeFlags.EMPTY_UPPER)) {
buf.sliceInto(elemBuf, buf.readInt32());
upper = subCodec.decode(elemBuf);
elemBuf.finish();
}

return new Range(
lower,
upper,
!!(flags & RangeFlags.INC_LOWER),
!!(flags & RangeFlags.INC_UPPER)
);
return new MultiRange(result);
}

getSubcodecs(): ICodec[] {
return [this.subCodec];
}

getKind(): CodecKind {
return "range";
return "multirange";
}
}
15 changes: 14 additions & 1 deletion packages/driver/src/codecs/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { NamedTupleCodec } from "./namedtuple";
import { EnumCodec } from "./enum";
import { ObjectCodec } from "./object";
import { SetCodec } from "./set";
import { RangeCodec } from "./range";
import { MultiRangeCodec, RangeCodec } from "./range";
import { ProtocolVersion } from "../ifaces";
import { versionGreaterThanOrEqual } from "../utils";
import { SparseObjectCodec } from "./sparseObject";
Expand All @@ -49,6 +49,7 @@ const CTYPE_ARRAY = 6;
const CTYPE_ENUM = 7;
const CTYPE_INPUT_SHAPE = 8;
const CTYPE_RANGE = 9;
const CTYPE_MULTIRANGE = 12;

export interface CustomCodecSpec {
int64_bigint?: boolean;
Expand Down Expand Up @@ -446,6 +447,18 @@ export class CodecsRegistry {
res = new RangeCodec(tid, subCodec);
break;
}

case CTYPE_MULTIRANGE: {
const pos = frb.readUInt16();
const subCodec = cl[pos];
if (subCodec == null) {
throw new ProtocolError(
"could not build range codec: missing subcodec"
);
}
res = new MultiRangeCodec(tid, subCodec);
break;
}
}

if (res == null) {
Expand Down
24 changes: 23 additions & 1 deletion packages/driver/src/datatypes/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

import { Duration, LocalDate, LocalDateTime } from "./datetime";
import type { Duration, LocalDate, LocalDateTime } from "./datetime";

export class Range<
T extends number | Date | LocalDate | LocalDateTime | Duration
Expand Down Expand Up @@ -63,3 +63,25 @@ export class Range<
};
}
}

export class MultiRange<
T extends number | Date | LocalDate | LocalDateTime | Duration
> {
constructor(private readonly _ranges: Range<T>[] = []) {}

get length() {
return this._ranges.length;
}

*[Symbol.iterator]() {
for (const range of this._ranges) {
yield range;
}
}

toJSON() {
return {
ranges: this._ranges,
};
}
}
4 changes: 2 additions & 2 deletions packages/driver/src/index.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ export {
DateDuration,
} from "./datatypes/datetime";
export { ConfigMemory } from "./datatypes/memory";
export { Range } from "./datatypes/range";
export { Range, MultiRange } from "./datatypes/range";

export type { Executor } from "./ifaces";

export * from "./errors";

/* Private APIs */
import * as codecs from "./codecs/ifaces";
import type * as codecs from "./codecs/ifaces";
import * as reg from "./codecs/registry";
import * as buf from "./primitives/buffer";
export const _CodecsRegistry = reg.CodecsRegistry;
Expand Down
Loading

0 comments on commit 73df1c7

Please sign in to comment.