Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explode and / or dynamic model / schema #481

Open
2 tasks
shcheklein opened this issue Sep 27, 2024 · 2 comments
Open
2 tasks

Add explode and / or dynamic model / schema #481

shcheklein opened this issue Sep 27, 2024 · 2 comments
Assignees

Comments

@shcheklein
Copy link
Member

Follow up https://github.com/iterative/dvcx/pull/1368
Based also on this discussion / feedback by @tibor-mach https://iterativeai.slack.com/archives/C04A9RWEZBN/p1727194987119179
Base also on iteration on DCLM - https://github.com/iterative/studio/issues/10596

Summary

When we have a single file (JSONL or CVS/Parquet with a column with JSONs) we need a way to "explode" those JSONs/dicts into a Pythonic model and store it in DataChain not a single column, but as multiple columns - one per each path in that JSON/dict.

E.g. this is how JSONL looks like after a naive parse:

Screenshot 2024-09-26 at 6 10 18 PM

Or from the CVS file (mind the meta column):

image (11)

There is an obvious way to mitigate this - create a Model class and populate it from in the UDF that. But that's seems very annoying and redundant - model description becomes 2x/3x code of the parser.

Suggestions

  • DataChain.explode(C("meta")). This one is more or less obvious and requires creating an extra table.
  • Make functions like map, gen dynamically figure out schema and create Pydantic model as it is parsing files. This requires more complicated implementation, but can faster since it can work in a streaming mode:

Imagine something like this:

def extract(file: File) -> Iterator[File, dict]:
    with file.open() as f:
        dctx = zstd.ZstdDecompressor()
        stream_reader = dctx.stream_reader(f)
        text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
        for line in text_stream:
            yield file, json.parse(line)


DataChain.from_dataset("index").settings(cache=True).limit(1).gen(extract).save("raw_text")
@shcheklein shcheklein self-assigned this Sep 27, 2024
@skshetry
Copy link
Member

skshetry commented Sep 27, 2024

This may be already possible with the combination of read_meta and map depending on how we want to solve this. (read_meta requires a storage_path with an example data to create a schema or a pydantic model passed).

How are we going to determine schema? Is it based on a sample of rows (which is what read_meta does, it reads a single row) or by reading all the rows?

@shcheklein
Copy link
Member Author

How are we going to determine schema? Is it based on a sample of rows (which is what read_meta does, it reads a single row) or by reading all the rows?

yes, based on a sample (like we do already in the from_parquet and friends)

This may be already possible with the combination of read_meta and map depending on how we want to solve this. (read_meta requires a storage_path with an example data to create a schema or a pydantic model passed).

yes, idea is the same but we need to wrap it into a user-friendly function and may be generalize a bit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants