diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 2f4165b8e7112..0446a008189c1 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -45,6 +45,7 @@ cargo run --example csv_sql
 - [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
 - [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
 - [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
+- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
 - [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file
 - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
 - [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file
diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs
new file mode 100644
index 0000000000000..9bf71e52c3de8
--- /dev/null
+++ b/datafusion-examples/examples/advanced_parquet_index.rs
@@ -0,0 +1,664 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use bytes::Bytes;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::parquet::{
+    ParquetAccessPlan, ParquetExecBuilder,
+};
+use datafusion::datasource::physical_plan::{
+    parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
+};
+use datafusion::datasource::TableProvider;
+use datafusion::execution::context::SessionState;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::parquet::arrow::arrow_reader::{
+    ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
+};
+use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::metadata::ParquetMetaData;
+use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
+use datafusion::parquet::schema::types::ColumnPath;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_optimizer::pruning::PruningPredicate;
+use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::*;
+use datafusion_common::{
+    internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
+};
+use datafusion_expr::utils::conjunction;
+use datafusion_expr::{TableProviderFilterPushDown, TableType};
+use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
+use futures::future::BoxFuture;
+use futures::FutureExt;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::collections::{HashMap, HashSet};
+use std::fs::File;
+use std::ops::Range;
+use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use tempfile::TempDir;
+use url::Url;
+
+/// This example demonstrates using low level DataFusion APIs to read only
+/// certain row groups and ranges from parquet files, based on external
+/// information.
+///
+/// Using these APIs, you can instruct DataFusion's parquet reader to skip
+/// ("prune") portions of files that do not contain relevant data. These APIs
+/// can be useful for doing low latency queries over a large number of Parquet
+/// files on remote storage (e.g. S3) where the cost of reading the metadata for
+/// each file is high (e.g. because it requires a network round trip to the
+/// storage service).
+///
+/// Depending on the information from the index, DataFusion can make a request
+/// to the storage service (e.g. S3)  to read only the necessary data.
+///
+/// Note that this example uses a hard coded index implementation. For a more
+/// realistic example of creating an index to prune files, see the
+/// `parquet_index.rs` example.
+///
+/// Specifically, this example illustrates how to:
+/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query
+/// 2. Use [`PruningPredicate`] for predicate analysis
+/// 3. Pass a row group selection to [`ParuetExec`]
+/// 4. Pass a row selection (within a row group) to [`ParquetExec`]
+///
+/// Note this is a *VERY* low level example for people who want to build their
+/// own custom indexes (e.g. for low latency queries). Most users should use
+/// higher level APIs for reading parquet files:
+/// [`SessionContext::read_parquet`] or [`ListingTable`], which also do file
+/// pruning based on parquet statistics (using the same underlying APIs)
+///
+/// # Diagram
+///
+/// This diagram shows how the `ParquetExec` is configured to do only a single
+/// (range) read from a parquet file, for the data that is needed. It does
+/// not read the file footer or any of the row groups that are not needed.
+///
+/// ```text
+///         ┌───────────────────────┐ The TableProvider configures the
+///         │ ┌───────────────────┐ │ ParquetExec:
+///         │ │                   │ │
+///         │ └───────────────────┘ │
+///         │ ┌───────────────────┐ │
+/// Row     │ │                   │ │  1. To read only specific Row
+/// Groups  │ └───────────────────┘ │  Groups (the ParquetExec tries
+///         │ ┌───────────────────┐ │  to reduce this further based
+///         │ │                   │ │  on metadata)
+///         │ └───────────────────┘ │              ┌────────────────────┐
+///         │ ┌───────────────────┐ │              │                    │
+///         │ │                   │◀┼ ─ ─ ┐        │    ParquetExec     │
+///         │ └───────────────────┘ │              │  (Parquet Reader)  │
+///         │          ...          │     └ ─ ─ ─ ─│                    │
+///         │ ┌───────────────────┐ │              │ ╔═══════════════╗  │
+///         │ │                   │ │              │ ║ParquetMetadata║  │
+///         │ └───────────────────┘ │              │ ╚═══════════════╝  │
+///         │ ╔═══════════════════╗ │              └────────────────────┘
+///         │ ║  Thrift metadata  ║ │
+///         │ ╚═══════════════════╝ │      1. With cached ParquetMetadata, so
+///         └───────────────────────┘      the ParquetExec does not re-read /
+///          Parquet File                  decode the thrift footer
+///
+/// ```
+///
+/// Within a Row Group, Column Chunks store data in DataPages. This example also
+/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges)
+/// which will skip unneeded data pages. This requires that the Parquet file has
+/// a [Page Index].
+///
+/// ```text
+///         ┌───────────────────────┐   If the RowSelection does not include any
+///         │          ...          │   rows from a particular Data Page, that
+///         │                       │   Data Page is not fetched or decoded.
+///         │ ┌───────────────────┐ │   Note this requires a PageIndex
+///         │ │     ┌──────────┐  │ │
+/// Row     │ │     │DataPage 0│  │ │                 ┌────────────────────┐
+/// Groups  │ │     └──────────┘  │ │                 │                    │
+///         │ │     ┌──────────┐  │ │                 │    ParquetExec     │
+///         │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─           │  (Parquet Reader)  │
+///         │ │     └──────────┘  │ │      └ ─ ─ ─ ─ ─│                    │
+///         │ │     ┌──────────┐  │ │                 │ ╔═══════════════╗  │
+///         │ │     │DataPage 2│  │ │ If only rows    │ ║ParquetMetadata║  │
+///         │ │     └──────────┘  │ │ from DataPage 1 │ ╚═══════════════╝  │
+///         │ └───────────────────┘ │ are selected,   └────────────────────┘
+///         │                       │ only DataPage 1
+///         │          ...          │ is fetched and
+///         │                       │ decoded
+///         │ ╔═══════════════════╗ │
+///         │ ║  Thrift metadata  ║ │
+///         │ ╚═══════════════════╝ │
+///         └───────────────────────┘
+///          Parquet File
+/// ```
+///
+/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
+/// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md)
+#[tokio::main]
+async fn main() -> Result<()> {
+    // the object store is used to read the parquet files (in this case, it is
+    // a local file system, but in a real system it could be S3, GCS, etc)
+    let object_store: Arc<dyn ObjectStore> =
+        Arc::new(object_store::local::LocalFileSystem::new());
+
+    // Create a custom table provider with our special index.
+    let provider = Arc::new(IndexTableProvider::try_new(Arc::clone(&object_store))?);
+
+    // SessionContext for running queries that has the table provider
+    // registered as "index_table"
+    let ctx = SessionContext::new();
+    ctx.register_table("index_table", Arc::clone(&provider) as _)?;
+
+    // register object store provider for urls like `file://` work
+    let url = Url::try_from("file://").unwrap();
+    ctx.register_object_store(&url, object_store);
+
+    // Select data from the table without any predicates (and thus no pruning)
+    println!("** Select data, no predicates:");
+    ctx.sql("SELECT avg(id), max(text) FROM index_table")
+        .await?
+        .show()
+        .await?;
+    // the underlying parquet reader makes 10 IO requests, one for each row group
+
+    // Now, run a query that has a predicate that our index can handle
+    //
+    // For this query, the access plan specifies skipping 8 row groups
+    // and scanning 2 of them. The skipped row groups are not read at all:
+    //
+    // [Skip, Skip, Scan, Skip, Skip, Skip, Skip, Scan, Skip, Skip]
+    //
+    // Note that the parquet reader makes 2 IO requests - one for the data from
+    // each row group.
+    println!("** Select data, predicate `id IN (250, 750)`");
+    ctx.sql("SELECT text FROM index_table WHERE id IN (250, 750)")
+        .await?
+        .show()
+        .await?;
+
+    // Finally, demonstrate scanning sub ranges within the row groups.
+    // Parquet's minimum decode unit is a page, so specifying ranges
+    // within a row group can be used to skip pages within a row group.
+    //
+    // For this query, the access plan specifies skipping all but the last row
+    // group and within the last row group, reading only the row with id 950
+    //
+    // [Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Skip, Selection(skip 49, select 1, skip 50)]
+    //
+    // Note that the parquet reader makes a single IO request - for the data
+    // pages that must be decoded
+    //
+    // Note: in order to prune pages, the Page Index must be loaded and the
+    // ParquetExec will load it on demand if not present. To avoid a second IO
+    // during query, this example loaded the Page Index pre-emptively by setting
+    // `ArrowReader::with_page_index` in `IndexedFile::try_new`
+    provider.set_use_row_selection(true);
+    println!("** Select data, predicate `id = 950`");
+    ctx.sql("SELECT text  FROM index_table WHERE id = 950")
+        .await?
+        .show()
+        .await?;
+
+    Ok(())
+}
+
+/// DataFusion `TableProvider` that uses knowledge of how data is distributed in
+/// a file to prune row groups and rows from the file.
+///
+/// `file1.parquet` contains values `0..1000`
+#[derive(Debug)]
+pub struct IndexTableProvider {
+    /// Where the file is stored (cleanup on drop)
+    #[allow(dead_code)]
+    tmpdir: TempDir,
+    /// The file that is being read.
+    indexed_file: IndexedFile,
+    /// The underlying object store
+    object_store: Arc<dyn ObjectStore>,
+    /// if true, use row selections in addition to row group selections
+    use_row_selections: AtomicBool,
+}
+impl IndexTableProvider {
+    /// Create a new IndexTableProvider
+    /// * `object_store` - the object store implementation to use for reading files
+    pub fn try_new(object_store: Arc<dyn ObjectStore>) -> Result<Self> {
+        let tmpdir = TempDir::new().expect("Can't make temporary directory");
+
+        let indexed_file =
+            IndexedFile::try_new(tmpdir.path().join("indexed_file.parquet"), 0..1000)?;
+
+        Ok(Self {
+            indexed_file,
+            tmpdir,
+            object_store,
+            use_row_selections: AtomicBool::new(false),
+        })
+    }
+
+    /// set the value of use row selections
+    pub fn set_use_row_selection(&self, use_row_selections: bool) {
+        self.use_row_selections
+            .store(use_row_selections, Ordering::SeqCst);
+    }
+
+    /// return the value of use row selections
+    pub fn use_row_selections(&self) -> bool {
+        self.use_row_selections.load(Ordering::SeqCst)
+    }
+
+    /// convert filters like `a = 1`, `b = 2`
+    /// to a single predicate like `a = 1 AND b = 2` suitable for execution
+    fn filters_to_predicate(
+        &self,
+        state: &SessionState,
+        filters: &[Expr],
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let df_schema = DFSchema::try_from(self.schema())?;
+
+        let predicate = conjunction(filters.to_vec());
+        let predicate = predicate
+            .map(|predicate| state.create_physical_expr(predicate, &df_schema))
+            .transpose()?
+            // if there are no filters, use a literal true to have a predicate
+            // that always evaluates to true we can pass to the index
+            .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
+
+        Ok(predicate)
+    }
+
+    /// Returns a [`ParquetAccessPlan`] that specifies how to scan the
+    /// parquet file.
+    ///
+    /// A `ParquetAccessPlan` specifies which row groups  and which rows within
+    /// those row groups to scan.
+    fn create_plan(
+        &self,
+        predicate: &Arc<dyn PhysicalExpr>,
+    ) -> Result<ParquetAccessPlan> {
+        // In this example, we use the PruningPredicate's literal guarantees to
+        // analyze the predicate. In a real system, using
+        // `PruningPredicate::prune` would likely be easier to do.
+        let pruning_predicate =
+            PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?;
+
+        // The PruningPredicate's guarantees must all be satisfied in order for
+        // the predicate to possibly evaluate to true.
+        let guarantees = pruning_predicate.literal_guarantees();
+        let Some(constants) = self.value_constants(guarantees) else {
+            return Ok(self.indexed_file.scan_all_plan());
+        };
+
+        // Begin with a plan that skips all row groups.
+        let mut plan = self.indexed_file.scan_none_plan();
+
+        // determine which row groups have the values in the guarantees
+        for value in constants {
+            let ScalarValue::Int32(Some(val)) = value else {
+                // if we have unexpected type of constant, no pruning is possible
+                return Ok(self.indexed_file.scan_all_plan());
+            };
+
+            // Since we know the values in the files are between 0..1000 and
+            // evenly distributed between  in row groups, calculate in what row
+            // group this value appears and tell the parquet reader to read it
+            let val = *val as usize;
+            let num_rows_in_row_group = 1000 / plan.len();
+            let row_group_index = val / num_rows_in_row_group;
+            plan.scan(row_group_index);
+
+            // If we want to use row selections, which the parquet reader can
+            // use to skip data pages when the parquet file has a "page index"
+            // and the reader is configured to read it, add a row selection
+            if self.use_row_selections() {
+                let offset_in_row_group = val - row_group_index * num_rows_in_row_group;
+                let selection = RowSelection::from(vec![
+                    // skip rows before the desired row
+                    RowSelector::skip(offset_in_row_group.saturating_sub(1)),
+                    // select the actual row
+                    RowSelector::select(1),
+                    // skip any remaining rows in the group
+                    RowSelector::skip(num_rows_in_row_group - offset_in_row_group),
+                ]);
+
+                plan.scan_selection(row_group_index, selection);
+            }
+        }
+
+        Ok(plan)
+    }
+
+    /// Returns the set of constants that the `"id"` column must take in order
+    /// for the predicate to be true.
+    ///
+    /// If `None` is returned, we can't extract the necessary information from
+    /// the guarantees.
+    fn value_constants<'a>(
+        &self,
+        guarantees: &'a [LiteralGuarantee],
+    ) -> Option<&'a HashSet<ScalarValue>> {
+        // only handle a single guarantee for column in this example
+        if guarantees.len() != 1 {
+            return None;
+        }
+        let guarantee = guarantees.first()?;
+
+        // Only handle IN guarantees for the "in" column
+        if guarantee.guarantee != Guarantee::In || guarantee.column.name() != "id" {
+            return None;
+        }
+        Some(&guarantee.literals)
+    }
+}
+
+/// Stores information needed to scan a file
+#[derive(Debug)]
+struct IndexedFile {
+    /// File name
+    file_name: String,
+    /// The path of the file
+    path: PathBuf,
+    /// The size of the file
+    file_size: u64,
+    /// The pre-parsed parquet metadata for the file
+    metadata: Arc<ParquetMetaData>,
+    /// The arrow schema of the file
+    schema: SchemaRef,
+}
+
+impl IndexedFile {
+    fn try_new(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<Self> {
+        let path = path.as_ref();
+        // write the actual file
+        make_demo_file(path, value_range)?;
+
+        // Now, open the file and read its size and metadata
+        let file_name = path
+            .file_name()
+            .ok_or_else(|| internal_datafusion_err!("Invalid path"))?
+            .to_str()
+            .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?
+            .to_string();
+        let file_size = path.metadata()?.len();
+
+        let file = File::open(path).map_err(|e| {
+            DataFusionError::from(e).context(format!("Error opening file {path:?}"))
+        })?;
+
+        let options = ArrowReaderOptions::new()
+            // Load the page index when reading metadata to cache
+            // so it is available to interpret row selections
+            .with_page_index(true);
+        let reader =
+            ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
+        let metadata = reader.metadata().clone();
+        let schema = reader.schema().clone();
+
+        // canonicalize after writing the file
+        let path = std::fs::canonicalize(path)?;
+
+        Ok(Self {
+            file_name,
+            path,
+            file_size,
+            metadata,
+            schema,
+        })
+    }
+
+    /// Return a `PartitionedFile` to scan the underlying file
+    ///
+    /// The returned value does not have any  `ParquetAccessPlan` specified in
+    /// its extensions.
+    fn partitioned_file(&self) -> PartitionedFile {
+        PartitionedFile::new(self.path.display().to_string(), self.file_size)
+    }
+
+    /// Return a `ParquetAccessPlan` that scans all row groups in the file
+    fn scan_all_plan(&self) -> ParquetAccessPlan {
+        ParquetAccessPlan::new_all(self.metadata.num_row_groups())
+    }
+
+    /// Return a `ParquetAccessPlan` that scans no row groups in the file
+    fn scan_none_plan(&self) -> ParquetAccessPlan {
+        ParquetAccessPlan::new_none(self.metadata.num_row_groups())
+    }
+}
+
+/// Implement the TableProvider trait for IndexTableProvider
+/// so that we can query it as a table.
+#[async_trait]
+impl TableProvider for IndexTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.indexed_file.schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &SessionState,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let indexed_file = &self.indexed_file;
+        let predicate = self.filters_to_predicate(state, filters)?;
+
+        // Figure out which row groups to scan based on the predicate
+        let access_plan = self.create_plan(&predicate)?;
+        println!("{access_plan:?}");
+
+        let partitioned_file = indexed_file
+            .partitioned_file()
+            // provide the starting access plan to the ParquetExec by
+            // storing it as  "extensions" on PartitionedFile
+            .with_extensions(Arc::new(access_plan) as _);
+
+        // Prepare for scanning
+        let schema = self.schema();
+        let object_store_url = ObjectStoreUrl::parse("file://")?;
+        let file_scan_config = FileScanConfig::new(object_store_url, schema)
+            .with_limit(limit)
+            .with_projection(projection.cloned())
+            .with_file(partitioned_file);
+
+        // Configure a factory interface to avoid re-reading the metadata for each file
+        let reader_factory =
+            CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
+                .with_file(indexed_file);
+
+        // Finally, put it all together into a ParquetExec
+        Ok(ParquetExecBuilder::new(file_scan_config)
+            // provide the predicate so the ParquetExec can try and prune
+            // row groups internally
+            .with_predicate(predicate)
+            // provide the factory to create parquet reader without re-reading metadata
+            .with_parquet_file_reader_factory(Arc::new(reader_factory))
+            .build_arc())
+    }
+
+    /// Tell DataFusion to push filters down to the scan method
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        // Inexact because the pruning can't handle all expressions and pruning
+        // is not done at the row level -- there may be rows in returned files
+        // that do not pass the filter
+        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
+    }
+}
+
+/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files
+/// from object storage, and uses pre-loaded metadata.
+
+#[derive(Debug)]
+struct CachedParquetFileReaderFactory {
+    /// The underlying object store implementation for reading file data
+    object_store: Arc<dyn ObjectStore>,
+    /// The parquet metadata for each file in the index, keyed by the file name
+    /// (e.g. `file1.parquet`)
+    metadata: HashMap<String, Arc<ParquetMetaData>>,
+}
+
+impl CachedParquetFileReaderFactory {
+    fn new(object_store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_store,
+            metadata: HashMap::new(),
+        }
+    }
+    /// Add the pre-parsed information about the file to the factor
+    fn with_file(mut self, indexed_file: &IndexedFile) -> Self {
+        self.metadata.insert(
+            indexed_file.file_name.clone(),
+            Arc::clone(&indexed_file.metadata),
+        );
+        self
+    }
+}
+
+impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        _partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        _metrics: &ExecutionPlanMetricsSet,
+    ) -> Result<Box<dyn AsyncFileReader + Send>> {
+        // for this example we ignore the partition index and metrics
+        // but in a real system you would likely use them to report details on
+        // the performance of the reader.
+        let filename = file_meta
+            .location()
+            .parts()
+            .last()
+            .expect("No path in location")
+            .as_ref()
+            .to_string();
+
+        let object_store = Arc::clone(&self.object_store);
+        let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta);
+
+        if let Some(hint) = metadata_size_hint {
+            inner = inner.with_footer_size_hint(hint)
+        };
+
+        let metadata = self
+            .metadata
+            .get(&filename)
+            .expect("metadata for file not found: {filename}");
+        Ok(Box::new(ParquetReaderWithCache {
+            filename,
+            metadata: Arc::clone(metadata),
+            inner,
+        }))
+    }
+}
+
+/// wrapper around a ParquetObjectReader that caches metadata
+struct ParquetReaderWithCache {
+    filename: String,
+    metadata: Arc<ParquetMetaData>,
+    inner: ParquetObjectReader,
+}
+
+impl AsyncFileReader for ParquetReaderWithCache {
+    fn get_bytes(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
+        println!("get_bytes: {} Reading range {:?}", self.filename, range);
+        self.inner.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
+        println!(
+            "get_byte_ranges: {} Reading ranges {:?}",
+            self.filename, ranges
+        );
+        self.inner.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata(
+        &mut self,
+    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
+        println!("get_metadata: {} returning cached metadata", self.filename);
+
+        // return the cached metadata so the parquet reader does not read it
+        let metadata = self.metadata.clone();
+        async move { Ok(metadata) }.boxed()
+    }
+}
+
+/// Creates a new parquet file at the specified path.
+///
+/// * id: Int32
+/// * text: Utf8
+///
+/// The `id` column  increases sequentially from `min_value` to `max_value`
+/// The `text` column is a repeating sequence of `TheTextValue{i}`
+///
+/// Each row group has 100 rows
+fn make_demo_file(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<()> {
+    let path = path.as_ref();
+    let file = File::create(path)?;
+
+    let id = Int32Array::from_iter_values(value_range.clone());
+    let text =
+        StringArray::from_iter_values(value_range.map(|i| format!("TheTextValue{i}")));
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("id", Arc::new(id) as ArrayRef),
+        ("text", Arc::new(text) as ArrayRef),
+    ])?;
+
+    let schema = batch.schema();
+
+    // enable page statistics for the tag column,
+    // for everything else.
+    let props = WriterProperties::builder()
+        .set_max_row_group_size(100)
+        // compute column chunk (per row group) statistics by default
+        .set_statistics_enabled(EnabledStatistics::Chunk)
+        // compute column page statistics for the tag column
+        .set_column_statistics_enabled(ColumnPath::from("tag"), EnabledStatistics::Page)
+        .build();
+
+    // write the actual values to the file
+    let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
+    writer.write(&batch)?;
+    writer.close()?;
+
+    Ok(())
+}
diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs
index 3e2bc0ad7c3ac..e36a4f8906446 100644
--- a/datafusion/common/src/column.rs
+++ b/datafusion/common/src/column.rs
@@ -127,6 +127,13 @@ impl Column {
             })
     }
 
+    /// return the column's name.
+    ///
+    /// Note: This ignores the relation and returns the column name only.
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
     /// Serialize column into a flat name string
     pub fn flat_name(&self) -> String {
         match &self.relation {
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 1c431d04cd35e..c59cdba7c8292 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1393,6 +1393,13 @@ pub struct TableParquetOptions {
     pub key_value_metadata: HashMap<String, Option<String>>,
 }
 
+impl TableParquetOptions {
+    /// Return new default TableParquetOptions
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
 impl ConfigField for TableParquetOptions {
     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
         self.global.visit(v, key_prefix, description);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
index e15e907cd9b80..ea3030664b7b3 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
@@ -139,6 +139,11 @@ impl ParquetAccessPlan {
         self.set(idx, RowGroupAccess::Skip);
     }
 
+    /// scan the i-th row group
+    pub fn scan(&mut self, idx: usize) {
+        self.set(idx, RowGroupAccess::Scan);
+    }
+
     /// Return true if the i-th row group should be scanned
     pub fn should_scan(&self, idx: usize) -> bool {
         self.row_groups[idx].should_scan()
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ec21c5504c694..959e50fac81d1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -186,9 +186,9 @@ pub use writer::plan_to_parquet;
 /// let exec = ParquetExec::builder(file_scan_config).build();
 /// ```
 ///
-/// For a complete example, see the [`parquet_index_advanced` example]).
+/// For a complete example, see the [`advanced_parquet_index` example]).
 ///
-/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
+/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs
 ///
 /// # Execution Overview
 ///
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index e590f372253c1..da8b793a5c29d 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -54,7 +54,7 @@ impl RowGroupAccessPlanFilter {
         Self { access_plan }
     }
 
-    /// Return true if there are no row groups to scan
+    /// Return true if there are no row groups
     pub fn is_empty(&self) -> bool {
         self.access_plan.is_empty()
     }
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 4dd62a8945184..98aff0d65898a 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -471,8 +471,10 @@ pub struct PruningPredicate {
     /// Original physical predicate from which this predicate expr is derived
     /// (required for serialization)
     orig_expr: Arc<dyn PhysicalExpr>,
-    /// [`LiteralGuarantee`]s that are used to try and prove a predicate can not
-    /// possibly evaluate to `true`.
+    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
+    /// evaluate to `true`.
+    ///
+    /// See [`PruningPredicate::literal_guarantees`] for more details.
     literal_guarantees: Vec<LiteralGuarantee>,
 }
 
@@ -595,6 +597,10 @@ impl PruningPredicate {
     }
 
     /// Returns a reference to the literal guarantees
+    ///
+    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
+    /// expression to possibly be `true`. If any is not satisfied, the
+    /// expression is guaranteed to be `null` or `false`.
     pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
         &self.literal_guarantees
     }