11'use strict'
22
3- import { workerData , BroadcastChannel , isMainThread } from 'worker_threads'
4- import { modelsInDomain } from './use-cases'
3+ import { workerData , BroadcastChannel } from 'worker_threads'
54
65export class PortEventRouter {
76 constructor ( models , broker ) {
87 this . models = models
98 this . broker = broker
109 }
1110
12- getThreadLocalPorts ( ) {
13- const localSpec = this . models . getModelSpec (
14- workerData . poolName . toUpperCase ( )
15- )
16- return this . models
11+ get localSpec ( ) {
12+ if ( this . __localSpec ) return this . __localSpec
13+ this . __localSpec = this . models . getModelSpec ( workerData . poolName )
14+ return this . __localSpec
15+ }
16+
17+ get threadLocalPorts ( ) {
18+ if ( this . __threadLocalPorts ) return this . __threadLocalPorts
19+ this . __threadLocalPorts = this . models
1720 . getModelSpecs ( )
1821 . filter (
1922 spec =>
2023 spec . ports &&
21- ( spec . domain . toUpperCase ( ) === localSpec . domain . toUpperCase ( ) ||
22- spec . modelName . toUpperCase ( ) === localSpec . modelName . toUpperCase ( ) )
24+ ( spec . domain === this . localSpec . domain ||
25+ spec . modelName === this . localSpec . modelName )
2326 )
2427 . flatMap ( spec =>
2528 Object . values ( spec . ports )
2629 . filter ( port => port . consumesEvent || port . producesEvent )
27- . map ( port => ( { ...port , modelName : spec . modelName } ) )
30+ . map ( port => ( {
31+ ...port ,
32+ modelName : spec . modelName ,
33+ domain : spec . domain
34+ } ) )
2835 )
36+ return this . __threadLocalPorts
2937 }
3038
31- getThreadRemotePorts ( ) {
32- return this . models
39+ get threadRemotePorts ( ) {
40+ if ( this . __threadRemotePorts ) return this . __threadRemotePorts
41+ this . __threadRemotePorts = this . models
3342 . getModelSpecs ( )
3443 . filter (
3544 spec =>
3645 spec . ports &&
37- ! this . getThreadLocalPorts ( ) . find ( l => l . modelName === spec . modelName )
46+ ! this . threadLocalPorts . find ( l => l . modelName === spec . modelName )
3847 )
3948 . flatMap ( spec =>
4049 Object . values ( spec . ports )
4150 . filter ( port => port . consumesEvent || port . producesEvent )
42- . map ( port => ( { ...port , modelName : spec . modelName } ) )
51+ . map ( port => ( {
52+ ...port ,
53+ modelName : spec . modelName ,
54+ domain : spec . domain
55+ } ) )
4356 )
57+ return this . __threadRemotePorts
58+ }
59+
60+ get publisherPorts ( ) {
61+ if ( this . __publisherPorts ) this . __publisherPorts
62+ this . __publisherPorts = this . threadRemotePorts . filter ( remote =>
63+ this . threadLocalPorts . find (
64+ local => local . producesEvent === remote . consumesEvent
65+ )
66+ )
67+ return this . __publisherPorts
4468 }
4569
46- handleChannelEvent ( msg ) {
47- if ( msg . data . eventName ) this . broker . notify ( msg . data . eventName , msg . data )
70+ get subscriberPorts ( ) {
71+ if ( this . __subscriberPorts ) return this . __subscriberPorts
72+ this . __subscriberPorts = this . threadRemotePorts . filter ( remote =>
73+ this . threadLocalPorts . find (
74+ local => local . consumesEvent === remote . producesEvent
75+ )
76+ )
77+ return this . __subscriberPorts
78+ }
79+
80+ get unhandledPorts ( ) {
81+ if ( this . __unhandledPorts ) return this . __unhandledPorts
82+ this . __unhandledPorts = this . threadLocalPorts . filter (
83+ local =>
84+ ! this . threadRemotePorts . find (
85+ remote => local . producesEvent === remote . consumesEvent
86+ ) && ! this . localPorts . find ( l => local . producesEvent === l . consumesEvent )
87+ )
88+ return this . __unhandledPorts
89+ }
90+
91+ handleBroadcastEvent ( msg ) {
92+ if ( msg ?. data ?. eventName ) this . broker . notify ( msg . data . eventName , msg . data )
4893 else {
4994 console . log ( 'missing eventName' , msg . data )
5095 this . broker . notify ( 'missingEventName' , msg . data )
@@ -55,78 +100,56 @@ export class PortEventRouter {
55100 * Listen for producer events from other thread pools and invoke
56101 * local ports that consume them. Listen for local producer events
57102 * and forward to pools that consume them. If a producer event is
58- * not consumed by any local thread, foward to service mesh.
103+ * not consumed by any local thread, foward to the service mesh.
59104 */
60105 listen ( ) {
61- const localPorts = this . getThreadLocalPorts ( )
62- const remotePorts = this . getThreadRemotePorts ( )
63-
64- console . debug ( { localPorts } )
65- console . debug ( { remotePorts } )
66-
67- const publishPorts = remotePorts . filter ( remote =>
68- localPorts . find ( local => local . producesEvent === remote . consumesEvent )
69- )
70- const subscribePorts = remotePorts . filter ( remote =>
71- localPorts . find ( local => local . consumesEvent === remote . producesEvent )
72- )
73- const unhandledPorts = localPorts . filter (
74- local =>
75- ! remotePorts . find (
76- remote => local . producesEvent === remote . consumesEvent
77- ) && ! localPorts . find ( l => local . producesEvent === l . consumesEvent )
78- )
79-
80106 const services = new Set ( )
81107 const channels = new Map ( )
82108
83- publishPorts . forEach ( port => services . add ( port . modelName ) )
84- subscribePorts . forEach ( port => services . add ( port . modelName ) )
109+ this . publisherPorts . forEach ( port => services . add ( port . modelName ) )
110+ this . subscriberPorts . forEach ( port => services . add ( port . modelName ) )
85111
86112 services . forEach ( service =>
87113 channels . set ( service , new BroadcastChannel ( service ) )
88114 )
89115
90- console . log ( 'publishPorts ' , publishPorts )
91- console . log ( 'subscribePorts ' , subscribePorts )
92- console . log ( 'unhandledPorts' , unhandledPorts )
116+ console . log ( 'publisherPorts ' , this . publisherPorts )
117+ console . log ( 'subscriberPorts ' , this . subscriberPorts )
118+ console . log ( 'unhandledPorts' , this . unhandledPorts )
93119 console . log ( 'channels' , channels )
94120
95- // dispatch outgoing events
96- publishPorts . forEach ( port =>
121+ // dispatch outgoing events to local pools
122+ this . publisherPorts . forEach ( port =>
97123 this . broker . on ( port . consumesEvent , event => {
98124 console . log ( 'broadcasting...' , { port, event } )
99125 channels
100126 . get ( port . modelName )
101- . postMessage (
102- JSON . parse (
103- JSON . stringify ( { ...event , route : 'balanceEventConsumer' } )
104- )
105- )
127+ . postMessage ( JSON . parse ( JSON . stringify ( event ) ) )
106128 } )
107129 )
108130
109- // listen for incoming events
110- subscribePorts . forEach ( port => {
131+ // listen for incoming events from local pools
132+ this . subscriberPorts . forEach ( port => {
111133 channels . get ( port . modelName ) . onmessage = msg => {
112134 console . log ( 'subscribePorts.onmessage' , msg . data )
113- this . handleChannelEvent ( msg )
135+ this . handleBroadcastEvent ( msg )
114136 }
115137 } )
116-
117- unhandledPorts . forEach ( port => {
138+
139+ // send ports not handled by local pool to mesh
140+ this . unhandledPorts . forEach ( port => {
118141 this . broker . on ( port . producesEvent , event => {
119142 this . broker . notify ( 'to_main' , {
120143 ...event ,
121- route : 'balanceEventConsumer'
144+ route : 'balanceEventConsumer' // mesh routing algo
122145 } )
123146 } )
124147 } )
125148
126149 // listen to this model's channel
127- new BroadcastChannel ( workerData . poolName . toUpperCase ( ) ) . onmessage = msg => {
150+ new BroadcastChannel ( workerData . poolName ) . onmessage = msg => {
128151 console . log ( 'onmessage' , msg . data )
129- this . handleChannelEvent ( msg )
152+ this . handleBroadcastEvent ( msg )
130153 }
131154 }
132155}
0 commit comments