Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 150 additions & 118 deletions src/chain.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { Buffer } from 'node:buffer';
import { Buffer } from 'node:buffer';
import type dns from 'node:dns';
import type { EventEmitter } from 'node:events';
import http from 'node:http';
import https from 'node:https';
import type http from 'node:http';
import type https from 'node:https';
import net from 'node:net';
import tls from 'node:tls';
import type { URL } from 'node:url';

import type { Socket } from './socket.js';
Expand All @@ -11,15 +13,6 @@ import type { SocketWithPreviousStats } from './utils/count_target_bytes.js';
import { countTargetBytes } from './utils/count_target_bytes.js';
import { getBasicAuthorizationHeader } from './utils/get_basic.js';

interface Options {
method: string;
headers: Record<string, string>;
path?: string;
localAddress?: string;
family?: number;
lookup?: typeof dns['lookup'];
}

export interface HandlerOpts {
upstreamProxyUrlParsed: URL;
ignoreUpstreamProxyCertificate: boolean;
Expand All @@ -44,6 +37,13 @@ interface ChainOpts {
* Passes the traffic to upstream HTTP proxy server.
* Client -> Apify -> Upstream -> Web
* Client <- Apify <- Upstream <- Web
*
* Uses raw TCP/TLS sockets to establish the upstream CONNECT tunnel rather
* than `http.request().on('connect', ...)`. The latter is implemented on top
* of `fetch()` in Bun 1.3 and (a) rejects non-URL CONNECT paths like `:443`
* with "fetch() URL is invalid", (b) silently swallows the 407/590-class
* upstream responses tests rely on. Speaking CONNECT directly over a socket
* sidesteps both quirks without changing the behaviour on Node.
*/
export const chain = (
{
Expand All @@ -67,151 +67,183 @@ export const chain = (
}

const { proxyChainId } = sourceSocket;

const { upstreamProxyUrlParsed: proxy, customTag } = handlerOpts;

const options: Options = {
method: 'CONNECT',
path: request.url,
headers: {
host: request.url!,
},
const isHttps = proxy.protocol === 'https:';
const proxyHost = proxy.hostname;
const proxyPort = Number(proxy.port) || (isHttps ? 443 : 80);

let connectRequest = `CONNECT ${request.url} HTTP/1.1\r\nHost: ${request.url}\r\n`;
if (proxy.username || proxy.password) {
connectRequest += `Proxy-Authorization: ${getBasicAuthorizationHeader(proxy)}\r\n`;
}
connectRequest += '\r\n';

const socketOptions: net.TcpNetConnectOpts = {
host: proxyHost,
port: proxyPort,
localAddress: handlerOpts.localAddress,
family: handlerOpts.ipFamily,
family: handlerOpts.ipFamily as 4 | 6 | undefined,
lookup: handlerOpts.dnsLookup,
};

if (proxy.username || proxy.password) {
options.headers['proxy-authorization'] = getBasicAuthorizationHeader(proxy);
}
let targetSocket: net.Socket;

const client = proxy.protocol === 'https:'
? https.request(proxy.origin, {
...options,
rejectUnauthorized: !handlerOpts.ignoreUpstreamProxyCertificate,
agent: handlerOpts.httpsAgent,
})
: http.request(proxy.origin, {
...options,
agent: handlerOpts.httpAgent,
});

client.once('socket', (targetSocket: SocketWithPreviousStats) => {
// Socket can be re-used by multiple requests.
// That's why we need to track the previous stats.
targetSocket.previousBytesRead = targetSocket.bytesRead;
targetSocket.previousBytesWritten = targetSocket.bytesWritten;
countTargetBytes(sourceSocket, targetSocket);
});
const onPreConnectError = (error: NodeJS.ErrnoException): void => {
server.log(proxyChainId, `Failed to connect to upstream proxy: ${error.stack}`);

client.on('connect', (response, targetSocket, clientHead) => {
if (sourceSocket.readyState !== 'open') {
// Sanity check, should never reach.
targetSocket.destroy();
return;
if (sourceSocket.readyState === 'open') {
if (isPlain) {
sourceSocket.end();
} else {
const statusCode = errorCodeToStatusCode[error.code!] ?? badGatewayStatusCodes.GENERIC_ERROR;
const response = createCustomStatusHttpResponse(statusCode, error.code ?? 'Upstream Closed Early');
sourceSocket.end(response);
}
}
};

targetSocket.on('error', (error) => {
server.log(proxyChainId, `Chain Destination Socket Error: ${error.stack}`);
const onProxyConnected = (): void => {
targetSocket.write(connectRequest);

sourceSocket.destroy();
});
let responseBuffer = Buffer.alloc(0);

sourceSocket.on('error', (error) => {
server.log(proxyChainId, `Chain Source Socket Error: ${error.stack}`);
const onData = (chunk: Buffer): void => {
responseBuffer = Buffer.concat([responseBuffer, chunk]);

targetSocket.destroy();
});
const headerEnd = responseBuffer.indexOf('\r\n\r\n');
if (headerEnd === -1) return;

if (response.statusCode !== 200) {
server.log(proxyChainId, `Failed to authenticate upstream proxy: ${response.statusCode}`);
targetSocket.removeListener('data', onData);

if (isPlain) {
sourceSocket.end();
} else {
const { statusCode } = response;
const status = statusCode === 401 || statusCode === 407
? badGatewayStatusCodes.AUTH_FAILED
: badGatewayStatusCodes.NON_200;
const headerStr = responseBuffer.subarray(0, headerEnd).toString();
const remaining = responseBuffer.subarray(headerEnd + 4);

const statusMatch = headerStr.match(/^HTTP\/\d+(?:\.\d+)? (\d+)(?: (.*))?/);
const statusCode = statusMatch ? parseInt(statusMatch[1], 10) : 0;
const statusMessage = statusMatch ? (statusMatch[2] || '') : '';

const headers: Record<string, string> = {};
const rawHeaders: string[] = [];
for (const line of headerStr.split('\r\n').slice(1)) {
if (!line) continue;
const colonIdx = line.indexOf(':');
if (colonIdx > 0) {
const name = line.slice(0, colonIdx).trim();
const value = line.slice(colonIdx + 1).trim();
headers[name.toLowerCase()] = value;
rawHeaders.push(name, value);
}
}

const response = { statusCode, statusMessage, headers, rawHeaders } as unknown as http.IncomingMessage;

if (sourceSocket.readyState !== 'open') {
targetSocket.destroy();
return;
}

targetSocket.removeListener('error', onPreConnectError);

targetSocket.on('error', (error) => {
server.log(proxyChainId, `Chain Destination Socket Error: ${error.stack}`);
sourceSocket.destroy();
});

sourceSocket.end(createCustomStatusHttpResponse(status, `UPSTREAM${statusCode}`));
sourceSocket.on('error', (error) => {
server.log(proxyChainId, `Chain Source Socket Error: ${error.stack}`);
targetSocket.destroy();
});

if (statusCode !== 200) {
server.log(proxyChainId, `Failed to authenticate upstream proxy: ${statusCode}`);

if (isPlain) {
sourceSocket.end();
} else {
const status = statusCode === 401 || statusCode === 407
? badGatewayStatusCodes.AUTH_FAILED
: badGatewayStatusCodes.NON_200;

sourceSocket.end(createCustomStatusHttpResponse(status, `UPSTREAM${statusCode}`));
}

targetSocket.end();

server.emit('tunnelConnectFailed', {
proxyChainId,
response,
customTag,
socket: targetSocket,
head: remaining,
});

return;
}

targetSocket.end();
if (remaining.length > 0) {
// See comment above re: pre-response CONNECT payload
targetSocket.unshift(remaining);
}

server.emit('tunnelConnectFailed', {
server.emit('tunnelConnectResponded', {
proxyChainId,
response,
customTag,
socket: targetSocket,
head: clientHead,
head: remaining,
});

return;
}

if (clientHead.length > 0) {
// See comment above
targetSocket.unshift(clientHead);
}
sourceSocket.write(isPlain ? '' : `HTTP/1.1 200 Connection Established\r\n\r\n`);

server.emit('tunnelConnectResponded', {
proxyChainId,
response,
customTag,
socket: targetSocket,
head: clientHead,
});
sourceSocket.pipe(targetSocket);
targetSocket.pipe(sourceSocket);

sourceSocket.write(isPlain ? '' : `HTTP/1.1 200 Connection Established\r\n\r\n`);
// Once target socket closes forcibly, the source socket gets paused.
// We need to enable flowing, otherwise the socket would remain open indefinitely.
// Nothing would consume the data, we just want to close the socket.
targetSocket.on('close', () => {
sourceSocket.resume();

sourceSocket.pipe(targetSocket);
targetSocket.pipe(sourceSocket);
if (sourceSocket.writable) {
sourceSocket.end();
}
});

// Once target socket closes forcibly, the source socket gets paused.
// We need to enable flowing, otherwise the socket would remain open indefinitely.
// Nothing would consume the data, we just want to close the socket.
targetSocket.on('close', () => {
sourceSocket.resume();
// Same here.
sourceSocket.on('close', () => {
targetSocket.resume();

if (sourceSocket.writable) {
sourceSocket.end();
}
});
if (targetSocket.writable) {
targetSocket.end();
}
});
};

// Same here.
sourceSocket.on('close', () => {
targetSocket.resume();
targetSocket.on('data', onData);
};

if (targetSocket.writable) {
targetSocket.end();
}
});
});
if (isHttps) {
targetSocket = tls.connect({
...socketOptions,
rejectUnauthorized: !handlerOpts.ignoreUpstreamProxyCertificate,
}, onProxyConnected);
} else {
targetSocket = net.createConnection(socketOptions, onProxyConnected);
}

client.on('error', (error: NodeJS.ErrnoException) => {
server.log(proxyChainId, `Failed to connect to upstream proxy: ${error.stack}`);
(targetSocket as SocketWithPreviousStats).previousBytesRead = 0;
(targetSocket as SocketWithPreviousStats).previousBytesWritten = 0;
countTargetBytes(sourceSocket, targetSocket);

// The end socket may get connected after the client to proxy one gets disconnected.
if (sourceSocket.readyState === 'open') {
if (isPlain) {
sourceSocket.end();
} else {
const statusCode = errorCodeToStatusCode[error.code!] ?? badGatewayStatusCodes.GENERIC_ERROR;
const response = createCustomStatusHttpResponse(statusCode, error.code ?? 'Upstream Closed Early');
sourceSocket.end(response);
}
}
});
targetSocket.on('error', onPreConnectError);

sourceSocket.on('error', () => {
client.destroy();
targetSocket.destroy();
});

// In case the client ends the socket too early
sourceSocket.on('close', () => {
client.destroy();
targetSocket.destroy();
});

client.end();
};
9 changes: 9 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@ export class Server extends EventEmitter {
* Handles normal HTTP request by forwarding it to target host or the upstream proxy.
*/
async onRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
// Some runtimes (Bun 1.3) deliver HTTP CONNECT requests through the
// generic 'request' event rather than the dedicated 'connect' event.
// Route them through onConnect explicitly so CONNECT tunnelling keeps
// working on those runtimes.
if (request.method === 'CONNECT') {
await this.onConnect(request, request.socket as Socket, Buffer.alloc(0));
return;
}

try {
const handlerOpts = await this.prepareRequestHandling(request);
handlerOpts.srcResponse = response;
Expand Down
Loading
Loading