Skip to content

Commit

Permalink
refactor(terminal): requestForResponse and promisify some methods (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
zccz14 authored Dec 29, 2024
1 parent 8e4b2e3 commit 8cfa74f
Show file tree
Hide file tree
Showing 42 changed files with 393 additions and 333 deletions.
16 changes: 7 additions & 9 deletions apps/data-collector/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import { IDataRecordTypes, formatTime, getDataRecordSchema } from '@yuants/data-model';
import { PromRegistry, Terminal, copyDataRecords, queryDataRecords } from '@yuants/protocol';
import { batchGroupBy, listWatch, switchMapWithComplete } from '@yuants/utils';
import { PromRegistry, Terminal, copyDataRecords, readDataRecords } from '@yuants/protocol';
import { listWatch } from '@yuants/utils';
import Ajv from 'ajv';
import CronJob from 'cron';
import {
EMPTY,
Observable,
OperatorFunction,
Subject,
Subscription,
catchError,
defaultIfEmpty,
defer,
distinctUntilChanged,
filter,
first,
interval,
map,
mergeAll,
mergeMap,
pipe,
repeat,
retry,
tap,
Expand Down Expand Up @@ -53,12 +51,13 @@ const term = new Terminal(HOST_URL, {
});

defer(() =>
queryDataRecords<ICopyDataRelation>(term, {
readDataRecords(term, {
type: 'copy_data_relation',
}),
)
.pipe(
//
mergeAll(),
map((x) => x.origin),
toArray(),
retry({ delay: 5_000 }),
Expand Down Expand Up @@ -185,8 +184,8 @@ const runTask = (cdr: ICopyDataRelation) =>
subs.push(
taskStart$.subscribe(() => {
defer(() =>
queryDataRecords(term, {
type: cdr.type,
readDataRecords(term, {
type: cdr.type as any,
tags: {
series_id: cdr.series_id,
},
Expand All @@ -199,7 +198,6 @@ const runTask = (cdr: ICopyDataRelation) =>
)
.pipe(
// ISSUE: prevent from data leak
toArray(),
retry({ delay: 5_000 }),
mergeMap((v) => v),
)
Expand Down
9 changes: 6 additions & 3 deletions apps/general-data-source/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IDataRecordTypes, IPeriod, getDataRecordSchema, getDataRecordWrapper } from '@yuants/data-model';
import { PromRegistry, Terminal, queryDataRecords, writeDataRecords } from '@yuants/protocol';
import { PromRegistry, Terminal, readDataRecords, writeDataRecords } from '@yuants/protocol';
import Ajv from 'ajv';
import {
EMPTY,
Expand All @@ -12,6 +12,7 @@ import {
groupBy,
interval,
map,
mergeAll,
mergeMap,
of,
repeat,
Expand Down Expand Up @@ -59,7 +60,7 @@ const syncData = (
//
map((gsr) =>
defer(() =>
queryDataRecords<IPeriod>(term, {
readDataRecords(term, {
type: 'period',
tags: {
datasource_id: gsr.specific_datasource_id,
Expand All @@ -70,6 +71,7 @@ const syncData = (
}),
).pipe(
//
mergeAll(),
map((v) => v.origin),
),
),
Expand Down Expand Up @@ -142,11 +144,12 @@ const syncData = (
};

const mapProductIdToGSRList$ = defer(() =>
queryDataRecords<IGeneralSpecificRelation>(term, {
readDataRecords(term, {
type: 'general_specific_relation',
}),
).pipe(
//
mergeAll(),
map((record) => {
const config = record.origin;
if (!validate(config)) {
Expand Down
6 changes: 4 additions & 2 deletions apps/general-realtime-data-source/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IDataRecordTypes, IPeriod, encodePath, formatTime, getDataRecordSchema } from '@yuants/data-model';
import { Terminal, providePeriods, queryDataRecords } from '@yuants/protocol';
import { Terminal, providePeriods, readDataRecords } from '@yuants/protocol';
import Ajv from 'ajv';
import {
EMPTY,
Expand All @@ -10,6 +10,7 @@ import {
from,
groupBy,
map,
mergeAll,
mergeMap,
of,
repeat,
Expand All @@ -33,11 +34,12 @@ const terminal = new Terminal(HV_URL, {
});

const mapProductIdToGSRList$ = defer(() =>
queryDataRecords<IGeneralSpecificRelation>(terminal, {
readDataRecords(terminal, {
type: 'general_specific_relation',
}),
).pipe(
//
mergeAll(),
map((record) => {
const config = record.origin;
if (!validate(config)) {
Expand Down
9 changes: 5 additions & 4 deletions apps/market-data-collector/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IDataRecordTypes, IPeriod, formatTime, getDataRecordSchema } from '@yuants/data-model';
import { PromRegistry, Terminal, copyDataRecords, queryDataRecords } from '@yuants/protocol';
import { PromRegistry, Terminal, copyDataRecords, readDataRecords } from '@yuants/protocol';
import { listWatch } from '@yuants/utils';
import Ajv from 'ajv';
import CronJob from 'cron';
Expand All @@ -15,6 +15,7 @@ import {
first,
interval,
map,
mergeAll,
mergeMap,
repeat,
retry,
Expand Down Expand Up @@ -51,12 +52,13 @@ const term = new Terminal(HV_URL, {
});

defer(() =>
queryDataRecords<IPullSourceRelation>(term, {
readDataRecords(term, {
type: 'pull_source_relation',
}),
)
.pipe(
//
mergeAll(),
map((x) => x.origin),
toArray(),
retry({ delay: 5_000 }),
Expand Down Expand Up @@ -188,7 +190,7 @@ const runTask = (psr: IPullSourceRelation) =>
subs.push(
taskStart$.subscribe(() => {
defer(() =>
queryDataRecords<IPeriod>(term, {
readDataRecords(term, {
type: 'period',
tags: {
datasource_id: psr.datasource_id,
Expand All @@ -204,7 +206,6 @@ const runTask = (psr: IPullSourceRelation) =>
)
.pipe(
// ISSUE: prevent from data leak
toArray(),
retry({ delay: 5_000 }),
mergeMap((v) => v),
)
Expand Down
86 changes: 43 additions & 43 deletions apps/trade-copier/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@ import {
getDataRecordSchema,
} from '@yuants/data-model';
import { IPositionDiff, diffPosition, mergePositions } from '@yuants/kernel';
import {
PromRegistry,
Terminal,
queryDataRecords,
submitOrder,
useAccountInfo,
useProducts,
} from '@yuants/protocol';
import { PromRegistry, Terminal, readDataRecords, useAccountInfo, useProducts } from '@yuants/protocol';
import '@yuants/protocol/lib/services';
import '@yuants/protocol/lib/services/order';
import { roundToStep } from '@yuants/utils';
import Ajv from 'ajv';
import addFormats from 'ajv-formats';
Expand All @@ -39,6 +34,7 @@ import {
groupBy,
lastValueFrom,
map,
mergeAll,
mergeMap,
of,
pairwise,
Expand Down Expand Up @@ -75,11 +71,12 @@ addFormats(ajv);
const tradeConfigValidate = ajv.compile(getDataRecordSchema('trade_copier_trade_config')!);

const tradeConfig$ = defer(() =>
queryDataRecords<ITradeCopierTradeConfig>(terminal, {
readDataRecords(terminal, {
type: 'trade_copier_trade_config',
}),
).pipe(
//
mergeAll(),
map((record) => {
const config = record.origin;
if (!tradeConfigValidate(config)) {
Expand All @@ -91,40 +88,43 @@ const tradeConfig$ = defer(() =>
shareReplay(1),
);

const validateOfTradeCopyRelation = ajv.compile(getDataRecordSchema('trade_copy_relation')!);
const validateOfTradeCopyRelation = ajv.compile<ITradeCopyRelation>(
getDataRecordSchema('trade_copy_relation')!,
);

const config$ = defer(() =>
queryDataRecords<ITradeCopyRelation>(terminal, { type: 'trade_copy_relation' }),
).pipe(
//
map((msg) => msg.origin),
filter((msg) => !msg.disabled),
filter((x) => validateOfTradeCopyRelation(x)),
toArray(),
map((data): ITradeCopierConfig => ({ tasks: data })),

tap((config) => console.info(formatTime(Date.now()), 'LoadConfig', JSON.stringify(config))),
tap((config) => {
for (const task of config.tasks) {
const labels = {
source_account_id: task.source_account_id,
target_account_id: task.target_account_id,
source_product_id: task.source_product_id,
target_product_id: task.target_product_id,
};
if (task.multiple === 0) {
MetricMatrixUp.set(0, labels);
} else {
MetricMatrixUp.set(1, labels);
const config$ = defer(() => readDataRecords(terminal, { type: 'trade_copy_relation' }))
.pipe(
//
mergeAll(),
map((msg) => msg.origin),
filter((msg) => !msg.disabled),
filter((x) => validateOfTradeCopyRelation(x)),
toArray(),
map((data): ITradeCopierConfig => ({ tasks: data })),
)
.pipe(
tap((config) => console.info(formatTime(Date.now()), 'LoadConfig', JSON.stringify(config))),
tap((config) => {
for (const task of config.tasks) {
const labels = {
source_account_id: task.source_account_id,
target_account_id: task.target_account_id,
source_product_id: task.source_product_id,
target_product_id: task.target_product_id,
};
if (task.multiple === 0) {
MetricMatrixUp.set(0, labels);
} else {
MetricMatrixUp.set(1, labels);
}
}
}
}),
catchError((err) => {
terminal.terminalInfo.status = 'InvalidConfig';
throw err;
}),
shareReplay(1),
);
}),
catchError((err) => {
terminal.terminalInfo.status = 'InvalidConfig';
throw err;
}),
shareReplay(1),
);

config$
.pipe(
Expand Down Expand Up @@ -623,7 +623,7 @@ async function setup() {
from(orders).pipe(
filter((order) => order.volume > 0),
concatMap((order) =>
from(submitOrder(terminal, order)).pipe(
from(terminal.requestForResponse('SubmitOrder', order)).pipe(
tap(() => {
console.info(formatTime(Date.now()), `SucceedToSubmitOrder`, key, JSON.stringify(order));
}),
Expand Down Expand Up @@ -661,7 +661,7 @@ async function setup() {
from(orders).pipe(
filter((order) => order.volume > 0),
mergeMap((order) =>
from(submitOrder(terminal, order)).pipe(
from(terminal.requestForResponse('SubmitOrder', order)).pipe(
tap(() => {
console.info(formatTime(Date.now()), `SucceedToSubmitOrder`, key, JSON.stringify(order));
}),
Expand Down
18 changes: 8 additions & 10 deletions apps/transfer-controller/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@ import {
formatTime,
getDataRecordWrapper,
} from '@yuants/data-model';
import {
PromRegistry,
Terminal,
queryDataRecords,
readDataRecords,
writeDataRecords,
} from '@yuants/protocol';
import { PromRegistry, Terminal, readDataRecords, writeDataRecords } from '@yuants/protocol';
import '@yuants/protocol/lib/services';
import '@yuants/protocol/lib/services/transfer';
// @ts-ignore
Expand All @@ -26,6 +20,7 @@ import {
from,
groupBy,
map,
mergeAll,
mergeMap,
of,
repeat,
Expand All @@ -46,12 +41,13 @@ const terminal = new Terminal(process.env.HOST_URL!, {
});

defer(() =>
queryDataRecords<ITransferOrder>(terminal, {
readDataRecords(terminal, {
type: 'transfer_order',
}),
)
.pipe(
//
mergeAll(),
map((v) => v.origin),
filter((order) => !['ERROR', 'COMPLETE'].includes(order.status!)),
toArray(),
Expand Down Expand Up @@ -105,23 +101,25 @@ const makeRoutingPath = async (order: ITransferOrder): Promise<ITransferPair[] |
const { credit_account_id, debit_account_id } = order;
const addressInfoList = await firstValueFrom(
defer(() =>
queryDataRecords<IAccountAddressInfo>(terminal, {
readDataRecords(terminal, {
type: 'account_address_info',
}),
).pipe(
//
mergeAll(),
map((v) => v.origin),
toArray(),
),
);

const transferNetworkInfoList = await firstValueFrom(
defer(() =>
queryDataRecords<ITransferNetworkInfo>(terminal, {
readDataRecords(terminal, {
type: 'transfer_network_info',
}),
).pipe(
//
mergeAll(),
map((v) => v.origin),
toArray(),
shareReplay(1),
Expand Down
Loading

0 comments on commit 8cfa74f

Please sign in to comment.