Skip to content

Commit

Permalink
Artf/lints (#4429)
Browse files Browse the repository at this point in the history
* lint fixes
* rename sandbox_utils without underscore

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 15, 2023
1 parent 484e144 commit 2e256d3
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 77 deletions.
6 changes: 4 additions & 2 deletions flyteadmin/pkg/artifacts/artifact_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flytestdlib/logger"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package artifacts

import (
"context"
"fmt"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"google.golang.org/grpc"

"context"
)

// ArtifactRegistry contains a client to talk to an Artifact service and has helper methods
Expand Down
16 changes: 8 additions & 8 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ package cloudevent

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/sandbox_utils"
"time"

dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/NYTimes/gizmo/pubsub"
gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws"
gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp"
Expand All @@ -21,9 +16,13 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/sandboxutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Repository, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, cloudEventsConfig runtimeInterfaces.CloudEventsConfig, remoteDataConfig runtimeInterfaces.RemoteDataConfig, scope promutils.Scope) interfaces.Publisher {
Expand Down Expand Up @@ -93,7 +92,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case common.Sandbox:
var publisher pubsub.Publisher
publisher = sandbox_utils.NewCloudEventsPublisher()
publisher = sandboxutils.NewCloudEventsPublisher()
sender = &cloudEventImplementations.PubSubSender{
Pub: publisher,
}

Check warning on line 98 in flyteadmin/pkg/async/cloudevent/factory.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/factory.go#L93-L98

Added lines #L93 - L98 were not covered by tests
Expand All @@ -108,7 +107,8 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

if cloudEventsConfig.CloudEventVersion == runtimeInterfaces.CloudEventVersionv2 {
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
} else {
return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)
}

Check warning on line 110 in flyteadmin/pkg/async/cloudevent/factory.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/factory.go#L108-L110

Added lines #L108 - L110 were not covered by tests

return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)

Check warning on line 112 in flyteadmin/pkg/async/cloudevent/factory.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/factory.go#L112

Added line #L112 was not covered by tests

}
10 changes: 5 additions & 5 deletions flyteadmin/pkg/async/cloudevent/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package cloudevent

import (
"context"
dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyte/flytestdlib/storage"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
"github.com/stretchr/testify/mock"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
)

func getMockStore() *storage.DataStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,30 @@ import (
"bytes"
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/util"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/golang/protobuf/jsonpb"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
Expand Down Expand Up @@ -153,7 +148,15 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
Domain: rawEvent.ExecutionId.Domain,
Name: rawEvent.ExecutionId.Name,
})
if err != nil {
logger.Warningf(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.ExecutionId)
return nil, err
}
ex, err := transformers.FromExecutionModel(ctx, executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(ctx, "couldn't transform execution [%+v] for cloud event processing", rawEvent.ExecutionId)
return nil, err
}
if ex.Closure.WorkflowId == nil {
logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex)
return nil, fmt.Errorf("workflow id is nil for execution [%+v]", ex)
Expand All @@ -164,6 +167,10 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
Name: ex.Closure.WorkflowId.Name,
Version: ex.Closure.WorkflowId.Version,
})
if err != nil {
logger.Warningf(ctx, "couldn't find workflow [%+v] for cloud event processing", ex.Closure.WorkflowId)
return nil, err
}
var workflowInterface core.TypedInterface
if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 {
err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface)
Expand Down Expand Up @@ -349,6 +356,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
if err != nil {
logger.Debugf(ctx, "Failed to transform task model with err %v", err)
return nil, err
}
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id

Check warning on line 364 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L337-L364

Added lines #L337 - L364 were not covered by tests
}
Expand Down Expand Up @@ -425,6 +436,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
eventSource = common.FlyteURLKeyFromNodeExecutionIDRetry(*e.ParentNodeExecutionId,
int(e.RetryAttempt))
finalMsg, err = c.TransformTaskExecutionEvent(ctx, e)
if err != nil {
logger.Errorf(ctx, "Failed to transform task execution event with error: %v", err)
return err
}
case *admin.NodeExecutionEventRequest:
topic = "cloudevents.NodeExecution"
e := msgType.Event
Expand All @@ -434,6 +449,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
if err != nil {
logger.Errorf(ctx, "Failed to transform node execution event with error: %v", err)
return err
}
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
executionID = msgType.ExecutionId.String()
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/async/cloudevent/implementations/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package implementations
import (
"context"
"fmt"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"

"github.com/NYTimes/gizmo/pubsub"
"github.com/Shopify/sarama"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"

Expand Down
30 changes: 15 additions & 15 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,23 @@ package impl
import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"strconv"
"time"

"github.com/benbjohnson/clock"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/auth"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
cloudeventInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications"
notificationInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/executions"
Expand All @@ -44,6 +34,16 @@ import (
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteadmin/plugins"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const childContainerQueueKey = "child_queue"
Expand Down Expand Up @@ -739,16 +739,16 @@ func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding
if inputVal.GetScalar() == nil || inputVal.GetScalar().GetPrimitive() == nil {
return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input value [%+v]", inputVal)
}
var strVal = ""
var strVal string
p := inputVal.GetScalar().GetPrimitive()
switch p.GetValue().(type) {
case *core.Primitive_Integer:
strVal = fmt.Sprintf("%s", p.GetStringValue())
strVal = p.GetStringValue()
case *core.Primitive_Datetime:
t := time.Unix(p.GetDatetime().Seconds, int64(p.GetDatetime().Nanos))
strVal = t.Format("2006-01-02")
case *core.Primitive_StringValue:
strVal = fmt.Sprintf("%s", p.GetStringValue())
strVal = p.GetStringValue()
case *core.Primitive_FloatValue:
strVal = fmt.Sprintf("%.2f", p.GetFloatValue())
case *core.Primitive_Boolean:
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"strings"
"testing"
"time"
Expand All @@ -23,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/auth"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks"
notificationMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package impl
import (
"bytes"
"context"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"strconv"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/launch_plan_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"testing"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand Down
16 changes: 8 additions & 8 deletions flyteadmin/pkg/manager/impl/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@ package impl
import (
"bytes"
"context"
"github.com/golang/protobuf/proto"
"strconv"
"time"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources"
Expand All @@ -29,6 +23,12 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengine "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
)

type taskMetrics struct {
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
adminErrors "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
Expand Down
Loading

0 comments on commit 2e256d3

Please sign in to comment.