Frontmatter
| id | 9564 |
| title | Finalize Data Pipeline Push Integration & UI Reactivity |
| state | Closed |
| labels | enhancementaiarchitecturecore |
| assignees | tobiu |
| createdAt | Mar 26, 2026, 5:13 PM |
| updatedAt | Mar 26, 2026, 6:12 PM |
| githubUrl | https://github.com/neomjs/neo/issues/9564 |
| author | tobiu |
| commentsCount | 1 |
| parentIssue | 9449 |
| subIssues | [] |
| subIssuesCompleted | 0 |
| subIssuesTotal | 0 |
| blockedBy | [] |
| blocking | [] |
| closedAt | Mar 26, 2026, 6:12 PM |
Finalize Data Pipeline Push Integration & UI Reactivity
tobiu added parent issue #9449 on Mar 26, 2026, 5:14 PM
tobiu assigned to @tobiu on Mar 26, 2026, 5:14 PM

tobiu
Mar 26, 2026, 6:11 PM
Input from Gemini 3.1 Pro:
✦ I have implemented the final integration for the push-based data pipeline.
Summary of Changes:
src/data/connection/Base.mjs: Made the base connection observable so that all connection types (including WebSocket) can emit custom events likepush.src/data/connection/WebSocket.mjs: Now fires apushevent when unsolicited data (Progressive Hydration) arrives without a matching callback or PromisemId.src/data/Pipeline.mjs: ImplementedonConnectionPushto relay the push event from its underlying connection.
- It passes the pushed data through the
parserandnormalizer.- If running in the Data Worker, it forwards the shaped data via IPC to the App Worker Pipeline.
src/worker/App.mjs: AddedonPipelinePush(msg)to catch the IPC forward and re-emit thepushevent on the correct proxy pipeline instance inside the App Worker.src/data/Store.mjs: The Store now listens topushevents from its configured pipeline.
- Added
onPipelinePush(data): It looks up the record by its primary key, and if it exists, it executesrecord.set(data). This completes the flow and triggers the surgical VDOM update without reloading the grid or collection.- Unit Tests: Added
PipelinePush.spec.mjsandStorePush.spec.mjsto rigorously verify the event bubbling and the surgicalrecord.set()updates using mock connections.These changes have been committed and pushed to the repository. I will now close this issue.
tobiu closed this issue on Mar 26, 2026, 6:12 PM
Goal
Connect the lower-level WebSocket/Stream RPC mechanisms to the
Storeso that unsolicited server pushes surgically update UI components viarecord.set().Context
In issue #9454, we implemented the plumbing for Progressive Hydration (WebSocket persistent subscriptions, RPC routing, IPC forwarding). However, we missed the final integration step: The
Storeitself is not listening for these pushes, and the pipeline abstraction lacks a dedicatedpushorupdateevent to notify the store.If a server pushes
{ id: 123, status: 'done' }right now, it traverses the worker boundary but dies in the App Worker because the Store is only listening toloadandprogress(for chunking), not unsolicitedpushevents.Acceptance Criteria
Connection Event Emission:
src/data/connection/WebSocket.mjs(and potentiallyRpc) must fire apushevent when unsolicited data arrives, rather than just executing hardcoded callbacks.Pipeline Event Bridging:
src/data/Pipeline.mjsmust relaypushevents from its connection to the outside world.parserandnormalizerbefore emittingpipeline.fire('push', shapedData).workerExecution: 'data'.Store Integration (
record.set()):src/data/Store.mjsmust listen to thepushevent on its pipeline.onPipelinePush(data). This method must locate the existing record via its primary key and callrecord.set(data)to trigger a surgical VDOM update without reloading the entire collection.Unit Testing (
test/playwright/unit/data/):PipelinePush.spec.mjsto verify event bubbling, shaping, and IPC forwarding using mock connections.StorePush.spec.mjsto verify that a Store correctly interprets pipeline pushes and applies surgical updates viarecord.set(), triggering arecordChangeevent.