@@ -126,16 +126,21 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
126126 }
127127}
128128
129- // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
130- func (rx * ResumableUpload ) transferChunk (ctx context.Context ) (* http.Response , error ) {
131- chunk , off , size , err := rx .Media .Chunk ()
132-
133- done := err == io .EOF
134- if ! done && err != nil {
135- return nil , err
129+ // transferChunk performs a single HTTP request to upload a single chunk.
130+ func (rx * ResumableUpload ) transferChunk (ctx context.Context , chunk io.Reader , off , size int64 , done bool ) (* http.Response , error ) {
131+ // rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
132+ // If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
133+ // triggering a retry of the request.
134+ var rCtx context.Context
135+ var cancel context.CancelFunc
136+
137+ rCtx = ctx
138+ if rx .ChunkTransferTimeout != 0 {
139+ rCtx , cancel = context .WithTimeout (ctx , rx .ChunkTransferTimeout )
140+ defer cancel ()
136141 }
137142
138- res , err := rx .doUploadRequest (ctx , chunk , off , int64 ( size ) , done )
143+ res , err := rx .doUploadRequest (rCtx , chunk , off , size , done )
139144 if err != nil {
140145 return res , err
141146 }
@@ -149,161 +154,146 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
149154 if res .StatusCode == http .StatusOK {
150155 rx .reportProgress (off , off + int64 (size ))
151156 }
157+ return res , nil
158+ }
152159
153- if statusResumeIncomplete (res ) {
154- rx .Media .Next ()
160+ // uploadChunkWithRetries attempts to upload a single chunk, with retries
161+ // within ChunkRetryDeadline if ChunkTransferTimeout is non-zero.
162+ func (rx * ResumableUpload ) uploadChunkWithRetries (ctx context.Context , chunk io.Reader , off , size int64 , done bool ) (* http.Response , error ) {
163+ // Configure error retryable criteria.
164+ shouldRetry := rx .Retry .errorFunc ()
165+
166+ // Configure single chunk retry deadline.
167+ retryDeadline := defaultRetryDeadline
168+ if rx .ChunkRetryDeadline != 0 {
169+ retryDeadline = rx .ChunkRetryDeadline
170+ }
171+
172+ // Each chunk gets its own initialized-at-zero backoff and invocation ID.
173+ bo := rx .Retry .backoff ()
174+ quitAfterTimer := time .NewTimer (retryDeadline )
175+ defer quitAfterTimer .Stop ()
176+ rx .attempts = 1
177+ rx .invocationID = uuid .New ().String ()
178+
179+ var pause time.Duration
180+ var resp * http.Response
181+ var err error
182+
183+ // Retry loop for a single chunk.
184+ for {
185+ // Wait for the backoff period, unless the context is canceled or the
186+ // retry deadline is hit.
187+ pauseTimer := time .NewTimer (pause )
188+ select {
189+ case <- ctx .Done ():
190+ pauseTimer .Stop ()
191+ if err == nil {
192+ err = ctx .Err ()
193+ }
194+ return resp , err
195+ case <- pauseTimer .C :
196+ case <- quitAfterTimer .C :
197+ pauseTimer .Stop ()
198+ return resp , err
199+ }
200+ pauseTimer .Stop ()
201+
202+ // Check for context cancellation or timeout once more. If more than one
203+ // case in the select statement above was satisfied at the same time, Go
204+ // will choose one arbitrarily.
205+ // That can cause an operation to go through even if the context was
206+ // canceled before or the timeout was reached.
207+ select {
208+ case <- ctx .Done ():
209+ if err == nil {
210+ err = ctx .Err ()
211+ }
212+ return resp , err
213+ case <- quitAfterTimer .C :
214+ return resp , err
215+ default :
216+ }
217+
218+ // We close the response's body here, since we definitely will not
219+ // return `resp` now. If we close it before the select case above, a
220+ // timer may fire and cause us to return a response with a closed body
221+ // (in which case, the caller will not get the error message in the body).
222+ if resp != nil && resp .Body != nil {
223+ // Read the body to EOF - if the Body is not both read to EOF and closed,
224+ // the Client's underlying RoundTripper may not be able to re-use the
225+ // persistent TCP connection to the server for a subsequent "keep-alive" request.
226+ // See https://pkg.go.dev/net/http#Client.Do
227+ io .Copy (io .Discard , resp .Body )
228+ resp .Body .Close ()
229+ }
230+
231+ resp , err = rx .transferChunk (ctx , chunk , off , size , done )
232+ status := 0
233+ if resp != nil {
234+ status = resp .StatusCode
235+ }
236+ // Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
237+ // or we encounter a retryable error.
238+ if (rx .ChunkTransferTimeout != 0 && errors .Is (err , context .DeadlineExceeded )) || shouldRetry (status , err ) {
239+ rx .attempts ++
240+ pause = bo .Pause ()
241+ chunk , _ , _ , _ = rx .Media .Chunk ()
242+ continue
243+ }
244+ return resp , err
155245 }
156- return res , nil
157246}
158247
159248// Upload starts the process of a resumable upload with a cancellable context.
160- // It retries using the provided back off strategy until cancelled or the
161- // strategy indicates to stop retrying.
162249// It is called from the auto-generated API code and is not visible to the user.
163250// Before sending an HTTP request, Upload calls any registered hook functions,
164251// and calls the returned functions after the request returns (see send.go).
165252// rx is private to the auto-generated API code.
166253// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
167254// Upload does not parse the response into the error on a non 200 response;
168255// it is the caller's responsibility to call resp.Body.Close.
169- func (rx * ResumableUpload ) Upload (ctx context.Context ) (resp * http.Response , err error ) {
256+ func (rx * ResumableUpload ) Upload (ctx context.Context ) (* http.Response , error ) {
257+ for {
258+ chunk , off , size , err := rx .Media .Chunk ()
259+ done := err == io .EOF
260+ if ! done && err != nil {
261+ return nil , err
262+ }
170263
171- // There are a couple of cases where it's possible for err and resp to both
172- // be non-nil. However, we expose a simpler contract to our callers: exactly
173- // one of resp and err will be non-nil. This means that any response body
174- // must be closed here before returning a non-nil error.
175- var prepareReturn = func ( resp * http. Response , err error ) ( * http. Response , error ) {
264+ resp , err := rx . uploadChunkWithRetries ( ctx , chunk , off , int64 ( size ), done )
265+ // There are a couple of cases where it's possible for err and resp to both
266+ // be non-nil. However, we expose a simpler contract to our callers: exactly
267+ // one of resp and err will be non-nil. This means that any response body
268+ // must be closed here before returning a non-nil error.
176269 if err != nil {
177270 if resp != nil && resp .Body != nil {
178271 resp .Body .Close ()
179272 }
180273 // If there were retries, indicate this in the error message and wrap the final error.
181274 if rx .attempts > 1 {
182- return nil , fmt .Errorf ("chunk upload failed after %d attempts; , final error: %w" , rx .attempts , err )
275+ return nil , fmt .Errorf ("chunk upload failed after %d attempts, final error: %w" , rx .attempts , err )
183276 }
184277 return nil , err
185278 }
279+
186280 // This case is very unlikely but possible only if rx.ChunkRetryDeadline is
187281 // set to a very small value, in which case no requests will be sent before
188282 // the deadline. Return an error to avoid causing a panic.
189283 if resp == nil {
190- return nil , fmt .Errorf ("upload request to %v not sent, choose larger value for ChunkRetryDealine " , rx .URI )
284+ return nil , fmt .Errorf ("upload request to %v not sent, choose larger value for ChunkRetryDeadline " , rx .URI )
191285 }
192- return resp , nil
193- }
194- // Configure retryable error criteria.
195- errorFunc := rx .Retry .errorFunc ()
196-
197- // Configure per-chunk retry deadline.
198- var retryDeadline time.Duration
199- if rx .ChunkRetryDeadline != 0 {
200- retryDeadline = rx .ChunkRetryDeadline
201- } else {
202- retryDeadline = defaultRetryDeadline
203- }
204-
205- // Send all chunks.
206- for {
207- var pause time.Duration
208-
209- // Each chunk gets its own initialized-at-zero backoff and invocation ID.
210- bo := rx .Retry .backoff ()
211- quitAfterTimer := time .NewTimer (retryDeadline )
212- rx .attempts = 1
213- rx .invocationID = uuid .New ().String ()
214-
215- // Retry loop for a single chunk.
216- for {
217- pauseTimer := time .NewTimer (pause )
218- select {
219- case <- ctx .Done ():
220- quitAfterTimer .Stop ()
221- pauseTimer .Stop ()
222- if err == nil {
223- err = ctx .Err ()
224- }
225- return prepareReturn (resp , err )
226- case <- pauseTimer .C :
227- case <- quitAfterTimer .C :
228- pauseTimer .Stop ()
229- return prepareReturn (resp , err )
230- }
231- pauseTimer .Stop ()
232-
233- // Check for context cancellation or timeout once more. If more than one
234- // case in the select statement above was satisfied at the same time, Go
235- // will choose one arbitrarily.
236- // That can cause an operation to go through even if the context was
237- // canceled before or the timeout was reached.
238- select {
239- case <- ctx .Done ():
240- quitAfterTimer .Stop ()
241- if err == nil {
242- err = ctx .Err ()
243- }
244- return prepareReturn (resp , err )
245- case <- quitAfterTimer .C :
246- return prepareReturn (resp , err )
247- default :
248- }
249286
250- // rCtx is derived from a context with a defined transferTimeout with non-zero value.
251- // If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
252- // triggering a retry of the request.
253- var rCtx context.Context
254- var cancel context.CancelFunc
255-
256- rCtx = ctx
257- if rx .ChunkTransferTimeout != 0 {
258- rCtx , cancel = context .WithTimeout (ctx , rx .ChunkTransferTimeout )
259- }
260-
261- // We close the response's body here, since we definitely will not
262- // return `resp` now. If we close it before the select case above, a
263- // timer may fire and cause us to return a response with a closed body
264- // (in which case, the caller will not get the error message in the body).
265- if resp != nil && resp .Body != nil {
266- // Read the body to EOF - if the Body is not both read to EOF and closed,
267- // the Client's underlying RoundTripper may not be able to re-use the
268- // persistent TCP connection to the server for a subsequent "keep-alive" request.
269- // See https://pkg.go.dev/net/http#Client.Do
287+ if statusResumeIncomplete (resp ) {
288+ // The upload is not yet complete, but the server has acknowledged this chunk.
289+ // We don't have anything to do with the response body.
290+ if resp .Body != nil {
270291 io .Copy (io .Discard , resp .Body )
271292 resp .Body .Close ()
272293 }
273- resp , err = rx .transferChunk (rCtx )
274-
275- var status int
276- if resp != nil {
277- status = resp .StatusCode
278- }
279-
280- // The upload should be retried if the rCtx is canceled due to a timeout.
281- select {
282- case <- rCtx .Done ():
283- if rx .ChunkTransferTimeout != 0 && errors .Is (rCtx .Err (), context .DeadlineExceeded ) {
284- // Cancel the context for rCtx
285- cancel ()
286- continue
287- }
288- default :
289- }
290-
291- // Check if we should retry the request.
292- if ! errorFunc (status , err ) {
293- quitAfterTimer .Stop ()
294- break
295- }
296-
297- rx .attempts ++
298- pause = bo .Pause ()
299- }
300-
301- // If the chunk was uploaded successfully, but there's still
302- // more to go, upload the next chunk without any delay.
303- if statusResumeIncomplete (resp ) {
294+ rx .Media .Next ()
304295 continue
305296 }
306-
307- return prepareReturn (resp , err )
297+ return resp , nil
308298 }
309299}
0 commit comments