Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Support multipart blob upload in Container Task #5924

Open
2 tasks done
wayner0628 opened this issue Oct 28, 2024 · 7 comments
Open
2 tasks done

[Core feature] Support multipart blob upload in Container Task #5924

wayner0628 opened this issue Oct 28, 2024 · 7 comments
Assignees
Labels
enhancement New feature or request

Comments

@wayner0628
Copy link
Contributor

wayner0628 commented Oct 28, 2024

Motivation: Why do you think this is important?

Currently sidecar (container task uploader) doesn't support nested directories in multipart blob uploads

{},"level":"warning","msg":"Currently nested directories are not supported in multipart blob uploads, for directory @ /var/outputs/out/data/g1/apftmdfsmkbgbqzfc9kx-n0-0","ts":"2024-10-28T00:23:18Z"}
380543961-0f18dfca-ee4b-4774-adb2-e72db37804b8

Since we support multipart blob downloads now, we should also support corresponding uploads to provide a complete workflow for the container task.

Goal: What should the final outcome look like, ideally?

Successful implementation should be able to run this python file

import logging
from typing import Tuple, List
import datetime
from flytekit import ContainerTask, kwtypes, workflow, task
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory


logger = logging.getLogger(__file__)


flyte_file_io = ContainerTask(
    name="flyte_file_io",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(inputs=FlyteFile),
    outputs=kwtypes(out=FlyteFile),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "write_flytefile.py",
        # "{{.inputs.inputs}}",
        "/var/inputs/inputs",
        "/var/outputs/out",
    ],
)

flyte_dir_io = ContainerTask(
    name="flyte_dir_io",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(inputs=FlyteDirectory),
    outputs=kwtypes(out=FlyteDirectory),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "write_flytedir.py",
        # "{{.inputs.inputs}}",
        "/var/inputs/inputs",
        "/var/outputs/out",
    ],
)

@task
def flyte_file_task() -> FlyteFile:
    with open("./a.txt", "w") as file:
        file.write("This is a.txt file.")
    return FlyteFile(path="./a.txt")

@workflow
def flyte_file_io_wf() -> FlyteFile:
    ff = flyte_file_task()
    return flyte_file_io(inputs=ff)

@task
def flyte_dir_write_task() -> FlyteDirectory:
    from pathlib import Path
    import flytekit
    import os

    working_dir = flytekit.current_context().working_directory
    local_dir = Path(os.path.join(working_dir, "csv_files"))
    local_dir.mkdir(exist_ok=True)
    write_file = local_dir / "a.txt"
    with open(write_file, "w") as file:
        file.write("This is for flyte dir.")

    return FlyteDirectory(path=str(local_dir))

# Not supported by sidecar currently
@task
def flyte_dir_read_task(path: FlyteDirectory) -> bool:
    from pathlib import Path
    path = Path(path)

    if not path.exists() or not path.is_dir():
        print(f"Error: {path} does not exist or is not a directory.")
        return False

    for file in path.rglob("*"):
        if file.is_file():
            print(file)
    return True

@workflow
def flyte_dir_io_wf() -> bool:
    fd = flyte_dir_write_task()
    return flyte_dir_read_task(flyte_dir_io(inputs=fd))

if __name__ == "__main__":
    print(flyte_dir_io_wf())
    print(flyte_file_io_wf())

Describe alternatives you've considered

NA

Propose: Link/Inline OR Additional context

#5715

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@wayner0628 wayner0628 added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Oct 28, 2024
Copy link

welcome bot commented Oct 28, 2024

Thank you for opening your first issue here! 🛠

@eapolinario eapolinario removed the untriaged This issues has not yet been looked at by the Maintainers label Oct 31, 2024
@wayner0628
Copy link
Contributor Author

Hi! I'm currently occupied with other tasks and likely won't be able to return to this issue soon. Please feel free to contribute.

@wayner0628 wayner0628 removed their assignment Nov 6, 2024
@peterxcli
Copy link
Contributor

#take

@doLei-2001
Copy link

Hello, I see that you have a function that I am currently investigating.

How do I pass a large number of parameters to a containertask? For example, I have 20 parameters.

I see your code shows that you are passing

"-var-input-inputs"
"-var-output-out"

Do you also use read parameters one by one in the image?

with open

@doLei-2001
Copy link

Are there any other methods?

@davidmirror-ops davidmirror-ops moved this from Backlog to Assigned in Flyte Issues/PRs maintenance Nov 21, 2024
@peterxcli
Copy link
Contributor

Hi @wayner0628, after rebasing this PR #5715, and follow your setup instruction in that PR, I failed at task 2 in flyte_dir_io_wf

image

and the logs in the pod(flyte-copilot-dowloader):

time="2024-11-27T11:42:10Z" level=error msg="[0] Couldn't find a config file []. Relying on env vars and pflags."      │
│ Error: Config File "config" Not Found in "[]"                                                                          │
│ Usage:                                                                                                                 │
│   flytedata download <opts> [flags]                                                                                    │
│                                                                                                                        │
│ Flags:                                                                                                                 │
│   -d, --download-mode string          Download mode to use. Options [[DOWNLOAD_STREAM DO_NOT_DOWNLOAD DOWNLOAD_EAGER]] │
│   -m, --format string                 What should be the output format for the primitive and structured types. Options │
│   -f, --from-remote string            The remote path/key for inputs in stow store.                                    │
│   -h, --help                          help for download                                                                │
│   -i, --input-interface bytesBase64   Input interface proto message - core.VariableMap, base64 encoced string (default │
│   -t, --timeout duration              Max time to allow for downloads to complete, default is 1H (default 1h0m0s)      │
│   -o, --to-local-dir string           The local directory on disk where data should be downloaded.                     │
│       --to-output-prefix string       The remote path/key prefix for outputs in stow store. this is mostly used to wri │
│                                                                                                                        │
│ Global Flags:                                                                                                          │
│       --add_dir_header                             If true, adds the file directory to the header                      │
│       --alsologtostderr                            log to standard error as well as files                              │
│       --as string                                  Username to impersonate for the operation                           │
│       --as-group stringArray                       Group to impersonate for the operation, this flag can be repeated t │
│       --as-uid string                              UID to impersonate for the operation                                │
│       --certificate-authority string               Path to a cert file for the certificate authority                   │
│       --client-certificate string                  Path to a client certificate file for TLS                           │
│       --client-key string                          Path to a client key file for TLS                                   │
│       --cluster string                             The name of the kubeconfig cluster to use                           │
│       --config string                              config file (default is $HOME/config.yaml)                          │
│       --context string                             The name of the kubeconfig context to use                           │
│       --disable-compression                        If true, opt-out of response compression for all requests to the se │
│       --err-output-name string                     Actual key name under the prefix where the error protobuf should be
...

Did I miss something? If you have time, please take a look. I’d appreciate your insight! Thanks!"

@wayner0628
Copy link
Contributor Author

Hi, I didn't encounter this error before. Did you edit your config file like this?

plugins:
  logs:
    dynamic-log-links:
      - comet-ml-execution-id:
          displayName: Comet
          templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }}"
      - comet-ml-custom-id:
          displayName: Comet
          templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }}"

    kubernetes-enabled: true
    kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
    cloudwatch-enabled: false
    stackdriver-enabled: false
  k8s:
    default-env-vars:
      - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
      - FLYTE_AWS_ACCESS_KEY_ID: minio
      - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
      - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
    co-pilot:
          image: "localhost:30000/copilot-flytefile:0603" # THIS LINE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Assigned
Development

No branches or pull requests

4 participants