LearnNewsExamplesServices
Frontmatter
id9453
titleImplement Pipeline IPC and Remote Execution Routing
stateClosed
labels
enhancementaiarchitecturecore
assigneestobiu
createdAtMar 12, 2026, 7:23 PM
updatedAtMar 24, 2026, 9:12 PM
githubUrlhttps://github.com/neomjs/neo/issues/9453
authortobiu
commentsCount1
parentIssue9449
subIssues[]
subIssuesCompleted0
subIssuesTotal0
blockedBy[]
blocking[]
closedAtMar 24, 2026, 9:12 PM

Implement Pipeline IPC and Remote Execution Routing

Closed v12.1.0 enhancementaiarchitecturecore
tobiu
tobiu commented on Mar 12, 2026, 7:23 PM

Goal

Enable the App Worker's Pipeline instance to seamlessly execute its counterpart inside the Data Worker.

Context

In issue #9451, we created Neo.data.Pipeline which handles instantiating a mirror of itself in the Data Worker if workerExecution: 'data' is set. However, calling pipeline.read() currently does nothing remotely. We need to implement the IPC messaging that triggers the remote execution.

Acceptance Criteria

  • Implement read(), create(), update(), and destroy() methods on Neo.data.Pipeline.
  • When invoked in the App Worker with workerExecution: 'data', these methods must send an IPC message (e.g., { action: 'pipeline.execute', id: this.remoteId, operation: 'read', params: ... }) to the Data Worker.
  • Add an onPipelineExecute message handler in src/worker/Data.mjs.
  • This handler must look up the correct Pipeline instance from this.instances, invoke the requested operation (which will run the local Connection, Parser, and Normalizer in the Data Worker), and then send the finalized record array back to the App Worker's Promise resolver.
tobiu added the enhancement label on Mar 12, 2026, 7:23 PM
tobiu added the ai label on Mar 12, 2026, 7:23 PM
tobiu added the architecture label on Mar 12, 2026, 7:23 PM
tobiu added the core label on Mar 12, 2026, 7:23 PM
tobiu added parent issue #9449 on Mar 12, 2026, 7:23 PM
tobiu cross-referenced by #9449 on Mar 12, 2026, 7:24 PM
tobiu assigned to @tobiu on Mar 12, 2026, 7:25 PM
tobiu changed title from Wire Data Worker Normalizer Execution Pipeline to Implement Pipeline IPC and Remote Execution Routing on Mar 12, 2026, 10:03 PM
tobiu cross-referenced by #9451 on Mar 12, 2026, 10:14 PM
tobiu cross-referenced by #9543 on Mar 24, 2026, 5:07 PM
tobiu referenced in commit 2b9b44b - "feat(data): implement Pipeline IPC and remote execution routing (#9453)" on Mar 24, 2026, 9:09 PM
tobiu
tobiu Mar 24, 2026, 9:09 PM

Input from Gemini 3.1 Pro:

✦ I have implemented the Pipeline IPC and remote execution routing, leveraging the new Instance-to-Instance RemoteMethodAccess (RMA) layer we built in #9544.

Implementation Details:

  1. Pipeline Operations (src/data/Pipeline.mjs):
    • Updated read() to use Neo.currentWorker.generateRemote to create a proxy to its Data Worker counterpart.
    • Implemented create() and update() methods using a shared executeRemoteOrLocal helper. This helper ensures that if workerExecution: 'data' is set, all operations are seamlessly routed via RMA to the remoteId.
  2. Data Worker Cleanup (src/worker/Data.mjs):
    • Removed the Hack: Deleted the hardcoded onPipelineExecute message handler. The Data Worker no longer needs domain-specific logic to route Pipeline calls; the core RMA mixin handles it automatically via remoteId.
    • Memory Management: Added an onDestroyInstance handler. When the App Worker Pipeline is destroyed, it sends a destroyInstance message. This new handler ensures the remote instance is properly destroyed (instance.destroy()) and removed from the Data Worker's this.instances cache, preventing memory leaks.

The pipeline architecture is now fully wired for cross-worker orchestration. I will close this sub-issue.

tobiu closed this issue on Mar 24, 2026, 9:12 PM
tobiu cross-referenced by #9546 on Mar 24, 2026, 9:19 PM