Skip to content

Commit

Permalink
Refactor ArrowResultConverter - cleanup and make it skip empty batches
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Mar 21, 2024
1 parent f88ecc6 commit 374af38
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 58 deletions.
95 changes: 60 additions & 35 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>

private readonly schema: Array<TColumnDesc>;

private reader?: IterableIterator<RecordBatch<TypeMap>>;
private recordBatchReader?: IterableIterator<RecordBatch<TypeMap>>;

private pendingRecordBatch?: RecordBatch<TypeMap>;
// This is the next (!!) record batch to be read. It is unset only in two cases:
// - prior to the first call to `fetchNext`
// - when no more data available
// This field is primarily used by a `hasMore`, so it can tell if next `fetchNext` will
// actually return a non-empty result
private prefetchedRecordBatch?: RecordBatch<TypeMap>;

constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: TGetResultSetMetadataResp) {
this.context = context;
Expand All @@ -44,7 +49,7 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
if (this.schema.length === 0) {
return false;
}
if (this.pendingRecordBatch) {
if (this.prefetchedRecordBatch) {
return true;
}
return this.source.hasMore();
Expand All @@ -55,47 +60,67 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
return [];
}

// eslint-disable-next-line no-constant-condition
while (true) {
// It's not possible to know if iterator has more items until trying
// to get the next item. But we need to know if iterator is empty right
// after getting the next item. Therefore, after creating the iterator,
// we get one item more and store it in `pendingRecordBatch`. Next time,
// we use that stored item, and prefetch the next one. Prefetched item
// is therefore the next item we are going to return, so it can be used
// to know if we actually can return anything next time
const recordBatch = this.pendingRecordBatch;
this.pendingRecordBatch = this.prefetch();

if (recordBatch) {
const table = new Table(recordBatch);
return this.getRows(table.schema, table.toArray());
}
// It's not possible to know if iterator has more items until trying to get the next item.
// So each time we read one batch ahead and store it, but process the batch prefetched on
// a previous `fetchNext` call. Because we actually already have the next item - it's easy
// to tell if the subsequent `fetchNext` will be able to read anything, and `hasMore` logic
// becomes trivial

// eslint-disable-next-line no-await-in-loop
const { batches } = await this.source.fetchNext(options);
if (batches.length === 0) {
this.reader = undefined;
break;
}
// This prefetch handles a first call to `fetchNext`, when all the internal fields are not initialized yet.
// On subsequent calls to `fetchNext` it will do nothing
await this.prefetch(options);

if (this.prefetchedRecordBatch) {
// Here we consume a record batch fetched during previous call to `fetchNext`, and prefetch the next batch
const previousRecordBatch = this.prefetchedRecordBatch;
this.prefetchedRecordBatch = undefined;
await this.prefetch(options);

const reader = RecordBatchReader.from<TypeMap>(batches);
this.reader = reader[Symbol.iterator]();
this.pendingRecordBatch = this.prefetch();
const table = new Table(previousRecordBatch);
return this.getRows(table.schema, table.toArray());
}

return [];
}

private prefetch(): RecordBatch<TypeMap> | undefined {
const item = this.reader?.next() ?? { done: true, value: undefined };
// This method tries to read one more record batch and store it in `prefetchedRecordBatch` field.
// If `prefetchedRecordBatch` is already non-empty - the method does nothing.
// This method pulls the next item from source if needed, initializes a record batch reader and
// gets the next item from it - until either reaches end of data or finds a non-empty record batch
private async prefetch(options: ResultsProviderFetchNextOptions) {
// This loop will be executed until a next non-empty record batch is retrieved
// Another implicit loop condition (end of data) is checked in the loop body
while (!this.prefetchedRecordBatch) {
// First, try to fetch next item from source and initialize record batch reader.
// If source has no more data - exit prematurely
if (!this.recordBatchReader) {
const sourceHasMore = await this.source.hasMore(); // eslint-disable-line no-await-in-loop
if (!sourceHasMore) {
return;
}

const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop
if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) {
const reader = RecordBatchReader.from<TypeMap>(arrowBatch.batches);
this.recordBatchReader = reader[Symbol.iterator]();
}
}

if (item.done || item.value === undefined) {
this.reader = undefined;
return undefined;
// Try to get a next item from current record batch reader. The reader may be unavailable at this point -
// in this case we fall back to a "done" state, and the `while` loop will do one more iteration attempting
// to create a new reader. Eventually it will either succeed or reach end of source. This scenario also
// handles readers which are already empty
const item = this.recordBatchReader?.next() ?? { done: true, value: undefined };
if (item.done || item.value === undefined) {
this.recordBatchReader = undefined;
} else {
// Skip empty batches
// eslint-disable-next-line no-lonely-if
if (item.value.numRows > 0) {
this.prefetchedRecordBatch = item.value;
}
}
}

return item.value;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
Expand Down
33 changes: 10 additions & 23 deletions tests/unit/result/compatibility.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const { expect } = require('chai');
const ArrowResultHandler = require('../../../dist/result/ArrowResultHandler').default;
const ArrowResultConverter = require('../../../dist/result/ArrowResultConverter').default;
const JsonResultHandler = require('../../../dist/result/JsonResultHandler').default;
const ResultSlicer = require('../../../dist/result/ResultSlicer').default;

const { fixArrowResult } = require('../../fixtures/compatibility');
const fixtureColumn = require('../../fixtures/compatibility/column');
Expand All @@ -15,24 +14,18 @@ describe('Result handlers compatibility tests', () => {
it('colum-based data', async () => {
const context = {};
const rowSetProvider = new ResultsProviderMock(fixtureColumn.rowSets);
const result = new ResultSlicer(
context,
new JsonResultHandler(context, rowSetProvider, { schema: fixtureColumn.schema }),
);
const result = new JsonResultHandler(context, rowSetProvider, { schema: fixtureColumn.schema });
const rows = await result.fetchNext({ limit: 10000 });
expect(rows).to.deep.equal(fixtureColumn.expected);
});

it('arrow-based data without native types', async () => {
const context = {};
const rowSetProvider = new ResultsProviderMock(fixtureArrow.rowSets);
const result = new ResultSlicer(
const result = new ArrowResultConverter(
context,
new ArrowResultConverter(
context,
new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrow.arrowSchema }),
{ schema: fixtureArrow.schema },
),
new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrow.arrowSchema }),
{ schema: fixtureArrow.schema },
);
const rows = await result.fetchNext({ limit: 10000 });
expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected);
Expand All @@ -41,13 +34,10 @@ describe('Result handlers compatibility tests', () => {
it('arrow-based data with native types', async () => {
const context = {};
const rowSetProvider = new ResultsProviderMock(fixtureArrowNT.rowSets);
const result = new ResultSlicer(
const result = new ArrowResultConverter(
context,
new ArrowResultConverter(
context,
new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrowNT.arrowSchema }),
{ schema: fixtureArrowNT.schema },
),
new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrowNT.arrowSchema }),
{ schema: fixtureArrowNT.schema },
);
const rows = await result.fetchNext({ limit: 10000 });
expect(fixArrowResult(rows)).to.deep.equal(fixtureArrowNT.expected);
Expand All @@ -56,13 +46,10 @@ describe('Result handlers compatibility tests', () => {
it('should infer arrow schema from thrift schema', async () => {
const context = {};
const rowSetProvider = new ResultsProviderMock(fixtureArrow.rowSets);
const result = new ResultSlicer(
const result = new ArrowResultConverter(
context,
new ArrowResultConverter(
context,
new ArrowResultHandler(context, rowSetProvider, { schema: fixtureArrow.schema }),
{ schema: fixtureArrow.schema },
),
new ArrowResultHandler(context, rowSetProvider, { schema: fixtureArrow.schema }),
{ schema: fixtureArrow.schema },
);
const rows = await result.fetchNext({ limit: 10000 });
expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected);
Expand Down

0 comments on commit 374af38

Please sign in to comment.