Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8298f34
Integrating AI Agent Capabilities into dubbo-admin
stringl1l1l1l Sep 1, 2025
b721d5a
Move function to agent package
stringl1l1l1l Sep 1, 2025
3cb05a0
Delete useless prompt
stringl1l1l1l Sep 1, 2025
dca9683
move config package to internal
stringl1l1l1l Sep 1, 2025
f0614f2
remove unnecessary nil explicit initialization
stringl1l1l1l Sep 1, 2025
e3ce659
Enable support for the Alibaba DashScope OpenAI-Compatible API
stringl1l1l1l Sep 1, 2025
542249b
Abstract models of different providers into Model class
stringl1l1l1l Sep 4, 2025
6e2b520
abstract Agent, Flow, and Tool input/output
stringl1l1l1l Sep 10, 2025
fb88f1b
Add simple chat history memory
stringl1l1l1l Sep 12, 2025
795e894
Add support of stream output
stringl1l1l1l Sep 14, 2025
090b1df
Convert Chinese comments to English
stringl1l1l1l Sep 14, 2025
4f2e6e1
replace genkit with fixed custom package
stringl1l1l1l Sep 15, 2025
baf8f2c
replace panic with error in String()
stringl1l1l1l Sep 17, 2025
2d31ab1
remove useless message package
stringl1l1l1l Sep 17, 2025
b703279
use channel to get stream chunk and output
stringl1l1l1l Sep 17, 2025
e70407c
move memory context initialization into agent
stringl1l1l1l Sep 18, 2025
18cc36e
fix bad channel usage in `react.Interact()`
stringl1l1l1l Sep 18, 2025
24d4efe
optimize agent initialization, use genkit-v1.0.4
stringl1l1l1l Sep 19, 2025
33df50f
implement gin web server with OpenAPI doc
stringl1l1l1l Sep 19, 2025
79825d0
support with Anthropic SSE format
stringl1l1l1l Sep 19, 2025
6c19e75
add user-friendly feedback and summary of react
stringl1l1l1l Sep 19, 2025
05e16eb
merge branch ai with origin/ai
stringl1l1l1l Sep 19, 2025
59e3c12
optimize prompts and react flows
stringl1l1l1l Sep 20, 2025
e72cbb7
decoupling chat and session & replace panic with error return
stringl1l1l1l Sep 22, 2025
536a027
sort and format imports
stringl1l1l1l Sep 22, 2025
144d272
introduce Pinecone RAG & Cohere Rerank utils
stringl1l1l1l Sep 23, 2025
b5ce578
optimize errors handle logic
stringl1l1l1l Sep 23, 2025
7bd0d06
add mock session create function
stringl1l1l1l Sep 23, 2025
f09acfc
optimize tool use, refactor propmts
stringl1l1l1l Sep 24, 2025
17d1826
fix bugs in ReAct flows, optimize propmts
stringl1l1l1l Sep 24, 2025
851fa88
optimize the startup logic, add README.md
stringl1l1l1l Sep 25, 2025
ebf55b9
add content block support, optimize prompts
stringl1l1l1l Sep 25, 2025
3749180
implement session granularity chat history, optimize prompts
stringl1l1l1l Sep 25, 2025
138a1e1
optimize errors sending
stringl1l1l1l Sep 25, 2025
30bc713
refactor memory, add sliding window and turn memory
stringl1l1l1l Sep 25, 2025
a64888d
add memory query tools to allow self memory retrieval
stringl1l1l1l Sep 25, 2025
4c7eeed
add markdown data cleaner and RAG support, optimize propmts for RAG
stringl1l1l1l Sep 26, 2025
f0bb27c
add support of usage statistics, optimize prompts, fix genkit bug
stringl1l1l1l Sep 26, 2025
d21ce51
optimize sse events sending and memory tool call
stringl1l1l1l Sep 27, 2025
90d0e28
optimize prompts and empty tool response handling
stringl1l1l1l Sep 27, 2025
7fac8d0
[chore]: update openapi doc
stringl1l1l1l Sep 29, 2025
306fe4f
[chore]: format code, add missing ASF headers
stringl1l1l1l Sep 29, 2025
8537d51
Merge branch 'ospp-2025' into ospp-2025-feat-ai
stringl1l1l1l Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ai/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.idea
.vscode
.genkit
.DS_Store

.env

reference
.qoder
30 changes: 30 additions & 0 deletions ai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# dubbo-admin-ai
## Introduction

This project is an intelligent agent's server for dubbo-admin.


## Startup
1. Set your API Keys in the `.env` file or set them as environment variables
```shell
# .env_example
PINECONE_API_KEY=your_pinecone_api_key
DASHSCOPE_API_KEY=your_dashscope_api_key
COHERE_API_KEY=your_cohere_api_key
SILICONFLOW_API_KEY=your_siliconflow_api_key
GEMINI_API_KEY=your_gemini_api_key
```

2. Run the server
```shell
go run main.go --mode dev --env your_env_file_path --port 8888
```

## Build and Run
```shell
mkdir build
cd build
go build -o dubbo-admin-ai ../

./dubbo-admin-ai --mode prod --env your_env_file_path --port 8888
```
298 changes: 298 additions & 0 deletions ai/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package agent

import (
"context"
"errors"
"fmt"

"dubbo-admin-ai/config"
"dubbo-admin-ai/memory"
"dubbo-admin-ai/schema"

"github.com/firebase/genkit/go/core"
)

type NoStream = struct{}
type StreamType interface {
NoStream | schema.StreamChunk
}

type Flow = *core.Flow[schema.Schema, schema.Schema, any]
type NormalFlow = *core.Flow[schema.Schema, schema.Schema, NoStream]
type StreamFlow = *core.Flow[schema.Schema, schema.Schema, schema.StreamChunk]

type StreamHandler = func(*core.StreamingFlowValue[schema.Schema, schema.StreamChunk], error) bool
type StreamFunc = func(*Channels) StreamHandler

const (
IntentFlowName string = "intent"
ThinkFlowName string = "think"
ActFlowName string = "act"
ObserveFlowName string = "observe"
ReActFlowName string = "reAct"
)

type Agent interface {
Interact(*schema.UserInput, string) *Channels
GetMemory() *memory.History
}

type Channels struct {
closed bool

UserRespChan chan *schema.StreamFeedback
FlowChan chan schema.Schema
ErrorChan chan error
}

func NewChannels(bufferSize int) *Channels {
return &Channels{
closed: false,
UserRespChan: make(chan *schema.StreamFeedback, bufferSize),
FlowChan: make(chan schema.Schema, bufferSize),
ErrorChan: make(chan error, bufferSize),
}
}

func (chans *Channels) Reset() {
chans.closed = false
}

func (chans *Channels) Close() {
chans.closed = true
}

func (chans *Channels) Closed() bool {
return chans.closed
}

func (chans *Channels) Destroy() {
close(chans.UserRespChan)
close(chans.FlowChan)
close(chans.ErrorChan)
chans = nil
}

type StageType int

const (
BeforeLoop StageType = iota
InLoop
AfterLoop
)

type Stage struct {
flow any
streamFunc StreamFunc
Type StageType
}

func NewStage(flow any, t StageType) *Stage {
return &Stage{
flow: flow,
streamFunc: nil,
Type: t,
}
}

func NewStreamStage(flow any, t StageType, onStreaming func(*Channels, schema.StreamChunk) error, onDone func(*Channels, schema.Schema) error) (stage *Stage) {
if onStreaming == nil || onDone == nil {
panic("onStreaming, onDone and streamChan callbacks cannot be nil for streaming stage")
}

stage = NewStage(flow, t)
stage.streamFunc = func(channels *Channels) StreamHandler {
return func(val *core.StreamingFlowValue[schema.Schema, schema.StreamChunk], err error) bool {
if err != nil {
channels.ErrorChan <- err
return false
}
if !val.Done {
if err := onStreaming(channels, val.Stream); err != nil {
channels.ErrorChan <- err
return false
}
} else if val.Output != nil {
if err := onDone(channels, val.Output); err != nil {
channels.ErrorChan <- err
return false
}
}
return true
}
}
return stage
}

// Execute will receive an input and produce an output
func (s *Stage) Execute(ctx context.Context, chans *Channels, input schema.Schema) error {
if value, ok := s.flow.(NormalFlow); ok {
output, err := value.Run(ctx, input)
if err != nil {
return fmt.Errorf("error when running normal flow: %w", err)
}
chans.FlowChan <- output
} else if value, ok := s.flow.(StreamFlow); ok {
if chans == nil || s.streamFunc == nil {
return fmt.Errorf("stream handler and channels cannot be nil for stream flow")
}
// the stream function will produce the output
value.Stream(ctx, input)(s.streamFunc(chans))
}

return nil
}

type Orchestrator interface {
Run(context.Context, schema.Schema, *Channels) error
RunStage(context.Context, string, schema.Schema, *Channels) error
}

type OrderOrchestrator struct {
stages map[string]*Stage
beforeLoop []string
loop []string
afterLoop []string
}

// The order of stages is the order in which they are executed
func NewOrderOrchestrator(stages ...*Stage) *OrderOrchestrator {
stagesMap := make(map[string]*Stage, len(stages))
loop := make([]string, 0, len(stages))
beforeLoop := make([]string, 0, len(stages))
afterLoop := make([]string, 0, len(stages))

for _, stage := range stages {
// Get the flow name through interface method
var flowName string
if nFlow, ok := stage.flow.(NormalFlow); ok {
flowName = nFlow.Name()
} else if sFlow, ok := stage.flow.(StreamFlow); ok {
flowName = sFlow.Name()
}
stagesMap[flowName] = stage
switch stage.Type {
case BeforeLoop:
beforeLoop = append(beforeLoop, flowName)
case InLoop:
loop = append(loop, flowName)
case AfterLoop:
afterLoop = append(afterLoop, flowName)
}
}

return &OrderOrchestrator{
stages: stagesMap,
loop: loop,
beforeLoop: beforeLoop,
afterLoop: afterLoop,
}
}

func (orchestrator *OrderOrchestrator) Run(ctx context.Context, input schema.Schema, chans *Channels) (err error) {
defer func() {
if err != nil {
chans.ErrorChan <- err
}
}()
// Use user initial input for the first round
if input == nil {
return errors.New("userInput cannot be nil")
}

for _, key := range orchestrator.beforeLoop {
curStage, ok := orchestrator.stages[key]
if !ok {
return fmt.Errorf("stage %s not found", key)
}

if err := curStage.Execute(ctx, chans, input); err != nil {
return fmt.Errorf("failed to execute stage %s: %w", key, err)
}
output := <-chans.FlowChan

input = output
}

// Iterate until reaching maximum iterations or status is Finished
var finalOutput schema.Observation
Outer:
for range config.MAX_REACT_ITERATIONS {
for _, order := range orchestrator.loop {
// Execute current stage
curStage, ok := orchestrator.stages[order]
if !ok {
return fmt.Errorf("stage %s not found", order)
}

if err := curStage.Execute(ctx, chans, input); err != nil {
return fmt.Errorf("failed to execute stage %s: %w", order, err)
}
output := <-chans.FlowChan

// Check if LLM returned final answer
if out, ok := output.(schema.Observation); ok {
if !out.Heartbeat && out.FinalAnswer != "" {
finalOutput = out
break Outer
}
}
// The output of current stage will be the input of the next stage
if val, ok := output.(schema.Observation); ok {
finalOutput = val
}
input = output
}
}
chans.UserRespChan <- schema.StreamFinal(&finalOutput)

for _, key := range orchestrator.afterLoop {
curStage, ok := orchestrator.stages[key]
if !ok {
return fmt.Errorf("stage %s not found", key)
}

if err := curStage.Execute(ctx, chans, input); err != nil {
return fmt.Errorf("failed to execute stage %s: %w", key, err)
}
output := <-chans.FlowChan

input = output
}

return nil
}

func (orchestrator *OrderOrchestrator) RunStage(ctx context.Context, key string, input schema.Schema, chans *Channels) (err error) {
defer func() {
chans.Close()
if err != nil {
chans.ErrorChan <- err
}
}()
stage, ok := orchestrator.stages[key]
if !ok {
return fmt.Errorf("stage %s not found", key)
}
if err := stage.Execute(ctx, chans, input); err != nil {
return fmt.Errorf("failed to execute stage %s: %w", key, err)
}
return nil
}
Loading