Skip to content

Commit

Permalink
fix bug in execution_manager with lpExpectedInputs, remove the standa…
Browse files Browse the repository at this point in the history
…lone artifact client and move into idl clientset, update local config file, remove a deprecated function (#4473)

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 23, 2023
1 parent 8a895b9 commit 9d7e55e
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 76 deletions.
13 changes: 7 additions & 6 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
# gatepr: revert the local dir to reflect home.
# paths were changed to personal to ensure settings didn't get lost.
admin:
endpoint: localhost:8089
# This endpoint is used by flytepropeller to talk to admin
# and artifacts to talk to admin,
# and _also_, admin to talk to artifacts
endpoint: localhost:30080
insecure: true
flyteadmin:
featureGates:
enableArtifacts: true

catalog-cache:
endpoint: localhost:8081
Expand Down Expand Up @@ -84,11 +90,6 @@ cloudEvents:
enable: true
cloudEventVersion: v2
type: sandbox
# For admin to find artifacts service
artifacts:
host: localhost
port: 30080
insecure: true
# For artifact service itself
artifactsServer:
artifactBlobStoreConfig:
Expand Down
44 changes: 0 additions & 44 deletions flyteadmin/pkg/artifacts/artifact_client.go

This file was deleted.

12 changes: 0 additions & 12 deletions flyteadmin/pkg/artifacts/artifact_client_test.go

This file was deleted.

14 changes: 12 additions & 2 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package artifacts
import (
"context"
"fmt"
admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -76,8 +78,16 @@ func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient {
return a.client
}

func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) *ArtifactRegistry {
func NewArtifactRegistry(ctx context.Context, _ *Config, _ ...grpc.DialOption) *ArtifactRegistry {
cfg := admin2.GetConfig(ctx)
clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to create Artifact client")
// too many calls to this function to update, just panic for now.
panic(err)
}

return &ArtifactRegistry{
client: InitializeArtifactClient(ctx, config, opts...),
client: clients.ArtifactServiceClient(),
}
}
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,13 +1011,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to
// and so we can fill in template args.
// ArtifactIDs are also returned for lineage purposes.
resolvedExpectedInputs, usedArtifactIDs, err := m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating)
lpExpectedInputs, usedArtifactIDs, err = m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating)
if err != nil {
logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err)
return nil, nil, err
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs)
logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

Expand Down
17 changes: 7 additions & 10 deletions flyteidl/clients/go/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -30,6 +31,7 @@ type Clientset struct {
identityServiceClient service.IdentityServiceClient
dataProxyServiceClient service.DataProxyServiceClient
signalServiceClient service.SignalServiceClient
artifactServiceClient artifact.ArtifactRegistryClient
}

// AdminClient retrieves the AdminServiceClient
Expand Down Expand Up @@ -59,6 +61,10 @@ func (c Clientset) SignalServiceClient() service.SignalServiceClient {
return c.signalServiceClient
}

func (c Clientset) ArtifactServiceClient() artifact.ArtifactRegistryClient {
return c.artifactServiceClient

Check warning on line 65 in flyteidl/clients/go/admin/client.go

View check run for this annotation

Codecov / codecov/patch

flyteidl/clients/go/admin/client.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

func NewAdminClient(ctx context.Context, conn *grpc.ClientConn) service.AdminServiceClient {
logger.Infof(ctx, "Initialized Admin client")
return service.NewAdminServiceClient(conn)
Expand Down Expand Up @@ -199,20 +205,11 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC
cs.healthServiceClient = grpc_health_v1.NewHealthClient(adminConnection)
cs.dataProxyServiceClient = service.NewDataProxyServiceClient(adminConnection)
cs.signalServiceClient = service.NewSignalServiceClient(adminConnection)
cs.artifactServiceClient = artifact.NewArtifactRegistryClient(adminConnection)

return &cs, nil
}

// Deprecated: Please use NewClientsetBuilder() instead.
func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) {
clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...)
if err != nil {
return nil, err
}

return clientSet.AdminClient(), nil
}

func InitializeMockAdminClient() service.AdminServiceClient {
logger.Infof(context.TODO(), "Initialized Mock Admin client")
return &mocks.AdminServiceClient{}
Expand Down

0 comments on commit 9d7e55e

Please sign in to comment.