Skip to content

Commit e8b8e55

Browse files
authored
fix: logging deadlock (#2346)
* fix: logging deadlock Fix logging deadlock, causing lots of test timeouts. This refactors how logging shutdown is handled, eliminating unnecessary captures, use idiomatic wait group to signal processor completion and remove unnecessary nil initialisation. Fix race condition in log testing which was reading Msg while the processor was still running. Switch to checking GITHUB_RUN_ID environment variable to detect GitHub as XDG_RUNTIME_DIR can be present in other situations. * fix: Docker container Terminate returning early Fix Docker container Terminate returning early when context is cancelled, which was leaving orphaned running containers. This ensures that all life cycle hooks are run even if one errors returning a multi error if needed.
1 parent 55ca42a commit e8b8e55

File tree

3 files changed

+172
-193
lines changed

3 files changed

+172
-193
lines changed

docker.go

Lines changed: 66 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,26 @@ type DockerContainer struct {
6363
isRunning bool
6464
imageWasBuilt bool
6565
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
66-
keepBuiltImage bool
67-
provider *DockerProvider
68-
sessionID string
69-
terminationSignal chan bool
70-
consumers []LogConsumer
71-
raw *types.ContainerJSON
72-
stopLogProductionCh chan bool
73-
logProductionDone chan bool
74-
logProductionError chan error
75-
logProductionMutex sync.Mutex
66+
keepBuiltImage bool
67+
provider *DockerProvider
68+
sessionID string
69+
terminationSignal chan bool
70+
consumers []LogConsumer
71+
raw *types.ContainerJSON
72+
logProductionError chan error
73+
74+
// TODO: Remove locking and wait group once the deprecated StartLogProducer and
75+
// StopLogProducer have been removed and hence logging can only be started and
76+
// stopped once.
77+
78+
// logProductionWaitGroup is used to signal when the log production has stopped.
79+
// This allows stopLogProduction to safely set logProductionStop to nil.
80+
logProductionWaitGroup sync.WaitGroup
81+
82+
// logProductionMutex protects logProductionStop channel so it can be started again.
83+
logProductionMutex sync.Mutex
84+
logProductionStop chan struct{}
85+
7686
logProductionTimeout *time.Duration
7787
logger Logging
7888
lifecycleHooks []ContainerLifecycleHooks
@@ -264,37 +274,26 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {
264274

265275
defer c.provider.client.Close()
266276

267-
err := c.terminatingHook(ctx)
268-
if err != nil {
269-
return err
270-
}
271-
272-
err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{
273-
RemoveVolumes: true,
274-
Force: true,
275-
})
276-
if err != nil {
277-
return err
278-
}
279-
280-
err = c.terminatedHook(ctx)
281-
if err != nil {
282-
return err
277+
errs := []error{
278+
c.terminatingHook(ctx),
279+
c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{
280+
RemoveVolumes: true,
281+
Force: true,
282+
}),
283+
c.terminatedHook(ctx),
283284
}
284285

285286
if c.imageWasBuilt && !c.keepBuiltImage {
286287
_, err := c.provider.client.ImageRemove(ctx, c.Image, types.ImageRemoveOptions{
287288
Force: true,
288289
PruneChildren: true,
289290
})
290-
if err != nil {
291-
return err
292-
}
291+
errs = append(errs, err)
293292
}
294293

295294
c.sessionID = ""
296295
c.isRunning = false
297-
return nil
296+
return errors.Join(errs...)
298297
}
299298

300299
// update container raw info
@@ -675,9 +674,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
675674
c.logProductionMutex.Lock()
676675
defer c.logProductionMutex.Unlock()
677676

678-
if c.stopLogProductionCh != nil {
677+
if c.logProductionStop != nil {
679678
return errors.New("log production already started")
680679
}
680+
681+
c.logProductionStop = make(chan struct{})
682+
c.logProductionWaitGroup.Add(1)
681683
}
682684

683685
for _, opt := range opts {
@@ -699,21 +701,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
699701
c.logProductionTimeout = &maxLogProductionTimeout
700702
}
701703

702-
c.stopLogProductionCh = make(chan bool)
703-
c.logProductionDone = make(chan bool)
704704
c.logProductionError = make(chan error, 1)
705705

706-
go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
707-
// signal the log production is done once go routine exits, this prevents race conditions around start/stop
708-
// set c.stopLogProductionCh to nil so that it can be started again
706+
go func() {
709707
defer func() {
710-
defer c.logProductionMutex.Unlock()
711-
close(done)
712-
close(errorCh)
713-
{
714-
c.logProductionMutex.Lock()
715-
c.stopLogProductionCh = nil
716-
}
708+
close(c.logProductionError)
709+
c.logProductionWaitGroup.Done()
717710
}()
718711

719712
since := ""
@@ -731,15 +724,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
731724

732725
r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
733726
if err != nil {
734-
errorCh <- err
727+
c.logProductionError <- err
735728
return
736729
}
737730
defer c.provider.Close()
738731

739732
for {
740733
select {
741-
case <-stop:
742-
errorCh <- r.Close()
734+
case <-c.logProductionStop:
735+
c.logProductionError <- r.Close()
743736
return
744737
default:
745738
h := make([]byte, 8)
@@ -795,7 +788,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
795788
}
796789
}
797790
}
798-
}(c.stopLogProductionCh, c.logProductionDone, c.logProductionError)
791+
}()
799792

800793
return nil
801794
}
@@ -805,17 +798,18 @@ func (c *DockerContainer) StopLogProducer() error {
805798
return c.stopLogProduction()
806799
}
807800

808-
// StopLogProducer will stop the concurrent process that is reading logs
801+
// stopLogProduction will stop the concurrent process that is reading logs
809802
// and sending them to each added LogConsumer
810803
func (c *DockerContainer) stopLogProduction() error {
804+
// TODO: Remove locking and wait group once StartLogProducer and StopLogProducer
805+
// have been removed and hence logging can only be started / stopped once.
811806
c.logProductionMutex.Lock()
812807
defer c.logProductionMutex.Unlock()
813-
if c.stopLogProductionCh != nil {
814-
c.stopLogProductionCh <- true
815-
// block until the log production is actually done in order to avoid strange races
816-
<-c.logProductionDone
817-
c.stopLogProductionCh = nil
818-
c.logProductionDone = nil
808+
if c.logProductionStop != nil {
809+
close(c.logProductionStop)
810+
c.logProductionWaitGroup.Wait()
811+
// Set c.logProductionStop to nil so that it can be started again.
812+
c.logProductionStop = nil
819813
return <-c.logProductionError
820814
}
821815
return nil
@@ -1122,17 +1116,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
11221116
}
11231117

11241118
c := &DockerContainer{
1125-
ID: resp.ID,
1126-
WaitingFor: req.WaitingFor,
1127-
Image: imageName,
1128-
imageWasBuilt: req.ShouldBuildImage(),
1129-
keepBuiltImage: req.ShouldKeepBuiltImage(),
1130-
sessionID: core.SessionID(),
1131-
provider: p,
1132-
terminationSignal: termSignal,
1133-
stopLogProductionCh: nil,
1134-
logger: p.Logger,
1135-
lifecycleHooks: req.LifecycleHooks,
1119+
ID: resp.ID,
1120+
WaitingFor: req.WaitingFor,
1121+
Image: imageName,
1122+
imageWasBuilt: req.ShouldBuildImage(),
1123+
keepBuiltImage: req.ShouldKeepBuiltImage(),
1124+
sessionID: core.SessionID(),
1125+
provider: p,
1126+
terminationSignal: termSignal,
1127+
logger: p.Logger,
1128+
lifecycleHooks: req.LifecycleHooks,
11361129
}
11371130

11381131
err = c.createdHook(ctx)
@@ -1225,15 +1218,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
12251218
}
12261219

12271220
dc := &DockerContainer{
1228-
ID: c.ID,
1229-
WaitingFor: req.WaitingFor,
1230-
Image: c.Image,
1231-
sessionID: sessionID,
1232-
provider: p,
1233-
terminationSignal: termSignal,
1234-
stopLogProductionCh: nil,
1235-
logger: p.Logger,
1236-
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
1221+
ID: c.ID,
1222+
WaitingFor: req.WaitingFor,
1223+
Image: c.Image,
1224+
sessionID: sessionID,
1225+
provider: p,
1226+
terminationSignal: termSignal,
1227+
logger: p.Logger,
1228+
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
12371229
}
12381230

12391231
err = dc.startedHook(ctx)
@@ -1545,7 +1537,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)
15451537

15461538
container.sessionID = core.SessionID()
15471539
container.consumers = []LogConsumer{}
1548-
container.stopLogProductionCh = nil
15491540
container.isRunning = response.State == "running"
15501541

15511542
// the termination signal should be obtained from the reaper

0 commit comments

Comments
 (0)