Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyteadmin] Refactor panic recovery into middleware #5546

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions flyteadmin/pkg/rpc/adminservice/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesUpdateRequest) (
*admin.WorkflowAttributesUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -30,7 +29,6 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad

func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesGetRequest) (
*admin.WorkflowAttributesGetResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -48,7 +46,6 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin

func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesDeleteRequest) (
*admin.WorkflowAttributesDeleteResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -66,7 +63,6 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad

func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesUpdateRequest) (
*admin.ProjectDomainAttributesUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -84,7 +80,6 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques

func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesGetRequest) (
*admin.ProjectDomainAttributesGetResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -102,7 +97,6 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *

func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesDeleteRequest) (
*admin.ProjectDomainAttributesDeleteResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -121,7 +115,6 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques
func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *admin.ProjectAttributesUpdateRequest) (
*admin.ProjectAttributesUpdateResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -140,7 +133,6 @@ func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *adm
func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.ProjectAttributesGetRequest) (
*admin.ProjectAttributesGetResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -159,7 +151,6 @@ func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.
func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *admin.ProjectAttributesDeleteRequest) (
*admin.ProjectAttributesDeleteResponse, error) {

defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -177,7 +168,6 @@ func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *adm

func (m *AdminService) ListMatchableAttributes(ctx context.Context, request *admin.ListMatchableAttributesRequest) (
*admin.ListMatchableAttributesResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
13 changes: 0 additions & 13 deletions flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"runtime/debug"

"github.com/golang/protobuf/proto"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent"
eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications"
Expand Down Expand Up @@ -44,17 +42,6 @@ type AdminService struct {
Metrics AdminMetrics
}

// Intercepts all admin requests to handle panics during execution.
func (m *AdminService) interceptPanic(ctx context.Context, request proto.Message) {
err := recover()
if err == nil {
return
}

m.Metrics.PanicCounter.Inc()
logger.Fatalf(ctx, "panic-ed for request: [%+v] with err: %v with Stack: %v", request, err, string(debug.Stack()))
}

const defaultRetries = 3

func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, configuration runtimeIfaces.Configuration,
Expand Down
40 changes: 0 additions & 40 deletions flyteadmin/pkg/rpc/adminservice/base_test.go

This file was deleted.

2 changes: 0 additions & 2 deletions flyteadmin/pkg/rpc/adminservice/description_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
)

func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin.ObjectGetRequest) (*admin.DescriptionEntity, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -36,7 +35,6 @@ func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin.
}

func (m *AdminService) ListDescriptionEntities(ctx context.Context, request *admin.DescriptionEntityListRequest) (*admin.DescriptionEntityList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
10 changes: 0 additions & 10 deletions flyteadmin/pkg/rpc/adminservice/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

func (m *AdminService) CreateExecution(
ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -32,7 +31,6 @@ func (m *AdminService) CreateExecution(

func (m *AdminService) RelaunchExecution(
ctx context.Context, request *admin.ExecutionRelaunchRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -51,7 +49,6 @@ func (m *AdminService) RelaunchExecution(

func (m *AdminService) RecoverExecution(
ctx context.Context, request *admin.ExecutionRecoverRequest) (*admin.ExecutionCreateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -70,7 +67,6 @@ func (m *AdminService) RecoverExecution(

func (m *AdminService) CreateWorkflowEvent(
ctx context.Context, request *admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -89,7 +85,6 @@ func (m *AdminService) CreateWorkflowEvent(

func (m *AdminService) GetExecution(
ctx context.Context, request *admin.WorkflowExecutionGetRequest) (*admin.Execution, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -107,7 +102,6 @@ func (m *AdminService) GetExecution(

func (m *AdminService) UpdateExecution(
ctx context.Context, request *admin.ExecutionUpdateRequest) (*admin.ExecutionUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
requestedAt := time.Now()
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
Expand All @@ -126,7 +120,6 @@ func (m *AdminService) UpdateExecution(

func (m *AdminService) GetExecutionData(
ctx context.Context, request *admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -144,7 +137,6 @@ func (m *AdminService) GetExecutionData(

func (m *AdminService) GetExecutionMetrics(
ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -162,7 +154,6 @@ func (m *AdminService) GetExecutionMetrics(

func (m *AdminService) ListExecutions(
ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -180,7 +171,6 @@ func (m *AdminService) ListExecutions(

func (m *AdminService) TerminateExecution(
ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand Down
7 changes: 0 additions & 7 deletions flyteadmin/pkg/rpc/adminservice/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

func (m *AdminService) CreateLaunchPlan(
ctx context.Context, request *admin.LaunchPlanCreateRequest) (*admin.LaunchPlanCreateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -31,7 +30,6 @@ func (m *AdminService) CreateLaunchPlan(
}

func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectGetRequest) (*admin.LaunchPlan, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -55,7 +53,6 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG
}

func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.ActiveLaunchPlanRequest) (*admin.LaunchPlan, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -73,7 +70,6 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A

func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.LaunchPlanUpdateRequest) (
*admin.LaunchPlanUpdateResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}
Expand All @@ -97,7 +93,6 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun

func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.ResourceListRequest) (
*admin.LaunchPlanList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand All @@ -116,7 +111,6 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou

func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin.ActiveLaunchPlanListRequest) (
*admin.LaunchPlanList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand All @@ -135,7 +129,6 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin

func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.")
}
Expand Down
7 changes: 1 addition & 6 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
package adminservice

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flyte/flyteadmin/pkg/rpc/adminservice/util"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
Expand Down Expand Up @@ -115,8 +113,7 @@ type descriptionEntityEndpointMetrics struct {
}

type AdminMetrics struct {
Scope promutils.Scope
PanicCounter prometheus.Counter
Scope promutils.Scope

executionEndpointMetrics executionEndpointMetrics
launchPlanEndpointMetrics launchPlanEndpointMetrics
Expand All @@ -137,8 +134,6 @@ type AdminMetrics struct {
func InitMetrics(adminScope promutils.Scope) AdminMetrics {
return AdminMetrics{
Scope: adminScope,
PanicCounter: adminScope.MustNewCounter("handler_panic",
"panics encountered while handling requests to the admin service"),

executionEndpointMetrics: executionEndpointMetrics{
scope: adminScope,
Expand Down
61 changes: 61 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package middleware

import (
"context"
"runtime/debug"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

// RecoveryInterceptor is a struct for creating gRPC interceptors that handle panics in go
type RecoveryInterceptor struct {
panicCounter prometheus.Counter
}

// NewRecoveryInterceptor creates a new RecoveryInterceptor with metrics under the provided scope
func NewRecoveryInterceptor(adminScope promutils.Scope) *RecoveryInterceptor {
panicCounter := adminScope.MustNewCounter("handler_panic", "panics encountered while handling gRPC requests")
return &RecoveryInterceptor{
panicCounter: panicCounter,
}
}

// UnaryServerInterceptor returns a new unary server interceptor for panic recovery.
func (ri *RecoveryInterceptor) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) {

defer func() {
if r := recover(); r != nil {
ri.panicCounter.Inc()
logger.Errorf(ctx, "panic-ed for request: [%+v] to %s with err: %v with Stack: %v", req, info.FullMethod, r, string(debug.Stack()))
// Return INTERNAL to client with no info as to not leak implementation details
err = status.Errorf(codes.Internal, "")
}
}()

return handler(ctx, req)
}
}

// StreamServerInterceptor returns a new streaming server interceptor for panic recovery.
func (ri *RecoveryInterceptor) StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {

defer func() {
if r := recover(); r != nil {
ri.panicCounter.Inc()
logger.Errorf(stream.Context(), "panic-ed for stream to %s with err: %v with Stack: %v", info.FullMethod, r, string(debug.Stack()))
// Return INTERNAL to client with no info as to not leak implementation details
err = status.Errorf(codes.Internal, "")
}
}()

return handler(srv, stream)
}
}
Loading
Loading