Frontmatter
| id | 9449 |
| title | Epic: Unified Data Pipeline Architecture (Pipeline -> Connection -> Parser -> Normalizer) |
| state | Closed |
| labels | epicaiarchitecturecore |
| assignees | tobiu |
| createdAt | Mar 12, 2026, 3:20 PM |
| updatedAt | Mar 31, 2026, 5:03 PM |
| githubUrl | https://github.com/neomjs/neo/issues/9449 |
| author | tobiu |
| commentsCount | 3 |
| parentIssue | null |
| subIssues | 9418 Create Data Normalizer Architecture (`Neo.data.normalizer.Base` & `Tree`) 9419 Implement Dynamic Module Loading in `Neo.worker.Data` 9420 Migrate Data Pipeline to Connection -> Parser -> Normalizer flow 9450 Enhance Data Worker to Instantiate Dynamically Loaded Modules 9451 Create Pipeline Cornerstone and Refactor Store Implementation 9452 Connection Foundation and Parser Refactoring 9453 Implement Pipeline IPC and Remote Execution Routing 9454 Implement Push-Based WebSocket Integration in Data Pipeline 9455 Integrate RPC API into Pipeline Architecture (Connection.Rpc) 9502 Migrate existing Stores to the new Pipeline architecture 9543 Store Pipeline Instantiation and Legacy Parser Compatibility 9544 Enhance RemoteMethodAccess to Support Instance-to-Instance Routing 9546 Refactor Pipeline IPC to use Declarative Remote Configs (Instance Proxies) 9547 Fix RemoteMethodAccess for Main Thread Addons and Instance-to-Instance ID collision 9550 Refactor(data): Implement Store-to-Pipeline Legacy Bridge 9551 Examples: Implement unified Data Pipeline showcases 9552 Docs: Create Learning Guide for the Unified Data Pipeline Architecture 9564 Finalize Data Pipeline Push Integration & UI Reactivity |
| subIssuesCompleted | 18 |
| subIssuesTotal | 18 |
| blockedBy | [] |
| blocking | [] |
| closedAt | Mar 31, 2026, 5:03 PM |
Epic: Unified Data Pipeline Architecture (Pipeline -> Connection -> Parser -> Normalizer)

Input from Gemini 3.1 Pro:
✦ ### 🤖 AI Agent / Developer Briefing: Architectural Guidelines for Implementation
Before beginning work on any phase of this Epic, it is critical to understand the nuanced architecture of the Neo.mjs data layer to avoid significant design flaws. This Epic merges the Pipeline architecture with the RPC/WebSocket architecture.
1. Critical Trap: Records vs. Models
- The Trap: Assuming that
Neo.data.Storeitems are instances ofNeo.data.Model. They are NOT.- The Reality: The Store uses
Neo.data.RecordFactoryto create lightweight, enhanced objects (Records) that map back to a Model definition, but they are not Model instances themselves.- Architectural Implication: Never attempt to bind a WebSocket subscription directly to a Record instance or a Model. Subscriptions belong to the
Pipelineor are initiated by aViewControllerviaNeo.remotes.Api. The Pipeline parses the incoming push into a delta, and the Store executesrecord.set(delta)on the lightweight object, triggering a surgical VDOM update.2. The Pipeline Boundary (App Worker vs. Data Worker)
- The
Neo.data.Pipelineclass is primarily an App Worker construct.- When
workerExecution: 'data'is used, the App Worker Pipeline acts as an orchestrator. It holds the configs but uses IPC to instruct the Data Worker to instantiate the actualConnection,Parser, andNormalizer.- An App Worker Pipeline cannot natively "listen" to events from a Data Worker Connection. All cross-worker communication for streams and data fetching must explicitly bridge via IPC (
Neo.worker.Datamessage handlers).3. Key Files & Subsystems Affected: To successfully implement this Epic, you must be familiar with the following files:
- The Data/Store Layer:
src/data/Store.mjs(Understandadd(),recordChange, andautoInitRecords)src/data/RecordFactory.mjs(Understand howset()updates a record and incrementsversionSymbol)- The RPC/Remotes Layer:
src/remotes/Api.mjs(Whereremotes-api.jsonis parsed and proxy functions are generated. This needs to wrap proxies in Pipelines).src/manager/rpc/Api.mjs&src/manager/rpc/Message.mjs(Needs enhancement for persistentcallbackIdsubscriptions instead of just single-use PromisemIds).- The Connection Layer:
src/data/connection/WebSocket.mjs(Needs enhancement to route unsolicited pushes to subscribed channels/pipelines).4. The "Progressive Hydration" Testing Standard: When implementing Parsers for streams (#9454), tests should explicitly prove that a pushed Delta (e.g.,
{ action: 'update', id: 1, changes: { heavyField: 'data' } }) routes through the Pipeline and correctly triggersrecord.set()in the Store without causing a full collection reset.

Input from Gemini 3.1 Pro:
✦ Added sub-issue #9502 for Phase 6: Migration & Regression Testing.
This phase focuses on scanning the
examplesandappsfolders for anyStoreinstances currently using directapiorurlconfigs. We need to ensure these are systematically migrated to the newpipelinearchitecture to prevent any regression bugs as the old patterns are phased out.

already resolved in v12.1.
Goal
Modernize the framework's data architecture by implementing a unified, thread-agnostic data pipeline orchestrated by a new cornerstone class:
Neo.data.Pipeline. This epic resolves the fragmentation between local data fetching (Fetch/XHR) and the RPC API, ensuring all incoming data—whether pulled via REST, returned from an RPC call, or pushed spontaneously via WebSockets—flows through a standardized transformation and ingestion process.Context & The Architectural Schism
Currently, Neo.mjs has two competing data layers:
Connection -> Parser -> Normalizer -> Store) designed to handle complex datasets (like Tree Grids). However,parser.Streamincorrectly handles fetching natively, breaking the single responsibility principle. Furthermore, theStoreis currently hardcoded to orchestrate remote Data Worker instantiation for its Normalizer.Storeuses theapiconfig (RPC), it bypasses any Parsers or Normalizers.The Solution: Introduce
Neo.data.Pipelineas an architectural cornerstone. The Store will exclusively aggregate aPipelineinstance. ThePipelineencapsulates the cross-worker orchestration, and the RPC API becomes a Transport Mechanism (Connection.Rpc).The "Merged Universe": RPC + Pipelines
Data shaping (Parsers and Normalizers) should not be exclusive to Stores. A
ViewControllermaking a direct RPC call shouldn't have to manually format raw JSON. We are merging these concepts:remotes-api.jsonconfigurations will be enhanced to optionally defineparserandnormalizerconfigs for specific endpoints.Neo.remotes.Apidetects these configs, the generated proxy function will automatically pipe the raw JSON response through a Pipeline inside the Data Worker before returning the perfectly shaped data to the App Worker.Progressive Hydration & Delta-Aware Pipelines
Modern applications require high Time-To-Interactive (TTI). A common backend pattern is to push "Quick Wins" (lightweight data like IDs and titles) immediately, and then stream the heavy, processed fields (like complex summaries or aggregations) later via WebSockets as Operational Transforms (Opcodes / Deltas).
To support this natively:
read()(Full Loads). They must support continuous stream handlers that output partial record updates.read(), the Parser shapes a bulk array. For astream, a specialized Parser translates proprietary backend Opcodes into standardized Neo.mjs Deltas (Insert, Update, Delete).record.set({ field: 'new value' }). TheRecordFactoryincrements the record version, and the VDOM Worker calculates a minimal patch, updating just a single grid cell without replacing the whole row or losing local UI state.Implementation Phasing
Phase 1: The Pipeline Cornerstone (#9451)
Neo.data.Pipelineto manageworkerExecutionstate.Storeto delegate tothis.pipeline.read().Phase 2: Connection Foundation & Refactoring (#9452)
Neo.data.parser.Streaminto Connections.Phase 3: IPC & Remote Execution (#9453)
Phase 4: RPC Integration & The Merged Universe (#9455)
Connection.Rpcto wrap proxy calls.Neo.remotes.Apito support pipelines for standalone RPC calls.Phase 5: Push-Based WebSocket Integration (#9454)
streamstoremotes-api.json.manager.rpc.Message.