Skip to content

Commit

Permalink
fix: change data source lifecycle on agent memory mysql saver
Browse files Browse the repository at this point in the history
  • Loading branch information
joaopaulo-m committed Nov 26, 2024
1 parent 36496b1 commit 868f267
Showing 1 changed file with 89 additions and 96 deletions.
185 changes: 89 additions & 96 deletions packages/components/nodes/memory/AgentMemory/mysqlSaver.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
import { BaseCheckpointSaver, Checkpoint, CheckpointMetadata } from '@langchain/langgraph'
import { RunnableConfig } from '@langchain/core/runnables'
import { BaseMessage } from '@langchain/core/messages'
import { DataSource, QueryRunner } from 'typeorm'
import { DataSource } from 'typeorm'
import { CheckpointTuple, SaverOptions, SerializerProtocol } from './interface'
import { IMessage, MemoryMethods } from '../../../src/Interface'
import { mapChatMessageToBaseMessage } from '../../../src/utils'

export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
protected isSetup: boolean

datasource: DataSource

queryRunner: QueryRunner

config: SaverOptions

threadId: string

tableName = 'checkpoints'

constructor(config: SaverOptions, serde?: SerializerProtocol<Checkpoint>) {
super(serde)
this.config = config
const { datasourceOptions, threadId } = config
const { threadId } = config
this.threadId = threadId
this.datasource = new DataSource(datasourceOptions)
}

private async setup(): Promise<void> {
if (this.isSetup) {
return
}
private async getDataSource(): Promise<DataSource> {
const { datasourceOptions } = this.config
const dataSource = new DataSource(datasourceOptions)
await dataSource.initialize()
return dataSource
}

private async setup(dataSource: DataSource): Promise<void> {
if (this.isSetup) return

try {
const appDataSource = await this.datasource.initialize()

this.queryRunner = appDataSource.createQueryRunner()
await this.queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS ${this.tableName} (
thread_id VARCHAR(255) NOT NULL,
checkpoint_id VARCHAR(255) NOT NULL,
parent_id VARCHAR(255),
checkpoint BLOB,
metadata BLOB,
PRIMARY KEY (thread_id, checkpoint_id)
);`)
const queryRunner = dataSource.createQueryRunner()
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS ${this.tableName} (
thread_id VARCHAR(255) NOT NULL,
checkpoint_id VARCHAR(255) NOT NULL,
parent_id VARCHAR(255),
checkpoint BLOB,
metadata BLOB,
PRIMARY KEY (thread_id, checkpoint_id)
);`)
await queryRunner.release()
} catch (error) {
console.error(`Error creating ${this.tableName} table`, error)
throw new Error(`Error creating ${this.tableName} table`)
Expand All @@ -54,80 +50,64 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}

async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
await this.setup()
const dataSource = await this.getDataSource()
await this.setup(dataSource)

const thread_id = config.configurable?.thread_id || this.threadId
const checkpoint_id = config.configurable?.checkpoint_id

if (checkpoint_id) {
try {
const keys = [thread_id, checkpoint_id]
const sql = `SELECT checkpoint, parent_id, metadata FROM ${this.tableName} WHERE thread_id = ? AND checkpoint_id = ?`

const rows = await this.queryRunner.manager.query(sql, keys)

if (rows && rows.length > 0) {
return {
config,
checkpoint: (await this.serde.parse(rows[0].checkpoint.toString())) as Checkpoint,
metadata: (await this.serde.parse(rows[0].metadata.toString())) as CheckpointMetadata,
parentConfig: rows[0].parent_id
? {
configurable: {
thread_id,
checkpoint_id: rows[0].parent_id
}
}
: undefined
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
}
} else {
const keys = [thread_id]
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`
try {
const queryRunner = dataSource.createQueryRunner()
const sql = checkpoint_id
? `SELECT checkpoint, parent_id, metadata FROM ${this.tableName} WHERE thread_id = ? AND checkpoint_id = ?`
: `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`

const rows = await this.queryRunner.manager.query(sql, keys)
const rows = await queryRunner.manager.query(sql, checkpoint_id ? [thread_id, checkpoint_id] : [thread_id])
await queryRunner.release()

if (rows && rows.length > 0) {
const row = rows[0]
return {
config: {
configurable: {
thread_id: rows[0].thread_id,
checkpoint_id: rows[0].checkpoint_id
thread_id: row.thread_id || thread_id,
checkpoint_id: row.checkpoint_id || checkpoint_id
}
},
checkpoint: (await this.serde.parse(rows[0].checkpoint.toString())) as Checkpoint,
metadata: (await this.serde.parse(rows[0].metadata.toString())) as CheckpointMetadata,
parentConfig: rows[0].parent_id
checkpoint: (await this.serde.parse(row.checkpoint.toString())) as Checkpoint,
metadata: (await this.serde.parse(row.metadata.toString())) as CheckpointMetadata,
parentConfig: row.parent_id
? {
configurable: {
thread_id: rows[0].thread_id,
checkpoint_id: rows[0].parent_id
thread_id,
checkpoint_id: row.parent_id
}
}
: undefined
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
}
return undefined
}

async *list(config: RunnableConfig, limit?: number, before?: RunnableConfig): AsyncGenerator<CheckpointTuple> {
await this.setup()
const thread_id = config.configurable?.thread_id || this.threadId
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ${
before ? 'AND checkpoint_id < ?' : ''
} ORDER BY checkpoint_id DESC`
if (limit) {
sql += ` LIMIT ${limit}`
}
const args = [thread_id, before?.configurable?.checkpoint_id].filter(Boolean)

async *list(config: RunnableConfig, limit?: number, before?: RunnableConfig): AsyncGenerator<CheckpointTuple, void, unknown> {
const dataSource = await this.getDataSource()
try {
const rows = await this.queryRunner.manager.query(sql, args)
const threadId = config.configurable?.thread_id || this.threadId
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ${
before ? 'AND checkpoint_id < ?' : ''
} ORDER BY checkpoint_id DESC`
if (limit) {
sql += ` LIMIT ${limit}`
}
const args = [threadId, before?.configurable?.checkpoint_id].filter(Boolean)

const rows = await dataSource.manager.query(sql, args)
if (rows && rows.length > 0) {
for (const row of rows) {
yield {
Expand All @@ -151,15 +131,19 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error listing ${this.tableName}`, error)
throw new Error(`Error listing ${this.tableName}`)
console.error(`Error listing checkpoints`, error)
throw new Error(`Error listing checkpoints`)
} finally {
await dataSource.destroy()
}
}

async put(config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata): Promise<RunnableConfig> {
await this.setup()
if (!config.configurable?.checkpoint_id) return {}
const dataSource = await this.getDataSource()
await this.setup(dataSource)

try {
const queryRunner = dataSource.createQueryRunner()
const row = [
config.configurable?.thread_id || this.threadId,
checkpoint.id,
Expand All @@ -172,10 +156,13 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE checkpoint = VALUES(checkpoint), metadata = VALUES(metadata)`

await this.queryRunner.manager.query(query, row)
await queryRunner.manager.query(query, row)
await queryRunner.release()
} catch (error) {
console.error('Error saving checkpoint', error)
throw new Error('Error saving checkpoint')
} finally {
await dataSource.destroy()
}

return {
Expand All @@ -186,20 +173,6 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}

async delete(threadId: string): Promise<void> {
if (!threadId) {
return
}
await this.setup()
const query = `DELETE FROM ${this.tableName} WHERE thread_id = ?;`

try {
await this.queryRunner.manager.query(query, [threadId])
} catch (error) {
console.error(`Error deleting thread_id ${threadId}`, error)
}
}

async getChatMessages(
overrideSessionId = '',
returnBaseMessages = false,
Expand Down Expand Up @@ -232,14 +205,34 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
type: m.role
})
}

return returnIMessages
}

async delete(threadId: string): Promise<void> {
if (!threadId) return

const dataSource = await this.getDataSource()
await this.setup(dataSource)

try {
const queryRunner = dataSource.createQueryRunner()
const query = `DELETE FROM ${this.tableName} WHERE thread_id = ?;`
await queryRunner.manager.query(query, [threadId])
await queryRunner.release()
} catch (error) {
console.error(`Error deleting thread_id ${threadId}`, error)
} finally {
await dataSource.destroy()
}
}

async addChatMessages(): Promise<void> {
// Empty as it's not being used
// Implementação vazia porque o método não está sendo usado
}

async clearChatMessages(overrideSessionId = ''): Promise<void> {
if (!overrideSessionId) return
await this.delete(overrideSessionId)
}
}

0 comments on commit 868f267

Please sign in to comment.