Skip to content

Commit

Permalink
Add one more test.
Browse files Browse the repository at this point in the history
  • Loading branch information
tinybit committed Aug 29, 2024
1 parent 8e55989 commit d6e9e26
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions tests/context_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,122 @@ func TestContextCancellationType2(t *testing.T) {

assert.Less(t, queryTime-cancelBackoff, time.Second)
}

func TestContextCancellationType3(t *testing.T) {
var (
q1 = "CREATE DATABASE IF NOT EXISTS test_query_cancellation"
q2 = "DROP TABLE IF EXISTS test_query_cancellation.trips"
q3 = `CREATE TABLE test_query_cancellation.trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
PRIMARY KEY (pickup_datetime, dropoff_datetime);`
q4 = `INSERT INTO test_query_cancellation.trips
SELECT
trip_id,
pickup_datetime,
dropoff_datetime,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
passenger_count,
trip_distance,
fare_amount,
extra,
tip_amount,
tolls_amount,
total_amount,
payment_type,
pickup_ntaname,
dropoff_ntaname
FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..2}.gz',
'TabSeparatedWithNames'
);`
)

prepareQueries := []string{q1, q2, q3}

conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})

assert.Nil(t, err)
assert.NotNil(t, conn)

if err = conn.Ping(context.Background()); err != nil {
return
}

t.Log("Connected.")

// prepare table
for _, query := range prepareQueries {
err = conn.Exec(context.Background(), query)
if err != nil {
log.Printf("Finished with error: %v\n", err)
conn.Close()
return
}
}

// prepare context
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

doneCh := make(chan bool, 1)
queryTimeCh := make(chan time.Duration, 1)

// run query in background
go func() {
log.Println("Running heavy query...")

start := time.Now()

defer func() {
log.Printf("Query took: %v\n", time.Since(start))
queryTimeCh <- time.Since(start)
doneCh <- true
}()

err = conn.Exec(ctx, q4)
if err != nil {
log.Printf("Finished with error: %v\n", err)
return
}
}()

cancelBackoff := 3 * time.Second

// let workers run for awhile and stop
go func() {
time.Sleep(cancelBackoff)
cancelCtx()
log.Printf("Context cancelled after %v.", cancelBackoff)
}()

<-doneCh
conn.Close()
log.Println("Done.")

queryTime := <-queryTimeCh

assert.Less(t, queryTime-cancelBackoff, time.Second)
}

0 comments on commit d6e9e26

Please sign in to comment.