@@ -16,7 +16,7 @@ package query
1616
1717import (
1818 "context"
19- "fmt "
19+ "sync "
2020 "testing"
2121 "time"
2222
@@ -58,15 +58,26 @@ func TestIntegration_QueryCancelWait(t *testing.T) {
5858 if len (testQueryHelpers ) == 0 {
5959 t .Skip ("integration tests skipped" )
6060 }
61+ // Run a script with a known min duration, as this test relies on timing of requests.
62+ sql := `
63+ DECLARE end_target TIMESTAMP;
64+ DECLARE poll_check_count INT64;
65+ SET end_target = TIMESTAMP_ADD(CURRENT_TIMESTAMP, INTERVAL 1 SECOND);
66+ SET poll_check_count = 0;
67+ WHILE CURRENT_TIMESTAMP < end_target DO
68+ SET poll_check_count = poll_check_count + 1;
69+ END WHILE;
70+ SELECT CURRENT_TIMESTAMP as ts, poll_check_count AS foo
71+ `
6172 for k , helper := range testQueryHelpers {
6273 t .Run (k , func (t * testing.T ) {
6374 ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
6475 defer cancel ()
6576
66- numGenRows := uint64 ( 1000000 )
67- req := helper . FromSQL ( fmt . Sprintf ( "SELECT num FROM UNNEST(GENERATE_ARRAY(1,%d)) as num" , numGenRows ))
68- req . QueryRequest . JobCreationMode = bigquerypb . QueryRequest_JOB_CREATION_OPTIONAL
69- req .QueryRequest .TimeoutMs = wrapperspb .UInt32 (500 )
77+ req := helper . FromSQL ( sql )
78+ // We want the first RPC to return before cancellation, so we set a low timeout
79+ // so that the query has populated a job reference.
80+ req .QueryRequest .TimeoutMs = wrapperspb .UInt32 (250 )
7081 req .QueryRequest .UseQueryCache = wrapperspb .Bool (false )
7182
7283 wctx , wcancel := context .WithCancel (ctx )
@@ -75,38 +86,49 @@ func TestIntegration_QueryCancelWait(t *testing.T) {
7586 t .Fatalf ("StartQuery() error: %v" , err )
7687 }
7788
89+ // Use a wg to synchronize progress of a goroutine.
90+ var wg sync.WaitGroup
91+ var waitErr error
92+
93+ wg .Add (1 )
7894 go func (t * testing.T ) {
79- err := q .Wait (ctx )
80- if err == nil {
81- t .Errorf ("Wait() should throw an error: %v" , err )
82- }
95+ waitErr = q .Wait (ctx )
96+ wg .Done ()
8397 }(t )
8498
85- for q . JobReference () == nil && q . Err () == nil {
86- time . Sleep ( 100 * time . Millisecond )
87- }
99+ // sleep on main thread, then cancel.
100+ // Time is later than the poll duration.
101+ time . Sleep ( 500 * time . Millisecond )
88102 wcancel ()
89103
104+ // wait for the cancellation to return and evaluate expectations.
105+ wg .Wait ()
106+ if waitErr != context .Canceled {
107+ t .Errorf ("Wait() should return context.Canceled, returned: %v" , waitErr )
108+ }
90109 if q .Complete () {
91110 t .Fatalf ("Complete() should be false" )
92111 }
93112
94- // Re-attach and wait again
95- nq , err := helper .AttachJob (ctx , q .JobReference ())
96- if err != nil {
97- t .Fatalf ("AttachJob() error: %v" , err )
98- }
113+ // It's still possible that we could have canceled before a JobReference is captured, so only attempt
114+ // the reattach if the reference is present.
115+ if q .JobReference () != nil {
99116
100- err = nq .Wait (ctx )
101- if err != nil {
102- t .Fatalf ("Wait() error: %v" , err )
103- }
117+ // Re-attach and wait again.
118+ nq , err := helper .AttachJob (ctx , q .JobReference ())
119+ if err != nil {
120+ t .Fatalf ("AttachJob() error: %v" , err )
121+ }
104122
105- if ! nq .Complete () {
106- t .Fatalf ("Complete() should be true after Wait()" )
107- }
123+ err = nq .Wait (ctx )
124+ if err != nil {
125+ t .Fatalf ("Wait() error: %v" , err )
126+ }
108127
109- // TODO: read data and assert row count
128+ if ! nq .Complete () {
129+ t .Fatalf ("Complete() should be true after Wait()" )
130+ }
131+ }
110132 })
111133 }
112134}
0 commit comments