From 65d22632d44228a5235f4013be17f96fd3f32109 Mon Sep 17 00:00:00 2001 From: jona42-ui Date: Tue, 10 Feb 2026 12:10:16 +0300 Subject: [PATCH] fix(core): implement two-level Map for O(1) protocol+subprotocol lookup Signed-off-by: Thembo Jonathan --- .../binding-coap/src/coap-client-factory.ts | 4 + .../binding-coap/src/coaps-client-factory.ts | 4 + .../binding-file/src/file-client-factory.ts | 4 + .../binding-http/src/http-client-factory.ts | 4 + .../binding-http/src/https-client-factory.ts | 4 + .../binding-mbus/src/mbus-client-factory.ts | 4 + .../src/modbus-client-factory.ts | 4 + .../binding-mqtt/src/mqtt-client-factory.ts | 14 +- packages/binding-mqtt/src/mqtt-client.ts | 26 ++- .../binding-mqtt/src/mqtts-client-factory.ts | 31 +-- .../mqtt-client-subscribe-test.integration.ts | 5 +- .../test/mqtt-websocket-test.integration.ts | 186 ++++++++++++++++ .../src/netconf-client-factory.ts | 4 + packages/binding-opcua/src/factory.ts | 4 + .../src/ws-client-factory.ts | 4 + .../src/wss-client-factory.ts | 4 + packages/core/src/consumed-thing.ts | 38 ++-- packages/core/src/helpers.ts | 20 +- packages/core/src/protocol-interfaces.ts | 20 ++ packages/core/src/servient.ts | 136 ++++++++++-- packages/core/test/helpers-test.ts | 25 +++ .../core/test/servient-subprotocol-test.ts | 208 ++++++++++++++++++ 22 files changed, 688 insertions(+), 65 deletions(-) create mode 100644 packages/binding-mqtt/test/mqtt-websocket-test.integration.ts create mode 100644 packages/core/test/servient-subprotocol-test.ts diff --git a/packages/binding-coap/src/coap-client-factory.ts b/packages/binding-coap/src/coap-client-factory.ts index ee1c36817..aa404079b 100644 --- a/packages/binding-coap/src/coap-client-factory.ts +++ b/packages/binding-coap/src/coap-client-factory.ts @@ -32,6 +32,10 @@ export default class CoapClientFactory implements ProtocolClientFactory { this.server = server; } + public getSupportedProtocols(): Array<[string, string?]> { + return [["coap"]]; + } + public getClient(): ProtocolClient { debug(`CoapClientFactory creating client for '${this.scheme}'`); return new CoapClient(this.server); diff --git a/packages/binding-coap/src/coaps-client-factory.ts b/packages/binding-coap/src/coaps-client-factory.ts index 70a700af0..1f745cbd4 100644 --- a/packages/binding-coap/src/coaps-client-factory.ts +++ b/packages/binding-coap/src/coaps-client-factory.ts @@ -25,6 +25,10 @@ const { debug } = createLoggers("binding-coap", "coaps-client-factory"); export default class CoapsClientFactory implements ProtocolClientFactory { public readonly scheme: string = "coaps"; + public getSupportedProtocols(): Array<[string, string?]> { + return [["coaps"]]; + } + public getClient(): ProtocolClient { debug(`CoapsClientFactory creating client for '${this.scheme}'`); return new CoapsClient(); diff --git a/packages/binding-file/src/file-client-factory.ts b/packages/binding-file/src/file-client-factory.ts index 62f90ac5a..fea5f781c 100644 --- a/packages/binding-file/src/file-client-factory.ts +++ b/packages/binding-file/src/file-client-factory.ts @@ -24,6 +24,10 @@ const { debug } = createLoggers("binding-file", "file-client-factory"); export default class FileClientFactory implements ProtocolClientFactory { public readonly scheme: string = "file"; + public getSupportedProtocols(): Array<[string, string?]> { + return [["file"]]; + } + public getClient(): ProtocolClient { debug(`FileClientFactory creating client for '${this.scheme}'`); return new FileClient(); diff --git a/packages/binding-http/src/http-client-factory.ts b/packages/binding-http/src/http-client-factory.ts index d30e682a5..44ebbe94d 100644 --- a/packages/binding-http/src/http-client-factory.ts +++ b/packages/binding-http/src/http-client-factory.ts @@ -33,6 +33,10 @@ export default class HttpClientFactory implements ProtocolClientFactory { this.config = config; } + public getSupportedProtocols(): Array<[string, string?]> { + return [["http"]]; + } + public getClient(): ProtocolClient { // HTTP over HTTPS proxy requires HttpsClient if (this.config && this.config.proxy && this.config.proxy.href && this.config.proxy.href.startsWith("https:")) { diff --git a/packages/binding-http/src/https-client-factory.ts b/packages/binding-http/src/https-client-factory.ts index 8c7e6dca3..ca7b4bf81 100644 --- a/packages/binding-http/src/https-client-factory.ts +++ b/packages/binding-http/src/https-client-factory.ts @@ -31,6 +31,10 @@ export default class HttpsClientFactory implements ProtocolClientFactory { this.config = config; } + public getSupportedProtocols(): Array<[string, string?]> { + return [["https"]]; + } + public getClient(): ProtocolClient { // HTTPS over HTTP proxy requires HttpClient if (this.config && this.config.proxy && this.config.proxy.href && this.config.proxy.href.startsWith("http:")) { diff --git a/packages/binding-mbus/src/mbus-client-factory.ts b/packages/binding-mbus/src/mbus-client-factory.ts index f937736d6..6f80393b1 100644 --- a/packages/binding-mbus/src/mbus-client-factory.ts +++ b/packages/binding-mbus/src/mbus-client-factory.ts @@ -21,6 +21,10 @@ const info = createInfoLogger("binding-mbus", "mbus-client-factory"); export default class MBusClientFactory implements ProtocolClientFactory { public readonly scheme: string = "mbus+tcp"; + public getSupportedProtocols(): Array<[string, string?]> { + return [["mbus+tcp"]]; + } + public getClient(): ProtocolClient { info(`MBusClientFactory creating client for '${this.scheme}'`); return new MBusClient(); diff --git a/packages/binding-modbus/src/modbus-client-factory.ts b/packages/binding-modbus/src/modbus-client-factory.ts index ffabc5081..c0b00d80c 100644 --- a/packages/binding-modbus/src/modbus-client-factory.ts +++ b/packages/binding-modbus/src/modbus-client-factory.ts @@ -22,6 +22,10 @@ export default class ModbusClientFactory implements ProtocolClientFactory { public readonly scheme: string = "modbus+tcp"; private singleton?: ModbusClient; + public getSupportedProtocols(): Array<[string, string?]> { + return [["modbus+tcp"]]; + } + public getClient(): ProtocolClient { debug(`Get client for '${this.scheme}'`); this.init(); diff --git a/packages/binding-mqtt/src/mqtt-client-factory.ts b/packages/binding-mqtt/src/mqtt-client-factory.ts index 78d64f77b..ccf102973 100644 --- a/packages/binding-mqtt/src/mqtt-client-factory.ts +++ b/packages/binding-mqtt/src/mqtt-client-factory.ts @@ -18,6 +18,7 @@ */ import { ProtocolClientFactory, ProtocolClient, createDebugLogger } from "@node-wot/core"; +import { MqttClientConfig } from "./mqtt"; import MqttClient from "./mqtt-client"; const debug = createDebugLogger("binding-mqtt", "mqtt-client-factory"); @@ -26,8 +27,19 @@ export default class MqttClientFactory implements ProtocolClientFactory { public readonly scheme: string = "mqtt"; private readonly clients: Array = []; + constructor(private readonly config: MqttClientConfig = {}) {} + + getSupportedProtocols(): Array<[string, string?]> { + return [ + ["mqtt"], // mqtt:// + ["mqtts"], // mqtts:// + ["ws", "mqtt"], // ws:// + subprotocol:mqtt + ["wss", "mqtt"], // wss:// + subprotocol:mqtt + ]; + } + getClient(): ProtocolClient { - const client = new MqttClient(); + const client = new MqttClient(this.config); this.clients.push(client); return client; } diff --git a/packages/binding-mqtt/src/mqtt-client.ts b/packages/binding-mqtt/src/mqtt-client.ts index 9fc120155..0ac7dfe12 100644 --- a/packages/binding-mqtt/src/mqtt-client.ts +++ b/packages/binding-mqtt/src/mqtt-client.ts @@ -54,6 +54,22 @@ export default class MqttClient implements ProtocolClient { private client?: mqtt.MqttClient; + private getBrokerUri(href: string): string { + const requestUri = new URL(href); + + if (href.startsWith("ws://") || href.startsWith("wss://")) { + return `${requestUri.protocol}//${requestUri.host}`; + } + + const compositeMatch = href.match(/^([a-z]+)\+([a-z]+):\/\//i); + if (compositeMatch) { + const transportScheme = compositeMatch[2]; + return `${transportScheme}://${requestUri.host}`; + } + + return `${this.scheme}://${requestUri.host}`; + } + public async subscribeResource( form: MqttForm, next: (value: Content) => void, @@ -62,7 +78,7 @@ export default class MqttClient implements ProtocolClient { ): Promise { const contentType = form.contentType ?? ContentSerdes.DEFAULT; const requestUri = new url.URL(form.href); - const brokerUri: string = `${this.scheme}://` + requestUri.host; + const brokerUri: string = this.getBrokerUri(form.href); // Keeping the path as the topic for compatibility reasons. // Current specification allows only form["mqv:filter"] const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"]; @@ -92,7 +108,7 @@ export default class MqttClient implements ProtocolClient { public async readResource(form: MqttForm): Promise { const contentType = form.contentType ?? ContentSerdes.DEFAULT; const requestUri = new url.URL(form.href); - const brokerUri: string = `${this.scheme}://` + requestUri.host; + const brokerUri: string = this.getBrokerUri(form.href); // Keeping the path as the topic for compatibility reasons. // Current specification allows only form["mqv:filter"] const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"]; @@ -124,7 +140,7 @@ export default class MqttClient implements ProtocolClient { public async writeResource(form: MqttForm, content: Content): Promise { const requestUri = new url.URL(form.href); - const brokerUri = `${this.scheme}://${requestUri.host}`; + const brokerUri = this.getBrokerUri(form.href); const topic = requestUri.pathname.slice(1) ?? form["mqv:topic"]; let pool = this.pools.get(brokerUri); @@ -147,7 +163,7 @@ export default class MqttClient implements ProtocolClient { public async invokeResource(form: MqttForm, content: Content): Promise { const requestUri = new url.URL(form.href); const topic = requestUri.pathname.slice(1); - const brokerUri = `${this.scheme}://${requestUri.host}`; + const brokerUri = this.getBrokerUri(form.href); let pool = this.pools.get(brokerUri); @@ -170,7 +186,7 @@ export default class MqttClient implements ProtocolClient { public async unlinkResource(form: Form): Promise { const requestUri = new url.URL(form.href); - const brokerUri: string = `${this.scheme}://` + requestUri.host; + const brokerUri: string = this.getBrokerUri(form.href); const topic = requestUri.pathname.slice(1); const pool = this.pools.get(brokerUri); diff --git a/packages/binding-mqtt/src/mqtts-client-factory.ts b/packages/binding-mqtt/src/mqtts-client-factory.ts index 345dc6a8c..b86f4dfd5 100644 --- a/packages/binding-mqtt/src/mqtts-client-factory.ts +++ b/packages/binding-mqtt/src/mqtts-client-factory.ts @@ -17,30 +17,15 @@ * Protocol test suite to test protocol implementations */ -import { ProtocolClientFactory, ProtocolClient, createDebugLogger } from "@node-wot/core"; import { MqttClientConfig } from "./mqtt"; -import MqttClient from "./mqtt-client"; +import MqttClientFactory from "./mqtt-client-factory"; -const debug = createDebugLogger("binding-mqtt", "mqtts-client-factory"); - -export default class MqttsClientFactory implements ProtocolClientFactory { - public readonly scheme: string = "mqtts"; - private readonly clients: Array = []; - - constructor(private readonly config: MqttClientConfig) {} - getClient(): ProtocolClient { - const client = new MqttClient(this.config, true); - this.clients.push(client); - return client; - } - - init(): boolean { - return true; - } - - destroy(): boolean { - debug(`MqttClientFactory stopping all clients for '${this.scheme}'`); - this.clients.forEach((client) => client.stop()); - return true; +/** + * @deprecated Use MqttClientFactory instead. MqttClientFactory now handles both secure and non-secure MQTT protocols. + * This class is kept for backward compatibility and simply wraps MqttClientFactory. + */ +export default class MqttsClientFactory extends MqttClientFactory { + constructor(config: MqttClientConfig) { + super(config); } } diff --git a/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts b/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts index 35ccc126f..a733c08bd 100644 --- a/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts +++ b/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts @@ -21,7 +21,6 @@ import { createInfoLogger, ProtocolHelpers, Servient } from "@node-wot/core"; import { expect, should } from "chai"; import MqttBrokerServer from "../src/mqtt-broker-server"; import MqttClientFactory from "../src/mqtt-client-factory"; -import MqttsClientFactory from "../src/mqtts-client-factory"; const info = createInfoLogger("binding-mqtt", "mqtt-client-subscribe-test.integration"); @@ -110,8 +109,8 @@ describe("MQTT client implementation - integration", () => { }); servient.addServer(brokerServer); - servient.addClientFactory(new MqttClientFactory()); - servient.addClientFactory(new MqttsClientFactory({ rejectUnauthorized: false })); + // MqttClientFactory now handles all MQTT protocols (mqtt, mqtts, ws+mqtt, wss+mqtt) + servient.addClientFactory(new MqttClientFactory({ rejectUnauthorized: false })); servient.start().then((WoT) => { expect(brokerServer.getPort()).to.equal(brokerPort); diff --git a/packages/binding-mqtt/test/mqtt-websocket-test.integration.ts b/packages/binding-mqtt/test/mqtt-websocket-test.integration.ts new file mode 100644 index 000000000..b08025a21 --- /dev/null +++ b/packages/binding-mqtt/test/mqtt-websocket-test.integration.ts @@ -0,0 +1,186 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and + * Document License (2015-05-13) which is available at + * https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document. + * + * SPDX-License-Identifier: EPL-2.0 OR W3C-20150513 + ********************************************************************************/ + +/** + * MQTT over WebSocket integration tests + */ + +import * as chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import { MqttClient, MqttClientFactory } from "../src/mqtt"; +import { expect, should } from "chai"; +import { Aedes, Server } from "aedes"; +import { createServer } from "http"; +import * as ws from "ws"; +import { Content, Form } from "@node-wot/core"; +import { Readable } from "stream"; +import Servient from "@node-wot/core/dist/servient"; + +chai.use(chaiAsPromised); +should(); + +describe("MQTT over WebSocket integration", () => { + let aedes: Aedes; + let httpServer: ReturnType; + let wsServer: ws.Server; + const brokerAddress = "localhost"; + const brokerPort = 8888; + const wsUri = `ws://${brokerAddress}:${brokerPort}`; + const compositeUri = `mqtt+ws://${brokerAddress}:${brokerPort}`; + + before((done) => { + aedes = Server({}); + httpServer = createServer(); + wsServer = new ws.Server({ server: httpServer }); + + wsServer.on("connection", (socket) => { + aedes.handle(socket as never); + }); + + httpServer.listen(brokerPort, () => { + done(); + }); + }); + + after((done) => { + wsServer.close(() => { + httpServer.close(() => { + aedes.close(() => { + done(); + }); + }); + }); + }); + + describe("MqttClientFactory multi-scheme support", () => { + it("should declare support for mqtt scheme via getSupportedProtocols()", () => { + const factory = new MqttClientFactory(); + const protocols = factory.getSupportedProtocols(); + const schemes = protocols.map(([scheme]) => scheme); + expect(schemes).to.include("mqtt"); + }); + + it("should declare support for mqtts scheme via getSupportedProtocols()", () => { + const factory = new MqttClientFactory(); + const protocols = factory.getSupportedProtocols(); + const schemes = protocols.map(([scheme]) => scheme); + expect(schemes).to.include("mqtts"); + }); + + it("should declare support for ws scheme with mqtt subprotocol via getSupportedProtocols()", () => { + const factory = new MqttClientFactory(); + const protocols = factory.getSupportedProtocols(); + const hasWsMqtt = protocols.some(([scheme, sub]) => scheme === "ws" && sub === "mqtt"); + expect(hasWsMqtt).to.be.true; + }); + + it("should declare support for wss scheme with mqtt subprotocol via getSupportedProtocols()", () => { + const factory = new MqttClientFactory(); + const protocols = factory.getSupportedProtocols(); + const hasWssMqtt = protocols.some(([scheme, sub]) => scheme === "wss" && sub === "mqtt"); + expect(hasWssMqtt).to.be.true; + }); + }); + + describe("MQTT client with ws:// URI", () => { + it.skip("should connect and publish/subscribe using ws:// scheme", (done) => { + const mqttClient = new MqttClient(); + const topic = "test/websocket"; + const form = new Form(`${wsUri}/${topic}`); + form["mqv:qos"] = "1"; + form["mqv:retain"] = false; + + mqttClient + .subscribeResource(form, async (value: Content) => { + try { + const data = await value.toBuffer(); + expect(data.toString()).to.equal("websocket-test"); + await mqttClient.stop(); + done(); + } catch (err) { + done(err); + } + }) + .then(async () => { + await mqttClient.writeResource( + form, + new Content("text/plain", Readable.from(Buffer.from("websocket-test"))) + ); + }) + .catch((err) => done(err)); + }).timeout(10000); + }); + + describe("MQTT client with mqtt+ws:// composite URI", () => { + it.skip("should connect and publish/subscribe using mqtt+ws:// scheme", (done) => { + const mqttClient = new MqttClient(); + const topic = "test/composite"; + const form = new Form(`${compositeUri}/${topic}`); + form["mqv:qos"] = "1"; + form["mqv:retain"] = false; + + mqttClient + .subscribeResource(form, async (value: Content) => { + try { + const data = await value.toBuffer(); + expect(data.toString()).to.equal("composite-test"); + await mqttClient.stop(); + done(); + } catch (err) { + done(err); + } + }) + .then(async () => { + await mqttClient.writeResource( + form, + new Content("text/plain", Readable.from(Buffer.from("composite-test"))) + ); + }) + .catch((err) => done(err)); + }).timeout(10000); + }); + + describe("Servient integration with subprotocol", () => { + it("should route ws:// + subprotocol:mqtt to MqttClientFactory", () => { + const servient = new Servient(); + const factory = new MqttClientFactory(); + servient.addClientFactory(factory); + + // Test that servient can get client for ws + mqtt subprotocol + const client = servient.getClientFor("ws", "mqtt"); + expect(client).to.be.instanceOf(MqttClient); + }); + + it("should route wss:// + subprotocol:mqtt to MqttClientFactory", () => { + const servient = new Servient(); + const factory = new MqttClientFactory(); + servient.addClientFactory(factory); + + // Test that servient can get client for wss + mqtt subprotocol + const client = servient.getClientFor("wss", "mqtt"); + expect(client).to.be.instanceOf(MqttClient); + }); + + it("should handle mqtt:// scheme to MqttClientFactory", () => { + const servient = new Servient(); + const factory = new MqttClientFactory(); + servient.addClientFactory(factory); + + // Test that servient can get client for plain mqtt scheme + const client = servient.getClientFor("mqtt"); + expect(client).to.be.instanceOf(MqttClient); + }); + }); +}); diff --git a/packages/binding-netconf/src/netconf-client-factory.ts b/packages/binding-netconf/src/netconf-client-factory.ts index 9bed91a61..31b2b4422 100644 --- a/packages/binding-netconf/src/netconf-client-factory.ts +++ b/packages/binding-netconf/src/netconf-client-factory.ts @@ -26,6 +26,10 @@ export default class NetconfClientFactory implements ProtocolClientFactory { public readonly scheme: string = "netconf"; public contentSerdes: ContentSerdes = ContentSerdes.get(); + public getSupportedProtocols(): Array<[string, string?]> { + return [["netconf"]]; + } + public getClient(): ProtocolClient { this.contentSerdes.addCodec(new NetconfCodec()); // add custom codec for NetConf debug(`NetconfClientFactory creating client for '${this.scheme}'`); diff --git a/packages/binding-opcua/src/factory.ts b/packages/binding-opcua/src/factory.ts index b0fa3e7d7..8f5651f4f 100644 --- a/packages/binding-opcua/src/factory.ts +++ b/packages/binding-opcua/src/factory.ts @@ -31,6 +31,10 @@ export class OPCUAClientFactory implements ProtocolClientFactory { this.contentSerdes.addCodec(new OpcuaBinaryCodec()); } + public getSupportedProtocols(): Array<[string, string?]> { + return [["opc.tcp"]]; + } + getClient(): ProtocolClient { debug(`OpcuaClientFactory creating client for '${this.scheme}'`); if (this._clients[0] != null) { diff --git a/packages/binding-websockets/src/ws-client-factory.ts b/packages/binding-websockets/src/ws-client-factory.ts index 09425cfc0..5c0511cbb 100644 --- a/packages/binding-websockets/src/ws-client-factory.ts +++ b/packages/binding-websockets/src/ws-client-factory.ts @@ -30,6 +30,10 @@ export default class WebSocketClientFactory implements ProtocolClientFactory { this.clientSideProxy = proxy; } + public getSupportedProtocols(): Array<[string, string?]> { + return [["ws"]]; + } + public getClient(): ProtocolClient { debug(`HttpClientFactory creating client for '${this.scheme}'`); return new WebSocketClient(); diff --git a/packages/binding-websockets/src/wss-client-factory.ts b/packages/binding-websockets/src/wss-client-factory.ts index b61ce2862..4d78bdadd 100644 --- a/packages/binding-websockets/src/wss-client-factory.ts +++ b/packages/binding-websockets/src/wss-client-factory.ts @@ -26,6 +26,10 @@ export default class WssClientFactory implements ProtocolClientFactory { // TODO: implement and remove eslint-ignore-useless-constructor } + public getSupportedProtocols(): Array<[string, string?]> { + return [["wss"]]; + } + public getClient(): ProtocolClient { throw new Error("WssClientFactory for 'wss' is not implemented"); } diff --git a/packages/core/src/consumed-thing.ts b/packages/core/src/consumed-thing.ts index a17c72a2c..4e391a72f 100644 --- a/packages/core/src/consumed-thing.ts +++ b/packages/core/src/consumed-thing.ts @@ -517,14 +517,15 @@ export default class ConsumedThing extends Thing implements IConsumedThing { if (options.formIndex >= 0 && options.formIndex < forms.length) { form = forms[options.formIndex]; const scheme = Helpers.extractScheme(form.href); - if (this.#servient.hasClientFor(scheme)) { + const cacheKey = this.getClientCacheKey(scheme, form.subprotocol); + + if (this.#servient.hasClientFor(scheme, form.subprotocol)) { debug(`ConsumedThing '${this.title}' got client for '${scheme}'`); - client = this.#servient.getClientFor(scheme); + client = this.#servient.getClientFor(scheme, form.subprotocol); - if (!this.#clients.get(scheme)) { - // new client + if (!this.#clients.get(cacheKey)) { this.ensureClientSecurity(client, form); - this.#clients.set(scheme, client); + this.#clients.set(cacheKey, client); } } else { throw new Error(`ConsumedThing '${this.title}' missing ClientFactory for '${scheme}'`); @@ -534,35 +535,44 @@ export default class ConsumedThing extends Thing implements IConsumedThing { } } else { const schemes = forms.map((link) => Helpers.extractScheme(link.href)); - const cacheIdx = schemes.findIndex((scheme) => this.#clients.has(scheme)); + const cacheIdx = schemes.findIndex((scheme, idx) => { + const cacheKey = this.getClientCacheKey(scheme, forms[idx].subprotocol); + return this.#clients.has(cacheKey); + }); if (cacheIdx !== -1) { - // from cache debug(`ConsumedThing '${this.title}' chose cached client for '${schemes[cacheIdx]}'`); - // if cacheIdx is valid, then clients *contains* schemes[cacheIdx] - client = this.#clients.get(schemes[cacheIdx])!; + const cacheKey = this.getClientCacheKey(schemes[cacheIdx], forms[cacheIdx].subprotocol); + client = this.#clients.get(cacheKey)!; form = this.findForm(forms, op, affordance, schemes, cacheIdx); } else { - // new client debug(`ConsumedThing '${this.title}' has no client in cache (${cacheIdx})`); - const srvIdx = schemes.findIndex((scheme) => this.#servient.hasClientFor(scheme)); + const srvIdx = schemes.findIndex((scheme, idx) => + this.#servient.hasClientFor(scheme, forms[idx].subprotocol) + ); if (srvIdx === -1) throw new Error(`ConsumedThing '${this.title}' missing ClientFactory for '${schemes}'`); - client = this.#servient.getClientFor(schemes[srvIdx]); + form = this.findForm(forms, op, affordance, schemes, srvIdx); + client = this.#servient.getClientFor(schemes[srvIdx], form?.subprotocol); debug(`ConsumedThing '${this.title}' got new client for '${schemes[srvIdx]}'`); - this.#clients.set(schemes[srvIdx], client); + const cacheKey = this.getClientCacheKey(schemes[srvIdx], form?.subprotocol); + this.#clients.set(cacheKey, client); - form = this.findForm(forms, op, affordance, schemes, srvIdx); this.ensureClientSecurity(client, form); } } return { client, form }; } + private getClientCacheKey(scheme: string, subprotocol?: string): string { + const normalizedSubprotocol = subprotocol?.trim().toLowerCase(); + return normalizedSubprotocol ? `${scheme}+${normalizedSubprotocol}` : scheme; + } + async readProperty(propertyName: string, options?: WoT.InteractionOptions): Promise { // TODO pass expected form op to getClientFor() const tp = this.properties[propertyName]; diff --git a/packages/core/src/helpers.ts b/packages/core/src/helpers.ts index e39fb8c48..be0e3cee7 100644 --- a/packages/core/src/helpers.ts +++ b/packages/core/src/helpers.ts @@ -60,9 +60,27 @@ export default class Helpers implements Resolver { private static staticAddress?: string = undefined; public static extractScheme(uri: string): string { + if (!uri || typeof uri !== "string" || uri.trim().length === 0) { + throw new Error(`URI must be a non-empty string`); + } + + // handle composite schemes before URL parsing (e.g., mqtt+ws, coap+ws) + const compositeMatch = uri.match(/^([a-z][a-z0-9+.-]*):\/\//); + if (compositeMatch) { + const potentialScheme = compositeMatch[1].toLowerCase(); + if (potentialScheme.includes("+")) { + // validate composite scheme format (no leading/trailing +, must have parts on both sides) + const parts = potentialScheme.split("+"); + if (parts.length !== 2 || parts[0].length === 0 || parts[1].length === 0) { + throw new Error(`Invalid composite scheme format in URI "${uri}"`); + } + debug(`Helpers found composite scheme '${potentialScheme}'`); + return potentialScheme; + } + } + const parsed = new URL(uri); debug(parsed); - // remove trailing ':' if (parsed.protocol === null) { throw new Error(`Protocol in url "${uri}" must be valid`); } diff --git a/packages/core/src/protocol-interfaces.ts b/packages/core/src/protocol-interfaces.ts index 05e56d801..7c74d124d 100644 --- a/packages/core/src/protocol-interfaces.ts +++ b/packages/core/src/protocol-interfaces.ts @@ -89,6 +89,26 @@ export interface ProtocolClientFactory { getClient(): ProtocolClient; init(): boolean; destroy(): boolean; + + /** + * Returns all protocol combinations this factory supports as tuples [scheme, subprotocol?]. + * + * Examples: + * - HTTP factory: [["http"], ["https"]] + * - MQTT factory: [["mqtt"], ["mqtts"], ["ws", "mqtt"], ["wss", "mqtt"]] + * - WebSocket factory: [["ws"], ["wss"]] + * + * Implementation requirements: + * - Must return a non-empty array + * - Each tuple must have at least one element (the scheme) + * - Scheme (first element) must be a non-empty string + * - Subprotocol (second element) is optional and should be a non-empty string if provided + * - Empty strings, null, or undefined schemes will be rejected + * - Invalid tuples will be skipped with a warning + * + * @returns Array of tuples where first element is scheme, optional second is subprotocol + */ + getSupportedProtocols?(): Array<[string, string?]>; } export interface ProtocolServer { diff --git a/packages/core/src/servient.ts b/packages/core/src/servient.ts index 99261b4d2..62dcb9aec 100644 --- a/packages/core/src/servient.ts +++ b/packages/core/src/servient.ts @@ -27,7 +27,9 @@ const { debug, warn } = createLoggers("core", "servient"); export default class Servient { private servers: Array = []; - private clientFactories: Map = new Map(); + // Two-level map: Map> + // Empty string "" is used for subprotocol when none specified + private clientFactories: Map> = new Map(); private things: Map = new Map(); private credentialStore: Map> = new Map>(); @@ -147,30 +149,124 @@ export default class Servient { public addClientFactory(clientFactory: ProtocolClientFactory): void { debug(`Servient adding client factory for '${clientFactory.scheme}'`); - this.clientFactories.set(clientFactory.scheme, clientFactory); + + // Get all supported protocol combinations + const protocols = clientFactory.getSupportedProtocols?.() || [[clientFactory.scheme]]; + + if (!Array.isArray(protocols) || protocols.length === 0) { + warn( + `Servient client factory '${clientFactory.scheme}' returned invalid protocols, falling back to scheme` + ); + this.registerProtocolCombination(clientFactory, clientFactory.scheme, undefined); + return; + } + + for (const tuple of protocols) { + if (!Array.isArray(tuple)) { + warn(`Servient skipping invalid protocol tuple from '${clientFactory.scheme}'`); + continue; + } + + const [scheme, subprotocol] = tuple; + + if (!scheme || typeof scheme !== "string" || scheme.trim() === "") { + warn(`Servient skipping empty or invalid scheme from '${clientFactory.scheme}'`); + continue; + } + + this.registerProtocolCombination(clientFactory, scheme, subprotocol); + } + } + + private registerProtocolCombination( + clientFactory: ProtocolClientFactory, + scheme: string, + subprotocol?: string + ): void { + const normalizedScheme = scheme.toLowerCase().trim(); + const normalizedSubprotocol = subprotocol?.trim().toLowerCase() || ""; + + // Get or create inner map for this scheme + if (!this.clientFactories.has(normalizedScheme)) { + this.clientFactories.set(normalizedScheme, new Map()); + } + + const subprotocolMap = this.clientFactories.get(normalizedScheme)!; + + // Register factory for this scheme+subprotocol combination + if (subprotocolMap.has(normalizedSubprotocol)) { + warn( + `Servient replacing client factory for '${normalizedScheme}' + '${normalizedSubprotocol || "(none)"}'` + ); + } + + subprotocolMap.set(normalizedSubprotocol, clientFactory); + debug( + `Servient registered '${normalizedScheme}'${normalizedSubprotocol ? ` + subprotocol '${normalizedSubprotocol}'` : ""}` + ); } public removeClientFactory(scheme: string): boolean { + if (!scheme || typeof scheme !== "string" || scheme.trim() === "") { + warn("Servient removeClientFactory called with invalid scheme"); + return false; + } + debug(`Servient removing client factory for '${scheme}'`); - this.clientFactories.get(scheme)?.destroy(); - return this.clientFactories.delete(scheme); + const normalizedScheme = scheme.toLowerCase().trim(); + + const subprotocolMap = this.clientFactories.get(normalizedScheme); + if (subprotocolMap) { + // Destroy all factories for this scheme + for (const factory of subprotocolMap.values()) { + factory.destroy(); + } + return this.clientFactories.delete(normalizedScheme); + } + + return false; } - public hasClientFor(scheme: string): boolean { - debug(`Servient checking for '${scheme}' scheme in ${this.clientFactories.size} ClientFactories`); - return this.clientFactories.has(scheme); + public hasClientFor(scheme: string, subprotocol?: string): boolean { + if (!scheme || typeof scheme !== "string" || scheme.trim() === "") { + warn("Servient hasClientFor called with invalid scheme"); + return false; + } + + const normalizedScheme = scheme.toLowerCase().trim(); + const normalizedSubprotocol = subprotocol?.trim().toLowerCase() || ""; + + debug(`Servient checking for '${normalizedScheme}'${normalizedSubprotocol ? ` + '${normalizedSubprotocol}'` : ""}`); + + const subprotocolMap = this.clientFactories.get(normalizedScheme); + return subprotocolMap?.has(normalizedSubprotocol) ?? false; } - public getClientFor(scheme: string): ProtocolClient { - const clientFactory = this.clientFactories.get(scheme); - if (clientFactory) { - debug(`Servient creating client for scheme '${scheme}'`); - return clientFactory.getClient(); - } else { - // FIXME returning null was bad - Error or Promise? - // h0ru5: caller cannot react gracefully - I'd throw Error - throw new Error(`Servient has no ClientFactory for scheme '${scheme}'`); + public getClientFor(scheme: string, subprotocol?: string): ProtocolClient { + if (!scheme || typeof scheme !== "string" || scheme.trim() === "") { + throw new Error("Servient getClientFor called with invalid scheme"); } + + const normalizedScheme = scheme.toLowerCase().trim(); + const normalizedSubprotocol = subprotocol?.trim().toLowerCase() || ""; + + debug( + `Servient getting client for '${normalizedScheme}'${normalizedSubprotocol ? ` + subprotocol '${normalizedSubprotocol}'` : ""}` + ); + + // O(1) lookup in two-level map + const subprotocolMap = this.clientFactories.get(normalizedScheme); + if (subprotocolMap) { + const factory = subprotocolMap.get(normalizedSubprotocol); + if (factory) { + debug(`Servient found client factory for '${normalizedScheme}' + '${normalizedSubprotocol || "(none)"}'`); + return factory.getClient(); + } + } + + throw new Error( + `Servient has no ClientFactory for scheme '${normalizedScheme}'${normalizedSubprotocol ? ` with subprotocol '${normalizedSubprotocol}'` : ""}` + ); } public getClientSchemes(): string[] { @@ -221,7 +317,9 @@ export default class Servient { const serverStatus: Array> = []; this.servers.forEach((server) => serverStatus.push(server.start(this))); - this.clientFactories.forEach((clientFactory) => clientFactory.init()); + this.clientFactories.forEach((subprotocolMap) => { + subprotocolMap.forEach((clientFactory) => clientFactory.init()); + }); await Promise.all(serverStatus); return (this.#wotInstance = new WoTImpl(this)); @@ -236,7 +334,9 @@ export default class Servient { return; } - this.clientFactories.forEach((clientFactory) => clientFactory.destroy()); + this.clientFactories.forEach((subprotocolMap) => { + subprotocolMap.forEach((clientFactory) => clientFactory.destroy()); + }); const promises = this.servers.map((server) => server.stop()); await Promise.all(promises); diff --git a/packages/core/test/helpers-test.ts b/packages/core/test/helpers-test.ts index 7f0e7338b..e16b29260 100644 --- a/packages/core/test/helpers-test.ts +++ b/packages/core/test/helpers-test.ts @@ -51,6 +51,31 @@ class HelperTest { expect(scheme).to.eq("coap+ws"); } + @test "should extract mqtt+ws composite scheme"() { + const scheme = Helpers.extractScheme("mqtt+ws://broker.example.com:8080"); + expect(scheme).to.eq("mqtt+ws"); + } + + @test "should extract mqtt+wss composite scheme"() { + const scheme = Helpers.extractScheme("mqtt+wss://broker.example.com:8883"); + expect(scheme).to.eq("mqtt+wss"); + } + + @test "should extract modbus+tcp composite scheme"() { + const scheme = Helpers.extractScheme("modbus+tcp://device.local:502"); + expect(scheme).to.eq("modbus+tcp"); + } + + @test "should extract ws scheme"() { + const scheme = Helpers.extractScheme("ws://broker.example.com:8080/mqtt"); + expect(scheme).to.eq("ws"); + } + + @test "should extract wss scheme"() { + const scheme = Helpers.extractScheme("wss://broker.example.com:8883/mqtt"); + expect(scheme).to.eq("wss"); + } + @test "should correctly validate schema"() { const thing: ExposedThingInit = { title: "thingTest", diff --git a/packages/core/test/servient-subprotocol-test.ts b/packages/core/test/servient-subprotocol-test.ts new file mode 100644 index 000000000..ec59c5346 --- /dev/null +++ b/packages/core/test/servient-subprotocol-test.ts @@ -0,0 +1,208 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and + * Document License (2015-05-13) which is available at + * https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document. + * + * SPDX-License-Identifier: EPL-2.0 OR W3C-20150513 + ********************************************************************************/ + +/** + * Edge case tests for subprotocol and composite scheme support + */ + +import { suite, test } from "@testdeck/mocha"; +import { expect } from "chai"; +import { Readable } from "stream"; +import Servient from "../src/servient"; +import { ProtocolClient, ProtocolClientFactory } from "../src/protocol-interfaces"; +import Helpers from "../src/helpers"; +import { Content } from "../src/content"; +import { Form } from "../src/thing-description"; +import { Subscription } from "rxjs/Subscription"; + +class MockClient implements ProtocolClient { + async readResource(_form: Form): Promise { + return new Content("application/json", Readable.from(Buffer.from("{}"))); + } + async writeResource(_form: Form, _content: Content): Promise {} + async invokeResource(_form: Form, _content?: Content): Promise { + return new Content("application/json", Readable.from(Buffer.from("{}"))); + } + async unlinkResource(_form: Form): Promise {} + async subscribeResource( + _form: Form, + _next: (content: Content) => void, + _error?: (error: Error) => void, + _complete?: () => void + ): Promise { + return new Subscription(); + } + async requestThingDescription(_uri: string): Promise { + return new Content("application/json", Readable.from(Buffer.from("{}"))); + } + async start() {} + async stop() {} + setSecurity() { + return true; + } +} + +class MockMqttFactory implements ProtocolClientFactory { + readonly scheme = "mqtt"; + getSupportedProtocols(): Array<[string, string?]> { + return [ + ["mqtt"], + ["mqtts"], + ["ws", "mqtt"], + ["wss", "mqtt"], + ]; + } + getClient() { + return new MockClient(); + } + init() { + return true; + } + destroy() { + return true; + } +} + +class EmptySchemeFactory implements ProtocolClientFactory { + readonly scheme = "test"; + getSupportedProtocols() { + return []; + } + getClient() { + return new MockClient(); + } + init() { + return true; + } + destroy() { + return true; + } +} + +@suite("Subprotocol edge cases") +class SubprotocolEdgeCasesTest { + @test "should handle empty string subprotocol"() { + const servient = new Servient(); + servient.addClientFactory(new MockMqttFactory()); + + // empty string should not trigger subprotocol matching + const client = servient.getClientFor("mqtt", ""); + expect(client).to.exist; + } + + @test "should handle whitespace-only subprotocol"() { + const servient = new Servient(); + servient.addClientFactory(new MockMqttFactory()); + + // whitespace should not match + const client = servient.getClientFor("mqtt", " "); + expect(client).to.exist; + } + + @test "should handle case-insensitive subprotocol matching"() { + const servient = new Servient(); + servient.addClientFactory(new MockMqttFactory()); + + // MQTT (uppercase) should match mqtt (lowercase) + const client = servient.getClientFor("ws", "MQTT"); + expect(client).to.exist; + } + + @test "should handle case-insensitive scheme matching"() { + const servient = new Servient(); + servient.addClientFactory(new MockMqttFactory()); + + // WS (uppercase) should match ws (lowercase) + const client = servient.getClientFor("WS", "mqtt"); + expect(client).to.exist; + } + + @test "should handle getSchemes returning empty array"() { + const servient = new Servient(); + servient.addClientFactory(new EmptySchemeFactory()); + + expect(() => servient.getClientFor("mqtt+ws")).to.throw(); + } + + @test "should handle factory without getSchemes method"() { + const servient = new Servient(); + const basicFactory: ProtocolClientFactory = { + scheme: "basic", + getClient: () => new MockClient(), + init: () => true, + destroy: () => true, + }; + servient.addClientFactory(basicFactory); + + // should still work with just primary scheme + const client = servient.getClientFor("basic"); + expect(client).to.exist; + } + + @test "should handle factory without getSupportedProtocols method"() { + const servient = new Servient(); + const basicFactory: ProtocolClientFactory = { + scheme: "basic", + getClient: () => new MockClient(), + init: () => true, + destroy: () => true, + }; + servient.addClientFactory(basicFactory); + + // should work with just primary scheme (no subprotocol) + const client = servient.getClientFor("basic"); + expect(client).to.exist; + + // should fail when a subprotocol is requested + expect(() => servient.getClientFor("basic", "someprotocol")).to.throw(); + } +} + +@suite("Composite scheme edge cases") +class CompositeSchemeEdgeCasesTest { + @test "should reject malformed composite scheme with trailing plus"() { + expect(() => Helpers.extractScheme("mqtt+://broker")).to.throw(); + } + + @test "should reject malformed composite scheme with leading plus"() { + expect(() => Helpers.extractScheme("+ws://broker")).to.throw(); + } + + @test "should reject composite scheme with only plus"() { + expect(() => Helpers.extractScheme("+://broker")).to.throw(); + } + + @test "should handle composite scheme case insensitivity"() { + const scheme = Helpers.extractScheme("MQTT+WS://broker"); + expect(scheme).to.equal("mqtt+ws"); + } + + @test "should handle composite scheme with numbers"() { + const scheme = Helpers.extractScheme("protocol1+transport2://broker"); + expect(scheme).to.equal("protocol1+transport2"); + } + + @test "should reject empty URI"() { + expect(() => Helpers.extractScheme("")).to.throw(); + } + + @test "should reject URI without scheme"() { + expect(() => Helpers.extractScheme("//broker")).to.throw(); + } + + @test "should reject URI with only scheme separator"() { + expect(() => Helpers.extractScheme("://broker")).to.throw(); + } +}