Skip to content

Commit

Permalink
update cloud sync logic (#207)
Browse files Browse the repository at this point in the history
* update cloud sync logic

* update cloud sync logic
  • Loading branch information
domechn committed Dec 7, 2023
1 parent 7bd3945 commit 2d80e35
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 53 deletions.
5 changes: 3 additions & 2 deletions src-tauri/migrations/init/asset_prices_up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE IF NOT EXISTS asset_prices (
symbol TEXT NOT NULL,
-- if amount > 0, price is cost price, else price is sell price
price REAL NOT NULL,
-- createdAt in assets_v2
createdAt DATETIME NOT NULL
-- assetCreatedAt in assets_v2
assetCreatedAt DATETIME NOT NULL,
updatedAt DATETIME NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_uuid_asset_id ON asset_prices (uuid, assetID);
5 changes: 3 additions & 2 deletions src-tauri/migrations/v03t04/asset_prices_up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE IF NOT EXISTS asset_prices (
symbol TEXT NOT NULL,
-- if amount > 0, price is cost price, else price is sell price
price REAL NOT NULL,
-- createdAt in assets_v2
createdAt DATETIME NOT NULL
-- assetCreatedAt in assets_v2
assetCreatedAt DATETIME NOT NULL,
updatedAt DATETIME NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_uuid_asset_id ON asset_prices (uuid, assetID);
29 changes: 26 additions & 3 deletions src/middlelayers/charts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export async function loadAllAssetActionsBySymbol(symbol: string): Promise<Asset

export async function updateAssetPrice(uuid: string, assetID: number, symbol: string, price: number, createdAt: string) {
const db = await getDatabase()
await db.execute(`INSERT OR REPLACE INTO ${ASSETS_PRICE_TABLE_NAME} (uuid, assetID, symbol, price, createdAt) VALUES (?, ?, ?, ?, ?)`, [
await db.execute(`INSERT OR REPLACE INTO ${ASSETS_PRICE_TABLE_NAME} (uuid, assetID, symbol, price, assetCreatedAt) VALUES (?, ?, ?, ?, ?)`, [
uuid, assetID, symbol, price, createdAt
])
}
Expand Down Expand Up @@ -169,10 +169,27 @@ export function queryAllAssetPrices(): Promise<AssetPriceModel[]> {
return queryAssetPrices()
}

async function queryAssetPrices(symbol?: string): Promise<AssetPriceModel[]> {
export function queryAssetPricesAfterAssetCreatedAt(createdAt?: number): Promise<AssetPriceModel[]> {
const ts = createdAt ? new Date(createdAt).toISOString() : undefined
return queryAssetPrices(undefined, ts)
}

export function queryAssetPricesAfterUpdatedAt(updatedAt?: number): Promise<AssetPriceModel[]> {
const ts = updatedAt ? new Date(updatedAt).toISOString() : undefined
return queryAssetPrices(undefined, undefined, ts)
}

async function queryAssetPrices(symbol?: string, assetCreated?: string, updatedAt?: string): Promise<AssetPriceModel[]> {
const db = await getDatabase()
const params = symbol ? [symbol] : []
const prices = await db.select<AssetPriceModel[]>(`SELECT * FROM ${ASSETS_PRICE_TABLE_NAME} WHERE 1=1 ${symbol ? 'and symbol = ?' : ''}`, params)
if (assetCreated) {
params.push(assetCreated)
}
if (updatedAt) {
params.push(updatedAt)
}
const prices = await db.select<AssetPriceModel[]>(`SELECT * FROM ${ASSETS_PRICE_TABLE_NAME} WHERE 1=1 ${symbol ? 'and symbol = ?' : ''} ${assetCreated ? 'and assetCreatedAt > ?' : ''} ${updatedAt ? 'and updatedAt > ?' : ''}`, params)

return prices
}

Expand Down Expand Up @@ -215,6 +232,12 @@ export async function queryAssetsAfterCreatedAt(createdAt?: number): Promise<Ass
return assets
}

export async function queryAssetsByIDs(ids: number[]): Promise<AssetModel[]> {
const db = await getDatabase()
const assets = await db.select<AssetModel[]>(`SELECT * FROM ${ASSETS_TABLE_NAME} WHERE id in (${ids.join(",")})`)
return assets
}

async function queryAssetByUUID(id: string): Promise<AssetModel[]> {
const db = await getDatabase()
const assets = await db.select<AssetModel[]>(`SELECT * FROM ${ASSETS_TABLE_NAME} WHERE uuid = ?`, [id])
Expand Down
118 changes: 84 additions & 34 deletions src/middlelayers/cloudsync.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Polybase } from '@polybase/client'
import { getDatabase, saveModelsToDatabase } from './database'
import { v4 as uuidv4 } from 'uuid'
import { ASSETS_TABLE_NAME, queryAssetsAfterCreatedAt } from './charts'
import { AssetModel, CloudAssetModel, CloudSyncConfiguration } from './types'
import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME, queryAssetPricesAfterAssetCreatedAt, queryAssetPricesAfterUpdatedAt, queryAssetsAfterCreatedAt, queryAssetsByIDs } from './charts'
import { AssetModel, AssetPriceModel, CloudAssetModel, CloudSyncConfiguration, ExportAssetModel } from './types'
import { getCloudSyncConfiguration as getCloudSyncConfigurationModel, saveCloudSyncConfiguration as saveCloudSyncConfigurationModel } from './configuration'
import _ from 'lodash'
import Database from 'tauri-plugin-sql-api'
Expand Down Expand Up @@ -185,14 +185,14 @@ export async function getPublicKey() {
}

// list all assets from cloud
async function dumpAssetsFromCloudAfterCreatedAt(createdAt?: number): Promise<AssetModel[]> {
async function dumpAssetsFromCloudAfterCreatedAt(createdAt?: number): Promise<ExportAssetModel[]> {
const p = await getPolybaseDB()
// filter assets > createdAt from cloud, if createdAt is not provided, get all assets
const records = await p.collection<CloudAssetModel>(RECORD_COLLECTION_NAME).where("createdAt", ">=", createdAt || 0).sort("createdAt", "desc").get()

const needSyncedAssets = _(records.data).
return _(records.data).
map('data').
map(record => record.records ? JSON.parse(record.records) as AssetModel[] : []).
map(record => record.records ? JSON.parse(record.records) as ExportAssetModel[] : []).
flatten().
compact().
map((record) => ({
Expand All @@ -204,22 +204,37 @@ async function dumpAssetsFromCloudAfterCreatedAt(createdAt?: number): Promise<As
value: record.value,
price: record.price,
wallet: record.wallet,
costPrice: record.costPrice,
}))
.value()
return _(needSyncedAssets).map((asset) => ({
id: asset.id,
uuid: asset.uuid,
createdAt: asset.createdAt,
symbol: asset.symbol,
amount: asset.amount,
value: asset.value,
price: asset.price,
wallet: asset.wallet,
})).value()
}

async function dumpAssetsFromDBAfterCreatedAt(createdAt?: number): Promise<AssetModel[]> {
return queryAssetsAfterCreatedAt(createdAt)
async function dumpAssetsFromDBAfterCreatedAt(createdAt?: number): Promise<ExportAssetModel[]> {
const models = await queryAssetsAfterCreatedAt(createdAt)
const prices = await queryAssetPricesAfterAssetCreatedAt(createdAt)


const getExportAssetMode = (ms: AssetModel[], ps: AssetPriceModel[]): ExportAssetModel[] => {
const priceMap = _(ps).mapKeys('assetID').mapValues('price').value()
return _(ms).map((model) => {
const costPrice = priceMap[model.id]
return {
...model,
costPrice,
} as ExportAssetModel
}).value()
}

const afterCreatedAt = getExportAssetMode(models, prices)

const afterAssetIds = _(afterCreatedAt).map('id').value()

const afterUpdatedAt = await queryAssetPricesAfterUpdatedAt(createdAt)
const updatedAssetIds = _(afterUpdatedAt).filter(a => !afterAssetIds.includes(a.assetID)).map('assetID').value()
const updatedAssets = await queryAssetsByIDs(updatedAssetIds)
const afterUpdatedAtAssets = getExportAssetMode(updatedAssets, afterUpdatedAt)

return [...afterCreatedAt, ...afterUpdatedAtAssets]
}

export async function syncAssetsToCloudAndLocal(publicKey: string, createdAt?: number): Promise<number> {
Expand All @@ -241,12 +256,10 @@ export async function syncAssetsToCloudAndLocal(publicKey: string, createdAt?: n
const needSyncedAssetsToDB = _(cloudAssets).differenceBy(localAssets, 'uuid').value()
if (needSyncedAssetsToDB.length > 0) {
// write data to local
console.log('needSyncedAssetsToDB', needSyncedAssetsToDB)

synced += await writeAssetsToDB(d, needSyncedAssetsToDB)
}

await updateLastSyncTime(d, publicKey)
await markAsSynced(d, publicKey)
return synced
}

Expand All @@ -259,24 +272,48 @@ export async function forceSyncAssetsToCloudFromLocal(publicKey: string): Promis
const cloudAssets = await dumpAssetsFromCloudAfterCreatedAt()

let synced = 0
// data in cloud not equal to local, need to be removed
const changedDataInLocal = _(localAssets).filter(a => {
const dataInCloud = _(cloudAssets).find(c => c.uuid === a.uuid && c.symbol === a.symbol && c.wallet === a.wallet)
if (!dataInCloud) {
return false
}
if (dataInCloud.amount !== a.amount) {
return true
}
if (dataInCloud.value !== a.value) {
return true
}

// add data to cloud if not in cloud
const needSyncedAssetsToCloud = _(localAssets).differenceBy(cloudAssets, 'uuid').value()
if (needSyncedAssetsToCloud.length > 0) {
// write data to cloud
synced += await writeAssetsToCloud(publicKey, needSyncedAssetsToCloud)
}
if (dataInCloud.price !== a.price) {
return true
}
if (dataInCloud.costPrice !== a.costPrice) {
return true
}
return false
}).value()
const getModelKey = (a: ExportAssetModel) => a.uuid + a.symbol + a.wallet
// remove data in cloud if not in local
const needRemovedInCloud = _(cloudAssets).differenceBy(localAssets, 'uuid').value()
if (needRemovedInCloud.length > 0) {
synced += await removeAssetsInCloud(needRemovedInCloud)
}
// add data to cloud if not in cloud
const needSyncedAssetsToCloud = _(localAssets).differenceBy(cloudAssets, 'uuid').value()
if (needSyncedAssetsToCloud.length > 0 || changedDataInLocal.length > 0) {
// write data to cloud
synced += await writeAssetsToCloud(publicKey, _([...needSyncedAssetsToCloud, ...changedDataInLocal])
.uniqBy(a => getModelKey(a))
.value()
)
}

await updateLastSyncTime(d, publicKey)
await markAsSynced(d, publicKey)
return synced
}

async function writeAssetsToCloud(publicKey: string, assets: AssetModel[]): Promise<number> {
async function writeAssetsToCloud(publicKey: string, assets: ExportAssetModel[]): Promise<number> {
const gas = _(assets).groupBy('uuid').value()
const p = await getPolybaseDB()
const res = await bluebird.map(Object.keys(gas), async (uuid) => {
Expand All @@ -298,11 +335,9 @@ async function writeAssetsToCloud(publicKey: string, assets: AssetModel[]): Prom
return _(res).sum()
}

async function removeAssetsInCloud(assets: AssetModel[]): Promise<number> {
async function removeAssetsInCloud(assets: ExportAssetModel[]): Promise<number> {
const p = await getPolybaseDB()
const res = await bluebird.map(_(assets).map('uuid').uniq().value(), async (uid: string) => {
console.log('removeAssetsInCloud', uid);

const records = await p.collection<CloudAssetModel>(RECORD_COLLECTION_NAME).where("uuid", "==", uid).get()
if (records.data.length === 0) {
return 0
Expand All @@ -317,12 +352,27 @@ async function removeAssetsInCloud(assets: AssetModel[]): Promise<number> {
}

// return updated how many records
async function writeAssetsToDB(d: Database, assets: AssetModel[]): Promise<number> {
const res = await saveModelsToDatabase(ASSETS_TABLE_NAME, _.map(assets, (obj) => _.omit(obj, "id")))
async function writeAssetsToDB(d: Database, assets: ExportAssetModel[]): Promise<number> {
const res = await saveModelsToDatabase(ASSETS_TABLE_NAME, _(assets).map(a => _(a).omit("id").omit("costPrice").value()).value())
const f = _(assets).find(a => a.costPrice !== undefined)
if (!f) {
// not asset_price need to be saved
return res.length
}
const priceMOdels = _(assets).filter(a => a.costPrice !== undefined).map(a => ({
uuid: a.uuid,
assetID: a.id,
symbol: a.symbol,
price: a.costPrice,
assetCreatedAt: a.createdAt,
updatedAt: new Date().toISOString(),
} as AssetPriceModel)).value()

await saveModelsToDatabase(ASSETS_PRICE_TABLE_NAME, priceMOdels)
return res.length
}

async function updateLastSyncTime(d: Database, publicKey: string) {
async function markAsSynced(d: Database, publicKey: string) {
await d.execute(`INSERT INTO ${CLOUD_SYNC_TABLE_NAME} (publicKey, updatedAt) VALUES (?, ?) ON CONFLICT(publicKey) DO UPDATE SET updatedAt = ?`, [publicKey, new Date().toISOString(), new Date().toISOString()])
}

Expand Down
17 changes: 10 additions & 7 deletions src/middlelayers/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME, queryAllAssetPrices, queryH
import _ from 'lodash'
import { save, open } from "@tauri-apps/api/dialog"
import { writeTextFile, readTextFile } from "@tauri-apps/api/fs"
import { AssetModel, AssetPriceModel, HistoricalData } from './types'
import { getDatabase, saveModelsToDatabase } from './database'
import { AssetPriceModel, ExportAssetModel, HistoricalData } from './types'
import { saveModelsToDatabase } from './database'
import { exportConfigurationString, importRawConfiguration } from './configuration'

type ExportData = {
Expand Down Expand Up @@ -159,7 +159,7 @@ export async function importHistoricalData(): Promise<boolean> {
}

// import historicalData from file
async function saveHistoricalDataAssets(assets: (AssetModel & { costPrice?: number })[]) {
async function saveHistoricalDataAssets(assets: ExportAssetModel[]) {
const requiredKeys = ["uuid", "createdAt", "symbol", "amount", "value", "price"]
_(assets).forEach((asset) => {
_(requiredKeys).forEach(k => {
Expand All @@ -173,10 +173,12 @@ async function saveHistoricalDataAssets(assets: (AssetModel & { costPrice?: numb

// import asset prices
const importedAssets = _(await queryHistoricalData(-1, false)).map(d => d.assets).flatten().value()
const importAssetsMap = _(importedAssets).mapKeys(a => `${a.uuid}/${a.symbol}/${a.wallet}`).value()

const assetPriceModels = _(assets).filter(a => a.costPrice !== undefined).map(a => {
console.log(a);
const f = _(importedAssets).find(ia => ia.uuid === a.uuid && ia.symbol === a.symbol && ia.wallet === a.wallet)
const key = `${a.uuid}/${a.symbol}/${a.wallet}`

const f = importAssetsMap[key]
if (!f) {
return
}
Expand All @@ -185,7 +187,8 @@ async function saveHistoricalDataAssets(assets: (AssetModel & { costPrice?: numb
assetID: f.id,
symbol: a.symbol,
price: a.costPrice,
createdAt: a.createdAt
assetCreatedAt: a.createdAt,
updatedAt: new Date().toISOString(),
} as AssetPriceModel
}).compact().value()

Expand Down
4 changes: 2 additions & 2 deletions src/middlelayers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import _ from "lodash"
import Database from "tauri-plugin-sql-api"
import { v4 as uuidv4 } from 'uuid'
import { CoinModel } from './datafetch/types'
import { AssetPriceModel, AssetModel, WalletCoinUSD } from './types'
import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME } from './charts'
import { AssetModel, WalletCoinUSD } from './types'
import { ASSETS_TABLE_NAME } from './charts'
import md5 from 'md5'

export const databaseName = "track3.db"
Expand Down
10 changes: 7 additions & 3 deletions src/middlelayers/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ export type AssetModel = {
wallet?: string
}

// type when exporting assets to json or cloud database
export type ExportAssetModel = AssetModel & { costPrice?: number }

// asset_prices table
// to record the cost price or sell price of each coins, can be updated by users
export type AssetPriceModel = {
Expand All @@ -23,7 +26,8 @@ export type AssetPriceModel = {
price: number

// createdAt in assets_v2 table
createdAt: string
assetCreatedAt: string
updatedAt: string
}

export type CloudAssetModel = {
Expand All @@ -33,7 +37,7 @@ export type CloudAssetModel = {

uuid: string

// json stringify from AssetModel[]
// json stringify from ExportAssetModel[]
records: string

createdAt: number
Expand Down Expand Up @@ -169,7 +173,7 @@ export type HistoricalData = {
id: string
createdAt: string
// costPrice only exists when exporting historical data
assets: (AssetModel & { costPrice?: number })[]
assets: ExportAssetModel[]

total: number
}
Expand Down

0 comments on commit 2d80e35

Please sign in to comment.