Frontmatter
| id | 9451 |
| title | Create Pipeline Cornerstone and Refactor Store Implementation |
| state | Closed |
| labels | enhancementaiarchitecturecore |
| assignees | tobiu |
| createdAt | Mar 12, 2026, 7:22 PM |
| updatedAt | Mar 17, 2026, 7:57 PM |
| githubUrl | https://github.com/neomjs/neo/issues/9451 |
| author | tobiu |
| commentsCount | 0 |
| parentIssue | 9449 |
| subIssues | [] |
| subIssuesCompleted | 0 |
| subIssuesTotal | 0 |
| blockedBy | [] |
| blocking | [] |
| closedAt | Mar 17, 2026, 7:57 PM |
Create Pipeline Cornerstone and Refactor Store Implementation
tobiu added parent issue #9449 on Mar 12, 2026, 7:23 PM
tobiu cross-referenced by #9449 on Mar 12, 2026, 7:24 PM
tobiu assigned to @tobiu on Mar 12, 2026, 7:25 PM
tobiu changed title from Create Connection.Base and Establish Connection -> Parser Hierarchy to Create Pipeline Cornerstone and Refactor Store Implementation on Mar 12, 2026, 10:02 PM
tobiu cross-referenced by #9453 on Mar 12, 2026, 10:03 PM
tobiu cross-referenced by #9502 on Mar 17, 2026, 6:46 PM
tobiu closed this issue on Mar 17, 2026, 7:57 PM
Goal
Establish
Neo.data.Pipelineas the central orchestrator for data transformation and remote execution, and remove the brittle remote instantiation logic fromNeo.data.Store.Context
Currently,
Neo.data.Storeattempts to directly manage the cross-worker instantiation of its Normalizer (viaafterSetNormalizer). This is an abstraction leak; a Store should not be hardcoded toNeo.worker.Dataor manage remote IDs.We need a dedicated
Neo.data.Pipelineclass. The Store will aggregate aPipelineusingClassSystemUtil.beforeSetInstance(). The Pipeline takes over the responsibility of owning theConnection,Parser, andNormalizer, and importantly, orchestrating whether they run locally in the App Worker or remotely in the Data Worker.Acceptance Criteria
src/data/Pipeline.mjsextendingNeo.core.Base.Pipelinethe following reactive configs:workerExecution(default'app'),connection_,parser_, andnormalizer_.ClassSystemUtil.beforeSetInstanceinside the Pipeline to instantiate these sub-components.workerExecution: 'data', the Pipeline should useNeo.worker.Data.createInstanceto spawn the actual Connection, Parser, and Normalizer instances exclusively inside the Data Worker (meaning the App Worker Pipeline only holds the configs, not the instances), storing the remote ID.Neo.data.Store: RemoveafterSetNormalizerandafterSetParser. Introduce apipeline_config that usesClassSystemUtil.beforeSetInstanceto create the Pipeline.load()method should delegate tothis.pipeline.read().