diff --git a/cmds/protoc-gen-cloud-job/internal/gen.go b/cmds/protoc-gen-cloud-job/internal/gen.go new file mode 100644 index 00000000..e07417b7 --- /dev/null +++ b/cmds/protoc-gen-cloud-job/internal/gen.go @@ -0,0 +1,135 @@ +package internal + +import ( + "fmt" + "strings" + + "github.com/dave/jennifer/jen" + "github.com/pubgo/lava/pkg/proto/cloudjobpb" + "google.golang.org/protobuf/compiler/protogen" + "google.golang.org/protobuf/proto" +) + +const jobPkg = "github.com/pubgo/lava/component/cloudjobs" + +type eventInfo struct { + srv *protogen.Service + mth *protogen.Method +} + +// GenerateFile generates a .errors.pb.go file containing service definitions. +func GenerateFile(gen *protogen.Plugin, file *protogen.File) *protogen.GeneratedFile { + filename := file.GeneratedFilenamePrefix + ".pb.cloud_job.go" + genFile := jen.NewFile(string(file.GoPackageName)) + genFile.HeaderComment("Code generated by protoc-gen-cloud-job. DO NOT EDIT.") + genFile.HeaderComment("versions:") + genFile.HeaderComment(fmt.Sprintf("- protoc-gen-cloud-job %s", version)) + genFile.HeaderComment(fmt.Sprintf("- protoc %s", protocVersion(gen))) + if file.Proto.GetOptions().GetDeprecated() { + genFile.HeaderComment(fmt.Sprintf("%s is a deprecated file.", file.Desc.Path())) + } else { + genFile.HeaderComment(fmt.Sprintf("source: %s", file.Desc.Path())) + } + + g := gen.NewGeneratedFile(filename, file.GoImportPath) + g.Skip() + + if len(file.Services) == 0 { + return g + } + + var events = make(map[string]map[string]*eventInfo) + for _, srv := range file.Services { + name, ok := proto.GetExtension(srv.Desc.Options(), cloudjobpb.E_JobName).(string) + if !ok || name == "" { + continue + } + + if events[name] == nil { + events[name] = map[string]*eventInfo{} + } + + for _, m := range srv.Methods { + jobSubjectName, ok := proto.GetExtension(m.Desc.Options(), cloudjobpb.E_SubjectName).(string) + if !ok || jobSubjectName == "" { + continue + } + + if events[name][jobSubjectName] != nil { + gen.Error(fmt.Errorf("cloud job:%s subject:%s exists", name, jobSubjectName)) + return g + } + + events[name][jobSubjectName] = &eventInfo{srv: srv, mth: m} + } + } + + if len(events) == 0 { + return g + } + + g.Unskip() + for name, subjects := range events { + for subName, info := range subjects { + code := strings.ReplaceAll(info.srv.GoName, "InnerService", "") + code = strings.ReplaceAll(code, "Service", "") + var keyName = fmt.Sprintf("%s%sKey", code, info.mth.GoName) + genFile.Commentf("%s %s/%s", keyName, info.srv.GoName, info.mth.GoName) + genFile.Const(). + Id(keyName). + Op("="). + Lit(subName) + genFile.Var().Id("_").Op("=").Qual(jobPkg, "RegisterSubject"). + Call(jen.Id(keyName), jen.New(jen.Id(info.mth.Input.GoIdent.GoName))).Line() + + genFile. + Func(). + Id(fmt.Sprintf("Register%s%sAsyncJob", code, info.mth.GoName)). + Params( + jen.Id("jobCli").Op("*").Qual(jobPkg, "Client"), + jen.Id("handler").Func().Params( + jen.Id("ctx").Op("*").Qual(jobPkg, "Context"), + jen.Id("req").Op("*").Id(info.mth.Input.GoIdent.GoName), + ).Error(), + ). + Block(jen.Qual(jobPkg, "RegisterJobHandler").Call(jen.Id("jobCli"), jen.Lit(name), jen.Id(keyName), jen.Id("handler"))) + genFile.Line() + + var prefix = fmt.Sprintf("Push%s", code) + var mthName = fmt.Sprintf("%sAsyncJob", info.mth.GoName) + mthName = handlerPushEventName(mthName, prefix) + genFile. + Func(). + Id(mthName). + Params( + jen.Id("jobCli").Op("*").Qual(jobPkg, "Client"), + jen.Id("ctx").Qual("context", "Context"), + jen.Id("req").Op("*").Id(info.mth.Input.GoIdent.GoName), + ). + Error(). + Block(jen.Return().Id("jobCli").Dot("Publish").Call(jen.Id("ctx"), jen.Id(keyName), jen.Id("req"))) + } + } + + g.P(genFile.GoString()) + return g +} + +func protocVersion(gen *protogen.Plugin) string { + v := gen.Request.GetCompilerVersion() + if v == nil { + return "(unknown)" + } + var suffix string + if s := v.GetSuffix(); s != "" { + suffix = "-" + s + } + return fmt.Sprintf("v%d.%d.%d%s", v.GetMajor(), v.GetMinor(), v.GetPatch(), suffix) +} + +func handlerPushEventName(name string, prefix string) string { + if strings.HasPrefix(name, prefix) { + return name + } + return fmt.Sprintf("%s%s", prefix, name) +} diff --git a/cmds/protoc-gen-cloud-job/internal/version.go b/cmds/protoc-gen-cloud-job/internal/version.go new file mode 100644 index 00000000..9be1654c --- /dev/null +++ b/cmds/protoc-gen-cloud-job/internal/version.go @@ -0,0 +1,3 @@ +package internal + +const version = "v0.0.1" diff --git a/cmds/protoc-gen-cloud-job/main.go b/cmds/protoc-gen-cloud-job/main.go new file mode 100644 index 00000000..ef94670d --- /dev/null +++ b/cmds/protoc-gen-cloud-job/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "flag" + + "github.com/pubgo/lava/cmds/protoc-gen-cloud-job/internal" + "google.golang.org/protobuf/compiler/protogen" + "google.golang.org/protobuf/types/pluginpb" +) + +func main() { + flag.Parse() + + protogen.Options{ParamFunc: flag.CommandLine.Set}. + Run(func(gp *protogen.Plugin) error { + gp.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL) + + for _, name := range gp.Request.FileToGenerate { + internal.GenerateFile(gp, gp.FilesByPath[name]) + } + + return nil + }) +} diff --git a/component/cloudjobs/aaa.go b/component/cloudjobs/aaa.go new file mode 100644 index 00000000..3d810b3d --- /dev/null +++ b/component/cloudjobs/aaa.go @@ -0,0 +1,14 @@ +package cloudjobs + +import ( + "github.com/pubgo/funk/log" + "google.golang.org/protobuf/proto" +) + +var logger = log.GetLogger("cloud_jobs") + +type Register interface { + RegisterCloudJobs(jobCli *Client) +} + +type JobHandler[T proto.Message] func(ctx *Context, args T) error diff --git a/component/cloudjobs/client.go b/component/cloudjobs/client.go new file mode 100644 index 00000000..16606831 --- /dev/null +++ b/component/cloudjobs/client.go @@ -0,0 +1,472 @@ +package cloudjobs + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/pubgo/funk/assert" + "github.com/pubgo/funk/errors" + "github.com/pubgo/funk/log" + "github.com/pubgo/funk/result" + "github.com/pubgo/funk/running" + "github.com/pubgo/funk/stack" + "github.com/pubgo/funk/try" + "github.com/pubgo/funk/version" + "github.com/pubgo/lava/component/natsclient" + "github.com/pubgo/lava/core/lifecycle" + "github.com/pubgo/lava/internal/ctxutil" + "github.com/pubgo/lava/pkg/typex" + "github.com/rs/xid" + "github.com/rs/zerolog" + "github.com/samber/lo" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +type Params struct { + Nc *natsclient.Client + Cfg *Config + Lc lifecycle.Lifecycle +} + +func New(p Params) *Client { + js := assert.Must1(jetstream.New(p.Nc.Conn)) + return &Client{ + p: p, + js: js, + prefix: DefaultPrefix, + handlers: make(map[string]map[string]JobHandler[proto.Message]), + streams: make(map[string]jetstream.Stream), + consumers: make(map[string]map[string]jetstream.Consumer), + jobs: make(map[string]map[string]map[string]*jobHandler), + } +} + +type Client struct { + p Params + js jetstream.JetStream + + // stream manager + streams map[string]jetstream.Stream + + // jobs: stream->consumer->Consumer + consumers map[string]map[string]jetstream.Consumer + + // handlers: job name -> subject -> job handler + handlers map[string]map[string]JobHandler[proto.Message] + + // jobs: stream->consumer->subject->jobHandler + jobs map[string]map[string]map[string]*jobHandler + + // stream, consumer, subject prefix, default: DefaultPrefix + prefix string +} + +func (c *Client) initStream() error { + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + for streamName, cfg := range c.p.Cfg.Streams { + streamName = c.streamName(streamName) + + assert.If(c.streams[streamName] != nil, "stream %s already exists", streamName) + + // add subject prefix + streamSubjects := lo.Map(cfg.Subjects, func(item string, index int) string { return c.subjectName(item) }) + metadata := map[string]string{"creator": fmt.Sprintf("%s/%s/%s", version.Project(), version.Version(), running.InstanceID)} + storageType := getStorageType(cfg.Storage) + streamCfg := jetstream.StreamConfig{ + Name: streamName, + Subjects: streamSubjects, + Metadata: metadata, + Storage: storageType, + } + + stream, err := c.js.CreateOrUpdateStream(ctx, streamCfg) + if err != nil && !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) { + return errors.Wrapf(err, "failed to create stream:%s", streamName) + } + c.streams[streamName] = stream + } + return nil +} + +func (c *Client) initConsumer() error { + allEventKeysSet := mapset.NewSet(lo.MapToSlice(subjects, func(key string, value proto.Message) string { return c.subjectName(key) })...) + + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + for jobOrConsumerName, consumers := range c.p.Cfg.Consumers { + jobName := jobOrConsumerName + assert.If(c.handlers[jobName] == nil, "failed to find job handler: %s, please impl RegisterAsyncJob", jobName) + + consumerName := jobOrConsumerName + for _, cfg := range consumers { + // check subject exists + for _, sub := range cfg.Subjects { + name := c.subjectName(lo.FromPtr(sub.Name)) + assert.If(!allEventKeysSet.Contains(name), "subject:%s not found, please check protobuf define and service", name) + } + + consumerName = c.consumerName(typex.DoFunc1(func() string { + if cfg.Consumer != nil { + return lo.FromPtr(cfg.Consumer) + } + return consumerName + })) + streamName := c.streamName(cfg.Stream) + + // consumer init + typex.DoFunc(func() { + if c.consumers[streamName] == nil { + c.consumers[streamName] = make(map[string]jetstream.Consumer) + } + // A streaming consumer can only have one corresponding job handler + assert.If(c.consumers[streamName][consumerName] != nil, "consumer %s already exists", consumerName) + + metadata := map[string]string{"version": fmt.Sprintf("%s/%s", version.Project(), version.Version())} + consumerCfg := jetstream.ConsumerConfig{ + Name: consumerName, + Durable: consumerName, + Metadata: metadata, + } + + consumer, err := c.js.CreateOrUpdateConsumer(ctx, streamName, consumerCfg) + assert.Fn(err != nil && !errors.Is(err, jetstream.ErrConsumerExists), func() error { + return errors.Wrapf(err, "stream=%s consumer=%s", streamName, consumerName) + }) + logger.Info().Func(func(e *zerolog.Event) { + e.Str("stream", streamName) + e.Str("consumer", consumerName) + e.Msg("register consumer success") + }) + + c.consumers[streamName][consumerName] = consumer + }) + + typex.DoFunc(func() { + if c.jobs[streamName] == nil { + c.jobs[streamName] = make(map[string]map[string]*jobHandler) + } + + if c.jobs[streamName][consumerName] == nil { + c.jobs[streamName][consumerName] = map[string]*jobHandler{} + } + + baseJobConfig := handleDefaultJobConfig(cfg.Job) + subjectMap := lo.SliceToMap(cfg.Subjects, func(item1 *strOrJobConfig) (string, *JobConfig) { + item := lo.ToPtr(JobConfig(lo.FromPtr(item1))) + return c.subjectName(*item.Name), mergeJobConfig(item, baseJobConfig) + }) + + for subName, subCfg := range subjectMap { + assert.If(c.handlers[jobName][subName] == nil, "job handler not found, job_name=%s sub_name=%s", jobName, subName) + + job := &jobHandler{ + name: jobName, + handler: c.handlers[jobName][subName], + cfg: subCfg, + } + + logger.Info().Func(func(e *zerolog.Event) { + e.Str("job_name", job.name) + e.Str("job_handler", stack.CallerWithFunc(job.handler).String()) + e.Any("job_config", subCfg) + e.Any("stream_name", streamName) + e.Any("consumer_name", consumerName) + e.Any("job_subject", subName) + e.Msg("register async job handler executor") + }) + c.jobs[streamName][consumerName][subName] = job + } + }) + } + } + return nil +} + +func (c *Client) doHandler(meta *jetstream.MsgMetadata, msg jetstream.Msg, job *jobHandler, cfg *JobConfig) (gErr error) { + var timeout = lo.FromPtr(cfg.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + ctx = log.UpdateEventCtx(ctx, log.Map{ + "sub_subject": msg.Subject(), + "sub_stream": meta.Stream, + "sub_consumer": meta.Consumer, + "sub_msg_id": msg.Headers().Get(jetstream.MsgIDHeader), + "sub_msg_" + senderKey: msg.Headers().Get(senderKey), + }) + + msgCtx := &Context{ + Context: ctx, + Header: http.Header(msg.Headers()), + NumDelivered: meta.NumDelivered, + NumPending: meta.NumPending, + Timestamp: meta.Timestamp, + Stream: meta.Stream, + Consumer: meta.Consumer, + Subject: msg.Subject(), + Config: cfg, + } + + var now = time.Now() + var args any + defer func() { + if gErr == nil { + return + } + + logger.Err(gErr).Func(func(e *zerolog.Event) { + e.Any("context", msgCtx) + e.Any("args", args) + e.Str("timeout", timeout.String()) + e.Str("start_time", now.String()) + e.Str("job_cost", time.Since(now).String()) + e.Msg("failed to do async job handler") + }) + }() + + var pb anypb.Any + if err := proto.Unmarshal(msg.Data(), &pb); err != nil { + return errors.WrapTag(err, + errors.T("msg", "failed to unmarshal stream msg data to any proto"), + errors.T("args", string(msg.Data())), + ) + } + args = &pb + + dst, err := anypb.UnmarshalNew(&pb, proto.UnmarshalOptions{}) + if err != nil { + return errors.WrapTag(err, + errors.T("msg", "failed to unmarshal any proto to proto msg"), + errors.T("args", pb.String()), + ) + } + args = &dst + + return errors.WrapFn(job.handler(msgCtx, dst), func() errors.Tags { + return errors.Tags{ + errors.T("msg", "failed to do async job handler"), + errors.T("args", dst), + errors.T("any_pb", pb.String()), + } + }) +} + +func (c *Client) doConsume() error { + for streamName, consumers := range c.consumers { + for consumerName, consumer := range consumers { + assert.If(c.jobs[streamName] == nil, "stream not found, stream=%s", streamName) + assert.If(c.jobs[streamName][consumerName] == nil, "consumer not found, consumer=%s", consumerName) + + jobSubjects := c.jobs[streamName][consumerName] + + logger.Info().Func(func(e *zerolog.Event) { + e.Str("stream", streamName) + e.Str("consumer", consumerName) + e.Any("subjects", lo.MapKeys(jobSubjects, func(_ *jobHandler, key string) string { return key })) + e.Msg("async job do consumer") + }) + + var doConsumeHandler = func(msg jetstream.Msg) { + var now = time.Now() + var addMsgInfo = func(e *zerolog.Event) { + e.Str("stream", streamName) + e.Str("consumer", consumerName) + e.Any("header", msg.Headers()) + e.Any("msg_id", msg.Headers().Get(jetstream.MsgIDHeader)) + e.Str("subject", msg.Subject()) + e.Str("msg_received_time", now.String()) + e.Str("job_cost", time.Since(now).String()) + } + + logger.Debug().Func(func(e *zerolog.Event) { + addMsgInfo(e) + e.Msg("received async job event") + }) + + handler := jobSubjects[msg.Subject()] + if handler == nil { + logger.Error().Func(addMsgInfo).Msg("failed to find subject job handler") + return + } + + meta, err := msg.Metadata() + if err != nil { + // no ack, retry always, unless it can recognize special error information + logger.Err(err).Func(addMsgInfo).Msg("failed to parse nats stream msg metadata") + return + } + + var cfg = handler.cfg + var checkErrAndLog = func(err error, msg string) { + if err == nil { + return + } + + logger.Err(err). + Str("fn_caller", stack.Caller(1).String()). + Func(addMsgInfo). + Any("metadata", meta). + Any("config", cfg). + Any("msg_received_time", now.String()). + Str("job_cost", time.Since(now).String()). + Msg(msg) + } + + err = try.Try(func() error { return c.doHandler(meta, msg, handler, cfg) }) + if err == nil { + checkErrAndLog(msg.Ack(), "failed to do msg ack with handler ok") + return + } + + // reject job msg + if isRejectErr(err) { + checkErrAndLog(msg.TermWithReason("reject by caller"), "failed to do msg ack with reject err") + return + } + + var backoff = lo.FromPtr(cfg.RetryBackoff) + var maxRetries = lo.FromPtr(cfg.MaxRetry) + + // Proactively retry and did not reach the maximum retry count + if meta.NumDelivered < uint64(maxRetries) { + logger.Warn(). + Err(err). + Func(addMsgInfo). + Any("metadata", meta). + Msg("retry nats stream async job event") + checkErrAndLog(msg.NakWithDelay(backoff), "failed to retry msg with delay nak") + return + } + + checkErrAndLog(err, "failed to do handler async job") + checkErrAndLog(msg.Ack(), "failed to do msg ack with handler error") + } + + con := result.Of(consumer.Consume(doConsumeHandler)) + if con.IsErr() { + return errors.WrapCaller(con.Err()) + } + c.p.Lc.BeforeStop(func() { con.Unwrap().Stop() }) + } + } + return nil +} + +func (c *Client) Start() error { + if err := c.initStream(); err != nil { + return errors.WrapCaller(err) + } + + if err := c.initConsumer(); err != nil { + return errors.WrapCaller(err) + } + + return errors.WrapCaller(c.doConsume()) +} + +func (c *Client) Publish(ctx context.Context, key string, args proto.Message, opts ...jetstream.PublishOpt) error { + return c.publish(ctx, key, args, opts...) +} + +func (c *Client) publish(ctx context.Context, key string, args proto.Message, opts ...jetstream.PublishOpt) (gErr error) { + var timeout = ctxutil.GetTimeout(ctx) + var now = time.Now() + var msgId = xid.New().String() + defer func() { + var msgFn = func(e *zerolog.Event) { + e.Str("pub_topic", key) + e.Str("pub_start", now.String()) + e.Any("pub_args", args) + e.Str("pub_cost", time.Since(now).String()) + e.Str("pub_msg_id", msgId) + if timeout != nil { + e.Str("timeout", timeout.String()) + } + } + if gErr == nil { + logger.Info(ctx).Func(msgFn).Msg("succeed to publish async job event to stream") + } else { + logger.Err(gErr, ctx).Func(msgFn).Msg("failed to publish async job event to stream") + } + }() + + pb, err := anypb.New(args) + if err != nil { + return errors.Wrap(err, "failed to marshal args to any proto") + } + + // TODO get info from ctx + + data, err := proto.Marshal(pb) + if err != nil { + return errors.Wrap(err, "failed to marshal any proto to bytes") + } + + // subject|topic name + key = c.subjectName(key) + + msg := &nats.Msg{ + Subject: key, + Data: data, + Header: nats.Header{ + senderKey: []string{fmt.Sprintf("%s/%s", running.Project, running.Version)}, + }, + } + + opts = append(opts, jetstream.WithMsgID(msgId)) + _, err = c.js.PublishMsg(ctx, msg, opts...) + if err != nil { + return errors.Wrapf(err, "failed to publish msg to stream, topic=%s msg_id=%s", key, msgId) + } + + return nil +} + +func (c *Client) streamName(name string) string { + prefix := fmt.Sprintf("%s:", c.prefix) + if strings.HasPrefix(name, prefix) { + return name + } + + return fmt.Sprintf("%s%s", prefix, name) +} + +func (c *Client) consumerName(name string) string { + prefix := fmt.Sprintf("%s:", c.prefix) + if strings.HasPrefix(name, prefix) { + return name + } + + return fmt.Sprintf("%s%s", prefix, name) +} + +func (c *Client) subjectName(name string) string { + return handleSubjectName(name, c.prefix) +} + +func (c *Client) registerJobHandler(jobName string, topic string, handler JobHandler[proto.Message]) { + assert.If(handler == nil, "job handler is nil") + assert.If(subjects[topic] == nil, "topic:%s not found", topic) + + if c.handlers[jobName] == nil { + c.handlers[jobName] = map[string]JobHandler[proto.Message]{} + } + + topic = c.subjectName(topic) + c.handlers[jobName][topic] = handler + + logger.Info().Func(func(e *zerolog.Event) { + e.Str("job_name", jobName) + e.Str("topic", topic) + e.Str("job_handler", stack.CallerWithFunc(handler).String()) + e.Msg("register async job handler") + }) +} diff --git a/component/cloudjobs/config.go b/component/cloudjobs/config.go new file mode 100644 index 00000000..40a4bc9b --- /dev/null +++ b/component/cloudjobs/config.go @@ -0,0 +1,103 @@ +package cloudjobs + +import ( + "time" + + "github.com/pubgo/funk/assert" + "github.com/pubgo/funk/errors" + "github.com/pubgo/lava/pkg/typex" + "google.golang.org/protobuf/proto" + yaml "gopkg.in/yaml.v3" +) + +const DefaultPrefix = "acj" +const DefaultTimeout = 15 * time.Second +const DefaultMaxRetry = 3 +const DefaultRetryBackoff = time.Second +const senderKey = "sender" + +type Config struct { + // Streams: nats stream config + Streams map[string]*StreamConfig `yaml:"streams"` + + // Consumers: nats consumer config + Consumers map[string]typex.YamlListType[*ConsumerConfig] `yaml:"consumers"` +} + +type StreamConfig struct { + // Storage jetstream.StorageType + Storage string `yaml:"storage"` + + // Subjects stream subscribe subject, e.g. nvr.speaker.* without prefix + Subjects typex.YamlListType[string] `yaml:"subjects"` +} + +type ConsumerConfig struct { + // Consumer name without prefix + Consumer *string `yaml:"consumer"` + + // Stream name without prefix + Stream string `yaml:"stream"` + + // Subjects config + Subjects typex.YamlListType[*strOrJobConfig] `yaml:"subjects"` + + // Job config + Job *JobConfig `yaml:"job"` +} + +type JobConfig struct { + // Name subject name + Name *string `yaml:"name"` + + // Timeout job executor timeout, default: DefaultTimeout + Timeout *time.Duration `yaml:"timeout"` + + // MaxRetry max retries, default: DefaultMaxRetry + MaxRetry *int `yaml:"max_retries"` + + // RetryBackoff retry backoff, default: DefaultRetryBackoff + RetryBackoff *time.Duration `yaml:"retry_backoff"` +} + +type jobHandler struct { + // job name + name string + + // job handler + handler func(ctx *Context, args proto.Message) error + + // job config + cfg *JobConfig +} + +type strOrJobConfig JobConfig + +func (p *strOrJobConfig) UnmarshalYAML(value *yaml.Node) error { + if value.IsZero() { + return nil + } + + switch value.Kind { + case yaml.ScalarNode: + var data string + if err := value.Decode(&data); err != nil { + return errors.WrapCaller(err) + } + + *p = strOrJobConfig(JobConfig{Name: &data}) + return nil + case yaml.MappingNode: + var data JobConfig + if err := value.Decode(&data); err != nil { + return errors.WrapCaller(err) + } + + *p = strOrJobConfig(data) + return nil + default: + var val any + assert.Exit(value.Decode(&val)) + return errors.Format("yaml kind type error,kind=%v data=%v", value.Kind, val) + } +} diff --git a/component/cloudjobs/config.yaml b/component/cloudjobs/config.yaml new file mode 100644 index 00000000..1ddb45a3 --- /dev/null +++ b/component/cloudjobs/config.yaml @@ -0,0 +1,14 @@ +jobs: + # stream name: stream config + streams: + gid: + storage: "file" + subjects: ["gid.>"] + consumers: + gid: + - consumer: "test:gid" + stream: "gid" + subjects: "gid.proxy.exec" + job: + timeout: "1m" + max_retries: 10 diff --git a/component/cloudjobs/context.go b/component/cloudjobs/context.go new file mode 100644 index 00000000..23185fa0 --- /dev/null +++ b/component/cloudjobs/context.go @@ -0,0 +1,35 @@ +package cloudjobs + +import ( + "context" + "net/http" + "time" +) + +type Context struct { + context.Context + + // Header jetstream.Headers(). + Header http.Header + + // NumDelivered jetstream.MsgMetadata{}.NumDelivered + NumDelivered uint64 + + // NumPending jetstream.MsgMetadata{}.NumPending + NumPending uint64 + + // Timestamp jetstream.MsgMetadata{}.Timestamp + Timestamp time.Time + + // Stream jetstream.MsgMetadata{}.Stream + Stream string + + // Consumer jetstream.MsgMetadata{}.Consumer + Consumer string + + // Subject|Topic name jetstream.Msg().Subject() + Subject string + + // Config job config from config file or default + Config *JobConfig +} diff --git a/component/cloudjobs/errors.go b/component/cloudjobs/errors.go new file mode 100644 index 00000000..80862f5e --- /dev/null +++ b/component/cloudjobs/errors.go @@ -0,0 +1,21 @@ +package cloudjobs + +import "github.com/pubgo/funk/errors" + +var errReject = errors.New("asyncjob: reject retry and discard msg") + +func Reject(errs ...error) error { + var reason = "reject" + if len(errs) > 0 { + reason = errs[0].Error() + } + return errors.Wrap(errReject, reason) +} + +func isRejectErr(err error) bool { + if err == nil { + return false + } + + return errors.Is(err, errReject) +} diff --git a/component/cloudjobs/registry.go b/component/cloudjobs/registry.go new file mode 100644 index 00000000..84478962 --- /dev/null +++ b/component/cloudjobs/registry.go @@ -0,0 +1,10 @@ +package cloudjobs + +import "google.golang.org/protobuf/proto" + +var subjects = make(map[string]proto.Message) + +func RegisterSubject(subject string, subType proto.Message) any { + subjects[subject] = subType + return nil +} diff --git a/component/cloudjobs/util.go b/component/cloudjobs/util.go new file mode 100644 index 00000000..b635c4d3 --- /dev/null +++ b/component/cloudjobs/util.go @@ -0,0 +1,143 @@ +package cloudjobs + +import ( + "context" + "fmt" + "reflect" + "strings" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/pubgo/funk/assert" + "github.com/pubgo/funk/log" + "github.com/pubgo/funk/stack" + "github.com/pubgo/funk/try" + "github.com/pubgo/lava/internal/ctxutil" + "github.com/rs/zerolog" + "github.com/samber/lo" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" +) + +func RegisterJobHandler[T proto.Message](jobCli *Client, jobName string, topic string, handler JobHandler[T]) { + assert.Fn(reflect.TypeOf(subjects[topic]) != reflect.TypeOf(lo.Empty[T]()), func() error { + return fmt.Errorf("type not match, topic-type=%s handler-input-type=%s", reflect.TypeOf(subjects[topic]).String(), reflect.TypeOf(lo.Empty[T]()).String()) + }) + + jobCli.registerJobHandler(jobName, topic, func(ctx *Context, args proto.Message) error { return handler(ctx, args.(T)) }) +} + +// PushEventSync push event sync +func PushEventSync[T any](handler func(context.Context, T) (*emptypb.Empty, error), ctx context.Context, t T) error { + return pushEventBasic(handler, ctx, t, false) +} + +// PushEvent push event async +func PushEvent[T any](handler func(context.Context, T) (*emptypb.Empty, error), ctx context.Context, t T, errs ...chan error) { + // clone ctx and recalculate timeout + ctx, cancel := ctxutil.Clone(ctx, DefaultTimeout) + defer cancel() + + err := pushEventBasic(handler, ctx, t, true) + if err == nil || len(errs) == 0 { + return + } + + select { + case errs[0] <- err: + return + default: + log.Warn(ctx).Func(func(e *zerolog.Event) { + e.Str("fn_caller", stack.CallerWithFunc(handler).String()) + e.Msg("failed to receive error message with push event") + }) + } +} + +func pushEventBasic[T any](handler func(context.Context, T) (*emptypb.Empty, error), ctx context.Context, t T, async bool) error { + timeout := ctxutil.GetTimeout(ctx) + now := time.Now() + err := try.Try(func() error { return lo.T2(handler(ctx, t)).B }) + if err == nil { + return nil + } + + logger.Err(err, ctx).Func(func(e *zerolog.Event) { + if timeout != nil { + e.Str("timeout", timeout.String()) + } + + e.Str("fn_caller", stack.Caller(1).String()) + e.Str("deal", stack.Caller(1).String()) + e.Bool("async", async) + e.Any("input_params", t) + e.Str("stack", stack.CallerWithFunc(handler).String()) + e.Str("cost", time.Since(now).String()) + e.Msg("failed to push event msg to nats stream") + }) + return err +} + +func getStorageType(name string) jetstream.StorageType { + switch name { + case "memory": + return jetstream.MemoryStorage + case "file": + return jetstream.FileStorage + default: + panic("unknown storage type") + } +} + +func mergeJobConfig(dst *JobConfig, src *JobConfig) *JobConfig { + if src == nil { + src = handleDefaultJobConfig(nil) + } + + if dst == nil { + dst = handleDefaultJobConfig(nil) + } + + if dst.MaxRetry == nil { + dst.MaxRetry = src.MaxRetry + } + + if dst.Timeout == nil { + dst.Timeout = src.Timeout + } + + if dst.RetryBackoff == nil { + dst.RetryBackoff = src.RetryBackoff + } + + return dst +} + +func handleDefaultJobConfig(cfg *JobConfig) *JobConfig { + if cfg == nil { + cfg = new(JobConfig) + } + + if cfg.Timeout == nil { + cfg.Timeout = lo.ToPtr(DefaultTimeout) + } + + if cfg.MaxRetry == nil { + cfg.MaxRetry = lo.ToPtr(DefaultMaxRetry) + } + + if cfg.RetryBackoff == nil { + cfg.RetryBackoff = lo.ToPtr(DefaultRetryBackoff) + } + + return cfg +} + +func handleSubjectName(name string, prefix string) string { + prefix = fmt.Sprintf("%s.", prefix) + if strings.HasPrefix(name, prefix) { + return name + } + + return fmt.Sprintf("%s%s", prefix, name) +} diff --git a/component/cloudjobs/z_util_test.go b/component/cloudjobs/z_util_test.go new file mode 100644 index 00000000..92882aaa --- /dev/null +++ b/component/cloudjobs/z_util_test.go @@ -0,0 +1,20 @@ +package cloudjobs + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCalcTimeout(t *testing.T) { + var ctx = context.Background() + deadline, ok := ctx.Deadline() + assert.Equal(t, ok, false) + + cc, _ := context.WithTimeout(ctx, time.Second*5) + deadline, ok = cc.Deadline() + assert.Equal(t, ok, true) + assert.Equal(t, time.Until(deadline) > time.Second*4, true) +} diff --git a/component/natsclient/client.go b/component/natsclient/client.go new file mode 100644 index 00000000..8d2d2a0b --- /dev/null +++ b/component/natsclient/client.go @@ -0,0 +1,43 @@ +package natsclient + +import ( + "fmt" + + "github.com/nats-io/nats.go" + "github.com/pubgo/funk/assert" + "github.com/pubgo/funk/log" + "github.com/pubgo/funk/running" + "github.com/pubgo/lava/core/lifecycle" +) + +type Param struct { + Cfg *Config + Logger log.Logger + Lc lifecycle.Lifecycle +} + +type Client struct { + Param + + *nats.Conn +} + +func New(p Param) *Client { + c := &Client{Param: p} + c.Logger = c.Logger.WithName("nats") + + nc := assert.Must1(nats.Connect(c.Cfg.Url, func(o *nats.Options) error { + o.AllowReconnect = true + o.Name = fmt.Sprintf("%s/%s/%s", running.Hostname, running.Project, running.InstanceID) + return nil + })) + log.Info().Bool("status", nc.IsConnected()).Msg("nats connection ...") + + c.Lc.BeforeStop(func() { + nc.Close() + }) + + c.Conn = nc + + return c +} diff --git a/component/natsclient/config.go b/component/natsclient/config.go new file mode 100644 index 00000000..94611abf --- /dev/null +++ b/component/natsclient/config.go @@ -0,0 +1,5 @@ +package natsclient + +type Config struct { + Url string `yaml:"url"` +} diff --git a/docs/data/nats/nats.conf b/docs/data/nats/nats.conf new file mode 100644 index 00000000..7a6a9068 --- /dev/null +++ b/docs/data/nats/nats.conf @@ -0,0 +1,12 @@ +listen: 0.0.0.0:4222 +http_port: 8222 +authorization: { + user: "admin" + password: "admin_123456" +} + +jetstream { + store_dir: /data/jetstream + max_mem: 1G + max_file: 10G +} diff --git a/docs/docker-compose.yml b/docs/docker-compose.yml new file mode 100644 index 00000000..2d147f96 --- /dev/null +++ b/docs/docker-compose.yml @@ -0,0 +1,16 @@ +services: + nats: + container_name: nats + image: nats:latest + restart: unless-stopped + env_file: ${PWD}/.env + volumes: + - $PWD/data/nats/nats.conf:/etc/nats/nats.conf + - $PWD/data/nats/jetstream:/data/jetstream + command: "-c /etc/nats/nats.conf" + ports: + - 4222:4222 + - 8222:8222 + - 6222:6222 + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/go.mod b/go.mod index f8863762..0272140c 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/logdyhq/logdy-core v0.13.0 github.com/maragudk/gomponents v0.20.0 github.com/mattheath/kala v0.0.0-20171219141654-d6276794bf0e + github.com/nats-io/nats.go v1.37.0 github.com/prometheus/common v0.48.0 github.com/pubgo/dix v0.3.17 github.com/pubgo/funk v0.5.48 @@ -174,6 +175,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.11 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect diff --git a/go.sum b/go.sum index 8c145090..fdea240b 100644 --- a/go.sum +++ b/go.sum @@ -442,6 +442,12 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= diff --git a/internal/ctxutil/util.go b/internal/ctxutil/util.go new file mode 100644 index 00000000..218fb890 --- /dev/null +++ b/internal/ctxutil/util.go @@ -0,0 +1,68 @@ +package ctxutil + +import ( + "context" + "time" + + "github.com/samber/lo" +) + +const defaultTimeout = time.Second * 10 + +var _ context.Context = (*clonedCtx)(nil) + +type clonedCtx struct { + oldCtx context.Context + ctx context.Context +} + +// Deadline implements context.Context. +func (c *clonedCtx) Deadline() (deadline time.Time, ok bool) { + return c.ctx.Deadline() +} + +// Done implements context.Context. +func (c *clonedCtx) Done() <-chan struct{} { + return c.ctx.Done() +} + +// Err implements context.Context. +func (c *clonedCtx) Err() error { + return c.ctx.Err() +} + +// Value implements context.Context. +func (c *clonedCtx) Value(key any) any { + return c.oldCtx.Value(key) +} + +func Clone(ctx context.Context, timeouts ...time.Duration) (context.Context, context.CancelFunc) { + if ctx == nil { + return context.WithCancel(context.Background()) + } + + timeout := defaultTimeout + if len(timeouts) > 0 { + timeout = timeouts[0] + } + + deadline, ok := ctx.Deadline() + if ok { + timeout = time.Until(deadline) + } + + ctxNew, cancel := context.WithTimeout(context.Background(), timeout) + return &clonedCtx{oldCtx: ctx, ctx: ctxNew}, cancel +} + +func GetTimeout(ctx context.Context) *time.Duration { + if ctx == nil { + return nil + } + + deadline, ok := ctx.Deadline() + if ok { + return lo.ToPtr(time.Until(deadline)) + } + return nil +} diff --git a/internal/ctxutil/util_test.go b/internal/ctxutil/util_test.go new file mode 100644 index 00000000..405afa1c --- /dev/null +++ b/internal/ctxutil/util_test.go @@ -0,0 +1,21 @@ +package ctxutil_test + +import ( + "context" + "testing" + + "github.com/pubgo/lava/internal/ctxutil" + "github.com/stretchr/testify/assert" +) + +func TestClone(t *testing.T) { + cc, cancel := context.WithCancel(context.TODO()) + oldCtx := context.WithValue(cc, "hello", "hello") + + newCtx, _ := ctxutil.Clone(oldCtx) + cancel() + + assert.Equal(t, oldCtx.Err(), context.Canceled) + assert.Equal(t, newCtx.Value("hello"), "hello") + assert.Equal(t, newCtx.Err(), nil) +} diff --git a/internal/example/grpc/pkg/proto/gidpb/id.pb.cloud_job.go b/internal/example/grpc/pkg/proto/gidpb/id.pb.cloud_job.go new file mode 100644 index 00000000..fd97127e --- /dev/null +++ b/internal/example/grpc/pkg/proto/gidpb/id.pb.cloud_job.go @@ -0,0 +1,25 @@ +// Code generated by protoc-gen-cloud-job. DO NOT EDIT. +// versions: +// - protoc-gen-cloud-job v0.0.1 +// - protoc v5.27.1 +// source: gid/id.proto + +package gidpb + +import ( + "context" + cloudjobs "github.com/pubgo/lava/component/cloudjobs" +) + +// IdProxyExecEventKey Id/ProxyExecEvent +const IdProxyExecEventKey = "gid.proxy.exec" + +var _ = cloudjobs.RegisterSubject(IdProxyExecEventKey, new(DoProxyEventReq)) + +func RegisterIdProxyExecEventAsyncJob(jobCli *cloudjobs.Client, handler func(ctx *cloudjobs.Context, req *DoProxyEventReq) error) { + cloudjobs.RegisterJobHandler(jobCli, "gid", IdProxyExecEventKey, handler) +} + +func PushIdProxyExecEventAsyncJob(jobCli *cloudjobs.Client, ctx context.Context, req *DoProxyEventReq) error { + return jobCli.Publish(ctx, IdProxyExecEventKey, req) +} diff --git a/internal/example/grpc/pkg/proto/gidpb/id.pb.go b/internal/example/grpc/pkg/proto/gidpb/id.pb.go index a8d1c984..c7c3ebb1 100644 --- a/internal/example/grpc/pkg/proto/gidpb/id.pb.go +++ b/internal/example/grpc/pkg/proto/gidpb/id.pb.go @@ -9,6 +9,7 @@ package gidpb import ( _ "github.com/google/gnostic-models/openapiv3" _ "github.com/pubgo/funk/proto/errorpb" + _ "github.com/pubgo/lava/pkg/proto/cloudjobpb" _ "github.com/pubgo/lava/pkg/proto/lavapbv1" _ "google.golang.org/genproto/googleapis/api/annotations" httpbody "google.golang.org/genproto/googleapis/api/httpbody" @@ -184,6 +185,44 @@ func (x *GenerateResponse) GetType() string { return "" } +type DoProxyEventReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *DoProxyEventReq) Reset() { + *x = DoProxyEventReq{} + if protoimpl.UnsafeEnabled { + mi := &file_gid_id_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DoProxyEventReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DoProxyEventReq) ProtoMessage() {} + +func (x *DoProxyEventReq) ProtoReflect() protoreflect.Message { + mi := &file_gid_id_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DoProxyEventReq.ProtoReflect.Descriptor instead. +func (*DoProxyEventReq) Descriptor() ([]byte, []int) { + return file_gid_id_proto_rawDescGZIP(), []int{1} +} + type Empty struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -193,7 +232,7 @@ type Empty struct { func (x *Empty) Reset() { *x = Empty{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[1] + mi := &file_gid_id_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -206,7 +245,7 @@ func (x *Empty) String() string { func (*Empty) ProtoMessage() {} func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[1] + mi := &file_gid_id_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -219,7 +258,7 @@ func (x *Empty) ProtoReflect() protoreflect.Message { // Deprecated: Use Empty.ProtoReflect.Descriptor instead. func (*Empty) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{1} + return file_gid_id_proto_rawDescGZIP(), []int{2} } type UploadFileRequest struct { @@ -234,7 +273,7 @@ type UploadFileRequest struct { func (x *UploadFileRequest) Reset() { *x = UploadFileRequest{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[2] + mi := &file_gid_id_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -247,7 +286,7 @@ func (x *UploadFileRequest) String() string { func (*UploadFileRequest) ProtoMessage() {} func (x *UploadFileRequest) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[2] + mi := &file_gid_id_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -260,7 +299,7 @@ func (x *UploadFileRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UploadFileRequest.ProtoReflect.Descriptor instead. func (*UploadFileRequest) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{2} + return file_gid_id_proto_rawDescGZIP(), []int{3} } func (x *UploadFileRequest) GetFilename() string { @@ -290,7 +329,7 @@ type ChatMessage struct { func (x *ChatMessage) Reset() { *x = ChatMessage{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[3] + mi := &file_gid_id_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -303,7 +342,7 @@ func (x *ChatMessage) String() string { func (*ChatMessage) ProtoMessage() {} func (x *ChatMessage) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[3] + mi := &file_gid_id_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -316,7 +355,7 @@ func (x *ChatMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ChatMessage.ProtoReflect.Descriptor instead. func (*ChatMessage) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{3} + return file_gid_id_proto_rawDescGZIP(), []int{4} } func (x *ChatMessage) GetName() string { @@ -352,7 +391,7 @@ type Message struct { func (x *Message) Reset() { *x = Message{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[4] + mi := &file_gid_id_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -365,7 +404,7 @@ func (x *Message) String() string { func (*Message) ProtoMessage() {} func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[4] + mi := &file_gid_id_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -378,7 +417,7 @@ func (x *Message) ProtoReflect() protoreflect.Message { // Deprecated: Use Message.ProtoReflect.Descriptor instead. func (*Message) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{4} + return file_gid_id_proto_rawDescGZIP(), []int{5} } func (x *Message) GetName() string { @@ -408,7 +447,7 @@ type GenerateRequest struct { func (x *GenerateRequest) Reset() { *x = GenerateRequest{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[5] + mi := &file_gid_id_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -421,7 +460,7 @@ func (x *GenerateRequest) String() string { func (*GenerateRequest) ProtoMessage() {} func (x *GenerateRequest) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[5] + mi := &file_gid_id_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -434,7 +473,7 @@ func (x *GenerateRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GenerateRequest.ProtoReflect.Descriptor instead. func (*GenerateRequest) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{5} + return file_gid_id_proto_rawDescGZIP(), []int{6} } func (x *GenerateRequest) GetType() GenType { @@ -458,7 +497,7 @@ type TypesRequest struct { func (x *TypesRequest) Reset() { *x = TypesRequest{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[6] + mi := &file_gid_id_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -471,7 +510,7 @@ func (x *TypesRequest) String() string { func (*TypesRequest) ProtoMessage() {} func (x *TypesRequest) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[6] + mi := &file_gid_id_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -484,7 +523,7 @@ func (x *TypesRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TypesRequest.ProtoReflect.Descriptor instead. func (*TypesRequest) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{6} + return file_gid_id_proto_rawDescGZIP(), []int{7} } func (x *TypesRequest) GetName() string { @@ -520,7 +559,7 @@ type TypesResponse struct { func (x *TypesResponse) Reset() { *x = TypesResponse{} if protoimpl.UnsafeEnabled { - mi := &file_gid_id_proto_msgTypes[7] + mi := &file_gid_id_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -533,7 +572,7 @@ func (x *TypesResponse) String() string { func (*TypesResponse) ProtoMessage() {} func (x *TypesResponse) ProtoReflect() protoreflect.Message { - mi := &file_gid_id_proto_msgTypes[7] + mi := &file_gid_id_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -546,7 +585,7 @@ func (x *TypesResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TypesResponse.ProtoReflect.Descriptor instead. func (*TypesResponse) Descriptor() ([]byte, []int) { - return file_gid_id_proto_rawDescGZIP(), []int{7} + return file_gid_id_proto_rawDescGZIP(), []int{8} } func (x *TypesResponse) GetTypes() []string { @@ -566,14 +605,17 @@ var file_gid_id_proto_rawDesc = []byte{ 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x68, 0x74, - 0x74, 0x70, 0x62, 0x6f, 0x64, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0e, 0x6c, 0x61, - 0x76, 0x61, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x6f, 0x70, - 0x65, 0x6e, 0x61, 0x70, 0x69, 0x76, 0x33, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x36, 0x0a, 0x10, 0x47, 0x65, 0x6e, - 0x65, 0x72, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, - 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, - 0x65, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x59, 0x0a, 0x11, 0x55, 0x70, + 0x74, 0x70, 0x62, 0x6f, 0x64, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x6c, 0x61, + 0x76, 0x61, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x6a, 0x6f, 0x62, 0x2f, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0e, 0x6c, 0x61, 0x76, 0x61, 0x2f, + 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x6f, 0x70, 0x65, 0x6e, 0x61, + 0x70, 0x69, 0x76, 0x33, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x36, 0x0a, 0x10, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, + 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x11, + 0x0a, 0x0f, 0x44, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x71, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x59, 0x0a, 0x11, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x66, @@ -607,8 +649,8 @@ var file_gid_id_proto_rawDesc = []byte{ 0x07, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x73, 0x6e, 0x6f, 0x77, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x62, 0x69, 0x67, 0x66, 0x6c, 0x61, 0x6b, 0x65, 0x10, - 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x73, 0x68, 0x6f, 0x72, 0x74, 0x69, 0x64, 0x10, 0x04, 0x32, 0xda, - 0x05, 0x0a, 0x02, 0x49, 0x64, 0x12, 0x81, 0x01, 0x0a, 0x08, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, + 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x73, 0x68, 0x6f, 0x72, 0x74, 0x69, 0x64, 0x10, 0x04, 0x32, 0xa9, + 0x06, 0x0a, 0x02, 0x49, 0x64, 0x12, 0x81, 0x01, 0x0a, 0x08, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x67, 0x69, 0x64, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x67, 0x69, 0x64, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, @@ -652,10 +694,15 @@ var file_gid_id_proto_rawDesc = []byte{ 0x12, 0x36, 0x0a, 0x07, 0x44, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x0a, 0x2e, 0x67, 0x69, 0x64, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0a, 0x2e, 0x67, 0x69, 0x64, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x13, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x70, 0x72, - 0x6f, 0x78, 0x79, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x1a, 0x11, 0xca, 0x41, 0x0e, 0x6c, 0x6f, 0x63, - 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x38, 0x30, 0x38, 0x30, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, - 0x2f, 0x67, 0x69, 0x64, 0x70, 0x62, 0x3b, 0x67, 0x69, 0x64, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x78, 0x79, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x12, 0x46, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x78, + 0x79, 0x45, 0x78, 0x65, 0x63, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x14, 0x2e, 0x67, 0x69, 0x64, + 0x2e, 0x44, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, + 0x1a, 0x0a, 0x2e, 0x67, 0x69, 0x64, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x12, 0xda, 0xf1, + 0x04, 0x0e, 0x67, 0x69, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x65, 0x78, 0x65, 0x63, + 0x1a, 0x18, 0xca, 0x41, 0x0e, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x38, + 0x30, 0x38, 0x30, 0xd2, 0xf1, 0x04, 0x03, 0x67, 0x69, 0x64, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, + 0x67, 0x69, 0x64, 0x70, 0x62, 0x3b, 0x67, 0x69, 0x64, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -671,42 +718,45 @@ func file_gid_id_proto_rawDescGZIP() []byte { } var file_gid_id_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_gid_id_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_gid_id_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_gid_id_proto_goTypes = []any{ (SrvCode)(0), // 0: gid.SrvCode (GenType)(0), // 1: gid.GenType (*GenerateResponse)(nil), // 2: gid.GenerateResponse - (*Empty)(nil), // 3: gid.Empty - (*UploadFileRequest)(nil), // 4: gid.UploadFileRequest - (*ChatMessage)(nil), // 5: gid.ChatMessage - (*Message)(nil), // 6: gid.Message - (*GenerateRequest)(nil), // 7: gid.GenerateRequest - (*TypesRequest)(nil), // 8: gid.TypesRequest - (*TypesResponse)(nil), // 9: gid.TypesResponse - (*httpbody.HttpBody)(nil), // 10: google.api.HttpBody + (*DoProxyEventReq)(nil), // 3: gid.DoProxyEventReq + (*Empty)(nil), // 4: gid.Empty + (*UploadFileRequest)(nil), // 5: gid.UploadFileRequest + (*ChatMessage)(nil), // 6: gid.ChatMessage + (*Message)(nil), // 7: gid.Message + (*GenerateRequest)(nil), // 8: gid.GenerateRequest + (*TypesRequest)(nil), // 9: gid.TypesRequest + (*TypesResponse)(nil), // 10: gid.TypesResponse + (*httpbody.HttpBody)(nil), // 11: google.api.HttpBody } var file_gid_id_proto_depIdxs = []int32{ - 10, // 0: gid.UploadFileRequest.file:type_name -> google.api.HttpBody - 6, // 1: gid.ChatMessage.msg:type_name -> gid.Message + 11, // 0: gid.UploadFileRequest.file:type_name -> google.api.HttpBody + 7, // 1: gid.ChatMessage.msg:type_name -> gid.Message 1, // 2: gid.GenerateRequest.type:type_name -> gid.GenType - 7, // 3: gid.Id.Generate:input_type -> gid.GenerateRequest - 8, // 4: gid.Id.TypeStream:input_type -> gid.TypesRequest - 8, // 5: gid.Id.Types:input_type -> gid.TypesRequest - 8, // 6: gid.Id.PutTypes:input_type -> gid.TypesRequest - 5, // 7: gid.Id.Chat:input_type -> gid.ChatMessage - 5, // 8: gid.Id.Chat1:input_type -> gid.ChatMessage - 4, // 9: gid.Id.UploadDownload:input_type -> gid.UploadFileRequest - 3, // 10: gid.Id.DoProxy:input_type -> gid.Empty - 2, // 11: gid.Id.Generate:output_type -> gid.GenerateResponse - 9, // 12: gid.Id.TypeStream:output_type -> gid.TypesResponse - 9, // 13: gid.Id.Types:output_type -> gid.TypesResponse - 9, // 14: gid.Id.PutTypes:output_type -> gid.TypesResponse - 5, // 15: gid.Id.Chat:output_type -> gid.ChatMessage - 5, // 16: gid.Id.Chat1:output_type -> gid.ChatMessage - 10, // 17: gid.Id.UploadDownload:output_type -> google.api.HttpBody - 3, // 18: gid.Id.DoProxy:output_type -> gid.Empty - 11, // [11:19] is the sub-list for method output_type - 3, // [3:11] is the sub-list for method input_type + 8, // 3: gid.Id.Generate:input_type -> gid.GenerateRequest + 9, // 4: gid.Id.TypeStream:input_type -> gid.TypesRequest + 9, // 5: gid.Id.Types:input_type -> gid.TypesRequest + 9, // 6: gid.Id.PutTypes:input_type -> gid.TypesRequest + 6, // 7: gid.Id.Chat:input_type -> gid.ChatMessage + 6, // 8: gid.Id.Chat1:input_type -> gid.ChatMessage + 5, // 9: gid.Id.UploadDownload:input_type -> gid.UploadFileRequest + 4, // 10: gid.Id.DoProxy:input_type -> gid.Empty + 3, // 11: gid.Id.ProxyExecEvent:input_type -> gid.DoProxyEventReq + 2, // 12: gid.Id.Generate:output_type -> gid.GenerateResponse + 10, // 13: gid.Id.TypeStream:output_type -> gid.TypesResponse + 10, // 14: gid.Id.Types:output_type -> gid.TypesResponse + 10, // 15: gid.Id.PutTypes:output_type -> gid.TypesResponse + 6, // 16: gid.Id.Chat:output_type -> gid.ChatMessage + 6, // 17: gid.Id.Chat1:output_type -> gid.ChatMessage + 11, // 18: gid.Id.UploadDownload:output_type -> google.api.HttpBody + 4, // 19: gid.Id.DoProxy:output_type -> gid.Empty + 4, // 20: gid.Id.ProxyExecEvent:output_type -> gid.Empty + 12, // [12:21] is the sub-list for method output_type + 3, // [3:12] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name @@ -731,7 +781,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*Empty); i { + switch v := v.(*DoProxyEventReq); i { case 0: return &v.state case 1: @@ -743,7 +793,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*UploadFileRequest); i { + switch v := v.(*Empty); i { case 0: return &v.state case 1: @@ -755,7 +805,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*ChatMessage); i { + switch v := v.(*UploadFileRequest); i { case 0: return &v.state case 1: @@ -767,7 +817,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*Message); i { + switch v := v.(*ChatMessage); i { case 0: return &v.state case 1: @@ -779,7 +829,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*GenerateRequest); i { + switch v := v.(*Message); i { case 0: return &v.state case 1: @@ -791,7 +841,7 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*TypesRequest); i { + switch v := v.(*GenerateRequest); i { case 0: return &v.state case 1: @@ -803,6 +853,18 @@ func file_gid_id_proto_init() { } } file_gid_id_proto_msgTypes[7].Exporter = func(v any, i int) any { + switch v := v.(*TypesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_gid_id_proto_msgTypes[8].Exporter = func(v any, i int) any { switch v := v.(*TypesResponse); i { case 0: return &v.state @@ -821,7 +883,7 @@ func file_gid_id_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_gid_id_proto_rawDesc, NumEnums: 2, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/example/grpc/pkg/proto/gidpb/id_grpc.pb.go b/internal/example/grpc/pkg/proto/gidpb/id_grpc.pb.go index ebcc361a..1213b3cc 100644 --- a/internal/example/grpc/pkg/proto/gidpb/id_grpc.pb.go +++ b/internal/example/grpc/pkg/proto/gidpb/id_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v5.27.1 // source: gid/id.proto @@ -16,8 +16,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Id_Generate_FullMethodName = "/gid.Id/Generate" @@ -28,25 +28,29 @@ const ( Id_Chat1_FullMethodName = "/gid.Id/Chat1" Id_UploadDownload_FullMethodName = "/gid.Id/UploadDownload" Id_DoProxy_FullMethodName = "/gid.Id/DoProxy" + Id_ProxyExecEvent_FullMethodName = "/gid.Id/ProxyExecEvent" ) // IdClient is the client API for Id service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// Id 生成随机ID服务 type IdClient interface { // Generate 生成ID Generate(ctx context.Context, in *GenerateRequest, opts ...grpc.CallOption) (*GenerateResponse, error) // 返回流 - TypeStream(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (Id_TypeStreamClient, error) + TypeStream(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TypesResponse], error) // Types id类型 Types(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (*TypesResponse, error) PutTypes(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (*TypesResponse, error) // 聊天 - Chat(ctx context.Context, opts ...grpc.CallOption) (Id_ChatClient, error) + Chat(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatMessage, ChatMessage], error) // ws: chat1 - Chat1(ctx context.Context, opts ...grpc.CallOption) (Id_Chat1Client, error) + Chat1(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatMessage, ChatMessage], error) UploadDownload(ctx context.Context, in *UploadFileRequest, opts ...grpc.CallOption) (*httpbody.HttpBody, error) DoProxy(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) + ProxyExecEvent(ctx context.Context, in *DoProxyEventReq, opts ...grpc.CallOption) (*Empty, error) } type idClient struct { @@ -58,20 +62,22 @@ func NewIdClient(cc grpc.ClientConnInterface) IdClient { } func (c *idClient) Generate(ctx context.Context, in *GenerateRequest, opts ...grpc.CallOption) (*GenerateResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GenerateResponse) - err := c.cc.Invoke(ctx, Id_Generate_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Id_Generate_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } return out, nil } -func (c *idClient) TypeStream(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (Id_TypeStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[0], Id_TypeStream_FullMethodName, opts...) +func (c *idClient) TypeStream(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TypesResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[0], Id_TypeStream_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &idTypeStreamClient{stream} + x := &grpc.GenericClientStream[TypesRequest, TypesResponse]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -81,26 +87,13 @@ func (c *idClient) TypeStream(ctx context.Context, in *TypesRequest, opts ...grp return x, nil } -type Id_TypeStreamClient interface { - Recv() (*TypesResponse, error) - grpc.ClientStream -} - -type idTypeStreamClient struct { - grpc.ClientStream -} - -func (x *idTypeStreamClient) Recv() (*TypesResponse, error) { - m := new(TypesResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_TypeStreamClient = grpc.ServerStreamingClient[TypesResponse] func (c *idClient) Types(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (*TypesResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(TypesResponse) - err := c.cc.Invoke(ctx, Id_Types_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Id_Types_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -108,88 +101,65 @@ func (c *idClient) Types(ctx context.Context, in *TypesRequest, opts ...grpc.Cal } func (c *idClient) PutTypes(ctx context.Context, in *TypesRequest, opts ...grpc.CallOption) (*TypesResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(TypesResponse) - err := c.cc.Invoke(ctx, Id_PutTypes_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Id_PutTypes_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } return out, nil } -func (c *idClient) Chat(ctx context.Context, opts ...grpc.CallOption) (Id_ChatClient, error) { - stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[1], Id_Chat_FullMethodName, opts...) +func (c *idClient) Chat(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatMessage, ChatMessage], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[1], Id_Chat_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &idChatClient{stream} + x := &grpc.GenericClientStream[ChatMessage, ChatMessage]{ClientStream: stream} return x, nil } -type Id_ChatClient interface { - Send(*ChatMessage) error - Recv() (*ChatMessage, error) - grpc.ClientStream -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_ChatClient = grpc.BidiStreamingClient[ChatMessage, ChatMessage] -type idChatClient struct { - grpc.ClientStream -} - -func (x *idChatClient) Send(m *ChatMessage) error { - return x.ClientStream.SendMsg(m) -} - -func (x *idChatClient) Recv() (*ChatMessage, error) { - m := new(ChatMessage) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *idClient) Chat1(ctx context.Context, opts ...grpc.CallOption) (Id_Chat1Client, error) { - stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[2], Id_Chat1_FullMethodName, opts...) +func (c *idClient) Chat1(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatMessage, ChatMessage], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Id_ServiceDesc.Streams[2], Id_Chat1_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &idChat1Client{stream} + x := &grpc.GenericClientStream[ChatMessage, ChatMessage]{ClientStream: stream} return x, nil } -type Id_Chat1Client interface { - Send(*ChatMessage) error - Recv() (*ChatMessage, error) - grpc.ClientStream -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_Chat1Client = grpc.BidiStreamingClient[ChatMessage, ChatMessage] -type idChat1Client struct { - grpc.ClientStream -} - -func (x *idChat1Client) Send(m *ChatMessage) error { - return x.ClientStream.SendMsg(m) -} - -func (x *idChat1Client) Recv() (*ChatMessage, error) { - m := new(ChatMessage) - if err := x.ClientStream.RecvMsg(m); err != nil { +func (c *idClient) UploadDownload(ctx context.Context, in *UploadFileRequest, opts ...grpc.CallOption) (*httpbody.HttpBody, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(httpbody.HttpBody) + err := c.cc.Invoke(ctx, Id_UploadDownload_FullMethodName, in, out, cOpts...) + if err != nil { return nil, err } - return m, nil + return out, nil } -func (c *idClient) UploadDownload(ctx context.Context, in *UploadFileRequest, opts ...grpc.CallOption) (*httpbody.HttpBody, error) { - out := new(httpbody.HttpBody) - err := c.cc.Invoke(ctx, Id_UploadDownload_FullMethodName, in, out, opts...) +func (c *idClient) DoProxy(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(Empty) + err := c.cc.Invoke(ctx, Id_DoProxy_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } return out, nil } -func (c *idClient) DoProxy(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { +func (c *idClient) ProxyExecEvent(ctx context.Context, in *DoProxyEventReq, opts ...grpc.CallOption) (*Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(Empty) - err := c.cc.Invoke(ctx, Id_DoProxy_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Id_ProxyExecEvent_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -198,31 +168,37 @@ func (c *idClient) DoProxy(ctx context.Context, in *Empty, opts ...grpc.CallOpti // IdServer is the server API for Id service. // All implementations should embed UnimplementedIdServer -// for forward compatibility +// for forward compatibility. +// +// Id 生成随机ID服务 type IdServer interface { // Generate 生成ID Generate(context.Context, *GenerateRequest) (*GenerateResponse, error) // 返回流 - TypeStream(*TypesRequest, Id_TypeStreamServer) error + TypeStream(*TypesRequest, grpc.ServerStreamingServer[TypesResponse]) error // Types id类型 Types(context.Context, *TypesRequest) (*TypesResponse, error) PutTypes(context.Context, *TypesRequest) (*TypesResponse, error) // 聊天 - Chat(Id_ChatServer) error + Chat(grpc.BidiStreamingServer[ChatMessage, ChatMessage]) error // ws: chat1 - Chat1(Id_Chat1Server) error + Chat1(grpc.BidiStreamingServer[ChatMessage, ChatMessage]) error UploadDownload(context.Context, *UploadFileRequest) (*httpbody.HttpBody, error) DoProxy(context.Context, *Empty) (*Empty, error) + ProxyExecEvent(context.Context, *DoProxyEventReq) (*Empty, error) } -// UnimplementedIdServer should be embedded to have forward compatible implementations. -type UnimplementedIdServer struct { -} +// UnimplementedIdServer should be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedIdServer struct{} func (UnimplementedIdServer) Generate(context.Context, *GenerateRequest) (*GenerateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Generate not implemented") } -func (UnimplementedIdServer) TypeStream(*TypesRequest, Id_TypeStreamServer) error { +func (UnimplementedIdServer) TypeStream(*TypesRequest, grpc.ServerStreamingServer[TypesResponse]) error { return status.Errorf(codes.Unimplemented, "method TypeStream not implemented") } func (UnimplementedIdServer) Types(context.Context, *TypesRequest) (*TypesResponse, error) { @@ -231,10 +207,10 @@ func (UnimplementedIdServer) Types(context.Context, *TypesRequest) (*TypesRespon func (UnimplementedIdServer) PutTypes(context.Context, *TypesRequest) (*TypesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PutTypes not implemented") } -func (UnimplementedIdServer) Chat(Id_ChatServer) error { +func (UnimplementedIdServer) Chat(grpc.BidiStreamingServer[ChatMessage, ChatMessage]) error { return status.Errorf(codes.Unimplemented, "method Chat not implemented") } -func (UnimplementedIdServer) Chat1(Id_Chat1Server) error { +func (UnimplementedIdServer) Chat1(grpc.BidiStreamingServer[ChatMessage, ChatMessage]) error { return status.Errorf(codes.Unimplemented, "method Chat1 not implemented") } func (UnimplementedIdServer) UploadDownload(context.Context, *UploadFileRequest) (*httpbody.HttpBody, error) { @@ -243,6 +219,10 @@ func (UnimplementedIdServer) UploadDownload(context.Context, *UploadFileRequest) func (UnimplementedIdServer) DoProxy(context.Context, *Empty) (*Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method DoProxy not implemented") } +func (UnimplementedIdServer) ProxyExecEvent(context.Context, *DoProxyEventReq) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method ProxyExecEvent not implemented") +} +func (UnimplementedIdServer) testEmbeddedByValue() {} // UnsafeIdServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to IdServer will @@ -252,6 +232,13 @@ type UnsafeIdServer interface { } func RegisterIdServer(s grpc.ServiceRegistrar, srv IdServer) { + // If the following call pancis, it indicates UnimplementedIdServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Id_ServiceDesc, srv) } @@ -278,21 +265,11 @@ func _Id_TypeStream_Handler(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(m); err != nil { return err } - return srv.(IdServer).TypeStream(m, &idTypeStreamServer{stream}) -} - -type Id_TypeStreamServer interface { - Send(*TypesResponse) error - grpc.ServerStream + return srv.(IdServer).TypeStream(m, &grpc.GenericServerStream[TypesRequest, TypesResponse]{ServerStream: stream}) } -type idTypeStreamServer struct { - grpc.ServerStream -} - -func (x *idTypeStreamServer) Send(m *TypesResponse) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_TypeStreamServer = grpc.ServerStreamingServer[TypesResponse] func _Id_Types_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(TypesRequest) @@ -331,56 +308,18 @@ func _Id_PutTypes_Handler(srv interface{}, ctx context.Context, dec func(interfa } func _Id_Chat_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(IdServer).Chat(&idChatServer{stream}) -} - -type Id_ChatServer interface { - Send(*ChatMessage) error - Recv() (*ChatMessage, error) - grpc.ServerStream + return srv.(IdServer).Chat(&grpc.GenericServerStream[ChatMessage, ChatMessage]{ServerStream: stream}) } -type idChatServer struct { - grpc.ServerStream -} - -func (x *idChatServer) Send(m *ChatMessage) error { - return x.ServerStream.SendMsg(m) -} - -func (x *idChatServer) Recv() (*ChatMessage, error) { - m := new(ChatMessage) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_ChatServer = grpc.BidiStreamingServer[ChatMessage, ChatMessage] func _Id_Chat1_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(IdServer).Chat1(&idChat1Server{stream}) + return srv.(IdServer).Chat1(&grpc.GenericServerStream[ChatMessage, ChatMessage]{ServerStream: stream}) } -type Id_Chat1Server interface { - Send(*ChatMessage) error - Recv() (*ChatMessage, error) - grpc.ServerStream -} - -type idChat1Server struct { - grpc.ServerStream -} - -func (x *idChat1Server) Send(m *ChatMessage) error { - return x.ServerStream.SendMsg(m) -} - -func (x *idChat1Server) Recv() (*ChatMessage, error) { - m := new(ChatMessage) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Id_Chat1Server = grpc.BidiStreamingServer[ChatMessage, ChatMessage] func _Id_UploadDownload_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UploadFileRequest) @@ -418,6 +357,24 @@ func _Id_DoProxy_Handler(srv interface{}, ctx context.Context, dec func(interfac return interceptor(ctx, in, info, handler) } +func _Id_ProxyExecEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DoProxyEventReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IdServer).ProxyExecEvent(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Id_ProxyExecEvent_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IdServer).ProxyExecEvent(ctx, req.(*DoProxyEventReq)) + } + return interceptor(ctx, in, info, handler) +} + // Id_ServiceDesc is the grpc.ServiceDesc for Id service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -445,6 +402,10 @@ var Id_ServiceDesc = grpc.ServiceDesc{ MethodName: "DoProxy", Handler: _Id_DoProxy_Handler, }, + { + MethodName: "ProxyExecEvent", + Handler: _Id_ProxyExecEvent_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/internal/example/grpc/pkg/proto/gidpb/proxy_grpc.pb.go b/internal/example/grpc/pkg/proto/gidpb/proxy_grpc.pb.go index a52d15bc..4792e523 100644 --- a/internal/example/grpc/pkg/proto/gidpb/proxy_grpc.pb.go +++ b/internal/example/grpc/pkg/proto/gidpb/proxy_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v5.27.1 // source: gid/proxy.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( IdProxy_Echo_FullMethodName = "/gid.IdProxy/Echo" @@ -38,8 +38,9 @@ func NewIdProxyClient(cc grpc.ClientConnInterface) IdProxyClient { } func (c *idProxyClient) Echo(ctx context.Context, in *EchoReq, opts ...grpc.CallOption) (*EchoRsp, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(EchoRsp) - err := c.cc.Invoke(ctx, IdProxy_Echo_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, IdProxy_Echo_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -48,18 +49,22 @@ func (c *idProxyClient) Echo(ctx context.Context, in *EchoReq, opts ...grpc.Call // IdProxyServer is the server API for IdProxy service. // All implementations should embed UnimplementedIdProxyServer -// for forward compatibility +// for forward compatibility. type IdProxyServer interface { Echo(context.Context, *EchoReq) (*EchoRsp, error) } -// UnimplementedIdProxyServer should be embedded to have forward compatible implementations. -type UnimplementedIdProxyServer struct { -} +// UnimplementedIdProxyServer should be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedIdProxyServer struct{} func (UnimplementedIdProxyServer) Echo(context.Context, *EchoReq) (*EchoRsp, error) { return nil, status.Errorf(codes.Unimplemented, "method Echo not implemented") } +func (UnimplementedIdProxyServer) testEmbeddedByValue() {} // UnsafeIdProxyServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to IdProxyServer will @@ -69,6 +74,13 @@ type UnsafeIdProxyServer interface { } func RegisterIdProxyServer(s grpc.ServiceRegistrar, srv IdProxyServer) { + // If the following call pancis, it indicates UnimplementedIdProxyServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&IdProxy_ServiceDesc, srv) } diff --git a/internal/example/grpc/proto/gid/id.proto b/internal/example/grpc/proto/gid/id.proto index 53b3d9fe..624dffe7 100644 --- a/internal/example/grpc/proto/gid/id.proto +++ b/internal/example/grpc/proto/gid/id.proto @@ -6,6 +6,7 @@ import "errorpb/options.proto"; import "google/api/annotations.proto"; import "google/api/client.proto"; import "google/api/httpbody.proto"; +import "lava/cloudjob/options.proto"; import "lava/rpc.proto"; import "openapiv3/annotations.proto"; @@ -21,6 +22,7 @@ message GenerateResponse { // Id 生成随机ID服务 service Id { option (google.api.default_host) = "localhost:8080"; + option (lava.cloudjob.job_name)="gid"; // Generate 生成ID rpc Generate (GenerateRequest) returns (GenerateResponse) { @@ -76,6 +78,14 @@ service Id { get: "/proxy/test" }; } + + rpc ProxyExecEvent (DoProxyEventReq) returns (Empty) { + option (lava.cloudjob.subject_name) = "gid.proxy.exec"; + } +} + +message DoProxyEventReq { + } message Empty { diff --git a/internal/example/grpc/protobuf.yaml b/internal/example/grpc/protobuf.yaml index 8c729ffe..5df5c288 100644 --- a/internal/example/grpc/protobuf.yaml +++ b/internal/example/grpc/protobuf.yaml @@ -1,4 +1,4 @@ -checksum: 1aee5ba9edd17a44e0dc80045c51f3b1dbbc99a6 +checksum: 772965786b68015a1d29bfc89ebd5cdcf8a30a19 vendor: ../../../.proto root: - proto @@ -9,7 +9,7 @@ deps: - name: google url: github.com/googleapis/googleapis path: /google - version: v0.0.0-20220601021915-4e0282f92e4e + version: v0.0.0-20220224004616-3c171936039b - name: protoc-gen-openapiv2 url: github.com/grpc-ecosystem/grpc-gateway/v2 path: /protoc-gen-openapiv2 @@ -32,6 +32,10 @@ deps: path: /proto/errorpb version: v0.5.48 plugins: + - name: cloud-job + out: pkg/proto + opt: + - paths=import - name: go out: pkg/proto opt: diff --git a/pkg/proto/asyncjobpb/options.pb.go b/pkg/proto/asyncjobpb/options.pb.go new file mode 100644 index 00000000..13a4360d --- /dev/null +++ b/pkg/proto/asyncjobpb/options.pb.go @@ -0,0 +1,116 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.1 +// source: proto/lava/asyncjob/options.proto + +package asyncjobpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + descriptorpb "google.golang.org/protobuf/types/descriptorpb" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var file_proto_lava_asyncjob_options_proto_extTypes = []protoimpl.ExtensionInfo{ + { + ExtendedType: (*descriptorpb.ServiceOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10010, + Name: "lava.asyncjob.job_name", + Tag: "bytes,10010,opt,name=job_name", + Filename: "proto/lava/asyncjob/options.proto", + }, + { + ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10011, + Name: "lava.asyncjob.subject_name", + Tag: "bytes,10011,opt,name=subject_name", + Filename: "proto/lava/asyncjob/options.proto", + }, +} + +// Extension fields to descriptorpb.ServiceOptions. +var ( + // job name, same with config jobs consumers job name + // + // optional string job_name = 10010; + E_JobName = &file_proto_lava_asyncjob_options_proto_extTypes[0] +) + +// Extension fields to descriptorpb.MethodOptions. +var ( + // optional string subject_name = 10011; + E_SubjectName = &file_proto_lava_asyncjob_options_proto_extTypes[1] +) + +var File_proto_lava_asyncjob_options_proto protoreflect.FileDescriptor + +var file_proto_lava_asyncjob_options_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x61, 0x76, 0x61, 0x2f, 0x61, 0x73, 0x79, + 0x6e, 0x63, 0x6a, 0x6f, 0x62, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x6c, 0x61, 0x76, 0x61, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x63, 0x6a, + 0x6f, 0x62, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x3a, 0x3b, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x9a, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, + 0x65, 0x3a, 0x42, 0x0a, 0x0c, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x12, 0x1e, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x9b, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, + 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x75, 0x62, 0x67, 0x6f, 0x2f, 0x6c, 0x61, 0x76, 0x61, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x63, 0x6a, 0x6f, + 0x62, 0x70, 0x62, 0x3b, 0x61, 0x73, 0x79, 0x6e, 0x63, 0x6a, 0x6f, 0x62, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_proto_lava_asyncjob_options_proto_goTypes = []any{ + (*descriptorpb.ServiceOptions)(nil), // 0: google.protobuf.ServiceOptions + (*descriptorpb.MethodOptions)(nil), // 1: google.protobuf.MethodOptions +} +var file_proto_lava_asyncjob_options_proto_depIdxs = []int32{ + 0, // 0: lava.asyncjob.job_name:extendee -> google.protobuf.ServiceOptions + 1, // 1: lava.asyncjob.subject_name:extendee -> google.protobuf.MethodOptions + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 0, // [0:2] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_lava_asyncjob_options_proto_init() } +func file_proto_lava_asyncjob_options_proto_init() { + if File_proto_lava_asyncjob_options_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_lava_asyncjob_options_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 2, + NumServices: 0, + }, + GoTypes: file_proto_lava_asyncjob_options_proto_goTypes, + DependencyIndexes: file_proto_lava_asyncjob_options_proto_depIdxs, + ExtensionInfos: file_proto_lava_asyncjob_options_proto_extTypes, + }.Build() + File_proto_lava_asyncjob_options_proto = out.File + file_proto_lava_asyncjob_options_proto_rawDesc = nil + file_proto_lava_asyncjob_options_proto_goTypes = nil + file_proto_lava_asyncjob_options_proto_depIdxs = nil +} diff --git a/pkg/proto/cloudjobpb/options.pb.go b/pkg/proto/cloudjobpb/options.pb.go new file mode 100644 index 00000000..4e216ae1 --- /dev/null +++ b/pkg/proto/cloudjobpb/options.pb.go @@ -0,0 +1,116 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.1 +// source: proto/lava/cloudjob/options.proto + +package cloudjobpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + descriptorpb "google.golang.org/protobuf/types/descriptorpb" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var file_proto_lava_cloudjob_options_proto_extTypes = []protoimpl.ExtensionInfo{ + { + ExtendedType: (*descriptorpb.ServiceOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10010, + Name: "lava.cloudjob.job_name", + Tag: "bytes,10010,opt,name=job_name", + Filename: "proto/lava/cloudjob/options.proto", + }, + { + ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtensionType: (*string)(nil), + Field: 10011, + Name: "lava.cloudjob.subject_name", + Tag: "bytes,10011,opt,name=subject_name", + Filename: "proto/lava/cloudjob/options.proto", + }, +} + +// Extension fields to descriptorpb.ServiceOptions. +var ( + // job name, same with config jobs consumers job name + // + // optional string job_name = 10010; + E_JobName = &file_proto_lava_cloudjob_options_proto_extTypes[0] +) + +// Extension fields to descriptorpb.MethodOptions. +var ( + // optional string subject_name = 10011; + E_SubjectName = &file_proto_lava_cloudjob_options_proto_extTypes[1] +) + +var File_proto_lava_cloudjob_options_proto protoreflect.FileDescriptor + +var file_proto_lava_cloudjob_options_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x61, 0x76, 0x61, 0x2f, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x6a, 0x6f, 0x62, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x6c, 0x61, 0x76, 0x61, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x6a, + 0x6f, 0x62, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x3a, 0x3b, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x9a, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, + 0x65, 0x3a, 0x42, 0x0a, 0x0c, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x12, 0x1e, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x9b, 0x4e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, + 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x75, 0x62, 0x67, 0x6f, 0x2f, 0x6c, 0x61, 0x76, 0x61, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x6a, 0x6f, + 0x62, 0x70, 0x62, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x6a, 0x6f, 0x62, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_proto_lava_cloudjob_options_proto_goTypes = []any{ + (*descriptorpb.ServiceOptions)(nil), // 0: google.protobuf.ServiceOptions + (*descriptorpb.MethodOptions)(nil), // 1: google.protobuf.MethodOptions +} +var file_proto_lava_cloudjob_options_proto_depIdxs = []int32{ + 0, // 0: lava.cloudjob.job_name:extendee -> google.protobuf.ServiceOptions + 1, // 1: lava.cloudjob.subject_name:extendee -> google.protobuf.MethodOptions + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 0, // [0:2] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_lava_cloudjob_options_proto_init() } +func file_proto_lava_cloudjob_options_proto_init() { + if File_proto_lava_cloudjob_options_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_lava_cloudjob_options_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 2, + NumServices: 0, + }, + GoTypes: file_proto_lava_cloudjob_options_proto_goTypes, + DependencyIndexes: file_proto_lava_cloudjob_options_proto_depIdxs, + ExtensionInfos: file_proto_lava_cloudjob_options_proto_extTypes, + }.Build() + File_proto_lava_cloudjob_options_proto = out.File + file_proto_lava_cloudjob_options_proto_rawDesc = nil + file_proto_lava_cloudjob_options_proto_goTypes = nil + file_proto_lava_cloudjob_options_proto_depIdxs = nil +} diff --git a/pkg/proto/errcodepb/api_grpc.pb.go b/pkg/proto/errcodepb/api_grpc.pb.go index 3d9adbb6..c4181c0f 100644 --- a/pkg/proto/errcodepb/api_grpc.pb.go +++ b/pkg/proto/errcodepb/api_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v5.27.1 // source: proto/lava/services/errcode/api.proto @@ -16,8 +16,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( ErrorService_Codes_FullMethodName = "/lava.service.ErrorService/Codes" @@ -39,8 +39,9 @@ func NewErrorServiceClient(cc grpc.ClientConnInterface) ErrorServiceClient { } func (c *errorServiceClient) Codes(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ErrCodes, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(ErrCodes) - err := c.cc.Invoke(ctx, ErrorService_Codes_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, ErrorService_Codes_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -49,18 +50,22 @@ func (c *errorServiceClient) Codes(ctx context.Context, in *emptypb.Empty, opts // ErrorServiceServer is the server API for ErrorService service. // All implementations should embed UnimplementedErrorServiceServer -// for forward compatibility +// for forward compatibility. type ErrorServiceServer interface { Codes(context.Context, *emptypb.Empty) (*ErrCodes, error) } -// UnimplementedErrorServiceServer should be embedded to have forward compatible implementations. -type UnimplementedErrorServiceServer struct { -} +// UnimplementedErrorServiceServer should be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedErrorServiceServer struct{} func (UnimplementedErrorServiceServer) Codes(context.Context, *emptypb.Empty) (*ErrCodes, error) { return nil, status.Errorf(codes.Unimplemented, "method Codes not implemented") } +func (UnimplementedErrorServiceServer) testEmbeddedByValue() {} // UnsafeErrorServiceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to ErrorServiceServer will @@ -70,6 +75,13 @@ type UnsafeErrorServiceServer interface { } func RegisterErrorServiceServer(s grpc.ServiceRegistrar, srv ErrorServiceServer) { + // If the following call pancis, it indicates UnimplementedErrorServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&ErrorService_ServiceDesc, srv) } diff --git a/pkg/proto/metadatapb/metadata_grpc.pb.go b/pkg/proto/metadatapb/metadata_grpc.pb.go index 70975e6a..cc987f07 100644 --- a/pkg/proto/metadatapb/metadata_grpc.pb.go +++ b/pkg/proto/metadatapb/metadata_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v5.27.1 // source: proto/lava/services/metadata/metadata.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Metadata_ListServices_FullMethodName = "/lava.service.Metadata/ListServices" @@ -26,6 +26,8 @@ const ( // MetadataClient is the client API for Metadata service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// Metadata is api definition metadata service. type MetadataClient interface { // ListServices list the full name of all services. ListServices(ctx context.Context, in *ListServicesRequest, opts ...grpc.CallOption) (*ListServicesReply, error) @@ -42,8 +44,9 @@ func NewMetadataClient(cc grpc.ClientConnInterface) MetadataClient { } func (c *metadataClient) ListServices(ctx context.Context, in *ListServicesRequest, opts ...grpc.CallOption) (*ListServicesReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(ListServicesReply) - err := c.cc.Invoke(ctx, Metadata_ListServices_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Metadata_ListServices_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -51,8 +54,9 @@ func (c *metadataClient) ListServices(ctx context.Context, in *ListServicesReque } func (c *metadataClient) GetServiceDesc(ctx context.Context, in *GetServiceDescRequest, opts ...grpc.CallOption) (*GetServiceDescReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetServiceDescReply) - err := c.cc.Invoke(ctx, Metadata_GetServiceDesc_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Metadata_GetServiceDesc_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -61,7 +65,9 @@ func (c *metadataClient) GetServiceDesc(ctx context.Context, in *GetServiceDescR // MetadataServer is the server API for Metadata service. // All implementations should embed UnimplementedMetadataServer -// for forward compatibility +// for forward compatibility. +// +// Metadata is api definition metadata service. type MetadataServer interface { // ListServices list the full name of all services. ListServices(context.Context, *ListServicesRequest) (*ListServicesReply, error) @@ -69,9 +75,12 @@ type MetadataServer interface { GetServiceDesc(context.Context, *GetServiceDescRequest) (*GetServiceDescReply, error) } -// UnimplementedMetadataServer should be embedded to have forward compatible implementations. -type UnimplementedMetadataServer struct { -} +// UnimplementedMetadataServer should be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedMetadataServer struct{} func (UnimplementedMetadataServer) ListServices(context.Context, *ListServicesRequest) (*ListServicesReply, error) { return nil, status.Errorf(codes.Unimplemented, "method ListServices not implemented") @@ -79,6 +88,7 @@ func (UnimplementedMetadataServer) ListServices(context.Context, *ListServicesRe func (UnimplementedMetadataServer) GetServiceDesc(context.Context, *GetServiceDescRequest) (*GetServiceDescReply, error) { return nil, status.Errorf(codes.Unimplemented, "method GetServiceDesc not implemented") } +func (UnimplementedMetadataServer) testEmbeddedByValue() {} // UnsafeMetadataServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to MetadataServer will @@ -88,6 +98,13 @@ type UnsafeMetadataServer interface { } func RegisterMetadataServer(s grpc.ServiceRegistrar, srv MetadataServer) { + // If the following call pancis, it indicates UnimplementedMetadataServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Metadata_ServiceDesc, srv) } diff --git a/pkg/typex/func.go b/pkg/typex/func.go new file mode 100644 index 00000000..ff3efe54 --- /dev/null +++ b/pkg/typex/func.go @@ -0,0 +1,9 @@ +package typex + +func DoFunc(fn func()) { + fn() +} + +func DoFunc1[T any](fn func() T) T { + return fn() +} diff --git a/pkg/typex/yaml.go b/pkg/typex/yaml.go new file mode 100644 index 00000000..0ca917be --- /dev/null +++ b/pkg/typex/yaml.go @@ -0,0 +1,36 @@ +package typex + +import ( + "github.com/pubgo/funk/assert" + "github.com/pubgo/funk/errors" + yaml "gopkg.in/yaml.v3" +) + +type YamlListType[T any] []T + +func (p *YamlListType[T]) UnmarshalYAML(value *yaml.Node) error { + if value.IsZero() { + return nil + } + + switch value.Kind { + case yaml.ScalarNode, yaml.MappingNode: + var data T + if err := value.Decode(&data); err != nil { + return errors.WrapCaller(err) + } + *p = []T{data} + return nil + case yaml.SequenceNode: + var data []T + if err := value.Decode(&data); err != nil { + return errors.WrapCaller(err) + } + *p = data + return nil + default: + var val any + assert.Exit(value.Decode(&val)) + return errors.Format("yaml kind type error, kind=%v data=%v", value.Kind, val) + } +} diff --git a/proto/lava/cloudjob/options.proto b/proto/lava/cloudjob/options.proto new file mode 100644 index 00000000..27969aac --- /dev/null +++ b/proto/lava/cloudjob/options.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package lava.cloudjob; + +import "google/protobuf/descriptor.proto"; + +option go_package = "github.com/pubgo/lava/pkg/proto/cloudjobpb;cloudjobpb"; + +extend google.protobuf.ServiceOptions { + // job name, same with config jobs consumers job name + string job_name = 10010; +} + +extend google.protobuf.MethodOptions { + string subject_name = 10011; +}