Skip to content

Commit

Permalink
Add Iceberg metadata table $metadata_log_entries
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawalreetika committed Jan 4, 2025
1 parent 83cb0f6 commit b4d52b7
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 1 deletion.
15 changes: 15 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,21 @@ example uses the earliest snapshot ID: ``2423571386296047175``
testBranch | BRANCH | 3374797416068698476 | NULL | NULL | NULL
testTag | TAG | 4686954189838128572 | 10 | NULL | NULL

``$metadata_log_entries`` Table
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``$metadata_log_entries`` : Provide metadata log entries for the table

.. code-block:: sql

SELECT * FROM "region$metadata_log_entries";

.. code-block:: text

timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number
-------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------------------------
2024-12-28 23:41:30.451 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region1/metadata/00000-395385ba-3b69-47a7-9c5b-61d056de55c6.metadata.json | 5983271822201743253 | 0 | 1
2024-12-28 23:42:42.207 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region1/metadata/00001-61151efc-0e01-4a47-a5e6-7b72749cc4a8.metadata.json | 5841566266546816471 | 0 | 2
2024-12-28 23:42:47.591 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region1/metadata/00002-d4a9c326-5053-4a26-9082-d9fbf1d6cd14.metadata.json | 6894018661156805064 | 0 | 3

Procedures
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName,
return Optional.of(new PropertiesTable(systemTableName, table));
case REFS:
return Optional.of(new RefsTable(systemTableName, table));
case METADATA_LOG_ENTRIES:
return Optional.of(new MetadataLogTable(systemTableName, table));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.facebook.presto.iceberg.IcebergTableType.FILES;
import static com.facebook.presto.iceberg.IcebergTableType.HISTORY;
import static com.facebook.presto.iceberg.IcebergTableType.MANIFESTS;
import static com.facebook.presto.iceberg.IcebergTableType.METADATA_LOG_ENTRIES;
import static com.facebook.presto.iceberg.IcebergTableType.PARTITIONS;
import static com.facebook.presto.iceberg.IcebergTableType.PROPERTIES;
import static com.facebook.presto.iceberg.IcebergTableType.REFS;
Expand All @@ -51,7 +52,7 @@ public class IcebergTableName

private final Optional<Long> changelogEndSnapshot;

private static final Set<IcebergTableType> SYSTEM_TABLES = Sets.immutableEnumSet(FILES, MANIFESTS, PARTITIONS, HISTORY, SNAPSHOTS, PROPERTIES, REFS);
private static final Set<IcebergTableType> SYSTEM_TABLES = Sets.immutableEnumSet(FILES, MANIFESTS, PARTITIONS, HISTORY, SNAPSHOTS, PROPERTIES, REFS, METADATA_LOG_ENTRIES);

@JsonCreator
public IcebergTableName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public enum IcebergTableType
PARTITIONS(true),
FILES(true),
REFS(true),
METADATA_LOG_ENTRIES(true),
PROPERTIES(true),
CHANGELOG(true),
EQUALITY_DELETES(true),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.InMemoryRecordSet;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.util.SnapshotUtil.snapshotIdAsOfTime;

public class MetadataLogTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;

private static final List<ColumnMetadata> COLUMN_DEFINITIONS = ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("timestamp", TIMESTAMP_WITH_TIME_ZONE))
.add(new ColumnMetadata("file", VARCHAR))
.add(new ColumnMetadata("latest_snapshot_id", BIGINT))
.add(new ColumnMetadata("latest_schema_id", INTEGER))
.add(new ColumnMetadata("latest_sequence_number", BIGINT))
.build();

private static final List<Type> COLUMN_TYPES = COLUMN_DEFINITIONS.stream()
.map(ColumnMetadata::getType)
.collect(Collectors.toList());

public MetadataLogTable(SchemaTableName tableName, Table icebergTable)
{
tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMN_DEFINITIONS);
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
Iterable<List<?>> rowIterable = () -> new Iterator<List<?>>()
{
private final Iterator<MetadataLogEntry> metadataLogEntriesIterator = ((BaseTable) icebergTable).operations().current().previousFiles().iterator();
private boolean addedLatestEntry;

@Override
public boolean hasNext()
{
return metadataLogEntriesIterator.hasNext() || !addedLatestEntry;
}

@Override
public List<?> next()
{
if (metadataLogEntriesIterator.hasNext()) {
return processMetadataLogEntries(session, metadataLogEntriesIterator.next());
}
if (!addedLatestEntry) {
addedLatestEntry = true;
TableMetadata currentMetadata = ((BaseTable) icebergTable).operations().current();
return buildLatestMetadataRow(session, currentMetadata);
}
return null;
}
};
return new InMemoryRecordSet(COLUMN_TYPES, rowIterable).cursor();
}

private List<?> processMetadataLogEntries(ConnectorSession session, MetadataLogEntry metadataLogEntry)
{
Long snapshotId = null;
Snapshot snapshot = null;
try {
snapshotId = snapshotIdAsOfTime(icebergTable, metadataLogEntry.timestampMillis());
snapshot = icebergTable.snapshot(snapshotId);
}
catch (IllegalArgumentException ignored) {
// Implies this metadata file was created during table creation
}
return addRow(session, metadataLogEntry.timestampMillis(), metadataLogEntry.file(), snapshotId, snapshot);
}

private List<?> buildLatestMetadataRow(ConnectorSession session, TableMetadata metadata)
{
Snapshot latestSnapshot = icebergTable.currentSnapshot();
Long latestSnapshotId = (latestSnapshot != null) ? latestSnapshot.snapshotId() : null;

return addRow(session, metadata.lastUpdatedMillis(), metadata.metadataFileLocation(), latestSnapshotId, latestSnapshot);
}

private List<?> addRow(ConnectorSession session, long timestampMillis, String fileLocation, Long snapshotId, Snapshot snapshot)
{
return Arrays.asList(
packDateTimeWithZone(timestampMillis, session.getSqlFunctionProperties().getTimeZoneKey()),
fileLocation,
snapshotId,
snapshot != null ? snapshot.schemaId() : null,
snapshot != null ? snapshot.sequenceNumber() : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,26 @@ public void testRefsTable()
assertQuery("SELECT * FROM test_table_references FOR SYSTEM_VERSION AS OF 'testTag' where id1=1", "VALUES(1, NULL)");
}

@Test
public void testMetadataLogTable()
{
try {
assertUpdate("CREATE TABLE test_table_metadatalog (id1 BIGINT, id2 BIGINT)");
assertQuery("SELECT count(*) FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES 1");
//metadata file created at table creation
assertQuery("SELECT latest_snapshot_id FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES NULL");

assertUpdate("INSERT INTO test_table_metadatalog VALUES (0, 00), (1, 10), (2, 20)", 3);
Table icebergTable = loadTable("test_table_metadatalog");
Snapshot latestSnapshot = icebergTable.currentSnapshot();
assertQuery("SELECT count(*) FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES 2");
assertQuery("SELECT latest_snapshot_id FROM \"test_table_metadatalog$metadata_log_entries\" order by timestamp DESC limit 1", "values " + latestSnapshot.snapshotId());
}
finally {
assertUpdate("DROP TABLE IF EXISTS test_table_metadatalog");
}
}

@Test
public void testAllIcebergType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,20 @@ public void testRefsTable()
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table_multilevel_partitions$refs\"");
}

@Test
public void testMetadataLogTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$metadata_log_entries\"",
"VALUES ('timestamp', 'timestamp with time zone', '', '')," +
"('file', 'varchar', '', '')," +
"('latest_snapshot_id', 'bigint', '', '')," +
"('latest_schema_id', 'integer', '', '')," +
"('latest_sequence_number', 'bigint', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$metadata_log_entries\"");

assertQuerySucceeds("SELECT * FROM test_schema.\"test_table_multilevel_partitions$metadata_log_entries\"");
}

@Test
public void testSessionPropertiesInManuallyStartedTransaction()
{
Expand Down

0 comments on commit b4d52b7

Please sign in to comment.