@@ -198,10 +198,11 @@ func (p *dispatcherPool[Q, R]) Execute(ctx context.Context, q Message[Q]) (Messa
198198}
199199
200200func (d * dispatcher [Q , R ]) newCall (q Message [Q ]) (* call [Q , R ], error ) {
201- responseCountdown := 1 // Default is JSON response only.
201+ responseCountdown := & atomic.Int32 {}
202+ responseCountdown .Add (1 ) // Default is JSON response only.
202203 if _ , ok := any (d .zeroQ .Data ).(DestinationProvider ); ok {
203204 // Expecting JSON followed by binary blob.
204- responseCountdown ++
205+ responseCountdown . Add ( 1 )
205206 }
206207 call := & call [Q , R ]{
207208 donec : make (chan * call [Q , R ], 1 ),
@@ -262,14 +263,15 @@ func (d *dispatcher[Q, R]) inputBlobs() {
262263 inputErr = err
263264 break
264265 }
265- hdebug . Printf ( "=== === === Read blob header %d %d" , id , length )
266+
266267 lr := & io.LimitedReader {
267268 R : d .inOut .stdoutBinary ,
268269 N : int64 (length ),
269270 }
270271
271272 call := d .pendingCall (id )
272- call .responseCountdown --
273+
274+ hdebug .Printf ("START === === === Read blob header id: %d len: %d countdown: %d" , id , length , call .responseCountdown .Load ())
273275
274276 if err := call .handleBlob (lr ); err != nil {
275277 inputErr = err
@@ -279,12 +281,14 @@ func (d *dispatcher[Q, R]) inputBlobs() {
279281 inputErr = fmt .Errorf ("blob %d: expected to read %d more bytes" , id , lr .N )
280282 break
281283 }
282- if call .responseCountdown <= 0 {
284+ call .responseCountdown .Add (- 1 )
285+ if call .responseCountdown .Load () <= 0 {
283286 d .mu .Lock ()
284287 delete (d .pending , id )
285288 d .mu .Unlock ()
286289 call .done ()
287290 }
291+ hdebug .Printf ("END === === === Read blob header id: %d len: %d countdown: %d" , id , length , call .responseCountdown .Load ())
288292 }
289293
290294 if inputErr != nil && inputErr != io .EOF && inputErr != io .ErrClosedPipe {
@@ -304,17 +308,17 @@ func (d *dispatcher[Q, R]) inputJSON() {
304308
305309 call := d .pendingCall (r .GetID ())
306310
307- call .responseCountdown --
308- if call .responseCountdown < 0 {
309- call .err = fmt .Errorf ("protocol error: JSON response must be sent first." )
310- }
311+ hdebug .Printf ("END === === === get JSON id: %d countdown: %d" , r .GetID (), call .responseCountdown .Load ())
312+
313+ call .responseCountdown .Add (- 1 )
311314 call .response = r
312- if call .responseCountdown == 0 {
315+ if call .responseCountdown . Load () <= 0 || r . Header . Err != "" {
313316 d .mu .Lock ()
314317 delete (d .pending , r .GetID ())
315318 d .mu .Unlock ()
316- call .done ()
319+ call .done () // TODO1 check that this can be called multiple times safely.
317320 }
321+ hdebug .Printf ("END === === === get JSON id: %d countdown: %d" , r .GetID (), call .responseCountdown .Load ())
318322
319323 }
320324
@@ -352,13 +356,12 @@ func (d *dispatcher[Q, R]) pendingCall(id uint32) *call[Q, R] {
352356type call [Q , R any ] struct {
353357 request Message [Q ]
354358 response Message [R ]
355- responseCountdown int
359+ responseCountdown * atomic. Int32
356360 err error
357361 donec chan * call [Q , R ]
358362}
359363
360364func (c * call [Q , R ]) handleBlob (r io.Reader ) error {
361- hdebug .Printf ("call ====> handleBlob" )
362365 dest := any (c .request .Data ).(DestinationProvider ).GetDestination ()
363366 if dest == nil {
364367 panic ("blob destination is not set" )
@@ -522,29 +525,29 @@ func newDispatcher[Q, R any](opts Options) (*dispatcherPool[Q, R], error) {
522525
523526 inOuts := make ([]* inOut , opts .PoolSize )
524527 for i := range opts .PoolSize {
525- var stdin , stdoutBinary hugio.ReadWriteCloser
528+ var stdinPipe , stdoutBinary hugio.ReadWriteCloser
526529 var stdout io.WriteCloser
527530 var jsonr io.Reader
528531
529- stdin = hugio .NewPipeReadWriteCloser ()
530- stdoutRwc := hugio .NewPipeReadWriteCloser ()
531- stdout = stdoutRwc
532+ stdinPipe = hugio .NewPipeReadWriteCloser ()
533+ stdoutPipe := hugio .NewPipeReadWriteCloser ()
534+ stdout = stdoutPipe
532535
533536 var zero Q
534537 if _ , ok := any (zero ).(DestinationProvider ); ok {
535538 stdoutBinary = hugio .NewPipeReadWriteCloser ()
536- jsonr = stdoutRwc
539+ jsonr = stdoutPipe
537540 stdout = textandbinaryreader .NewWriter (stdout , stdoutBinary )
538541 } else {
539- jsonr = stdoutRwc
542+ jsonr = stdoutPipe
540543 }
541544
542545 inOuts [i ] = & inOut {
543- stdin : stdin ,
546+ stdin : stdinPipe ,
544547 stdout : stdout ,
545548 stdoutBinary : stdoutBinary ,
546549 dec : json .NewDecoder (jsonr ),
547- enc : json .NewEncoder (stdin ),
550+ enc : json .NewEncoder (stdinPipe ),
548551 }
549552 }
550553
@@ -748,8 +751,13 @@ func AllDispatchers(opts Options) *Dispatchers {
748751 _ , err = data .ReadFrom (r )
749752 webOpts .Main = Binary {Name : "webp" , Data : data .Bytes ()}
750753
751- return & Dispatchers {
754+ dispatchers := & Dispatchers {
752755 katex : & lazyDispatcher [KatexInput , KatexOutput ]{opts : katexOpts },
753756 webp : & lazyDispatcher [WebpInput , WebpOutput ]{opts : webOpts },
754757 }
758+
759+ // TODO1 better way to do this?
760+ // image.RegisterFormat("webp", "RIFF????WEBPVP8", dispatchers.DecodeWebp, dispatchers.DecodeWebpConfig)
761+
762+ return dispatchers
755763}
0 commit comments