Skip to content

Commit

Permalink
AI pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Dec 13, 2024
1 parent 956c9f6 commit 0c8a703
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 26 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/Masterminds/sprig/v3 v3.3.0
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.47.2
github.com/conduitio-labs/conduit-connector-weaviate v0.1.2-0.20241125183412-ae12206633d7
github.com/conduitio/conduit-commons v0.5.0
github.com/conduitio/conduit-connector-file v0.9.0
github.com/conduitio/conduit-connector-generator v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ github.com/ckaznocha/intrange v0.2.1/go.mod h1:7NEhVyf8fzZO5Ds7CRaqPEm52Ut83hsTi
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio-labs/conduit-connector-weaviate v0.1.2-0.20241125183412-ae12206633d7 h1:DnugxVjYVSCX2pvLlQAlTK2AEopieV0LjUKtnAedStE=
github.com/conduitio-labs/conduit-connector-weaviate v0.1.2-0.20241125183412-ae12206633d7/go.mod h1:rIJkVcS9cfFcy5rrimYu3wzaQJypkmXDNMmWqJKbGeg=
github.com/conduitio/conduit-commons v0.5.0 h1:28UIuOIo+6WvBZ4EU54KfPhSf44I1/Y65zQ9dC0Ps1E=
github.com/conduitio/conduit-commons v0.5.0/go.mod h1:xyT6XpGvj79gdtsn3qaD2KxadhsAYS+mmBOdln08Wio=
github.com/conduitio/conduit-connector-file v0.9.0 h1:bp6mU5hMbWS5QK9hh3Vt+AigXSPFfc62lzHZKJZkscg=
Expand Down
14 changes: 8 additions & 6 deletions pkg/plugin/connector/builtin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"runtime/debug"

weaviate "github.com/conduitio-labs/conduit-connector-weaviate"
file "github.com/conduitio/conduit-connector-file"
generator "github.com/conduitio/conduit-connector-generator"
kafka "github.com/conduitio/conduit-connector-kafka"
Expand All @@ -38,12 +39,13 @@ import (
// The key of the map is the import path of the module
// containing the connector implementation.
var DefaultBuiltinConnectors = map[string]sdk.Connector{
"github.com/conduitio/conduit-connector-file": file.Connector,
"github.com/conduitio/conduit-connector-generator": generator.Connector,
"github.com/conduitio/conduit-connector-kafka": kafka.Connector,
"github.com/conduitio/conduit-connector-log": connLog.Connector,
"github.com/conduitio/conduit-connector-postgres": postgres.Connector,
"github.com/conduitio/conduit-connector-s3": s3.Connector,
"github.com/conduitio/conduit-connector-file": file.Connector,
"github.com/conduitio/conduit-connector-generator": generator.Connector,
"github.com/conduitio/conduit-connector-kafka": kafka.Connector,
"github.com/conduitio/conduit-connector-log": connLog.Connector,
"github.com/conduitio/conduit-connector-postgres": postgres.Connector,
"github.com/conduitio/conduit-connector-s3": s3.Connector,
"github.com/conduitio-labs/conduit-connector-weaviate": weaviate.Connector,
}

type blueprint struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/processor/builtin/impl/ai/openai/embedding.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type embeddingProcessor struct {

func NewEmbeddingProcessor(log log.CtxLogger) sdk.Processor {
return &embeddingProcessor{
logger: log.WithComponent("openai_embedding"),
logger: log.WithComponent("openai.embedding"),
}
}

Expand Down Expand Up @@ -138,6 +138,7 @@ func (p *embeddingProcessor) Process(ctx context.Context, records []opencdc.Reco
Msg("got embeddings")

for i, record := range records {
// todo this is huge
record.Metadata[EmbeddingMetadataBase64] = embeddings.Data[i].EmbeddingBase64
// todo add more metadata related to the embeddings
out = append(out, sdk.SingleRecord(record))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/plugin/processor/builtin/impl/ai/weaviate/get_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
)

type getContextConfig struct {
Endpoint string `json:"endpoint" default:"localhost:18080"`
Scheme string `json:"scheme" default:"http"`
ClassName string `json:"className" validate:"required"`
ContextField string `json:"contextField" validate:"required"`
}
Expand Down Expand Up @@ -76,8 +78,8 @@ func (p *getContextProcessor) Configure(ctx context.Context, c config.Config) er
func (p *getContextProcessor) Open(ctx context.Context) error {
// Configure Weaviate client
cfg := weaviate.Config{
Host: "localhost:8080", // Replace with your Weaviate instance URL
Scheme: "http",
Host: p.cfg.Endpoint, // Replace with your Weaviate instance URL
Scheme: p.cfg.Scheme,
}
client, err := weaviate.NewClient(cfg)
if err != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 25 additions & 16 deletions pkg/plugin/processor/builtin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,34 @@ import (
)

var DefaultBuiltinProcessors = map[string]ProcessorPluginConstructor{
"avro.decode": avro.NewDecodeProcessor,
"avro.encode": avro.NewEncodeProcessor,
"base64.decode": base64.NewDecodeProcessor,
"base64.encode": base64.NewEncodeProcessor,
"custom.javascript": custom.NewJavascriptProcessor,
"error": impl.NewErrorProcessor,
"filter": impl.NewFilterProcessor,
"field.convert": field.NewConvertProcessor,
"field.exclude": field.NewExcludeProcessor,
"field.rename": field.NewRenameProcessor,
"field.set": field.NewSetProcessor,
"json.decode": json.NewDecodeProcessor,
"json.encode": json.NewEncodeProcessor,
"avro.decode": avro.NewDecodeProcessor,
"avro.encode": avro.NewEncodeProcessor,

"base64.decode": base64.NewDecodeProcessor,
"base64.encode": base64.NewEncodeProcessor,

"custom.javascript": custom.NewJavascriptProcessor,
"error": impl.NewErrorProcessor,

"filter": impl.NewFilterProcessor,

"field.convert": field.NewConvertProcessor,
"field.exclude": field.NewExcludeProcessor,
"field.rename": field.NewRenameProcessor,
"field.set": field.NewSetProcessor,

"json.decode": json.NewDecodeProcessor,
"json.encode": json.NewEncodeProcessor,

"unwrap.debezium": unwrap.NewDebeziumProcessor,
"unwrap.kafkaconnect": unwrap.NewKafkaConnectProcessor,
"unwrap.opencdc": unwrap.NewOpenCDCProcessor,
"webhook.http": webhook.NewHTTPProcessor,
"openai.embedding": openai.NewEmbeddingProcessor,
"openai.prompt": openai.NewPromptProcessor,

"webhook.http": webhook.NewHTTPProcessor,

"openai.embedding": openai.NewEmbeddingProcessor,
"openai.prompt": openai.NewPromptProcessor,

"weaviate.getContext": weaviate.NewGetContextProcessor,
}

Expand Down

0 comments on commit 0c8a703

Please sign in to comment.