diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index a3a6cf5..2abbefd 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -41,6 +41,7 @@ set(ICEBERG_SOURCES statistics_file.cc table.cc table_metadata.cc + table_scan.cc transform.cc transform_function.cc type.cc diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 0b1355a..ba037bb 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -290,7 +290,7 @@ struct ICEBERG_EXPORT ManifestEntry { std::optional file_sequence_number; /// Field id: 2 /// File path, partition tuple, metrics, ... - DataFile data_file; + std::shared_ptr data_file; inline static const SchemaField kStatus = SchemaField::MakeRequired(0, "status", std::make_shared()); diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 6b81eb9..4f098d1 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -34,6 +34,7 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ICEBERG_EXPORT ManifestReader { public: + virtual ~ManifestReader() = default; virtual Result>> Entries() const = 0; private: @@ -43,10 +44,27 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: + virtual ~ManifestListReader() = default; virtual Result>> Files() const = 0; private: std::unique_ptr reader_; }; +/// \brief Creates a reader for the manifest list. +/// \param file_path Path to the manifest list file. +/// \return A Result containing the reader or an error. +Result> CreateManifestListReader( + const std::string_view& file_path) { + return NotImplemented("CreateManifestListReader is not implemented yet."); +} + +/// \brief Creates a reader for a manifest file. +/// \param file_path Path to the manifest file. +/// \return A Result containing the reader or an error. +Result> CreateManifestReader( + const std::string_view& file_path) { + return NotImplemented("CreateManifestReader is not implemented yet."); +} + } // namespace iceberg diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79f378..a71e218 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -23,6 +23,7 @@ #include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" namespace iceberg { @@ -107,4 +108,8 @@ const std::vector& Table::history() const { const std::shared_ptr& Table::io() const { return io_; } +std::unique_ptr Table::NewScan() const { + return std::make_unique(metadata_, io_); +} + } // namespace iceberg diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 9db02b4..9a89057 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -104,6 +104,12 @@ class ICEBERG_EXPORT Table { /// \return a vector of history entries const std::vector& history() const; + /// \brief Create a new table scan builder for this table + /// + /// Once a table scan builder is created, it can be refined to project columns and + /// filter data. + virtual std::unique_ptr NewScan() const; + /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc new file mode 100644 index 0000000..23d6d40 --- /dev/null +++ b/src/iceberg/table_scan.cc @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/table_scan.h" + +#include +#include + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { +/// \brief Use indexed data structures for efficient lookups +class DeleteFileIndex { + public: + /// \brief Build the index from a list of manifest entries. + explicit DeleteFileIndex(const std::vector>& entries) { + for (const auto& entry : entries) { + const int64_t seq_num = + entry->sequence_number.value_or(TableMetadata::kInitialSequenceNumber); + sequence_index.emplace(seq_num, entry.get()); + } + } + + /// \brief Find delete files that match the sequence number of a data entry. + std::vector FindRelevantEntries(const ManifestEntry& data_entry) const { + std::vector relevant_deletes; + + // Use lower_bound for efficient range search + auto data_sequence_number = + data_entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber); + for (auto it = sequence_index.lower_bound(data_sequence_number); + it != sequence_index.end(); ++it) { + // Additional filtering logic here + relevant_deletes.push_back(it->second); + } + + return relevant_deletes; + } + + private: + /// \brief Index by sequence number for quick filtering + std::multimap sequence_index; +}; + +/// \brief Get matched delete files for a given data entry. +std::vector> GetMatchedDeletes( + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { + const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); + std::vector> matched_deletes; + if (relevant_entries.empty()) { + return matched_deletes; + } + + matched_deletes.reserve(relevant_entries.size()); + for (const auto& delete_entry : relevant_entries) { + // TODO(gty404): check if the delete entry contains the data entry's file path + matched_deletes.emplace_back(delete_entry->data_file); + } + return matched_deletes; +} +} // namespace + +// implement FileScanTask +FileScanTask::FileScanTask(std::shared_ptr file, + std::vector> delete_files, + std::shared_ptr residual) + : data_file_(std::move(file)), + delete_files_(std::move(delete_files)), + residual_(std::move(residual)) {} + +const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } + +const std::vector>& FileScanTask::delete_files() const { + return delete_files_; +} + +int64_t FileScanTask::SizeBytes() const { + int64_t sizeInBytes = data_file_->file_size_in_bytes; + std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) { + sizeInBytes += delete_file->file_size_in_bytes; + }); + return sizeInBytes; +} + +int32_t FileScanTask::FilesCount() const { + return static_cast(delete_files_.size() + 1); +} + +int64_t FileScanTask::EstimatedRowCount() const { + if (data_file_->file_size_in_bytes == 0) { + return 0; + } + const auto sizeInBytes = data_file_->file_size_in_bytes; + const double scannedFileFraction = + static_cast(sizeInBytes) / data_file_->file_size_in_bytes; + return static_cast(scannedFileFraction * data_file_->record_count); +} + +const std::shared_ptr& FileScanTask::residual() const { return residual_; } + +TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, + std::shared_ptr file_io) + : file_io_(std::move(file_io)) { + context_.table_metadata = std::move(table_metadata); +} + +TableScanBuilder& TableScanBuilder::WithColumnNames( + std::vector column_names) { + column_names_.reserve(column_names.size()); + column_names_ = std::move(column_names); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithProjectedSchema(std::shared_ptr schema) { + context_.projected_schema = std::move(schema); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { + snapshot_id_ = snapshot_id; + return *this; +} + +TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr filter) { + context_.filter = std::move(filter); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) { + context_.case_sensitive = case_sensitive; + return *this; +} + +TableScanBuilder& TableScanBuilder::WithOption(std::string property, std::string value) { + context_.options[std::move(property)] = std::move(value); + return *this; +} + +TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { + context_.limit = limit; + return *this; +} + +Result> TableScanBuilder::Build() { + const auto& table_metadata = context_.table_metadata; + auto snapshot_id = snapshot_id_ ? snapshot_id_ : table_metadata->current_snapshot_id; + if (!snapshot_id) { + return InvalidArgument("No snapshot ID specified for table {}", + table_metadata->table_uuid); + } + auto iter = std::ranges::find_if(table_metadata->snapshots, + [&snapshot_id](const auto& snapshot) { + return snapshot->snapshot_id == *snapshot_id; + }); + if (iter == table_metadata->snapshots.end() || *iter == nullptr) { + return NotFound("Snapshot with ID {} is not found", *snapshot_id); + } + context_.snapshot = *iter; + + if (!context_.projected_schema) { + const auto& snapshot = context_.snapshot; + auto schema_id = + snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id; + if (!schema_id) { + return InvalidArgument("No schema ID found in snapshot {} for table {}", + snapshot->snapshot_id, table_metadata->table_uuid); + } + + const auto& schemas = table_metadata->schemas; + const auto it = std::ranges::find_if(schemas, [&schema_id](const auto& schema) { + return schema->schema_id() == *schema_id; + }); + if (it == schemas.end()) { + return InvalidArgument("Schema {} in snapshot {} is not found", + *snapshot->schema_id, snapshot->snapshot_id); + } + auto schema = *it; + + if (column_names_.empty()) { + context_.projected_schema = schema; + } else { + // TODO(gty404): collect touched columns from filter expression + std::vector projected_fields; + projected_fields.reserve(column_names_.size()); + for (const auto& column_name : column_names_) { + // TODO(gty404): support case-insensitive column names + auto field_opt = schema->GetFieldByName(column_name); + if (!field_opt) { + return InvalidArgument("Column {} not found in schema", column_name); + } + projected_fields.emplace_back(field_opt.value().get()); + } + context_.projected_schema = + std::make_shared(std::move(projected_fields), schema->schema_id()); + } + } + + return std::make_unique(std::move(context_), file_io_); +} + +TableScan::TableScan(TableScanContext context, std::shared_ptr file_io) + : context_(std::move(context)), file_io_(std::move(file_io)) {} + +const std::shared_ptr& TableScan::snapshot() const { return context_.snapshot; } + +const std::shared_ptr& TableScan::projection() const { + return context_.projected_schema; +} + +const TableScanContext& TableScan::context() const { return context_; } + +const std::shared_ptr& TableScan::io() const { return file_io_; } + +DataScan::DataScan(TableScanContext context, std::shared_ptr file_io) + : TableScan(std::move(context), std::move(file_io)) {} + +Result>> DataScan::PlanFiles() const { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, + CreateManifestListReader(context_.snapshot->manifest_list)); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); + + std::vector> data_entries; + std::vector> positional_delete_entries; + for (const auto& manifest_file : manifest_files) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, + CreateManifestReader(manifest_file->manifest_path)); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); + + // TODO(gty404): filter manifests using partition spec and filter expression + + for (auto& manifest_entry : manifests) { + const auto& data_file = manifest_entry->data_file; + switch (data_file->content) { + case DataFile::Content::kData: + data_entries.push_back(std::move(manifest_entry)); + break; + case DataFile::Content::kPositionDeletes: + // TODO(gty404): check if the sequence number is greater than or equal to the + // minimum sequence number of all manifest entries + positional_delete_entries.push_back(std::move(manifest_entry)); + break; + case DataFile::Content::kEqualityDeletes: + return NotSupported("Equality deletes are not supported in data scan"); + } + } + } + + // TODO(gty404): build residual expression from filter + std::shared_ptr residual; + std::vector> tasks; + DeleteFileIndex delete_file_index(positional_delete_entries); + for (const auto& data_entry : data_entries) { + auto matched_deletes = GetMatchedDeletes(*data_entry, delete_file_index); + const auto& data_file = data_entry->data_file; + tasks.emplace_back(std::make_shared( + data_file, std::move(matched_deletes), std::move(residual))); + } + return tasks; +} + +} // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h new file mode 100644 index 0000000..e4a38d9 --- /dev/null +++ b/src/iceberg/table_scan.h @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/manifest_entry.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief An abstract scan task. +class ICEBERG_EXPORT ScanTask { + public: + virtual ~ScanTask() = default; + + /// \brief The number of bytes that should be read by this scan task. + virtual int64_t SizeBytes() const = 0; + + /// \brief The number of files that should be read by this scan task. + virtual int32_t FilesCount() const = 0; + + /// \brief The number of rows that should be read by this scan task. + virtual int64_t EstimatedRowCount() const = 0; +}; + +/// \brief Task representing a data file and its corresponding delete files. +class ICEBERG_EXPORT FileScanTask : public ScanTask { + public: + FileScanTask(std::shared_ptr file, + std::vector> delete_files, + std::shared_ptr residual); + + /// \brief The data file that should be read by this scan task. + const std::shared_ptr& data_file() const; + + /// \brief The delete files that should be read by this scan task. + const std::vector>& delete_files() const; + + /// \brief The residual expression to apply after scanning the data file. + const std::shared_ptr& residual() const; + + int64_t SizeBytes() const override; + int32_t FilesCount() const override; + int64_t EstimatedRowCount() const override; + + private: + /// \brief Data file metadata. + std::shared_ptr data_file_; + /// \brief Delete files metadata. + std::vector> delete_files_; + /// \brief Residual expression to apply. + std::shared_ptr residual_; +}; + +/// \brief Scan context holding snapshot and scan-specific metadata. +struct TableScanContext { + /// \brief Table metadata. + std::shared_ptr table_metadata; + /// \brief Snapshot to scan. + std::shared_ptr snapshot; + /// \brief Projected schema. + std::shared_ptr projected_schema; + /// \brief Filter expression to apply. + std::shared_ptr filter; + /// \brief Whether the scan is case-sensitive. + bool case_sensitive = false; + /// \brief Additional options for the scan. + std::unordered_map options; + /// \brief Optional limit on the number of rows to scan. + std::optional limit; +}; + +/// \brief Builder class for creating TableScan instances. +class ICEBERG_EXPORT TableScanBuilder { + public: + /// \brief Constructs a TableScanBuilder for the given table. + /// \param table_metadata The metadata of the table to scan. + /// \param file_io The FileIO instance for reading manifests and data files. + explicit TableScanBuilder(std::shared_ptr table_metadata, + std::shared_ptr file_io); + + /// \brief Sets the snapshot ID to scan. + /// \param snapshot_id The ID of the snapshot. + /// \return Reference to the builder. + TableScanBuilder& WithSnapshotId(int64_t snapshot_id); + + /// \brief Selects columns to include in the scan. + /// \param column_names A list of column names. If empty, all columns will be selected. + /// \return Reference to the builder. + TableScanBuilder& WithColumnNames(std::vector column_names); + + /// \brief Sets the schema to use for the scan. + /// \param schema The schema to use. + /// \return Reference to the builder. + TableScanBuilder& WithProjectedSchema(std::shared_ptr schema); + + /// \brief Applies a filter expression to the scan. + /// \param filter Filter expression to use. + /// \return Reference to the builder. + TableScanBuilder& WithFilter(std::shared_ptr filter); + + /// \brief Sets whether the scan should be case-sensitive. + /// \param case_sensitive Whether the scan is case-sensitive. + /// /return Reference to the builder. + TableScanBuilder& WithCaseSensitive(bool case_sensitive); + + /// \brief Sets an option for the scan. + /// \param property The name of the option. + /// \param value The value of the option. + /// \return Reference to the builder. + TableScanBuilder& WithOption(std::string property, std::string value); + + /// \brief Sets an optional limit on the number of rows to scan. + /// \param limit Optional limit on the number of rows. + /// \return Reference to the builder. + TableScanBuilder& WithLimit(std::optional limit); + + /// \brief Builds and returns a TableScan instance. + /// \return A Result containing the TableScan or an error. + Result> Build(); + + private: + /// \brief the file I/O instance for reading manifests and data files. + std::shared_ptr file_io_; + /// \brief column names to project in the scan. + std::vector column_names_; + /// \brief snapshot ID to scan, if specified. + std::optional snapshot_id_; + /// \brief Context for the scan, including snapshot, schema, and filter. + TableScanContext context_; +}; + +/// \brief Represents a configured scan operation on a table. +class ICEBERG_EXPORT TableScan { + public: + virtual ~TableScan() = default; + + /// \brief Constructs a TableScan with the given context and file I/O. + /// \param context Scan context including snapshot, schema, and filter. + /// \param file_io File I/O instance for reading manifests and data files. + TableScan(TableScanContext context, std::shared_ptr file_io); + + /// \brief Returns the snapshot being scanned. + /// \return A shared pointer to the snapshot. + const std::shared_ptr& snapshot() const; + + /// \brief Returns the projected schema for the scan. + /// \return A shared pointer to the projected schema. + const std::shared_ptr& projection() const; + + /// \brief Returns the scan context. + /// \return A reference to the TableScanContext. + const TableScanContext& context() const; + + /// \brief Returns the file I/O instance used for reading manifests and data files. + /// \return A shared pointer to the FileIO instance. + const std::shared_ptr& io() const; + + /// \brief Plans the scan tasks by resolving manifests and data files. + /// \return A Result containing scan tasks or an error. + virtual Result>> PlanFiles() const = 0; + + protected: + /// \brief context for the scan, including snapshot, schema, and filter. + const TableScanContext context_; + /// \brief File I/O instance for reading manifests and data files. + std::shared_ptr file_io_; +}; + +/// \brief A scan that reads data files and applies delete files to filter rows. +class ICEBERG_EXPORT DataScan : public TableScan { + public: + /// \brief Constructs a DataScan with the given context and file I/O. + DataScan(TableScanContext context, std::shared_ptr file_io); + + /// \brief Plans the scan tasks by resolving manifests and data files. + /// \return A Result containing scan tasks or an error. + Result>> PlanFiles() const override; +}; + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index cc5f0a7..83574ee 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -92,6 +92,7 @@ class LocationProvider; class SortField; class SortOrder; class Table; +class FileIO; class Transaction; class Transform; class TransformFunction; @@ -113,6 +114,12 @@ class NameMapping; enum class SnapshotRefType; enum class TransformType; +class Expression; + +class FileScanTask; +class TableScan; +class TableScanBuilder; + /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- @@ -123,11 +130,13 @@ class MetadataUpdate; class UpdateRequirement; class AppendFiles; -class TableScan; struct DataFile; struct ManifestEntry; struct ManifestFile; struct ManifestList; +class ManifestReader; +class ManifestListReader; + } // namespace iceberg