diff --git a/lib/base/connection.js b/lib/base/connection.js index 77da3d0f37..62d84796f0 100644 --- a/lib/base/connection.js +++ b/lib/base/connection.js @@ -28,6 +28,7 @@ const Packets = require('../packets/index.js'); const Commands = require('../commands/index.js'); const ConnectionConfig = require('../connection_config.js'); const CharsetToEncoding = require('../constants/charset_encodings.js'); +const { ER_UNKNOWN_STMT_HANDLER } = require('../constants/errors.js'); const { traceCallback, tracePromise, @@ -38,6 +39,10 @@ const { connectChannel, } = require('../tracing.js'); +const returnNull = function () { + return null; +}; + let _connectionId = 0; let convertNamedPlaceholders = null; @@ -801,11 +806,8 @@ class BaseConnection extends EventEmitter { const prepareCommand = new Commands.Prepare(options, (err, stmt) => { if (err) { // skip execute command if prepare failed - executeCommand.start = function () { - return null; - }; + executeCommand.next = returnNull; errorCb(err); - executeCommand.emit('end'); return; } executeCommand.statement = stmt; @@ -814,13 +816,49 @@ class BaseConnection extends EventEmitter { this.addCommand(executeCommand); }; + // We need to intercept and retry prepareAndExecute if we had a stale prepared statement in the cache that the server already released + const key = BaseConnection.statementKey(options); + const cacheHasKey = this._statements.has(key); + if (executeCommand.onResult) { // Callback mode: traceCallback wraps the callback with tracing lifecycle, or calls through directly when no subscribers are registered const origExecCb = executeCommand.onResult; traceCallback( executeChannel, (wrappedCb) => { - executeCommand.onResult = wrappedCb; + if (cacheHasKey) { + executeCommand.onResult = (err, ...rest) => { + if (err && err.errno === ER_UNKNOWN_STMT_HANDLER) { + const origEmit = executeCommand.emit.bind(executeCommand); + executeCommand.emit = (eventName, ...rest) => { + if (eventName === 'end') { + // Intercept the 'end' event that will be emitted after this 'error' event is emitted + executeCommand.emit = origEmit; + return false; + } + // In this case there currently will not be any other events emitted before 'end', but leaving + // this here in case that changes in the future... + return origEmit(eventName, ...rest); + }; + + // Listeners may have been added to this execute command, so we reuse it + executeCommand.next = null; + + // We know that the statement does not exist on the server, so there is no need to close it + executeCommand.statement.close = returnNull; + this._statements.delete(key); + + executeCommand.onResult = wrappedCb; + prepareAndExecute(wrappedCb); + return; + } + + wrappedCb(err, ...rest); + }; + } else { + executeCommand.onResult = wrappedCb; + } + prepareAndExecute(wrappedCb); }, 0, @@ -837,7 +875,47 @@ class BaseConnection extends EventEmitter { null, origExecCb ); - } else if (shouldTrace(executeChannel)) { + + return executeCommand; + } + + if (cacheHasKey) { + const origEmit = executeCommand.emit.bind(executeCommand); + executeCommand.emit = (eventName, firstArg, ...rest) => { + if ( + eventName === 'error' && + firstArg && + firstArg.errno === ER_UNKNOWN_STMT_HANDLER + ) { + executeCommand.emit = (eventName, ...rest) => { + if (eventName === 'end') { + // Intercept the 'end' event that will be emitted after this 'error' event is emitted + executeCommand.emit = origEmit; + return false; + } + // In this case there currently will not be any other events emitted before 'end', but leaving + // this here in case that changes in the future... + return origEmit(eventName, ...rest); + }; + + // Listeners may have been added to this execute command, so we reuse it + executeCommand.next = null; + + // We know that the statement does not exist on the server, so there is no need to close it + executeCommand.statement.close = returnNull; + this._statements.delete(key); + + prepareAndExecute((err) => { + executeCommand.emit('error', err); + }); + return false; + } + + return origEmit(eventName, firstArg, ...rest); + }; + } + + if (shouldTrace(executeChannel)) { // Event-emitter mode: tracePromise wraps the async lifecycle tracePromise( executeChannel, diff --git a/test/integration/connection/test-execute-cached.test.mts b/test/integration/connection/test-execute-cached.test.mts index d8da7abc25..3bcacd1c81 100644 --- a/test/integration/connection/test-execute-cached.test.mts +++ b/test/integration/connection/test-execute-cached.test.mts @@ -1,5 +1,6 @@ -import type { RowDataPacket } from '../../../index.js'; +import type { QueryError, RowDataPacket } from '../../../index.js'; import { describe, it, strict } from 'poku'; +import { ER_PARSE_ERROR } from '../../../lib/constants/errors.js'; import { createConnection } from '../../common.test.mjs'; type TestRow = RowDataPacket & { test: number }; @@ -10,7 +11,7 @@ await describe('Execute Cached', async () => { const q = 'select 1 + ? as test'; const key = `undefined/undefined/undefined${q}`; - await it('should cache prepared statements', async () => { + await it('should cache prepared statements (Callback API)', async () => { const rows1 = await new Promise((resolve, reject) => { connection.execute(q, [123], (err, _rows) => err ? reject(err) : resolve(_rows) @@ -41,5 +42,258 @@ await describe('Execute Cached', async () => { strict.deepEqual(rows3, [{ test: 126 }]); }); + await it('should discard cached prepared statements that no longer exist on the server and retry automatically (Callback API)', async () => { + // Remove on the server but leave it in the cache + // @ts-expect-error: internal access + connection._statements.get(key).close(); + + // Remember the id + // @ts-expect-error: internal access + const { id: oldStatementId } = connection._statements.get(key); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).id === oldStatementId); + + const rows1 = await new Promise((resolve, reject) => { + connection.execute(q, [123], (err, _rows) => + err ? reject(err) : resolve(_rows) + ); + }); + + const rows2 = await new Promise((resolve, reject) => { + connection.execute(q, [124], (err, _rows) => + err ? reject(err) : resolve(_rows) + ); + }); + + const rows3 = await new Promise((resolve, reject) => { + connection.execute(q, [125], (err, _rows) => + err ? reject(err) : resolve(_rows) + ); + }); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).id !== oldStatementId); + + strict.deepEqual(rows1, [{ test: 124 }]); + strict.deepEqual(rows2, [{ test: 125 }]); + strict.deepEqual(rows3, [{ test: 126 }]); + }); + + await it('should properly forward prepare command errors to the executeCommand event listeners (Callback API)', async () => { + // Remove on the server but leave it in the cache + // @ts-expect-error: internal access + connection._statements.get(key).close(); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + + // Intercept addCommand so we can access the prepare command instance + let numAddCommandCalls = 0; + // @ts-expect-error: internal access + const origAddCommand = connection.addCommand.bind(connection); + // @ts-expect-error: internal access + connection.addCommand = function (command: { query: string }) { + if (++numAddCommandCalls === 3) { + // The third command added will be the prepare command issued while retrying the query + command.query = 'ASDF'; // Force a failure with an invalid query + + // Restore the original addCommand function to the connection + // @ts-expect-error: internal access + connection.addCommand = origAddCommand; + } + + origAddCommand(command); + }; + + const error = await new Promise((resolve, reject) => { + connection.execute(q, [123], (err, _rows) => + err ? resolve(err) : reject(_rows) + ); + }); + + // @ts-expect-error: internal access + strict(connection._statements.size === 0); + // @ts-expect-error: internal access + strict(!connection._statements.has(key)); + + strict(error.errno === ER_PARSE_ERROR); + }); + + await it('should cache prepared statements (EventEmitter API)', async () => { + const rows1 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [123]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + const rows2 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [124]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + const rows3 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [125]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + + strict.deepEqual(rows1, [{ test: 124 }]); + strict.deepEqual(rows2, [{ test: 125 }]); + strict.deepEqual(rows3, [{ test: 126 }]); + }); + + await it('should discard cached prepared statements that no longer exist on the server and retry automatically (EventEmitter API)', async () => { + // Remove on the server but leave it in the cache + // @ts-expect-error: internal access + connection._statements.get(key).close(); + + // Remember the id + // @ts-expect-error: internal access + const { id: oldStatementId } = connection._statements.get(key); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).id === oldStatementId); + + const rows1 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [123]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + const rows2 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [124]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + const rows3 = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [125]); + const _rows: TestRow[] = []; + + executeCommand.once('error', (err) => reject(err)); + executeCommand.on('result', (row: TestRow) => { + _rows.push(row); + }); + executeCommand.once('end', () => resolve(_rows)); + }); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).id !== oldStatementId); + + strict.deepEqual(rows1, [{ test: 124 }]); + strict.deepEqual(rows2, [{ test: 125 }]); + strict.deepEqual(rows3, [{ test: 126 }]); + }); + + await it('should properly forward prepare command errors to the executeCommand event listeners (EventEmitter API)', async () => { + // Remove on the server but leave it in the cache + // @ts-expect-error: internal access + connection._statements.get(key).close(); + + // @ts-expect-error: internal access + strict(connection._statements.size === 1); + // @ts-expect-error: internal access + strict(connection._statements.get(key).query === q); + // @ts-expect-error: internal access + strict(connection._statements.get(key).parameters.length === 1); + + // Intercept addCommand so we can access the prepare command instance + let numAddCommandCalls = 0; + // @ts-expect-error: internal access + const origAddCommand = connection.addCommand.bind(connection); + // @ts-expect-error: internal access + connection.addCommand = function (command: { query: string }) { + if (++numAddCommandCalls === 3) { + // The third command added will be the prepare command issued while retrying the query + command.query = 'ASDF'; // Force a failure with an invalid query + + // Restore the original addCommand function to the connection + // @ts-expect-error: internal access + connection.addCommand = origAddCommand; + } + + origAddCommand(command); + }; + + const errors = await new Promise((resolve, reject) => { + const executeCommand = connection.execute(q, [123]); + + const errors: QueryError[] = []; + executeCommand.on('error', (err) => errors.push(err)); + executeCommand.once('result', (row) => reject(row)); + executeCommand.once('end', () => resolve(errors)); + }); + + // @ts-expect-error: internal access + strict(connection._statements.size === 0); + // @ts-expect-error: internal access + strict(!connection._statements.has(key)); + + strict(errors.length === 1); + strict(errors[0].errno === ER_PARSE_ERROR); + }); + connection.end(); });