Skip to content

Commit

Permalink
[KYUUBI #5407][AUTHZ] Tests for Iceberg system procedures of snapshot…
Browse files Browse the repository at this point in the history
… management

### _Why are the changes needed?_
To close #5407 .
Follow up for #5248 .
Add some UT for snapshot management procedures. These procedures require alter permissions.

1.  rollback_to_snapshot  (https://iceberg.apache.org/docs/latest/spark-procedures/#rollback_to_snapshot):
Usage: `CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)
`
Meaning: rollback a table to a specific snapshot ID.

2. rollback_to_timestamp (https://iceberg.apache.org/docs/latest/spark-procedures/#rollback_to_timestamp)
Usage: `CALL catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00')`
Meaning:  rollback the table to the latest snapshot less than time.

3.  set_current_snapshot (https://iceberg.apache.org/docs/latest/spark-procedures/#set_current_snapshot)
Usage: `CALL catalog_name.system.set_current_snapshot('db.sample', 1)`
Meaning:  Set a table to a specific snapshot ID.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5394 from yabola/add_call.

Closes #5407

98b7492 [Bowen Liang] split into 3 ut for snapshot management
23fe49a [Bowen Liang] refactor ut
8ba97c6 [chenliang.lu] Add UT for checking previleges in iceberg call snapshot management

Lead-authored-by: chenliang.lu <[email protected]>
Co-authored-by: Bowen Liang <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
  • Loading branch information
yabola and bowenliang123 committed Oct 12, 2023
1 parent 98b74d2 commit fd69c6e
Showing 1 changed file with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.kyuubi.plugin.spark.authz.ranger

// scalastyle:off
import java.sql.Timestamp
import java.text.SimpleDateFormat

import scala.util.Try

import org.apache.spark.sql.Row
import org.scalatest.Outcome

// scalastyle:off
import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
Expand Down Expand Up @@ -281,4 +285,64 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
})
}
}

private def prepareExampleIcebergTable(table: String, initSnapshots: Int): Unit = {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $table (id int, name string) USING iceberg"))
(0 until initSnapshots).foreach(i =>
doAs(admin, sql(s"INSERT INTO $table VALUES ($i, 'user_$i')")))
}

private def getFirstSnapshot(table: String): Row = {
val existedSnapshots =
sql(s"SELECT * FROM $table.snapshots ORDER BY committed_at ASC LIMIT 1").collect()
existedSnapshots(0)
}

test("CALL rollback_to_snapshot") {
val tableName = "table_rollback_to_snapshot"
val table = s"$catalogV2.$namespace1.$tableName"
withCleanTmpResources(Seq((table, "table"))) {
prepareExampleIcebergTable(table, 2)
val targetSnapshotId = getFirstSnapshot(table).getAs[Long]("snapshot_id")
val callRollbackToSnapshot =
s"CALL $catalogV2.system.rollback_to_snapshot (table => '$table', snapshot_id => $targetSnapshotId)"

interceptContains[AccessControlException](doAs(someone, sql(callRollbackToSnapshot)))(
s"does not have [alter] privilege on [$namespace1/$tableName]")
doAs(admin, sql(callRollbackToSnapshot))
}
}

test("CALL rollback_to_timestamp") {
val tableName = "table_rollback_to_timestamp"
val table = s"$catalogV2.$namespace1.$tableName"
withCleanTmpResources(Seq((table, "table"))) {
prepareExampleIcebergTable(table, 2)
val callRollbackToTimestamp = {
val targetSnapshotCommittedAt = getFirstSnapshot(table).getAs[Timestamp]("committed_at")
val targetTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.format(targetSnapshotCommittedAt.getTime + 1)
s"CALL $catalogV2.system.rollback_to_timestamp (table => '$table', timestamp => TIMESTAMP '$targetTimestamp')"
}

interceptContains[AccessControlException](doAs(someone, sql(callRollbackToTimestamp)))(
s"does not have [alter] privilege on [$namespace1/$tableName]")
doAs(admin, sql(callRollbackToTimestamp))
}
}

test("CALL set_current_snapshot") {
val tableName = "table_set_current_snapshot"
val table = s"$catalogV2.$namespace1.$tableName"
withCleanTmpResources(Seq((table, "table"))) {
prepareExampleIcebergTable(table, 2)
val targetSnapshotId = getFirstSnapshot(table).getAs[Long]("snapshot_id")
val callSetCurrentSnapshot =
s"CALL $catalogV2.system.set_current_snapshot (table => '$table', snapshot_id => $targetSnapshotId)"

interceptContains[AccessControlException](doAs(someone, sql(callSetCurrentSnapshot)))(
s"does not have [alter] privilege on [$namespace1/$tableName]")
doAs(admin, sql(callSetCurrentSnapshot))
}
}
}

0 comments on commit fd69c6e

Please sign in to comment.