Skip to content

Commit

Permalink
update cloud sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
domechn committed Dec 7, 2023
1 parent f10ecf0 commit 4a733dd
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 54 deletions.
5 changes: 3 additions & 2 deletions src-tauri/migrations/init/asset_prices_up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE IF NOT EXISTS asset_prices (
symbol TEXT NOT NULL,
-- if amount > 0, price is cost price, else price is sell price
price REAL NOT NULL,
-- createdAt in assets_v2
createdAt DATETIME NOT NULL
-- assetCreatedAt in assets_v2
assetCreatedAt DATETIME NOT NULL,
updatedAt DATETIME NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_uuid_asset_id ON asset_prices (uuid, assetID);
1 change: 0 additions & 1 deletion src-tauri/migrations/init/cloud_sync_up.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
CREATE TABLE IF NOT EXISTS "cloud_sync" (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
needsFixExisting INTEGER NOT NULL DEFAULT 0,
publicKey TEXT NOT NULL,
updatedAt DATETIME DEFAULT CURRENT_TIMESTAMP
);
Expand Down
5 changes: 3 additions & 2 deletions src-tauri/migrations/v03t04/asset_prices_up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE IF NOT EXISTS asset_prices (
symbol TEXT NOT NULL,
-- if amount > 0, price is cost price, else price is sell price
price REAL NOT NULL,
-- createdAt in assets_v2
createdAt DATETIME NOT NULL
-- assetCreatedAt in assets_v2
assetCreatedAt DATETIME NOT NULL,
updatedAt DATETIME NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_uuid_asset_id ON asset_prices (uuid, assetID);
1 change: 0 additions & 1 deletion src-tauri/migrations/v03t04/cloud_sync_migrate.sql

This file was deleted.

4 changes: 0 additions & 4 deletions src-tauri/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,12 @@ impl Migration for V3TV4 {
let asset_prices =
fs::read_to_string(resource_dir.join("migrations/v03t04/asset_prices_up.sql"))
.unwrap();
let cloud_sync_migrate =
fs::read_to_string(resource_dir.join("migrations/v03t04/cloud_sync_migrate.sql"))
.unwrap();

let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("migrate from v0.3 to v0.4 in tokio spawn");
let mut conn = SqliteConnection::connect(&sqlite_path).await.unwrap();
conn.execute(asset_prices.as_str()).await.unwrap();
conn.execute(cloud_sync_migrate.as_str()).await.unwrap();
conn.close().await.unwrap();
println!("migrate from v0.3 to v0.4 in tokio spawn done");
});
Expand Down
29 changes: 26 additions & 3 deletions src/middlelayers/charts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export async function loadAllAssetActionsBySymbol(symbol: string): Promise<Asset

export async function updateAssetPrice(uuid: string, assetID: number, symbol: string, price: number, createdAt: string) {
const db = await getDatabase()
await db.execute(`INSERT OR REPLACE INTO ${ASSETS_PRICE_TABLE_NAME} (uuid, assetID, symbol, price, createdAt) VALUES (?, ?, ?, ?, ?)`, [
await db.execute(`INSERT OR REPLACE INTO ${ASSETS_PRICE_TABLE_NAME} (uuid, assetID, symbol, price, assetCreatedAt) VALUES (?, ?, ?, ?, ?)`, [
uuid, assetID, symbol, price, createdAt
])
}
Expand Down Expand Up @@ -169,10 +169,27 @@ export function queryAllAssetPrices(): Promise<AssetPriceModel[]> {
return queryAssetPrices()
}

async function queryAssetPrices(symbol?: string): Promise<AssetPriceModel[]> {
export function queryAssetPricesAfterAssetCreatedAt(createdAt?: number): Promise<AssetPriceModel[]> {
const ts = createdAt ? new Date(createdAt).toISOString() : undefined
return queryAssetPrices(undefined, ts)
}

export function queryAssetPricesAfterUpdatedAt(updatedAt?: number): Promise<AssetPriceModel[]> {
const ts = updatedAt ? new Date(updatedAt).toISOString() : undefined
return queryAssetPrices(undefined, undefined, ts)
}

async function queryAssetPrices(symbol?: string, assetCreated?: string, updatedAt?: string): Promise<AssetPriceModel[]> {
const db = await getDatabase()
const params = symbol ? [symbol] : []
const prices = await db.select<AssetPriceModel[]>(`SELECT * FROM ${ASSETS_PRICE_TABLE_NAME} WHERE 1=1 ${symbol ? 'and symbol = ?' : ''}`, params)
if (assetCreated) {
params.push(assetCreated)
}
if (updatedAt) {
params.push(updatedAt)
}
const prices = await db.select<AssetPriceModel[]>(`SELECT * FROM ${ASSETS_PRICE_TABLE_NAME} WHERE 1=1 ${symbol ? 'and symbol = ?' : ''} ${assetCreated ? 'and assetCreatedAt > ?' : ''} ${updatedAt ? 'and updatedAt > ?' : ''}`, params)

return prices
}

Expand Down Expand Up @@ -215,6 +232,12 @@ export async function queryAssetsAfterCreatedAt(createdAt?: number): Promise<Ass
return assets
}

export async function queryAssetsByIDs(ids: number[]): Promise<AssetModel[]> {
const db = await getDatabase()
const assets = await db.select<AssetModel[]>(`SELECT * FROM ${ASSETS_TABLE_NAME} WHERE id in (${ids.join(",")})`)
return assets
}

async function queryAssetByUUID(id: string): Promise<AssetModel[]> {
const db = await getDatabase()
const assets = await db.select<AssetModel[]>(`SELECT * FROM ${ASSETS_TABLE_NAME} WHERE uuid = ?`, [id])
Expand Down
101 changes: 68 additions & 33 deletions src/middlelayers/cloudsync.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Polybase } from '@polybase/client'
import { getDatabase, saveModelsToDatabase } from './database'
import { v4 as uuidv4 } from 'uuid'
import { ASSETS_TABLE_NAME, queryAssetsAfterCreatedAt } from './charts'
import { CloudAssetModel, CloudSyncConfiguration, ExportAssetModel } from './types'
import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME, queryAssetPricesAfterAssetCreatedAt, queryAssetPricesAfterUpdatedAt, queryAssetsAfterCreatedAt, queryAssetsByIDs } from './charts'
import { AssetModel, AssetPriceModel, CloudAssetModel, CloudSyncConfiguration, ExportAssetModel } from './types'
import { getCloudSyncConfiguration as getCloudSyncConfigurationModel, saveCloudSyncConfiguration as saveCloudSyncConfigurationModel } from './configuration'
import _ from 'lodash'
import Database from 'tauri-plugin-sql-api'
Expand Down Expand Up @@ -190,7 +190,7 @@ async function dumpAssetsFromCloudAfterCreatedAt(createdAt?: number): Promise<Ex
// filter assets > createdAt from cloud, if createdAt is not provided, get all assets
const records = await p.collection<CloudAssetModel>(RECORD_COLLECTION_NAME).where("createdAt", ">=", createdAt || 0).sort("createdAt", "desc").get()

const needSyncedAssets = _(records.data).
return _(records.data).
map('data').
map(record => record.records ? JSON.parse(record.records) as ExportAssetModel[] : []).
flatten().
Expand All @@ -204,25 +204,37 @@ async function dumpAssetsFromCloudAfterCreatedAt(createdAt?: number): Promise<Ex
value: record.value,
price: record.price,
wallet: record.wallet,
costPrice: record.costPrice,
}))
.value()
return _(needSyncedAssets).map((asset) => ({
id: asset.id,
uuid: asset.uuid,
createdAt: asset.createdAt,
symbol: asset.symbol,
amount: asset.amount,
value: asset.value,
price: asset.price,
wallet: asset.wallet,
})).value()
}

async function dumpAssetsFromDBAfterCreatedAt(createdAt?: number): Promise<ExportAssetModel[]> {
const models = await queryAssetsAfterCreatedAt(createdAt)
const prices = await queryAssetPricesAfterAssetCreatedAt(createdAt)

// todo: get cost price
return models

const getExportAssetMode = (ms: AssetModel[], ps: AssetPriceModel[]): ExportAssetModel[] => {
const priceMap = _(ps).mapKeys('assetID').mapValues('price').value()
return _(ms).map((model) => {
const costPrice = priceMap[model.id]
return {
...model,
costPrice,
} as ExportAssetModel
}).value()
}

const afterCreatedAt = getExportAssetMode(models, prices)

const afterAssetIds = _(afterCreatedAt).map('id').value()

const afterUpdatedAt = await queryAssetPricesAfterUpdatedAt(createdAt)
const updatedAssetIds = _(afterUpdatedAt).filter(a => !afterAssetIds.includes(a.assetID)).map('assetID').value()
const updatedAssets = await queryAssetsByIDs(updatedAssetIds)
const afterUpdatedAtAssets = getExportAssetMode(updatedAssets, afterUpdatedAt)

return [...afterCreatedAt, ...afterUpdatedAtAssets]
}

export async function syncAssetsToCloudAndLocal(publicKey: string, createdAt?: number): Promise<number> {
Expand All @@ -244,8 +256,6 @@ export async function syncAssetsToCloudAndLocal(publicKey: string, createdAt?: n
const needSyncedAssetsToDB = _(cloudAssets).differenceBy(localAssets, 'uuid').value()
if (needSyncedAssetsToDB.length > 0) {
// write data to local
console.log('needSyncedAssetsToDB', needSyncedAssetsToDB)

synced += await writeAssetsToDB(d, needSyncedAssetsToDB)
}

Expand All @@ -262,18 +272,42 @@ export async function forceSyncAssetsToCloudFromLocal(publicKey: string): Promis
const cloudAssets = await dumpAssetsFromCloudAfterCreatedAt()

let synced = 0
// data in cloud not equal to local, need to be removed
const changedDataInLocal = _(localAssets).filter(a => {
const dataInCloud = _(cloudAssets).find(c => c.uuid === a.uuid && c.symbol === a.symbol && c.wallet === a.wallet)
if (!dataInCloud) {
return false
}
if (dataInCloud.amount !== a.amount) {
return true
}
if (dataInCloud.value !== a.value) {
return true
}

// add data to cloud if not in cloud
const needSyncedAssetsToCloud = _(localAssets).differenceBy(cloudAssets, 'uuid').value()
if (needSyncedAssetsToCloud.length > 0) {
// write data to cloud
synced += await writeAssetsToCloud(publicKey, needSyncedAssetsToCloud)
}
if (dataInCloud.price !== a.price) {
return true
}
if (dataInCloud.costPrice !== a.costPrice) {
return true
}
return false
}).value()
const getModelKey = (a: ExportAssetModel) => a.uuid + a.symbol + a.wallet
// remove data in cloud if not in local
const needRemovedInCloud = _(cloudAssets).differenceBy(localAssets, 'uuid').value()
if (needRemovedInCloud.length > 0) {
synced += await removeAssetsInCloud(needRemovedInCloud)
}
// add data to cloud if not in cloud
const needSyncedAssetsToCloud = _(localAssets).differenceBy(cloudAssets, 'uuid').value()
if (needSyncedAssetsToCloud.length > 0 || changedDataInLocal.length > 0) {
// write data to cloud
synced += await writeAssetsToCloud(publicKey, _([...needSyncedAssetsToCloud, ...changedDataInLocal])
.uniqBy(a => getModelKey(a))
.value()
)
}

await markAsSynced(d, publicKey)
return synced
Expand Down Expand Up @@ -304,8 +338,6 @@ async function writeAssetsToCloud(publicKey: string, assets: ExportAssetModel[])
async function removeAssetsInCloud(assets: ExportAssetModel[]): Promise<number> {
const p = await getPolybaseDB()
const res = await bluebird.map(_(assets).map('uuid').uniq().value(), async (uid: string) => {
console.log('removeAssetsInCloud', uid)

const records = await p.collection<CloudAssetModel>(RECORD_COLLECTION_NAME).where("uuid", "==", uid).get()
if (records.data.length === 0) {
return 0
Expand All @@ -327,18 +359,21 @@ async function writeAssetsToDB(d: Database, assets: ExportAssetModel[]): Promise
// not asset_price need to be saved
return res.length
}
// todo save asset_price
const priceMOdels = _(assets).filter(a => a.costPrice !== undefined).map(a => ({
uuid: a.uuid,
assetID: a.id,
symbol: a.symbol,
price: a.costPrice,
assetCreatedAt: a.createdAt,
updatedAt: new Date().toISOString(),
} as AssetPriceModel)).value()

await saveModelsToDatabase(ASSETS_PRICE_TABLE_NAME, priceMOdels)
return res.length
}

async function markAsSynced(d: Database, publicKey: string) {
await d.execute(`INSERT INTO ${CLOUD_SYNC_TABLE_NAME} (publicKey, updatedAt, needsFixExisting) VALUES (?, ?, ?) ON CONFLICT(publicKey) DO UPDATE SET updatedAt = ?, needsFixExisting = ?`, [publicKey, new Date().toISOString(), 0, new Date().toISOString(), 0])
}

// if nee needsFixExisting is true, means need to override data in cloud if it is differenceBy local
export async function updateNeedsFixExisting(publicKey: string) {
const d = await getDatabase()
await d.execute(`UPDATE ${CLOUD_SYNC_TABLE_NAME} SET needsFixExisting = 1 WHERE publicKey = ?`, [publicKey])
await d.execute(`INSERT INTO ${CLOUD_SYNC_TABLE_NAME} (publicKey, updatedAt) VALUES (?, ?) ON CONFLICT(publicKey) DO UPDATE SET updatedAt = ?`, [publicKey, new Date().toISOString(), new Date().toISOString()])
}

async function sendPostRequest<T>(path: string, body: object, token?: string): Promise<T> {
Expand Down
13 changes: 8 additions & 5 deletions src/middlelayers/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME, queryAllAssetPrices, queryH
import _ from 'lodash'
import { save, open } from "@tauri-apps/api/dialog"
import { writeTextFile, readTextFile } from "@tauri-apps/api/fs"
import { AssetModel, AssetPriceModel, ExportAssetModel, HistoricalData } from './types'
import { getDatabase, saveModelsToDatabase } from './database'
import { AssetPriceModel, ExportAssetModel, HistoricalData } from './types'
import { saveModelsToDatabase } from './database'
import { exportConfigurationString, importRawConfiguration } from './configuration'

type ExportData = {
Expand Down Expand Up @@ -173,10 +173,12 @@ async function saveHistoricalDataAssets(assets: ExportAssetModel[]) {

// import asset prices
const importedAssets = _(await queryHistoricalData(-1, false)).map(d => d.assets).flatten().value()
const importAssetsMap = _(importedAssets).mapKeys(a => `${a.uuid}/${a.symbol}/${a.wallet}`).value()

const assetPriceModels = _(assets).filter(a => a.costPrice !== undefined).map(a => {
console.log(a)
const key = `${a.uuid}/${a.symbol}/${a.wallet}`

const f = _(importedAssets).find(ia => ia.uuid === a.uuid && ia.symbol === a.symbol && ia.wallet === a.wallet)
const f = importAssetsMap[key]
if (!f) {
return
}
Expand All @@ -185,7 +187,8 @@ async function saveHistoricalDataAssets(assets: ExportAssetModel[]) {
assetID: f.id,
symbol: a.symbol,
price: a.costPrice,
createdAt: a.createdAt
assetCreatedAt: a.createdAt,
updatedAt: new Date().toISOString(),
} as AssetPriceModel
}).compact().value()

Expand Down
4 changes: 2 additions & 2 deletions src/middlelayers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import _ from "lodash"
import Database from "tauri-plugin-sql-api"
import { v4 as uuidv4 } from 'uuid'
import { CoinModel } from './datafetch/types'
import { AssetPriceModel, AssetModel, WalletCoinUSD } from './types'
import { ASSETS_PRICE_TABLE_NAME, ASSETS_TABLE_NAME } from './charts'
import { AssetModel, WalletCoinUSD } from './types'
import { ASSETS_TABLE_NAME } from './charts'
import md5 from 'md5'

export const databaseName = "track3.db"
Expand Down
3 changes: 2 additions & 1 deletion src/middlelayers/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export type AssetPriceModel = {
price: number

// createdAt in assets_v2 table
createdAt: string
assetCreatedAt: string
updatedAt: string
}

export type CloudAssetModel = {
Expand Down

0 comments on commit 4a733dd

Please sign in to comment.