Skip to content

Commit 818dad6

Browse files
paulmelnikowbenjie
andauthored
feat(middleware): add a release() function (#1396)
Co-authored-by: Benjie Gillam <[email protected]>
1 parent 73fe801 commit 818dad6

File tree

7 files changed

+228
-27
lines changed

7 files changed

+228
-27
lines changed

src/__tests__/utils/pgPool.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@ import { parse as parsePgConnectionString } from 'pg-connection-string';
33

44
const pgUrl = process.env.TEST_PG_URL || 'postgres:///postgraphile_test';
55

6-
const pgPool = new Pool({
6+
export const poolConfig = {
77
...parsePgConnectionString(pgUrl),
88
max: 15,
99
idleTimeoutMillis: 500,
10+
};
11+
12+
const pgPool = new Pool(poolConfig);
13+
pgPool.on('error', () => {
14+
/* swallow error */
1015
});
1116

1217
export default pgPool;

src/interfaces.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Plugin, PostGraphileCoreOptions } from 'postgraphile-core';
77
import jwt = require('jsonwebtoken');
88
import { EventEmitter } from 'events';
99
import { PostGraphileResponse } from './postgraphile/http/frameworks';
10+
import { ShutdownActions } from './postgraphile/shutdownActions';
1011

1112
type PromiseOrDirect<T> = T | Promise<T>;
1213
type DirectOrCallback<Request, T> = T | ((req: Request) => PromiseOrDirect<T>);
@@ -325,6 +326,7 @@ export interface CreateRequestHandlerOptions extends PostGraphileOptions {
325326
// A Postgres client pool we use to connect Postgres clients.
326327
pgPool: Pool;
327328
_emitter: EventEmitter;
329+
shutdownActions: ShutdownActions;
328330
}
329331

330332
export interface GraphQLFormattedErrorExtended {
@@ -377,13 +379,16 @@ export interface HttpRequestHandler<
377379
faviconRouteHandler: ((res: PostGraphileResponse) => Promise<void>) | null;
378380
eventStreamRoute: string;
379381
eventStreamRouteHandler: ((res: PostGraphileResponse) => Promise<void>) | null;
382+
/** Experimental! */
383+
release: () => Promise<void>;
380384
}
381385

382386
/**
383387
* Options passed to the `withPostGraphileContext` function
384388
*/
385389
export interface WithPostGraphileContextOptions {
386390
pgPool: Pool;
391+
shutdownActions: ShutdownActions;
387392
jwtToken?: string;
388393
jwtSecret?: jwt.Secret;
389394
jwtPublicKey?: jwt.Secret | jwt.GetPublicKeyOrSecret;

src/postgraphile/__tests__/postgraphile-test.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ test('will use a created GraphQL schema to create the HTTP request handler and p
5050
'getGqlSchema',
5151
'pgPool',
5252
'_emitter',
53+
'shutdownActions',
5354
]);
5455
expect(createPostGraphileHttpRequestHandler.mock.calls[0][0].pgPool).toBe(pgPool);
5556
expect(createPostGraphileHttpRequestHandler.mock.calls[0][0].a).toBe(options.a);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
jest.unmock('postgraphile-core');
2+
3+
import pgPool, { poolConfig } from '../../__tests__/utils/pgPool';
4+
import { postgraphile } from '..';
5+
6+
const COMMON_OPTIONS = {
7+
exitOnFail: false,
8+
};
9+
10+
test('When the handler is created using a Pool object, it can be released without triggering an error', async () => {
11+
let handler = postgraphile(pgPool, COMMON_OPTIONS);
12+
await handler.release();
13+
// This would throw an error if the pool was released, it not throwing is what passes the test.
14+
await pgPool.query("select 'Pool is still valid';");
15+
});
16+
17+
test('When the handler is created using Pool config, it releases the pool it creates', async () => {
18+
let handler = postgraphile(poolConfig, COMMON_OPTIONS);
19+
let { pgPool } = handler;
20+
expect(pgPool).toHaveProperty('ended', false);
21+
await handler.release();
22+
expect(pgPool).toHaveProperty('ended', true);
23+
});
24+
25+
test('When the handler is created in watch mode using Pool config, it releases the pool it creates', async () => {
26+
let handler = postgraphile(poolConfig, { ...COMMON_OPTIONS, watchPg: true });
27+
let { pgPool } = handler;
28+
expect(pgPool).toHaveProperty('ended', false);
29+
await handler.release();
30+
expect(pgPool).toHaveProperty('ended', true);
31+
});

src/postgraphile/http/createPostGraphileHttpRequestHandler.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ export default function createPostGraphileHttpRequestHandler(
178178
pgPool,
179179
pgSettings,
180180
pgDefaultRole,
181+
shutdownActions,
181182
queryCacheMaxSize = 50 * MEGABYTE,
182183
extendedErrors,
183184
showErrorStack,
@@ -1120,9 +1121,14 @@ export default function createPostGraphileHttpRequestHandler(
11201121
middleware.faviconRouteHandler = graphiql ? faviconRouteHandler : null;
11211122
middleware.eventStreamRoute = eventStreamRoute;
11221123
middleware.eventStreamRouteHandler = watchPg ? eventStreamRouteHandler : null;
1124+
middleware.shutdownActions = shutdownActions;
1125+
// Experimental
1126+
middleware.release = (): Promise<void> => shutdownActions.invokeAll();
11231127

11241128
const hookedMiddleware = pluginHook('postgraphile:middleware', middleware, {
11251129
options,
1130+
// Experimental
1131+
shutdownActions,
11261132
});
11271133
// Sanity check:
11281134
if (!hookedMiddleware.getGraphQLSchema) {

src/postgraphile/postgraphile.ts

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { pluginHookFromOptions } from './pluginHook';
99
import { PostGraphileOptions, mixed, HttpRequestHandler } from '../interfaces';
1010
import chalk from 'chalk';
1111
import { debugPgClient } from './withPostGraphileContext';
12+
import { ShutdownActions } from './shutdownActions';
1213

1314
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
1415

@@ -43,20 +44,8 @@ export function getPostgraphileSchemaBuilder<
4344
pgPool: Pool,
4445
schema: string | Array<string>,
4546
incomingOptions: PostGraphileOptions<Request, Response>,
46-
release: null | (() => void) = null,
47+
shutdownActions: ShutdownActions = new ShutdownActions(),
4748
): PostgraphileSchemaBuilder {
48-
let released = false;
49-
function releaseOnce() {
50-
if (released) {
51-
throw new Error(
52-
'Already released this PostGraphile schema builder; should not have attempted a second release',
53-
);
54-
}
55-
released = true;
56-
if (release) {
57-
release();
58-
}
59-
}
6049
if (incomingOptions.live && incomingOptions.subscriptions == null) {
6150
// live implies subscriptions
6251
incomingOptions.subscriptions = true;
@@ -100,22 +89,73 @@ export function getPostgraphileSchemaBuilder<
10089

10190
async function createGqlSchema(): Promise<GraphQLSchema> {
10291
let attempts = 0;
92+
93+
let isShuttingDown = false;
94+
shutdownActions.add(async () => {
95+
isShuttingDown = true;
96+
});
97+
/*
98+
* This function should be called after every `await` in the try{} block
99+
* below so that if a shutdown occurs whilst we're awaiting something else
100+
* we immediately clean up.
101+
*/
102+
const assertAlive = () => {
103+
if (isShuttingDown) {
104+
throw Object.assign(new Error('PostGraphile is shutting down'), { isShutdownAction: true });
105+
}
106+
};
107+
108+
// If we're in watch mode, cancel watch mode on shutdown
109+
let releaseWatchFnPromise: Promise<() => void> | null = null;
110+
shutdownActions.add(async () => {
111+
if (releaseWatchFnPromise) {
112+
try {
113+
const releaseWatchFn = await releaseWatchFnPromise;
114+
await releaseWatchFn();
115+
} catch (e) {
116+
// Ignore errors during shutdown.
117+
}
118+
}
119+
});
120+
121+
// If the server shuts down, make sure the schema has resolved or
122+
// rejected before signaling shutdown is complete. If it rejected,
123+
// don't propagate the error.
124+
let gqlSchemaPromise: Promise<GraphQLSchema> | null = null;
125+
shutdownActions.add(async () => {
126+
if (gqlSchemaPromise) {
127+
await gqlSchemaPromise.catch(() => null);
128+
}
129+
});
130+
103131
// eslint-disable-next-line no-constant-condition
104132
while (true) {
133+
assertAlive();
105134
try {
106135
if (options.watchPg) {
107-
await watchPostGraphileSchema(pgPool, pgSchemas, options, newSchema => {
136+
// We must register the value used by the shutdown action immediately to avoid a race condition.
137+
releaseWatchFnPromise = watchPostGraphileSchema(pgPool, pgSchemas, options, newSchema => {
108138
gqlSchema = newSchema;
109139
_emitter.emit('schemas:changed');
110140
exportGqlSchema(gqlSchema);
111141
});
142+
143+
// Wait for the watch to be set up before progressing.
144+
await releaseWatchFnPromise;
145+
assertAlive();
146+
112147
if (!gqlSchema) {
113148
throw new Error(
114149
"Consistency error: watchPostGraphileSchema promises to call the callback before the promise resolves; but this hasn't happened",
115150
);
116151
}
117152
} else {
118-
gqlSchema = await createPostGraphileSchema(pgPool, pgSchemas, options);
153+
// We must register the value used by the shutdown action immediately to avoid a race condition.
154+
gqlSchemaPromise = createPostGraphileSchema(pgPool, pgSchemas, options);
155+
156+
gqlSchema = await gqlSchemaPromise;
157+
assertAlive();
158+
119159
exportGqlSchema(gqlSchema);
120160
}
121161
if (attempts > 0) {
@@ -128,16 +168,36 @@ export function getPostgraphileSchemaBuilder<
128168
}
129169
return gqlSchema;
130170
} catch (error) {
171+
releaseWatchFnPromise = null;
172+
gqlSchemaPromise = null;
131173
attempts++;
132174
const delay = Math.min(100 * Math.pow(attempts, 2), 30000);
133-
if (typeof options.retryOnInitFail === 'function') {
134-
const start = process.hrtime();
175+
if (error.isShutdownAction) {
176+
throw error;
177+
} else if (isShuttingDown) {
178+
console.error(
179+
'An error occurred whilst building the schema. However, the server was shutting down, which might have caused it.',
180+
);
181+
console.error(error);
182+
throw error;
183+
} else if (typeof options.retryOnInitFail === 'function') {
135184
try {
185+
const start = process.hrtime();
136186
const retry = await options.retryOnInitFail(error, attempts);
137187
const diff = process.hrtime(start);
138188
const dur = diff[0] * 1e3 + diff[1] * 1e-6;
139-
if (!retry) {
140-
releaseOnce();
189+
190+
if (isShuttingDown) {
191+
throw error;
192+
} else if (!retry) {
193+
// Trigger a shutdown, and swallow any new errors so old error is still thrown
194+
await shutdownActions.invokeAll().catch(e => {
195+
console.error(
196+
'An additional error occured whilst calling shutdownActions.invokeAll():',
197+
);
198+
console.error(e);
199+
});
200+
141201
throw error;
142202
} else {
143203
if (dur < 50) {
@@ -256,9 +316,14 @@ export default function postgraphile<
256316
);
257317
}
258318

319+
const shutdownActions = new ShutdownActions();
320+
259321
// Do some things with `poolOrConfig` so that in the end, we actually get a
260322
// Postgres pool.
261-
const { pgPool, release } = toPgPool(poolOrConfig);
323+
const { pgPool, releasePgPool } = toPgPool(poolOrConfig);
324+
if (releasePgPool) {
325+
shutdownActions.add(releasePgPool);
326+
}
262327

263328
pgPool.on('error', err => {
264329
/*
@@ -282,14 +347,15 @@ export default function postgraphile<
282347
pgPool,
283348
schema,
284349
incomingOptions,
285-
release,
350+
shutdownActions,
286351
);
287352
return createPostGraphileHttpRequestHandler({
288353
...(typeof poolOrConfig === 'string' ? { ownerConnectionString: poolOrConfig } : {}),
289354
...options,
290355
getGqlSchema: getGraphQLSchema,
291356
pgPool,
292357
_emitter,
358+
shutdownActions,
293359
});
294360
}
295361

@@ -321,24 +387,24 @@ function constructorName(obj: mixed): string | null {
321387
}
322388

323389
// tslint:disable-next-line no-any
324-
function toPgPool(poolOrConfig: any): { pgPool: Pool; release: null | (() => void) } {
390+
function toPgPool(poolOrConfig: any): { pgPool: Pool; releasePgPool: null | (() => void) } {
325391
if (quacksLikePgPool(poolOrConfig)) {
326392
// If it is already a `Pool`, just use it.
327-
return { pgPool: poolOrConfig, release: null };
393+
return { pgPool: poolOrConfig, releasePgPool: null };
328394
}
329395

330396
if (typeof poolOrConfig === 'string') {
331397
// If it is a string, let us parse it to get a config to create a `Pool`.
332398
const pgPool = new Pool({ connectionString: poolOrConfig });
333-
return { pgPool, release: () => pgPool.end() };
399+
return { pgPool, releasePgPool: () => pgPool.end() };
334400
} else if (!poolOrConfig) {
335401
// Use an empty config and let the defaults take over.
336402
const pgPool = new Pool({});
337-
return { pgPool, release: () => pgPool.end() };
403+
return { pgPool, releasePgPool: () => pgPool.end() };
338404
} else if (isPlainObject(poolOrConfig)) {
339405
// The user handed over a configuration object, pass it through
340406
const pgPool = new Pool(poolOrConfig);
341-
return { pgPool, release: () => pgPool.end() };
407+
return { pgPool, releasePgPool: () => pgPool.end() };
342408
} else {
343409
throw new Error('Invalid connection string / Pool ');
344410
}

0 commit comments

Comments
 (0)