Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MLOB-1858] feat(llmobs): langchain submits llmobs span events #4923

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
19 changes: 19 additions & 0 deletions .github/workflows/llmobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,22 @@ jobs:
- uses: codecov/codecov-action@v3
- if: always()
uses: ./.github/actions/testagent/logs

langchain:
runs-on: ubuntu-latest
env:
PLUGINS: langchain
steps:
- uses: actions/checkout@v4
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
- uses: ./.github/actions/testagent/start
- uses: ./.github/actions/node/setup
- uses: ./.github/actions/install
- uses: ./.github/actions/node/oldest
- run: yarn test:llmobs:plugins:ci
shell: bash
- uses: ./.github/actions/node/latest
- run: yarn test:llmobs:plugins:ci
shell: bash
- uses: codecov/codecov-action@v3
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
- if: always()
uses: ./.github/actions/testagent/logs
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"test:lambda:ci": "nyc --no-clean --include \"packages/dd-trace/src/lambda/**/*.js\" -- npm run test:lambda",
"test:llmobs:sdk": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" --exclude \"packages/dd-trace/test/llmobs/plugins/**/*.spec.js\" \"packages/dd-trace/test/llmobs/**/*.spec.js\" ",
"test:llmobs:sdk:ci": "nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs:sdk",
"test:llmobs:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/llmobs/plugins/**/*.spec.js\"",
"test:llmobs:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/llmobs/plugins/@($(echo $PLUGINS))/*.spec.js\"",
"test:llmobs:plugins:ci": "yarn services && nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs:plugins",
"test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-instrumentations/test/@($(echo $PLUGINS)).spec.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/*.spec.js\"",
"test:plugins:ci": "yarn services && nyc --no-clean --include \"packages/datadog-instrumentations/src/@($(echo $PLUGINS)).js\" --include \"packages/datadog-instrumentations/src/@($(echo $PLUGINS))/**/*.js\" --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins",
Expand Down
88 changes: 8 additions & 80 deletions packages/datadog-plugin-langchain/src/index.js
Original file line number Diff line number Diff line change
@@ -1,89 +1,17 @@
'use strict'

const { MEASURED } = require('../../../ext/tags')
const { storage } = require('../../datadog-core')
const TracingPlugin = require('../../dd-trace/src/plugins/tracing')
const LangChainTracingPlugin = require('./tracing')
const LangChainLLMObsPlugin = require('../../dd-trace/src/llmobs/plugins/langchain')
const CompositePlugin = require('../../dd-trace/src/plugins/composite')

const API_KEY = 'langchain.request.api_key'
const MODEL = 'langchain.request.model'
const PROVIDER = 'langchain.request.provider'
const TYPE = 'langchain.request.type'

const LangChainHandler = require('./handlers/default')
const LangChainChatModelHandler = require('./handlers/language_models/chat_model')
const LangChainLLMHandler = require('./handlers/language_models/llm')
const LangChainChainHandler = require('./handlers/chain')
const LangChainEmbeddingHandler = require('./handlers/embedding')

class LangChainPlugin extends TracingPlugin {
class LangChainPlugin extends CompositePlugin {
static get id () { return 'langchain' }
static get operation () { return 'invoke' }
static get system () { return 'langchain' }
static get prefix () {
return 'tracing:apm:langchain:invoke'
}

constructor () {
super(...arguments)

const langchainConfig = this._tracerConfig.langchain || {}
this.handlers = {
chain: new LangChainChainHandler(langchainConfig),
chat_model: new LangChainChatModelHandler(langchainConfig),
llm: new LangChainLLMHandler(langchainConfig),
embedding: new LangChainEmbeddingHandler(langchainConfig),
default: new LangChainHandler(langchainConfig)
static get plugins () {
return {
llmobs: LangChainLLMObsPlugin,
tracing: LangChainTracingPlugin
}
}

bindStart (ctx) {
const { resource, type } = ctx
const handler = this.handlers[type]

const instance = ctx.instance
const apiKey = handler.extractApiKey(instance)
const provider = handler.extractProvider(instance)
const model = handler.extractModel(instance)

const tags = handler.getSpanStartTags(ctx, provider) || []

if (apiKey) tags[API_KEY] = apiKey
if (provider) tags[PROVIDER] = provider
if (model) tags[MODEL] = model
if (type) tags[TYPE] = type

const span = this.startSpan('langchain.request', {
service: this.config.service,
resource,
kind: 'client',
meta: {
[MEASURED]: 1,
...tags
}
}, false)

const store = storage.getStore() || {}
ctx.currentStore = { ...store, span }

return ctx.currentStore
}

asyncEnd (ctx) {
const span = ctx.currentStore.span

const { type } = ctx

const handler = this.handlers[type]
const tags = handler.getSpanEndTags(ctx) || {}

span.addTags(tags)

span.finish()
}

getHandler (type) {
return this.handlers[type] || this.handlers.default
}
}

module.exports = LangChainPlugin
89 changes: 89 additions & 0 deletions packages/datadog-plugin-langchain/src/tracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const { MEASURED } = require('../../../ext/tags')
const { storage } = require('../../datadog-core')
const TracingPlugin = require('../../dd-trace/src/plugins/tracing')

const API_KEY = 'langchain.request.api_key'
const MODEL = 'langchain.request.model'
const PROVIDER = 'langchain.request.provider'
const TYPE = 'langchain.request.type'

const LangChainHandler = require('./handlers/default')
const LangChainChatModelHandler = require('./handlers/language_models/chat_model')
const LangChainLLMHandler = require('./handlers/language_models/llm')
const LangChainChainHandler = require('./handlers/chain')
const LangChainEmbeddingHandler = require('./handlers/embedding')

class LangChainTracingPlugin extends TracingPlugin {
static get id () { return 'langchain' }
static get operation () { return 'invoke' }
static get system () { return 'langchain' }
static get prefix () {
return 'tracing:apm:langchain:invoke'
}

constructor () {
super(...arguments)

const langchainConfig = this._tracerConfig.langchain || {}
this.handlers = {
chain: new LangChainChainHandler(langchainConfig),
chat_model: new LangChainChatModelHandler(langchainConfig),
llm: new LangChainLLMHandler(langchainConfig),
embedding: new LangChainEmbeddingHandler(langchainConfig),
default: new LangChainHandler(langchainConfig)
}
}

bindStart (ctx) {
const { resource, type } = ctx
const handler = this.handlers[type]

const instance = ctx.instance
const apiKey = handler.extractApiKey(instance)
const provider = handler.extractProvider(instance)
const model = handler.extractModel(instance)

const tags = handler.getSpanStartTags(ctx, provider) || []

if (apiKey) tags[API_KEY] = apiKey
if (provider) tags[PROVIDER] = provider
if (model) tags[MODEL] = model
if (type) tags[TYPE] = type

const span = this.startSpan('langchain.request', {
service: this.config.service,
resource,
kind: 'client',
meta: {
[MEASURED]: 1,
...tags
}
}, false)

const store = storage.getStore() || {}
ctx.currentStore = { ...store, span }

return ctx.currentStore
}

asyncEnd (ctx) {
const span = ctx.currentStore.span

const { type } = ctx

const handler = this.handlers[type]
const tags = handler.getSpanEndTags(ctx) || {}

span.addTags(tags)

span.finish()
}

getHandler (type) {
return this.handlers[type] || this.handlers.default
}
}

module.exports = LangChainTracingPlugin
39 changes: 31 additions & 8 deletions packages/dd-trace/src/llmobs/plugins/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,40 @@

const log = require('../../log')
const { storage } = require('../storage')
const { storage: apmStorage } = require('../../../../datadog-core')

const TracingPlugin = require('../../plugins/tracing')
const LLMObsTagger = require('../tagger')

// we make this a `Plugin` so we don't have to worry about `finish` being called
class LLMObsPlugin extends TracingPlugin {
constructor (...args) {
super(...args)

this._tagger = new LLMObsTagger(this._tracerConfig, true)
}

getName () {}

setLLMObsTags (ctx) {
throw new Error('setLLMObsTags must be implemented by the subclass')
}

getLLMObsSPanRegisterOptions (ctx) {
getLLMObsSpanRegisterOptions (ctx) {
throw new Error('getLLMObsSPanRegisterOptions must be implemented by the subclass')
}

start (ctx) {
const oldStore = storage.getStore()
const parent = oldStore?.span
// even though llmobs span events won't be enqueued if llmobs is disabled
// we should avoid doing any computations here (these listeners aren't disabled)
const enabled = this._tracerConfig.llmobs.enabled
if (!enabled) return

const parent = this.getLLMObsParent(ctx)
const span = ctx.currentStore?.span

const registerOptions = this.getLLMObsSPanRegisterOptions(ctx)
const registerOptions = this.getLLMObsSpanRegisterOptions(ctx)

this._tagger.registerLLMObsSpan(span, { parent, ...registerOptions })
if (registerOptions) {
this._tagger.registerLLMObsSpan(span, { parent, ...registerOptions })
}
}

asyncEnd (ctx) {
Expand Down Expand Up @@ -60,6 +64,25 @@ class LLMObsPlugin extends TracingPlugin {
}
super.configure(config)
}

getLLMObsParent () {
// we need to look one level up the APM span stack to find the parent
// the current span is the current langchain span (it was activated in the tracing `bindStart`)
const parentApmSpan = apmStorage.getStore()?.span?._store?.span
const parentLLMObsSpan = storage.getStore()?.span

let parent
if (
parentApmSpan === parentLLMObsSpan || // they are the same
LLMObsTagger.tagMap.has(parentApmSpan) // they are not the same, but the APM span is a parent
) {
parent = parentApmSpan
} else {
parent = parentLLMObsSpan
}

return parent
}
}

module.exports = LLMObsPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const LangChainLLMObsHandler = require('.')
const { spanHasError } = require('../../../util')

class LangChainLLMObsChainHandler extends LangChainLLMObsHandler {
setMetaTags ({ span, inputs, results }) {
let input, output
if (inputs) {
input = this.formatIO(inputs)
}

if (!results || spanHasError(span)) {
output = ''
} else {
output = this.formatIO(results)
}

// chain spans will always be workflows
this._tagger.tagTextIO(span, input, output)
}
}

module.exports = LangChainLLMObsChainHandler
Loading
Loading