Skip to content

Commit

Permalink
feat: stream event metrics (#1575)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Oct 14, 2024
1 parent 451a48d commit 5279a09
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 7 deletions.
5 changes: 5 additions & 0 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (kc *kClient) invokeStreamingEndpoint() (endpoint.Endpoint, error) {

// streamx version streaming mw
kc.sxStreamMW = streamx.StreamMiddlewareChain(kc.opt.StreamXOptions.StreamMWs...)
eventHandler := kc.opt.TracerCtl.GetStreamEventHandler()
if eventHandler != nil {
kc.opt.StreamXOptions.StreamRecvMWs = append(kc.opt.StreamXOptions.StreamRecvMWs, streamx.NewStreamRecvStatMiddleware(eventHandler))
kc.opt.StreamXOptions.StreamSendMWs = append(kc.opt.StreamXOptions.StreamSendMWs, streamx.NewStreamSendStatMiddleware(eventHandler))
}
kc.sxStreamRecvMW = streamx.StreamRecvMiddlewareChain(kc.opt.StreamXOptions.StreamRecvMWs...)
kc.sxStreamSendMW = streamx.StreamSendMiddlewareChain(kc.opt.StreamXOptions.StreamSendMWs...)

Expand Down
5 changes: 0 additions & 5 deletions internal/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/pkg/utils"
"github.com/cloudwego/localsession/backup"
Expand Down Expand Up @@ -80,10 +79,6 @@ type Options struct {
Limit Limit

MWBs []endpoint.MiddlewareBuilder
// streamx
SMWs []streamx.StreamMiddleware
SRecvMWs []streamx.StreamRecvMiddleware
SSendMWs []streamx.StreamSendMiddleware

Bus event.Bus
Events event.Queue
Expand Down
2 changes: 1 addition & 1 deletion internal/stream/stream_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// StreamEventHandler is used to handle stream events
type StreamEventHandler func(ctx context.Context, evt stats.Event, err error)
type StreamEventHandler = func(ctx context.Context, evt stats.Event, err error)

type StreamingConfig struct {
RecvMiddlewareBuilders []endpoint.RecvMiddlewareBuilder
Expand Down
9 changes: 8 additions & 1 deletion pkg/streamx/client_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package streamx

import "time"
import (
"context"
"time"

"github.com/cloudwego/kitex/pkg/stats"
)

type EventHandler func(ctx context.Context, evt stats.Event, err error)

type ClientOptions struct {
RecvTimeout time.Duration
Expand Down
27 changes: 27 additions & 0 deletions pkg/streamx/stream_middleware_internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package streamx

import (
"context"

"github.com/cloudwego/kitex/pkg/stats"
)

func NewStreamRecvStatMiddleware(ehandler EventHandler) StreamRecvMiddleware {
return func(next StreamRecvEndpoint) StreamRecvEndpoint {
return func(ctx context.Context, stream Stream, res any) (err error) {
err = next(ctx, stream, res)
ehandler(ctx, stats.StreamRecv, err)
return err
}
}
}

func NewStreamSendStatMiddleware(ehandler EventHandler) StreamSendMiddleware {
return func(next StreamSendEndpoint) StreamSendEndpoint {
return func(ctx context.Context, stream Stream, res any) (err error) {
err = next(ctx, stream, res)
ehandler(ctx, stats.StreamSend, err)
return err
}
}
}
12 changes: 12 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ func (s *server) RegisterService(svcInfo *serviceinfo.ServiceInfo, handler inter
}

registerOpts := internal_server.NewRegisterOptions(opts)
// add trace middlewares
ehandler := s.opt.TracerCtl.GetStreamEventHandler()
if ehandler != nil {
registerOpts.StreamRecvMiddlewares = append(
registerOpts.StreamRecvMiddlewares, streamx.NewStreamRecvStatMiddleware(ehandler),
)
registerOpts.StreamSendMiddlewares = append(
registerOpts.StreamSendMiddlewares, streamx.NewStreamSendStatMiddleware(ehandler),
)
}

// register service
if err := s.svcs.addService(svcInfo, handler, registerOpts); err != nil {
panic(err.Error())
}
Expand Down

0 comments on commit 5279a09

Please sign in to comment.