Skip to content

Commit

Permalink
Remove useless wait
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Davit committed Jan 23, 2025
1 parent 263ee77 commit aadb8fb
Showing 1 changed file with 0 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,12 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
Try(bq.tables.delete(avroTable.ref))
}

def waitForTable(table: Table.Spec): Unit = {
var retries = 0
while (!bq.tables.exists(table.ref) && retries < 3) {
Thread.sleep(500)
retries += 1
}
if (retries >= 3) throw new RuntimeException(s"Table $table not found")
}

"TypedBigQuery" should "handle records as TableRow" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
}.waitUntilFinish()

waitForTable(typedTable)

runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
data should containInAnyOrder(records)
Expand All @@ -160,8 +149,6 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
)
}.waitUntilFinish()

waitForTable(tableRowTable)

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow)
data should containInAnyOrder(records)
Expand All @@ -181,8 +168,6 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
)
}.waitUntilFinish()

waitForTable(avroTable)

runWithRealContext(options) { sc =>
val data =
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
Expand Down

0 comments on commit aadb8fb

Please sign in to comment.