diff --git a/cmd/single/start.go b/cmd/single/start.go index 0fbcbf9f14..d415f82111 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -4,9 +4,19 @@ import ( "context" "net/http" "os" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" + _ "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + _ "gorm.io/driver/postgres" // Required to import database driver. + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics" + datacatalogConfig "github.com/flyteorg/flyte/datacatalog/pkg/config" datacatalogRepo "github.com/flyteorg/flyte/datacatalog/pkg/repositories" datacatalog "github.com/flyteorg/flyte/datacatalog/pkg/rpc/datacatalogservice" @@ -29,14 +39,6 @@ import ( "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" - _ "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" - _ "gorm.io/driver/postgres" // Required to import database driver. - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/metrics" ) const defaultNamespace = "all" @@ -122,7 +124,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error { DefaultNamespaces: namespaceConfigs, }, NewCache: executors.NewCache, - NewClient: executors.NewClient, + NewClient: executors.BuildNewClientFunc(propellerScope), Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/datacatalog/pull_request_template.md b/datacatalog/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/datacatalog/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/docs/deployment/agents/bigquery.rst b/docs/deployment/agents/bigquery.rst index ac6ec896bb..9835c3d47a 100644 --- a/docs/deployment/agents/bigquery.rst +++ b/docs/deployment/agents/bigquery.rst @@ -102,3 +102,5 @@ Upgrade the Flyte Helm release Replace ```` with the name of your release (e.g., ``flyte``) and ```` with the name of your namespace (e.g., ``flyte``). + +For BigQuery plugin on the Flyte cluster, please refer to `BigQuery Plugin Example `_ diff --git a/docs/deployment/agents/databricks.rst b/docs/deployment/agents/databricks.rst new file mode 100644 index 0000000000..00a5e97a47 --- /dev/null +++ b/docs/deployment/agents/databricks.rst @@ -0,0 +1,294 @@ +.. _deployment-agent-setup-databricks: + +Databricks Agent +================= + +This guide provides an overview of how to set up Databricks agent in your Flyte deployment. + +Spin up a cluster +----------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + You can spin up a demo cluster using the following command: + + .. code-block:: bash + + flytectl demo start + + Or install Flyte using the :ref:`flyte-binary helm chart `. + + .. group-tab:: Flyte core + + If you've installed Flyte using the + `flyte-core helm chart `__, please ensure: + + * You have the correct kubeconfig and have selected the correct Kubernetes context. + * You have configured the correct flytectl settings in ``~/.flyte/config.yaml``. + +.. note:: + + Add the Flyte chart repo to Helm if you're installing via the Helm charts. + + .. code-block:: bash + + helm repo add flyteorg https://flyteorg.github.io/flyte + +Databricks workspace +-------------------- + +To set up your Databricks account, follow these steps: + +1. Create a `Databricks account `__. + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/deployment/plugins/databricks/databricks_workspace.png + :alt: A screenshot of Databricks workspace creation. + +2. Ensure that you have a Databricks workspace up and running. + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/deployment/plugins/databricks/open_workspace.png + :alt: A screenshot of Databricks workspace. + +3. Generate a `personal access token + `__ to be used in the Flyte configuration. + You can find the personal access token in the user settings within the workspace. ``User settings`` -> ``Developer`` -> ``Access tokens`` + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/deployment/plugins/databricks/databricks_access_token.png + :alt: A screenshot of access token. + +4. Enable custom containers on your Databricks cluster before you trigger the workflow. + +.. code-block:: bash + + curl -X PATCH -n -H "Authorization: Bearer " \ + https:///api/2.0/workspace-conf \ + -d '{"enableDcs": "true"}' + +For more detail, check `custom containers `__. + +5. Create an `instance profile +`__ +for the Spark cluster. This profile enables the Spark job to access your data in the S3 bucket. + +Create an instance profile using the AWS console (For AWS Users) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +1. In the AWS console, go to the IAM service. +2. Click the Roles tab in the sidebar. +3. Click Create role. + + a. Under Trusted entity type, select AWS service. + b. Under Use case, select **EC2**. + c. Click Next. + d. At the bottom of the page, click Next. + e. In the Role name field, type a role name. + f. Click Create role. + +4. In the role list, click the **AmazonS3FullAccess** role. +5. Click Create role button. + +In the role summary, copy the Role ARN. + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/deployment/plugins/databricks/s3_arn.png + :alt: A screenshot of s3 arn. + +Locate the IAM role that created the Databricks deployment +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +If you don’t know which IAM role created the Databricks deployment, do the following: + +1. As an account admin, log in to the account console. +2. Go to ``Workspaces`` and click your workspace name. +3. In the Credentials box, note the role name at the end of the Role ARN + +For example, in the Role ARN ``arn:aws:iam::123456789123:role/finance-prod``, the role name is finance-prod + +Edit the IAM role that created the Databricks deployment +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +1. In the AWS console, go to the IAM service. +2. Click the Roles tab in the sidebar. +3. Click the role that created the Databricks deployment. +4. On the Permissions tab, click the policy. +5. Click Edit Policy. +6. Append the following block to the end of the Statement array. Ensure that you don’t overwrite any of the existing policy. Replace with the role you created in Configure S3 access with instance profiles. + +.. code-block:: bash + + { + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": "arn:aws:iam:::role/" + } + +Specify agent configuration +---------------------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + .. tabs:: + + .. group-tab:: Demo cluster + + Enable the Databricks agent on the demo cluster by updating the ConfigMap: + + .. code-block:: bash + + kubectl edit configmap flyte-sandbox-config -n flyte + + .. code-block:: yaml + :emphasize-lines: 7,12,16 + + tasks: + task-plugins: + default-for-task-types: + container: container + container_array: k8s-array + sidecar: sidecar + spark: agent-service + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + plugins: + agent-service: + supportedTaskTypes: + - spark + + .. group-tab:: Helm chart + + Edit the relevant YAML file to specify the plugin. + + .. code-block:: yaml + :emphasize-lines: 7,11,15 + + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + - container: container + - container_array: k8s-array + - spark: agent-service + plugins: + agent-service: + supportedTaskTypes: + - spark + + .. group-tab:: Flyte core + + Create a file named ``values-override.yaml`` and add the following config to it: + + .. code-block:: yaml + :emphasize-lines: 9,14-17 + + enabled_plugins: + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + container: container + sidecar: sidecar + container_array: k8s-array + spark: agent-service + plugins: + agent-service: + supportedTaskTypes: + - spark + +Add the Databricks access token +------------------------------- + +You have to set the Databricks token to the Flyte configuration. + +1. Install flyteagent pod using helm + +.. code-block:: + + helm repo add flyteorg https://flyteorg.github.io/flyte + helm install flyteagent flyteorg/flyteagent --namespace flyte + +2. Get the base64 value of your Databricks token. + +.. code-block:: + + echo -n "" | base64 + +3. Edit the flyteagent secret + + .. code-block:: bash + + kubectl edit secret flyteagent -n flyte + + .. code-block:: yaml + :emphasize-lines: 3 + + apiVersion: v1 + data: + flyte_databricks_access_token: + kind: Secret + metadata: + annotations: + meta.helm.sh/release-name: flyteagent + meta.helm.sh/release-namespace: flyte + creationTimestamp: "2023-10-04T04:09:03Z" + labels: + app.kubernetes.io/managed-by: Helm + name: flyteagent + namespace: flyte + resourceVersion: "753" + uid: 5ac1e1b6-2a4c-4e26-9001-d4ba72c39e54 + type: Opaque + + +Upgrade the deployment +---------------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + .. tabs:: + + .. group-tab:: Demo cluster + + .. code-block:: + + kubectl rollout restart deployment flyte-sandbox -n flyte + + .. group-tab:: Helm chart + + .. code-block:: + + helm upgrade flyteorg/flyte-binary -n --values + + Replace ```` with the name of your release (e.g., ``flyte-backend``), + ```` with the name of your namespace (e.g., ``flyte``), + and ```` with the name of your YAML file. + + .. group-tab:: Flyte core + + .. code-block:: + + helm upgrade flyte/flyte-core -n --values values-override.yaml + + Replace ```` with the name of your release (e.g., ``flyte``) + and ```` with the name of your namespace (e.g., ``flyte``). + +Wait for the upgrade to complete. You can check the status of the deployment pods by running the following command: + +.. code-block:: + + kubectl get pods -n flyte + +For databricks plugin on the Flyte cluster, please refer to `Databricks Plugin Example `_ diff --git a/docs/deployment/agents/index.md b/docs/deployment/agents/index.md index 10a80236b9..e27644570a 100644 --- a/docs/deployment/agents/index.md +++ b/docs/deployment/agents/index.md @@ -16,6 +16,8 @@ Discover the process of setting up Agents for Flyte. - Guide to setting up the MMCloud agent. * - {ref}`Sensor Agent ` - Guide to setting up the Sensor agent. +* - {ref}`Databricks Agent ` + - Guide to setting up the Databricks agent. ``` ```{toctree} @@ -25,5 +27,6 @@ Discover the process of setting up Agents for Flyte. bigquery mmcloud +databricks sensor ``` diff --git a/docs/deployment/agents/mmcloud.rst b/docs/deployment/agents/mmcloud.rst index 1e36ff7cfc..217beab8ed 100644 --- a/docs/deployment/agents/mmcloud.rst +++ b/docs/deployment/agents/mmcloud.rst @@ -57,15 +57,15 @@ Enable the MMCloud agent by adding the following config to the relevant YAML fil .. code-block:: yaml tasks: - task-agents: - enabled-agents: + task-plugins: + enabled-plugins: - agent-service default-for-task-types: - mmcloud_task: agent-service .. code-block:: yaml - agents: + plugins: agent-service: agents: mmcloud-agent: @@ -117,3 +117,5 @@ Wait for the upgrade to complete. You can check the status of the deployment pod .. code-block:: kubectl get pods -n flyte + +For MMCloud plugin on the Flyte cluster, please refer to `Memory Machine Cloud Plugin Example `_ diff --git a/docs/deployment/plugins/webapi/snowflake.rst b/docs/deployment/plugins/webapi/snowflake.rst index a4ac2d35cf..80ef2305d0 100644 --- a/docs/deployment/plugins/webapi/snowflake.rst +++ b/docs/deployment/plugins/webapi/snowflake.rst @@ -239,4 +239,6 @@ Wait for the upgrade to complete. You can check the status of the deployment pod .. code-block:: - kubectl get pods -n flyte \ No newline at end of file + kubectl get pods -n flyte + + For Snowflake plugin on the Flyte cluster, please refer to `Snowflake Plugin Example `_ diff --git a/flyteadmin/auth/handler_utils.go b/flyteadmin/auth/handler_utils.go index e6fd1a7236..a6b4031ca8 100644 --- a/flyteadmin/auth/handler_utils.go +++ b/flyteadmin/auth/handler_utils.go @@ -8,6 +8,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils" "github.com/flyteorg/flyte/flyteadmin/auth/config" + "github.com/flyteorg/flyte/flytestdlib/logger" ) const ( @@ -146,3 +147,32 @@ func GetPublicURL(ctx context.Context, req *http.Request, cfg *config.Config) *u return u } + +func isAuthorizedRedirectURL(url *url.URL, authorizedURL *url.URL) bool { + return url.Hostname() == authorizedURL.Hostname() && url.Port() == authorizedURL.Port() && url.Scheme == authorizedURL.Scheme +} + +func GetRedirectURLAllowed(ctx context.Context, urlRedirectParam string, cfg *config.Config) bool { + if len(urlRedirectParam) == 0 { + logger.Debugf(ctx, "not validating whether empty redirect url is authorized") + return true + } + redirectURL, err := url.Parse(urlRedirectParam) + if err != nil { + logger.Debugf(ctx, "failed to parse user-supplied redirect url: %s with err: %v", urlRedirectParam, err) + return false + } + if redirectURL.Host == "" { + logger.Debugf(ctx, "not validating whether relative redirect url is authorized") + return true + } + logger.Debugf(ctx, "validating whether redirect url: %s is authorized", redirectURL) + for _, authorizedURI := range cfg.AuthorizedURIs { + if isAuthorizedRedirectURL(redirectURL, &authorizedURI.URL) { + logger.Debugf(ctx, "authorizing redirect url: %s against authorized uri: %s", redirectURL.String(), authorizedURI.String()) + return true + } + } + logger.Debugf(ctx, "not authorizing redirect url: %s", redirectURL.String()) + return false +} diff --git a/flyteadmin/auth/handler_utils_test.go b/flyteadmin/auth/handler_utils_test.go index 441f83dbbb..c44a57b934 100644 --- a/flyteadmin/auth/handler_utils_test.go +++ b/flyteadmin/auth/handler_utils_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteadmin/auth/config" - config2 "github.com/flyteorg/flyte/flytestdlib/config" + flytestdconfig "github.com/flyteorg/flyte/flytestdlib/config" ) func TestGetPublicURL(t *testing.T) { @@ -16,7 +16,7 @@ func TestGetPublicURL(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "https://abc", nil) assert.NoError(t, err) u := GetPublicURL(context.Background(), req, &config.Config{ - AuthorizedURIs: []config2.URL{ + AuthorizedURIs: []flytestdconfig.URL{ {URL: *config.MustParseURL("https://xyz")}, {URL: *config.MustParseURL("https://abc")}, }, @@ -28,7 +28,7 @@ func TestGetPublicURL(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "https://abc", nil) assert.NoError(t, err) u := GetPublicURL(context.Background(), req, &config.Config{ - AuthorizedURIs: []config2.URL{ + AuthorizedURIs: []flytestdconfig.URL{ {URL: *config.MustParseURL("https://xyz")}, {URL: *config.MustParseURL("http://abc")}, }, @@ -40,7 +40,7 @@ func TestGetPublicURL(t *testing.T) { req, err := http.NewRequest(http.MethodPost, "https://abc", nil) assert.NoError(t, err) u := GetPublicURL(context.Background(), req, &config.Config{ - AuthorizedURIs: []config2.URL{ + AuthorizedURIs: []flytestdconfig.URL{ {URL: *config.MustParseURL("https://xyz")}, {URL: *config.MustParseURL("http://xyz")}, }, @@ -61,7 +61,7 @@ func TestGetPublicURL(t *testing.T) { assert.NoError(t, err) u := GetPublicURL(context.Background(), req, &config.Config{ - AuthorizedURIs: []config2.URL{ + AuthorizedURIs: []flytestdconfig.URL{ {URL: *config.MustParseURL("http://flyteadmin:80")}, {URL: *config.MustParseURL("http://localhost:30081")}, {URL: *config.MustParseURL("http://localhost:8089")}, @@ -72,3 +72,28 @@ func TestGetPublicURL(t *testing.T) { assert.Equal(t, "http://localhost:30081", u.String()) }) } + +func TestGetRedirectURLAllowed(t *testing.T) { + ctx := context.TODO() + t.Run("relative url", func(t *testing.T) { + assert.True(t, GetRedirectURLAllowed(ctx, "/console", &config.Config{})) + }) + t.Run("no redirect url", func(t *testing.T) { + assert.True(t, GetRedirectURLAllowed(ctx, "", &config.Config{})) + }) + cfg := &config.Config{ + AuthorizedURIs: []flytestdconfig.URL{ + {URL: *config.MustParseURL("https://example.com")}, + {URL: *config.MustParseURL("http://localhost:3008")}, + }, + } + t.Run("authorized url", func(t *testing.T) { + assert.True(t, GetRedirectURLAllowed(ctx, "https://example.com", cfg)) + }) + t.Run("authorized localhost url", func(t *testing.T) { + assert.True(t, GetRedirectURLAllowed(ctx, "http://localhost:3008", cfg)) + }) + t.Run("unauthorized url", func(t *testing.T) { + assert.False(t, GetRedirectURLAllowed(ctx, "https://flyte.com", cfg)) + }) +} diff --git a/flyteadmin/auth/handlers.go b/flyteadmin/auth/handlers.go index 26e6428df3..a6c2e3b122 100644 --- a/flyteadmin/auth/handlers.go +++ b/flyteadmin/auth/handlers.go @@ -141,6 +141,11 @@ func GetLoginHandler(ctx context.Context, authCtx interfaces.AuthenticationConte logger.Debugf(ctx, "Setting CSRF state cookie to %s and state to %s\n", csrfToken, state) url := authCtx.OAuth2ClientConfig(GetPublicURL(ctx, request, authCtx.Options())).AuthCodeURL(state) queryParams := request.URL.Query() + if !GetRedirectURLAllowed(ctx, queryParams.Get(RedirectURLParameter), authCtx.Options()) { + logger.Infof(ctx, "unauthorized redirect URI") + writer.WriteHeader(http.StatusForbidden) + return + } if flowEndRedirectURL := queryParams.Get(RedirectURLParameter); flowEndRedirectURL != "" { redirectCookie := NewRedirectCookie(ctx, flowEndRedirectURL) if redirectCookie != nil { diff --git a/flyteadmin/flyteadmin_config.yaml b/flyteadmin/flyteadmin_config.yaml index e3d19f7326..443814572b 100644 --- a/flyteadmin/flyteadmin_config.yaml +++ b/flyteadmin/flyteadmin_config.yaml @@ -212,4 +212,4 @@ qualityOfService: staging: MEDIUM # by default production has an UNDEFINED tier when it is omitted from the configuration namespace_mapping: - template: "{{ project }}-{{ domain }}" # Default namespace mapping template. \ No newline at end of file + template: "{{ project }}-{{ domain }}" # Default namespace mapping template. diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go index 912cb241da..bbbe6c90e9 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go @@ -7,13 +7,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "google.golang.org/grpc/status" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks" "github.com/flyteorg/flyte/flyteadmin/pkg/common" diff --git a/flyteadmin/pull_request_template.md b/flyteadmin/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flyteadmin/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/flyteadmin/tests/node_execution_test.go b/flyteadmin/tests/node_execution_test.go index 3250865b78..60787e430b 100644 --- a/flyteadmin/tests/node_execution_test.go +++ b/flyteadmin/tests/node_execution_test.go @@ -11,12 +11,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" diff --git a/flytearchives/LICENSE b/flytearchives/LICENSE deleted file mode 100644 index bed437514f..0000000000 --- a/flytearchives/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2019 Lyft, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/flytearchives/README.md b/flytearchives/README.md deleted file mode 100644 index 9c80ac52f3..0000000000 --- a/flytearchives/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# datacatalog -Service that catalogs data to allow for data discovery, lineage and tagging diff --git a/flytearchives/datacatalog.tar.gz b/flytearchives/datacatalog.tar.gz deleted file mode 100644 index 1cc1e56b46..0000000000 Binary files a/flytearchives/datacatalog.tar.gz and /dev/null differ diff --git a/flytecopilot/pull_request_template.md b/flytecopilot/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flytecopilot/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/flyteidl/pull_request_template.md b/flyteidl/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flyteidl/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/client.go b/flyteplugins/go/tasks/pluginmachinery/k8s/client.go index f14ae2c8a0..0ab46081e9 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/client.go @@ -69,7 +69,7 @@ func NewKubeClient(config *rest.Config, options Options) (core.KubeClient, error if options.ClientOptions == nil { options.ClientOptions = &client.Options{ HTTPClient: httpClient, - Mapper: mapper, + Mapper: mapper, } } diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index 1c33594997..bf7d537416 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "os" "testing" "time" @@ -18,6 +19,13 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" ) +func TestMain(m *testing.M) { + // All tests should run assuming UTC timezone. + time.Local = time.UTC + code := m.Run() + os.Exit(code) +} + func TestExtractCurrentCondition(t *testing.T) { jobCreated := commonOp.JobCondition{ Type: commonOp.JobCreated, diff --git a/flyteplugins/pull_request_template.md b/flyteplugins/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flyteplugins/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index 8696f3993a..e1069650ad 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -144,7 +144,7 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { DefaultNamespaces: namespaceConfigs, }, NewCache: executors.NewCache, - NewClient: executors.NewClient, + NewClient: executors.BuildNewClientFunc(propellerScope), Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index e3c29ae3d9..ae538385fb 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -109,7 +109,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w DefaultNamespaces: namespaceConfigs, }, NewCache: executors.NewCache, - NewClient: executors.NewClient, + NewClient: executors.BuildNewClientFunc(webhookScope), Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index bdab0d91be..d6d89e1711 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -33,107 +33,95 @@ var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, er return otelutils.WrapK8sCache(k8sCache), nil } -var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) { - var reader *fallbackClientReader - if options.Cache != nil && options.Cache.Reader != nil { - // if caching is enabled we create a fallback reader so we can attempt the client if the cache - // reader does not have the object - reader = &fallbackClientReader{ - orderedClients: []client.Reader{options.Cache.Reader}, +func BuildNewClientFunc(scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) { + return func(config *rest.Config, options client.Options) (client.Client, error) { + var cacheReader client.Reader + cachelessOptions := options + if options.Cache != nil && options.Cache.Reader != nil { + cacheReader = options.Cache.Reader + cachelessOptions.Cache = nil } - options.Cache.Reader = reader - } - - // create the k8s client - k8sClient, err := client.New(config, options) - if err != nil { - return k8sClient, err - } + kubeClient, err := client.New(config, cachelessOptions) + if err != nil { + return nil, err + } - k8sOtelClient := otelutils.WrapK8sClient(k8sClient) - if reader != nil { - // once the k8s client is created we set the fallback reader's client to the k8s client - reader.orderedClients = append(reader.orderedClients, k8sOtelClient) + return newFlyteK8sClient(kubeClient, cacheReader, scope) } - - return k8sOtelClient, nil } -// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which -// directly reads from the API -type fallbackClientReader struct { - orderedClients []client.Reader +type flyteK8sClient struct { + client.Client + cacheReader client.Reader + writeFilter fastcheck.Filter } -func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) { - for _, k8sClient := range c.orderedClients { - if err = k8sClient.Get(ctx, key, out, opts...); err == nil { +func (f flyteK8sClient) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) { + if f.cacheReader != nil { + if err = f.cacheReader.Get(ctx, key, out, opts...); err == nil { return nil } } - return + return f.Client.Get(ctx, key, out, opts...) } -func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { - for _, k8sClient := range c.orderedClients { - if err = k8sClient.List(ctx, list, opts...); err == nil { +func (f flyteK8sClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + if f.cacheReader != nil { + if err = f.cacheReader.List(ctx, list, opts...); err == nil { return nil } } - return -} - -type writeThroughCachingWriter struct { - client.Client - filter fastcheck.Filter -} - -func IDFromObject(obj client.Object, op string) []byte { - return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op)) + return f.Client.List(ctx, list, opts...) } // Create first checks the local cache if the object with id was previously successfully saved, if not then // saves the object obj in the Kubernetes cluster -func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { +func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { // "c" represents create - id := IDFromObject(obj, "c") - if w.filter.Contains(ctx, id) { + id := idFromObject(obj, "c") + if f.writeFilter.Contains(ctx, id) { return nil } - err := w.Client.Create(ctx, obj, opts...) + err := f.Client.Create(ctx, obj, opts...) if err != nil { return err } - w.filter.Add(ctx, id) + f.writeFilter.Add(ctx, id) return nil } // Delete first checks the local cache if the object with id was previously successfully deleted, if not then // deletes the given obj from Kubernetes cluster. -func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { +func (f flyteK8sClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { // "d" represents delete - id := IDFromObject(obj, "d") - if w.filter.Contains(ctx, id) { + id := idFromObject(obj, "d") + if f.writeFilter.Contains(ctx, id) { return nil } - err := w.Client.Delete(ctx, obj, opts...) + err := f.Client.Delete(ctx, obj, opts...) if err != nil { return err } - w.filter.Add(ctx, id) + f.writeFilter.Add(ctx, id) return nil } -func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) { - filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter")) +func idFromObject(obj client.Object, op string) []byte { + return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op)) +} + +func newFlyteK8sClient(kubeClient client.Client, cacheReader client.Reader, scope promutils.Scope) (flyteK8sClient, error) { + writeFilter, err := fastcheck.NewOppoBloomFilter(50000, scope.NewSubScope("kube_filter")) if err != nil { - return writeThroughCachingWriter{}, err + return flyteK8sClient{}, err } - return writeThroughCachingWriter{ - Client: c, - filter: filter, + + return flyteK8sClient{ + Client: kubeClient, + cacheReader: cacheReader, + writeFilter: writeFilter, }, nil } diff --git a/flytepropeller/pkg/controller/executors/kube_test.go b/flytepropeller/pkg/controller/executors/kube_test.go index bcaa64ff6f..4d84d3fb08 100644 --- a/flytepropeller/pkg/controller/executors/kube_test.go +++ b/flytepropeller/pkg/controller/executors/kube_test.go @@ -2,13 +2,14 @@ package executors import ( "context" - "fmt" "reflect" "testing" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/flytestdlib/contextutils" @@ -45,42 +46,46 @@ func TestIdFromObject(t *testing.T) { APIVersion: "v1", }, } - if got := IDFromObject(p, tt.args.op); !reflect.DeepEqual(got, []byte(tt.want)) { - t.Errorf("IDFromObject() = %s, want %s", string(got), tt.want) + if got := idFromObject(p, tt.args.op); !reflect.DeepEqual(got, []byte(tt.want)) { + t.Errorf("idFromObject() = %s, want %s", string(got), tt.want) } }) } } -type singleInvokeClient struct { +type mockKubeClient struct { client.Client - createCalled bool - deleteCalled bool + createCalledCount int + deleteCalledCount int + getCalledCount int + getMissCount int } -func (f *singleInvokeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - if f.createCalled { - return fmt.Errorf("create called more than once") - } - f.createCalled = true +func (m *mockKubeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + m.createCalledCount++ + return nil +} + +func (m *mockKubeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + m.deleteCalledCount++ return nil } -func (f *singleInvokeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - if f.deleteCalled { - return fmt.Errorf("delete called more than once") +func (m *mockKubeClient) Get(ctx context.Context, objectKey types.NamespacedName, obj client.Object, opts ...client.GetOption) error { + if m.getCalledCount < m.getMissCount { + m.getMissCount-- + return k8serrors.NewNotFound(v1.Resource("pod"), "name") } - f.deleteCalled = true + + m.getCalledCount++ return nil } -func TestWriteThroughCachingWriter_Create(t *testing.T) { +func TestFlyteK8sClient(t *testing.T) { ctx := context.TODO() - c := &singleInvokeClient{} - w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope()) - assert.NoError(t, err) + scope := promutils.NewTestScope() - p := &v1.Pod{ + pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", Name: "name", @@ -91,39 +96,73 @@ func TestWriteThroughCachingWriter_Create(t *testing.T) { }, } - err = w.Create(ctx, p) - assert.NoError(t, err) + objectKey := types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } - assert.True(t, c.createCalled) + // test cache reader + tests := []struct { + name string + initCacheReader bool + cacheMissCount int + expectedClientGetCount int + }{ + {"no-cache", false, 0, 2}, + {"with-cache-one-miss", true, 1, 1}, + {"with-cache-no-misses", true, 0, 0}, + } - err = w.Create(ctx, p) - assert.NoError(t, err) -} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var cacheReader client.Reader + if tt.initCacheReader { + cacheReader = &mockKubeClient{ + getMissCount: tt.cacheMissCount, + } + } -func TestWriteThroughCachingWriter_Delete(t *testing.T) { - ctx := context.TODO() - c := &singleInvokeClient{} - w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope()) - assert.NoError(t, err) + kubeClient := &mockKubeClient{} - p := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "name", - }, - TypeMeta: metav1.TypeMeta{ - Kind: "pod", - APIVersion: "v1", - }, - } + flyteK8sClient, err := newFlyteK8sClient(kubeClient, cacheReader, scope.NewSubScope(tt.name)) + assert.NoError(t, err) - err = w.Delete(ctx, p) - assert.NoError(t, err) + for i := 0; i < 2; i++ { + err := flyteK8sClient.Get(ctx, objectKey, pod) + assert.NoError(t, err) + } - assert.True(t, c.deleteCalled) + assert.Equal(t, tt.expectedClientGetCount, kubeClient.getCalledCount) + }) + } - err = w.Delete(ctx, p) - assert.NoError(t, err) + // test create + t.Run("create", func(t *testing.T) { + kubeClient := &mockKubeClient{} + flyteK8sClient, err := newFlyteK8sClient(kubeClient, nil, scope.NewSubScope("create")) + assert.NoError(t, err) + + for i := 0; i < 5; i++ { + err = flyteK8sClient.Create(ctx, pod) + assert.NoError(t, err) + } + + assert.Equal(t, 1, kubeClient.createCalledCount) + }) + + // test delete + t.Run("delete", func(t *testing.T) { + kubeClient := &mockKubeClient{} + flyteK8sClient, err := newFlyteK8sClient(kubeClient, nil, scope.NewSubScope("delete")) + assert.NoError(t, err) + + for i := 0; i < 5; i++ { + err = flyteK8sClient.Delete(ctx, pod) + assert.NoError(t, err) + } + + assert.Equal(t, 1, kubeClient.deleteCalledCount) + }) } func init() { diff --git a/flytepropeller/pull_request_template.md b/flytepropeller/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flytepropeller/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_ diff --git a/flytestdlib/pull_request_template.md b/flytestdlib/pull_request_template.md deleted file mode 100644 index 9cdab99b46..0000000000 --- a/flytestdlib/pull_request_template.md +++ /dev/null @@ -1,35 +0,0 @@ -## _Read then delete this section_ - -_- Make sure to use a concise title for the pull-request._ - -_- Use #patch, #minor or #major in the pull-request title to bump the corresponding version. Otherwise, the patch version -will be bumped. [More details](https://github.com/marketplace/actions/github-tag-bump)_ - -# TL;DR -_Please replace this text with a description of what this PR accomplishes._ - -## Type - - [ ] Bug Fix - - [ ] Feature - - [ ] Plugin - -## Are all requirements met? - - - [ ] Code completed - - [ ] Smoke tested - - [ ] Unit tests added - - [ ] Code documentation added - - [ ] Any pending items have an associated Issue - -## Complete description - _How did you fix the bug, make the feature etc. Link to any design docs etc_ - -## Tracking Issue -_Remove the '*fixes*' keyword if there will be multiple PRs to fix the linked issue_ - -fixes https://github.com/flyteorg/flyte/issues/ - -## Follow-up issue -_NA_ -OR -_https://github.com/flyteorg/flyte/issues/_