Skip to content

Commit 931a89c

Browse files
[release-22.0] connpool: fix connection leak during idle connection reopen (#18967) (#18970)
Signed-off-by: Arthur Schreiber <[email protected]> Co-authored-by: Arthur Schreiber <[email protected]>
1 parent 51d20de commit 931a89c

File tree

2 files changed

+105
-4
lines changed

2 files changed

+105
-4
lines changed

go/pools/smartconnpool/pool.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,13 @@ func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] {
460460
// to expire this connection (even if it's still visible to them), so it's
461461
// safe to return it
462462
for conn, ok := stack.Pop(); ok; conn, ok = stack.Pop() {
463-
if conn.timeUsed.borrow() {
464-
return conn
463+
if !conn.timeUsed.borrow() {
464+
// Ignore the connection that couldn't be borrowed;
465+
// it's being closed by the idle worker and replaced by a new connection.
466+
continue
465467
}
468+
469+
return conn
466470
}
467471
return nil
468472
}
@@ -787,11 +791,23 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
787791
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
788792
if conn.timeUsed.expired(mono, timeout) {
789793
pool.Metrics.idleClosed.Add(1)
794+
790795
conn.Close()
796+
pool.closedConn()
797+
791798
// Using context.Background() is fine since MySQL connection already enforces
792799
// a connect timeout via the `db_connect_timeout_ms` config param.
793-
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
794-
pool.closedConn()
800+
c, err := pool.getNew(context.Background())
801+
if err != nil {
802+
// If we couldn't open a new connection, just continue
803+
continue
804+
}
805+
806+
// opening a new connection might have raced with other goroutines,
807+
// so it's possible that we got back `nil` here
808+
if c != nil {
809+
// Return the new connection to the pool
810+
pool.tryReturnConn(c)
795811
}
796812
}
797813
}

go/pools/smartconnpool/pool_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,3 +1322,88 @@ func TestCloseDuringWaitForConn(t *testing.T) {
13221322
require.EqualValues(t, 0, state.open.Load())
13231323
}
13241324
}
1325+
1326+
// TestIdleTimeoutConnectionLeak checks for leaked connections after idle timeout
1327+
func TestIdleTimeoutConnectionLeak(t *testing.T) {
1328+
var state TestState
1329+
1330+
// Slow connection creation to ensure idle timeout happens during reopening
1331+
state.chaos.delayConnect = 300 * time.Millisecond
1332+
1333+
p := NewPool(&Config[*TestConn]{
1334+
Capacity: 2,
1335+
IdleTimeout: 50 * time.Millisecond,
1336+
LogWait: state.LogWait,
1337+
}).Open(newConnector(&state), nil)
1338+
1339+
getCtx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond)
1340+
defer cancel()
1341+
1342+
// Get and return two connections
1343+
conn1, err := p.Get(getCtx, nil)
1344+
require.NoError(t, err)
1345+
1346+
conn2, err := p.Get(getCtx, nil)
1347+
require.NoError(t, err)
1348+
1349+
p.put(conn1)
1350+
p.put(conn2)
1351+
1352+
// At this point: Active=2, InUse=0, Available=2
1353+
require.EqualValues(t, 2, p.Active())
1354+
require.EqualValues(t, 0, p.InUse())
1355+
require.EqualValues(t, 2, p.Available())
1356+
1357+
// Wait for idle timeout to kick in and start expiring connections
1358+
require.EventuallyWithT(t, func(c *assert.CollectT) {
1359+
// Check the actual number of currently open connections
1360+
assert.Equal(c, int64(2), state.open.Load())
1361+
// Check the total number of closed connections
1362+
assert.Equal(c, int64(1), state.close.Load())
1363+
}, 100*time.Millisecond, 10*time.Millisecond)
1364+
1365+
// At this point, the idle timeout worker has expired the connections
1366+
// and is trying to reopen them (which takes 300ms due to delayConnect)
1367+
1368+
// Try to get connections while they're being reopened
1369+
// This should trigger the bug where connections get discarded
1370+
for i := 0; i < 2; i++ {
1371+
getCtx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
1372+
defer cancel()
1373+
1374+
conn, err := p.Get(getCtx, nil)
1375+
require.NoError(t, err)
1376+
1377+
p.put(conn)
1378+
}
1379+
1380+
// Wait a moment for all reopening to complete
1381+
require.EventuallyWithT(t, func(c *assert.CollectT) {
1382+
// Check the actual number of currently open connections
1383+
require.Equal(c, int64(2), state.open.Load())
1384+
// Check the total number of closed connections
1385+
require.Equal(c, int64(2), state.close.Load())
1386+
}, 400*time.Millisecond, 10*time.Millisecond)
1387+
1388+
// Check the pool state
1389+
assert.Equal(t, int64(2), p.Active())
1390+
assert.Equal(t, int64(0), p.InUse())
1391+
assert.Equal(t, int64(2), p.Available())
1392+
assert.Equal(t, int64(2), p.Metrics.IdleClosed())
1393+
1394+
// Try to close the pool - if there are leaked connections, this will timeout
1395+
closeCtx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond)
1396+
defer cancel()
1397+
1398+
err = p.CloseWithContext(closeCtx)
1399+
require.NoError(t, err)
1400+
1401+
// Pool should be completely closed now
1402+
assert.Equal(t, int64(0), p.Active())
1403+
assert.Equal(t, int64(0), p.InUse())
1404+
assert.Equal(t, int64(0), p.Available())
1405+
assert.Equal(t, int64(2), p.Metrics.IdleClosed())
1406+
1407+
assert.Equal(t, int64(0), state.open.Load())
1408+
assert.Equal(t, int64(4), state.close.Load())
1409+
}

0 commit comments

Comments
 (0)