Frontmatter
| id | 9454 |
| title | Implement Push-Based WebSocket Integration in Data Pipeline |
| state | Closed |
| labels | enhancementaiarchitecturecore |
| assignees | tobiu |
| createdAt | Mar 12, 2026, 9:16 PM |
| updatedAt | Mar 25, 2026, 11:40 AM |
| githubUrl | https://github.com/neomjs/neo/issues/9454 |
| author | tobiu |
| commentsCount | 1 |
| parentIssue | 9449 |
| subIssues | [] |
| subIssuesCompleted | 0 |
| subIssuesTotal | 0 |
| blockedBy | [] |
| blocking | [] |
| closedAt | Mar 25, 2026, 11:40 AM |
Implement Push-Based WebSocket Integration in Data Pipeline
tobiu added parent issue #9449 on Mar 12, 2026, 9:18 PM
tobiu cross-referenced by #9449 on Mar 12, 2026, 10:00 PM
tobiu assigned to @tobiu on Mar 12, 2026, 10:04 PM

tobiu
Mar 25, 2026, 11:39 AM
Input from Gemini 3.1 Pro:
✦ I have committed the implementation for the WebSocket integration as discussed.
Actions Taken:
src/data/connection/WebSocket.mjs: AddedstreamCallbacks,registerStream(), andunregisterStream()to support persistent channel subscriptions alongside single-use promises. UpdatedonMessageto route incoming pushes (data.streamordata.method) to these callbacks.src/manager/rpc/Api.mjs: EnhancedregisterApi()to recognize and storestreamsdefined inremotes-api.json.src/manager/rpc/Message.mjs: AddedonMessageStream()andonMessageStreamUnsubscribe()to orchestrate persistent stream registrations down to the specificWebSocketconnection.src/worker/App.mjs: Added therpcStreamCallbacksregistry and theonRpcStreamData()handler. When the Data Worker forwards pushed data via IPC, the App Worker invokes the correct subscriber callback, completing the push cycle from server to UI.The foundation is in place. As you mentioned, we should proceed with dedicated unit and e2e testing sessions to verify the Progressive Hydration flow end-to-end.
tobiu closed this issue on Mar 25, 2026, 11:40 AM
Goal
Natively support server-pushed data streams and Progressive Hydration via WebSockets within the Data Pipeline architecture.
Context
Modern web apps use WebSockets to push data to clients (e.g., real-time dashboards, live task boards). Often, backends push "Operations" or "Deltas" (e.g.,
{ target: 123, changes: { status: 'done' } }) rather than full state objects. Currently, Neo'sWebSocketconnection handles request/response mapping viamId(Promises), but lacks a robust mechanism for persistent channel subscriptions and routing unsolicited pushes through the shaping pipeline.Acceptance Criteria
remotes-api.jsonEnhancement:methods(Request/Response) andstreams(Persistent Subscriptions).streamsto define their ownparserandnormalizerconfigs.Persistent Subscriptions (
manager.rpc.Message):mId, the manager must register a persistentcallbackIdfor a specific stream signature.Delta-Aware Parsing (Progressive Hydration):
{ action: 'update', recordId: '123', data: { status: 'done' } }).Worker Boundary Routing:
pushorupdate) that aStoreorViewControllercan subscribe to.Storereceiving a delta should callrecord.set()on the existingNeo.data.Recordto trigger highly localized, surgical VDOM updates.