Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] get different total rows answer for one dataset #37840

Closed
litao3rd opened this issue Sep 23, 2023 · 15 comments
Closed

[C++] get different total rows answer for one dataset #37840

litao3rd opened this issue Sep 23, 2023 · 15 comments

Comments

@litao3rd
Copy link

litao3rd commented Sep 23, 2023

Describe the bug, including details regarding any error messages, version, and platform.

I'm learning to use arrows with the C++ language. It's possible that this issue isn't a bug but rather a result of incorrect practices, but I'm not certain.

The code below utilizes the "tlc-trip-record-data" dataset, which consists of 264 parquet files that I've downloaded. My objective is to calculate the total number of rows in the dataset. However, as demonstrated below, I've encountered varying results when working with this dataset. Your assistance would be greatly appreciated.

#include <iostream>

#include <arrow/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_parquet.h>

namespace ds = arrow::dataset;
namespace fs = arrow::fs;
namespace cp = arrow::compute;

int main(int argc, char **argv)
{
    auto filesystem = std::make_shared<fs::LocalFileSystem>();
    auto format = std::make_shared<ds::ParquetFileFormat>();
    const std::string base_dir = "/path/to/dataset/tlc-trip-record-data";

    arrow::Status status;
    fs::FileSelector selector;
    selector.base_dir = base_dir;
    selector.recursive = true;
    auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())
                    .ValueOrDie();
    auto dataset = factory->Finish().ValueOrDie();
    auto sb = dataset->NewScan().ValueOrDie();
    sb->UseThreads(false);
    auto scanner = sb->Finish().ValueOrDie();
    {
        // In this block I got total 1526807659 rows
        std::cout << "total count rows() = " << scanner->CountRows().ValueOrDie() << "\n";
    }
    {
        // In this block I got total 57410540 rows
        int64_t total_rows = 0;
        status = scanner->Scan([&](ds::TaggedRecordBatch batch) -> arrow::Status {
            total_rows += batch.record_batch->num_rows();
            return arrow::Status::OK();
        });
        std::cout << "total count rows in visitor mode = " << total_rows << "\n";
    }
    return 0;
}

Output:

total count rows() = 1526807659
total count rows in visitor mode = 57410540

Thanks for any reply.

Component(s)

C++

@mapleFU
Copy link
Member

mapleFU commented Sep 23, 2023

@litao3rd can you give a minimal case for this(like a file or few files that can reproduce this case), and provide the version of arrow you're using?

@litao3rd
Copy link
Author

litao3rd commented Sep 23, 2023

Sorry for my mistake.

I am currently using version 12.0.1 of Arrow. (Maybe I should update the library to latest version 13.0.0?)

The original data is downloaded from this website. To simplify the process, I have created a small script for downloading a few files that can reproduce this issue. The script will create a directory called "tlc-trip-record-data" in the current directory and download the necessary data into this directory. Please note that we are behind the Great Firewall, so you may need to execute this script with an optional proxy using the following command: sh ./download-tcl-trip-record-data.sh --proxy protocol://host:port.

I have encountered a perplexing issue. When I use 6 months of data, the first block returns a total of 142,516,648 rows, while the other block returns 0 rows. However, when I use 5 months of data, excluding January, both blocks yield the same result. Unfortunately, I am unable to identify the bug in Arrow due to its complexity.

Please note that you need to modify the path to tlc-trip-record-data directory in cpp codes.

#!/bin/bash

set -e

dataset="tlc-trip-record-data"

test -d $dataset || mkdir $dataset

pushd $dataset

for n in $(seq 1 6); do
    d="2023-0$n"
    echo "[*] downloading data for $d"
    curl $@ -LO "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_$d.parquet"
    curl $@ -LO "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_$d.parquet"
    curl $@ -LO "https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_$d.parquet"
    curl $@ -LO "https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_$d.parquet"
    echo "[*] finished download data for $d"
done

popd

@mapleFU
Copy link
Member

mapleFU commented Sep 25, 2023

Oops, let me have a try with master, I've check some CountRows and Scan code in master but at first glance they should works well.

I've test that the result status of Scan is failed like above:

Result<const Kernel*> CastFunction::DispatchExact(
    const std::vector<TypeHolder>& types) const {
  RETURN_NOT_OK(CheckArity(types.size()));

  std::vector<const ScalarKernel*> candidate_kernels;
  for (const auto& kernel : kernels_) {
    if (kernel.signature->MatchesInputs(types)) {
      candidate_kernels.push_back(&kernel);
    }
  }

  if (candidate_kernels.size() == 0) {
    return Status::NotImplemented("Unsupported cast from ", types[0].type->ToString(),
                                  " to ", ToTypeName(out_type_id_), " using function ",
                                  this->name());
  }

Because field_type is null, and column is int64.

@mapleFU
Copy link
Member

mapleFU commented Sep 25, 2023

The month1 schema is:

{
  "Version": "1.0",
  "CreatedBy": "parquet-cpp-arrow version 8.0.0",
  "TotalRows": "1114320",
  "NumberOfRowGroups": "1",
  "NumberOfRealColumns": "7",
  "NumberOfColumns": "7",
  "Columns": [
     { "Id": "0", "Name": "dispatching_base_num", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "UTF8", "LogicalType": {"Type": "String"} },
     { "Id": "1", "Name": "pickup_datetime", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "is_from_converted_type": false, "force_set_converted_type": false} },
     { "Id": "2", "Name": "dropOff_datetime", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "is_from_converted_type": false, "force_set_converted_type": false} },
     { "Id": "3", "Name": "PUlocationID", "PhysicalType": "DOUBLE", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} },
     { "Id": "4", "Name": "DOlocationID", "PhysicalType": "DOUBLE", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} },
     { "Id": "5", "Name": "SR_Flag", "PhysicalType": "INT32", "ConvertedType": "NONE", "LogicalType": {"Type": "Null"} },
     { "Id": "6", "Name": "Affiliated_base_number", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "UTF8", "LogicalType": {"Type": "String"} }
  ],

The month6 schema is:

{
  "Version": "2.6",
  "CreatedBy": "parquet-cpp-arrow version 12.0.0",
  "TotalRows": "1219445",
  "NumberOfRowGroups": "1",
  "NumberOfRealColumns": "7",
  "NumberOfColumns": "7",
  "Columns": [
     { "Id": "0", "Name": "dispatching_base_num", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "UTF8", "LogicalType": {"Type": "String"} },
     { "Id": "1", "Name": "pickup_datetime", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "is_from_converted_type": false, "force_set_converted_type": false} },
     { "Id": "2", "Name": "dropOff_datetime", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "is_from_converted_type": false, "force_set_converted_type": false} },
     { "Id": "3", "Name": "PUlocationID", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} },
     { "Id": "4", "Name": "DOlocationID", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} },
     { "Id": "5", "Name": "SR_Flag", "PhysicalType": "INT64", "ConvertedType": "NONE", "LogicalType": {"Type": "None"} },
     { "Id": "6", "Name": "Affiliated_base_number", "PhysicalType": "BYTE_ARRAY", "ConvertedType": "UTF8", "LogicalType": {"Type": "String"} }
  ],

I think the problem raises from the schema mismatch

@mapleFU
Copy link
Member

mapleFU commented Sep 25, 2023

  ds::FinishOptions finishOptions;
  finishOptions.inspect_options.fragments = 30;
  auto dataset = factory->Finish(finishOptions).ValueOrDie();

I think the result from ScannerBuilder::Finish has a Inspect, by default, inspect_options.fragments == 1, so it will only inspect one file, and regard its schema as the final schema. We can enlarge inspect_options.fragments to collect more schemas

However, the most suitable ways is to set the schema in FinishOptions:

  ::arrow::SchemaBuilder builder;
  builder.AddField(::arrow::field("dispatching_base_num", ::arrow::large_utf8()));
  builder.AddField(::arrow::field("pickup_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  builder.AddField(::arrow::field("dropoff_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  builder.AddField(::arrow::field("PULocationID", ::arrow::int32()));
  builder.AddField(::arrow::field("DOLocationID", ::arrow::int32()));
  builder.AddField(::arrow::field("SR_Flag", ::arrow::int32()));
  builder.AddField(::arrow::field("dispatching_base_number", ::arrow::large_utf8()));
  finishOptions.schema = builder.Finish().ValueOrDie();
  auto dataset = factory->Finish(finishOptions).ValueOrDie();

Explicit set a schema is better in this case.

@litao3rd
Copy link
Author

  ds::FinishOptions finishOptions;
  finishOptions.inspect_options.fragments = 30;
  auto dataset = factory->Finish(finishOptions).ValueOrDie();

I think the result from ScannerBuilder::Finish has a Inspect, by default, inspect_options.fragments == 1, so it will only inspect one file, and regard its schema as the final schema. We can enlarge inspect_options.fragments to collect more schemas

However, the most suitable ways is to set the schema in FinishOptions:

  ::arrow::SchemaBuilder builder;
  builder.AddField(::arrow::field("dispatching_base_num", ::arrow::large_utf8()));
  builder.AddField(::arrow::field("pickup_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  builder.AddField(::arrow::field("dropoff_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  builder.AddField(::arrow::field("PULocationID", ::arrow::int32()));
  builder.AddField(::arrow::field("DOLocationID", ::arrow::int32()));
  builder.AddField(::arrow::field("SR_Flag", ::arrow::int32()));
  builder.AddField(::arrow::field("dispatching_base_number", ::arrow::large_utf8()));
  finishOptions.schema = builder.Finish().ValueOrDie();
  auto dataset = factory->Finish(finishOptions).ValueOrDie();

Explicit set a schema is better in this case.

It's truly remarkable! The method you demonstrate is certainly helpful for someone like me who is not very familier with Arrow. I sincerely appreciate your all kindful replies.

@litao3rd
Copy link
Author

Oops, let me have a try with master, I've check some CountRows and Scan code in master but at first glance they should works well.

I've test that the result status of Scan is failed like above:

Result<const Kernel*> CastFunction::DispatchExact(
    const std::vector<TypeHolder>& types) const {
  RETURN_NOT_OK(CheckArity(types.size()));

  std::vector<const ScalarKernel*> candidate_kernels;
  for (const auto& kernel : kernels_) {
    if (kernel.signature->MatchesInputs(types)) {
      candidate_kernels.push_back(&kernel);
    }
  }

  if (candidate_kernels.size() == 0) {
    return Status::NotImplemented("Unsupported cast from ", types[0].type->ToString(),
                                  " to ", ToTypeName(out_type_id_), " using function ",
                                  this->name());
  }

Because field_type is null, and column is int64.

I am not very familier with C++ so that I cound not understand this code snippet well. Could you kindly provide a complete snippet to illustrate how this code works? This would greatly assist me in grasping its workings.

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

  arrow::Status status;
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  ds::FinishOptions finishOptions;
  ::arrow::SchemaBuilder builder;
  auto s = builder.AddField(::arrow::field("dispatching_base_num", ::arrow::large_utf8()));
  if (!s.ok()) {
    s.Abort();
  }
  s =builder.AddField(::arrow::field("pickup_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  s =builder.AddField(::arrow::field("dropoff_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  s =builder.AddField(::arrow::field("PULocationID", ::arrow::int32()));
  s =builder.AddField(::arrow::field("DOLocationID", ::arrow::int32()));
  s =builder.AddField(::arrow::field("SR_Flag", ::arrow::int32()));
  s =builder.AddField(::arrow::field("dispatching_base_number", ::arrow::large_utf8()));
  if (!s.ok()) {
    s.Abort();
  }
  finishOptions.schema = builder.Finish().ValueOrDie();
  auto dataset = factory->Finish(finishOptions).ValueOrDie();
  auto sb = dataset->NewScan().ValueOrDie();
  s = sb->UseThreads(false);
  if (!s.ok()) {
    s.Abort();
  }
  auto scanner = sb->Finish().ValueOrDie();
  {
    // In this block I got total 57410540 rows
    int64_t total_rows = 0;
    if (scanner->options()->dataset_schema != nullptr) {
      std::cout << "Schema: " << scanner->options()->dataset_schema->ToString() << std::endl;
    } else {
      std::cout << "Schema: " << "NONE" << std::endl;
    }
    status = scanner->Scan([&](ds::TaggedRecordBatch batch) -> arrow::Status {
      total_rows += batch.record_batch->num_rows();
      return arrow::Status::OK();
    });
    std::cout << "total count rows in visitor mode = " << total_rows << ", result:" << status.ToString() << "\n";
  }
  {
    // In this block I got total 1526807659 rows
    std::cout << "total count rows() = " << scanner->CountRows().ValueOrDie() << "\n";
  }
  return 0;

remember to check the status after computing

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

@bkietz I found a problem, when UnifySchemas called, it calles SchemaBuilder::AddField, when schema has same name, it has conflict solving options, when using CONFLICT_MERGE, it will call Field::MergeWith:

Result<std::shared_ptr<Field>> Field::MergeWith(const Field& other,
                                                MergeOptions options) const {
  if (name() != other.name()) {
    return Status::Invalid("Field ", name(), " doesn't have the same name as ",
                           other.name());
  }

  if (Equals(other, /*check_metadata=*/false)) {
    return Copy();
  }

  if (options.promote_nullability) {
    if (type()->Equals(other.type())) {
      return Copy()->WithNullable(nullable() || other.nullable());
    }
    std::shared_ptr<Field> promoted = MaybePromoteNullTypes(*this, other);
    if (promoted) return promoted;
  }

  return Status::Invalid("Unable to merge: Field ", name(),
                         " has incompatible types: ", type()->ToString(), " vs ",
                         other.type()->ToString());
}

In this case, if we don't set explicit schema, and deduce by parquets, it will says string and large_string is not compatible and failed. Would this better having a Common type here? Or this case is expected ?

@litao3rd
Copy link
Author

  arrow::Status status;
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  ds::FinishOptions finishOptions;
  ::arrow::SchemaBuilder builder;
  auto s = builder.AddField(::arrow::field("dispatching_base_num", ::arrow::large_utf8()));
  if (!s.ok()) {
    s.Abort();
  }
  s =builder.AddField(::arrow::field("pickup_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  s =builder.AddField(::arrow::field("dropoff_datetime", ::arrow::timestamp(::arrow::TimeUnit::SECOND)));
  s =builder.AddField(::arrow::field("PULocationID", ::arrow::int32()));
  s =builder.AddField(::arrow::field("DOLocationID", ::arrow::int32()));
  s =builder.AddField(::arrow::field("SR_Flag", ::arrow::int32()));
  s =builder.AddField(::arrow::field("dispatching_base_number", ::arrow::large_utf8()));
  if (!s.ok()) {
    s.Abort();
  }
  finishOptions.schema = builder.Finish().ValueOrDie();
  auto dataset = factory->Finish(finishOptions).ValueOrDie();
  auto sb = dataset->NewScan().ValueOrDie();
  s = sb->UseThreads(false);
  if (!s.ok()) {
    s.Abort();
  }
  auto scanner = sb->Finish().ValueOrDie();
  {
    // In this block I got total 57410540 rows
    int64_t total_rows = 0;
    if (scanner->options()->dataset_schema != nullptr) {
      std::cout << "Schema: " << scanner->options()->dataset_schema->ToString() << std::endl;
    } else {
      std::cout << "Schema: " << "NONE" << std::endl;
    }
    status = scanner->Scan([&](ds::TaggedRecordBatch batch) -> arrow::Status {
      total_rows += batch.record_batch->num_rows();
      return arrow::Status::OK();
    });
    std::cout << "total count rows in visitor mode = " << total_rows << ", result:" << status.ToString() << "\n";
  }
  {
    // In this block I got total 1526807659 rows
    std::cout << "total count rows() = " << scanner->CountRows().ValueOrDie() << "\n";
  }
  return 0;

remember to check the status after computing

Greate! Thanks for your reply. It's really helpful for me.

@litao3rd
Copy link
Author

@bkietz I found a problem, when UnifySchemas called, it calles SchemaBuilder::AddField, when schema has same name, it has conflict solving options, when using CONFLICT_MERGE, it will call Field::MergeWith:

Result<std::shared_ptr<Field>> Field::MergeWith(const Field& other,
                                                MergeOptions options) const {
  if (name() != other.name()) {
    return Status::Invalid("Field ", name(), " doesn't have the same name as ",
                           other.name());
  }

  if (Equals(other, /*check_metadata=*/false)) {
    return Copy();
  }

  if (options.promote_nullability) {
    if (type()->Equals(other.type())) {
      return Copy()->WithNullable(nullable() || other.nullable());
    }
    std::shared_ptr<Field> promoted = MaybePromoteNullTypes(*this, other);
    if (promoted) return promoted;
  }

  return Status::Invalid("Unable to merge: Field ", name(),
                         " has incompatible types: ", type()->ToString(), " vs ",
                         other.type()->ToString());
}

In this case, if we don't set explicit schema, and deduce by parquets, it will says string and large_string is not compatible and failed. Would this better having a Common type here? Or this case is expected ?

I think the current approach is reasonable. However, having a common type as an option would also be good, but I don't want this common type to be set as the default. This case is expected for me.

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

I think the current approach is reasonable. However, having a common type as an option would also be good, but I don't want this common type to be set as the default. This case is expected for me.

Yeah, I just raise a problem I found to the community, you'd better have self-defined meta or self-defined schema in this case.

However, I think the Dataset should be able to unify the schema

@litao3rd
Copy link
Author

litao3rd commented Sep 26, 2023

Yeah, I just raise a problem I found to the community, you'd better have self-defined meta or self-defined schema in this case.

However, I think the Dataset should be able to unify the schema

Yep. It's a good problem for the community. Hope you find a greate way to solve the problem. Very appreciate for your friendly responses. It's an awesome experience for me.

@bkietz
Copy link
Member

bkietz commented Sep 27, 2023

In this case, if we don't set explicit schema, and deduce by parquets, it will says string and large_string is not compatible and failed. Would this better having a Common type here? Or this case is expected?

This case is expected. UnifySchemas is designed to only merge fields which are compatible with zero conversion overhead across any reader. This includes (for example) promotion from a required field to a nullable field, since we can always read a column guaranteed to have no nulls as a column which might have nulls. It doesn't include promotion from utf8 to large_utf8 because although a parquet column can be read into a large_utf8 column with no more steps than it could be read into a utf8 column, when reading arrow IPC we'd need to introduce a conversion step.

Supporting extended promotions such as utf8->large_utf8 and integer widening would need to operate at the level of MakeScanNode(), so that project nodes could be inserted as necessary when an underlying reader doesn't support pushing down the altered field.

@litao3rd
Copy link
Author

litao3rd commented Oct 7, 2023

The original problem has been resolved. Close this issue.

@litao3rd litao3rd closed this as completed Oct 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants