Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust Engine write_deltalake Schema #2200

Closed
tastyfrankfurt opened this issue Feb 21, 2024 · 4 comments
Closed

Rust Engine write_deltalake Schema #2200

tastyfrankfurt opened this issue Feb 21, 2024 · 4 comments
Labels
bug Something isn't working

Comments

@tastyfrankfurt
Copy link

tastyfrankfurt commented Feb 21, 2024

Environment

Ubuntu 22.04, python3, reading events from Azure eventhubs and writing out to Azure blob storage.
Delta-rs version:
pip list returns version 0.15.3 for delta lake

Binding:

Environment:

  • loud provider: Azure
  • OS: Ubuntu
  • Other: python

Bug

What happened:
When using schema option on the write_deltalake function with the rust engine it fails to write out to delta lake on azure storage.

If i change to pyarrow engine then it works as expected.

What you expected to happen:
When using schema option on the write_deltalake function with the rust engine it writes out to delta lake on azure storage.

How to reproduce it:
Use the code sample as stated below,

Apologies for my code snippet if there are any errors.

import pyarrow as pa
from azure.storage.blob import ContainerClient
from deltalake import write_deltalake,WriterProperties,DeltaTable,Schema,Field
from deltalake.schema import PrimitiveType

events = [{"appname":"AnthraX","geoip":{"long": 123,"lat":456},"facility":"ftp","hostname":"we.eus","message":"You\'re not gonna believe what just happened","msgid":"ID675","procid":5599,"severity":"notice","timestamp":"2024-02-19T12:01:40.777Z","version":1},
{"appname":"AnthraX","geoip":{"long": 123,"lat":456},"facility":"ntp","hostname":"random.rent","message":"You\'re not gonna believe what just happened","msgid":"ID898","procid":1335,"severity":"notice","timestamp":"2024-02-19T12:01:41.777Z","version":2},
{"appname":"shaneIxD","geoip":{"long": 123,"lat":456},"facility":"authpriv","hostname":"we.frogans","message":"There\'s a breach in the warp core, captain","msgid":"ID356","procid":3449,"severity":"warning","timestamp":"2024-02-19T12:01:42.778Z","version":2},
{"appname":"jesseddy","geoip":{"long": 123,"lat":456},"facility":"user","hostname":"for.is","message":"We\'re gonna need a bigger boat","msgid":"ID516","procid":445,"severity":"notice","timestamp":"2024-02-19T12:01:43.777Z","version":2},
{"appname":"shaneIxD","geoip":{"long": 123,"lat":456},"facility":"audit","hostname":"make.dad","message":"Great Scott! We\'re never gonna reach 88 mph with the flux capacitor in its current state!","msgid":"ID304","procid":2692,"severity":"err","timestamp":"2024-02-19T12:01:44.777Z","version":2},
{"appname":"b0rnc0nfused","geoip":{"long": 123,"lat":456},"facility":"local1","hostname":"up.realtor","message":"Maybe we just shouldn\'t use computers","msgid":"ID952","procid":8756,"severity":"emerg","timestamp":"2024-02-19T12:01:45.777Z","version":2},
{"appname":"KarimMove","geoip":{"long": 123,"lat":456},"facility":"local4","hostname":"some.store","message":"Maybe we just shouldn\'t use computers","msgid":"ID415","procid":9398,"severity":"crit","timestamp":"2024-02-19T12:01:46.778Z","version":2},
{"appname":"ahmadajmi","geoip":{"long": 123,"lat":456},"facility":"local1","hostname":"random.tushu","message":"Pretty pretty pretty good","msgid":"ID127","procid":8555,"severity":"info","timestamp":"2024-02-19T12:01:47.777Z","version":1},
{"appname":"b0rnc0nfused","geoip":{"long": 123,"lat":456},"facility":"alert","hostname":"names.tips","message":"There\'s a breach in the warp core, captain","msgid":"ID129","procid":5473,"severity":"debug","timestamp":"2024-02-19T12:01:48.777Z","version":2}
]
schema_json = '''{
    "type": "struct",
    "fields": [
            {"name":"timestamp","type":"datetime","nullable":false,"metadata":{}},
            {"name":"appname","type":"string","nullable":true,"metadata": {}},
            {"name":"facility","type":"string","nullable":true,"metadata": {}},
            {"name":"hostname","type":"string","nullable":true,"metadata": {}},
            {"name":"message","type":"string","nullable":true,"metadata": {}},
            {"name":"procid","type":"string","nullable":true,"metadata": {}},
            {"name":"msgid","type":"string","nullable":true,"metadata": {}},
            {"name":"severity","type":"string","nullable":true,"metadata": {}}
        ]
    }'''
schema = Schema.from_json(schema_json )
data = pa.Table.from_pylist(events)
delta_path = 'abfss://container@account_name.dfs.core.windows.net/tablepath
delta_partition = ["severity"]
deltalake_writeprops = WriterProperties(compression='LZ4')
delta_storage_options = {
            "AZURE_STORAGE_ACCOUNT_NAME": mystorageaccountname,
            "AZURE_STORAGE_ACCOUNT_KEY": mysupersecretkey
        }
write_deltalake(
            delta_path,
            data,
            partition_by=delta_partition,
            storage_options=delta_storage_options,
            writer_properties=deltalake_writeprops,
            schema=schema,
            overwrite_schema=True,
            engine="rust",
            mode="append")

More details:
BACKTRACEFULL.txt

@tastyfrankfurt tastyfrankfurt added the bug Something isn't working label Feb 21, 2024
@tastyfrankfurt
Copy link
Author

Okay i dont think this is a bug now i have unpicked it further, the timestamp field type is written to delta as an EPOCH timestamp, as such fields of type timestamp expect the time stamp field to be an integer and by default in microseconds. I have updated my code to handle this scenario by setting any pyarrow timestamp fields to seconds using the below logic on initialisation,

for x, i in enumerate(pa_schema):
    if pa.timestamp('us').equals(i.type):
        #pa_schema.get_field_index('timestamp')
        pa_schema = pa_schema.remove(x)
        pa_schema = pa_schema.insert(x,pa.field(i.name,pa.timestamp(delta_timestamp, tz=delta_tz),nullable=i.nullable))

Then during even processing changing the timestamp field to posix time using datetime.timestamp() with below code logic

        for i in pa_schema:
            if pa.timestamp(delta_timestamp).equals(i.type):
                e[i.name] = parse(e[i.name]).timestamp()
        eventlist.append(e)

@ion-elgreco
Copy link
Collaborator

@tastyfrankfurt I do believe we are missing a case here, you passed a schema but we are not using it when writing with Rust engine, I'll try to address this

@tastyfrankfurt
Copy link
Author

No worries, i guess the issue now for us if i have moved some data processing back into the python code base which i didn't want to do for performance reasons, anyway, i look forward to seeing what you can find.

@ion-elgreco ion-elgreco self-assigned this Feb 24, 2024
@ion-elgreco
Copy link
Collaborator

Closing this since passing a schema is not really needed. Can always cast beforehand

@ion-elgreco ion-elgreco closed this as not planned Won't fix, can't repro, duplicate, stale Dec 7, 2024
@ion-elgreco ion-elgreco removed their assignment Dec 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants