Support MQTT over WebSocket via composite scheme resolution#1487
Support MQTT over WebSocket via composite scheme resolution#1487Pranav-0440 wants to merge 2 commits intoeclipse-thingweb:masterfrom
Conversation
- Added composite scheme resolution (scheme + subprotocol) in ConsumedThing - Extended MqttClientFactory to support ws+mqtt and wss+mqtt - Added integration test for MQTT over WebSocket - Preserved backward compatibility Signed-off-by: Pranav-0440 <pranavghorpade61@gmail.com>
…resolution - Add composite scheme resolution (scheme + subprotocol) - Extend MqttClientFactory to support ws+mqtt and wss+mqtt - Fix client caching logic for composite schemes - Add integration test for MQTT over WebSocket - Preserve backward compatibility Signed-off-by: Pranav-0440 <pranavghorpade61@gmail.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds support for MQTT over WebSocket by introducing composite scheme resolution in ConsumedThing. The implementation allows forms with WebSocket schemes (ws://, wss://) and MQTT subprotocol to be handled by dedicated MQTT client factories registered with composite schemes (ws+mqtt, wss+mqtt).
Changes:
- Added composite scheme resolution logic in ConsumedThing to combine URI schemes with subprotocols
- Extended MqttClientFactory to accept scheme parameter for composite schemes
- Added integration test for MQTT over WebSocket functionality
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/core/src/consumed-thing.ts | Implements composite scheme resolution by checking for scheme+subprotocol combinations when selecting protocol clients |
| packages/binding-mqtt/src/mqtt-client-factory.ts | Updated to accept a scheme parameter in constructor to support composite schemes like ws+mqtt and wss+mqtt |
| packages/binding-mqtt/test/mqtt-ws-test.integration.ts | New integration test for MQTT over WebSocket using composite scheme resolution |
| packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts | Updated to register all MQTT client factories including composite schemes |
| packages/binding-mqtt/test/mqtt-broker-server-interaction-test.integration.ts | Updated to register all MQTT client factories including composite schemes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| getClient(): ProtocolClient { | ||
| const client = new MqttClient(); |
There was a problem hiding this comment.
The MqttClientFactory now accepts and stores composite schemes like "ws+mqtt" and "wss+mqtt", but the MqttClient (in mqtt-client.ts) doesn't accept a scheme parameter in its constructor. It hardcodes the scheme to "mqtt" or "mqtts" based on the secure boolean. This means when the factory creates a client for "ws+mqtt", the client will still use "mqtt" as its scheme and construct invalid broker URIs. The MqttClient needs to be updated to: 1) Accept an optional scheme parameter from the factory, 2) Extract the base transport scheme from composite schemes (e.g., "ws" from "ws+mqtt") when constructing broker URIs, since mqtt.js only understands standard transport schemes like "ws", "wss", "mqtt", and "mqtts".
| const client = new MqttClient(); | |
| const client = new MqttClient(this.scheme); |
| client = this.#servient.getClientFor(schemes[srvIdx]); | ||
| client = this.#servient.getClientFor(resolvedSchemes[srvIdx]); | ||
|
|
||
| debug(`ConsumedThing '${this.title}' got new client for '${schemes[srvIdx]}'`); |
There was a problem hiding this comment.
The debug statement logs the base scheme from the schemes array instead of the resolved scheme that was actually selected. When using composite schemes like "ws+mqtt", this will log "ws" instead of "ws+mqtt", making debugging confusing. This should be changed to use resolvedSchemes[srvIdx] to accurately reflect which client was selected.
| debug(`ConsumedThing '${this.title}' got new client for '${schemes[srvIdx]}'`); | |
| debug(`ConsumedThing '${this.title}' got new client for '${resolvedSchemes[srvIdx]}'`); |
| await new Promise<void>((resolve, reject) => { | ||
| broker.publish( | ||
| { | ||
| cmd: "publish", | ||
| topic: "test/status", | ||
| payload: Buffer.from("online"), | ||
| qos: 0, | ||
| retain: false, | ||
| dup: false, | ||
| }, | ||
| (err?: Error) => { | ||
| if (err) reject(err); | ||
| else resolve(); | ||
| } | ||
| ); | ||
| }); |
There was a problem hiding this comment.
The test publishes a message to the broker before subscribing to it. Since the message has retain: false (line 84), it won't be stored by the broker. When readProperty is called (line 94), the MqttClient will subscribe to the topic, but the message has already been published and discarded. This test likely has a race condition and may not work reliably. The message should either be published after calling readProperty (in a separate async context), or the retain flag should be set to true so the message is retained by the broker until the client subscribes.
This PR adds support for MQTT over WebSocket using composite scheme resolution (ws+mqtt, wss+mqtt).
Changes
Added composite scheme resolution in ConsumedThing
Extended MqttClientFactory to support ws+mqtt and wss+mqtt
Added integration test for MQTT over WebSocket
Preserved backward compatibility for existing MQTT schemes
Result
All existing and new tests pass successfully.
Closes - #1475