Frontmatter
| id | 9453 |
| title | Implement Pipeline IPC and Remote Execution Routing |
| state | Closed |
| labels | enhancementaiarchitecturecore |
| assignees | tobiu |
| createdAt | Mar 12, 2026, 7:23 PM |
| updatedAt | Mar 24, 2026, 9:12 PM |
| githubUrl | https://github.com/neomjs/neo/issues/9453 |
| author | tobiu |
| commentsCount | 1 |
| parentIssue | 9449 |
| subIssues | [] |
| subIssuesCompleted | 0 |
| subIssuesTotal | 0 |
| blockedBy | [] |
| blocking | [] |
| closedAt | Mar 24, 2026, 9:12 PM |
Implement Pipeline IPC and Remote Execution Routing
tobiu added parent issue #9449 on Mar 12, 2026, 7:23 PM
tobiu cross-referenced by #9449 on Mar 12, 2026, 7:24 PM
tobiu assigned to @tobiu on Mar 12, 2026, 7:25 PM
tobiu changed title from Wire Data Worker Normalizer Execution Pipeline to Implement Pipeline IPC and Remote Execution Routing on Mar 12, 2026, 10:03 PM
tobiu cross-referenced by #9451 on Mar 12, 2026, 10:14 PM
tobiu cross-referenced by #9543 on Mar 24, 2026, 5:07 PM

tobiu
Mar 24, 2026, 9:09 PM
Input from Gemini 3.1 Pro:
✦ I have implemented the Pipeline IPC and remote execution routing, leveraging the new Instance-to-Instance
RemoteMethodAccess(RMA) layer we built in #9544.Implementation Details:
- Pipeline Operations (
src/data/Pipeline.mjs):
- Updated
read()to useNeo.currentWorker.generateRemoteto create a proxy to its Data Worker counterpart.- Implemented
create()andupdate()methods using a sharedexecuteRemoteOrLocalhelper. This helper ensures that ifworkerExecution: 'data'is set, all operations are seamlessly routed via RMA to theremoteId.- Data Worker Cleanup (
src/worker/Data.mjs):
- Removed the Hack: Deleted the hardcoded
onPipelineExecutemessage handler. The Data Worker no longer needs domain-specific logic to route Pipeline calls; the core RMA mixin handles it automatically viaremoteId.- Memory Management: Added an
onDestroyInstancehandler. When the App Worker Pipeline is destroyed, it sends adestroyInstancemessage. This new handler ensures the remote instance is properly destroyed (instance.destroy()) and removed from the Data Worker'sthis.instancescache, preventing memory leaks.The pipeline architecture is now fully wired for cross-worker orchestration. I will close this sub-issue.
tobiu closed this issue on Mar 24, 2026, 9:12 PM
tobiu cross-referenced by #9546 on Mar 24, 2026, 9:19 PM
Goal
Enable the App Worker's
Pipelineinstance to seamlessly execute its counterpart inside the Data Worker.Context
In issue #9451, we created
Neo.data.Pipelinewhich handles instantiating a mirror of itself in the Data Worker ifworkerExecution: 'data'is set. However, callingpipeline.read()currently does nothing remotely. We need to implement the IPC messaging that triggers the remote execution.Acceptance Criteria
read(),create(),update(), anddestroy()methods onNeo.data.Pipeline.workerExecution: 'data', these methods must send an IPC message (e.g.,{ action: 'pipeline.execute', id: this.remoteId, operation: 'read', params: ... }) to the Data Worker.onPipelineExecutemessage handler insrc/worker/Data.mjs.this.instances, invoke the requested operation (which will run the local Connection, Parser, and Normalizer in the Data Worker), and then send the finalized record array back to the App Worker's Promise resolver.