@@ -10,6 +10,7 @@ import (
1010 "io"
1111 "net"
1212 "net/http"
13+ "strings"
1314 "sync"
1415
1516 "github.com/soheilhy/cmux"
@@ -36,18 +37,19 @@ import (
3637 "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
3738)
3839
39- var ErrSameGRPCandHTTPPort = errors .New ("cannot use same port for grpc and http server" )
40-
4140// Server runs HTTP, Mux and a grpc server
4241type Server struct {
4342 querySvc * querysvc.QueryService
4443 queryOptions * QueryOptions
4544
46- grpcConn net.Listener
47- httpConn net.Listener
48- grpcServer * grpc.Server
49- httpServer * httpServer
50- bgFinished sync.WaitGroup
45+ conn net.Listener
46+ grpcConn net.Listener
47+ httpConn net.Listener
48+ cmuxServer cmux.CMux
49+ grpcServer * grpc.Server
50+ httpServer * httpServer
51+ separatePorts bool
52+ bgFinished sync.WaitGroup
5153 telemetery.Setting
5254}
5355
@@ -71,8 +73,8 @@ func NewServer(
7173 }
7274 separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"
7375
74- if ! separatePorts {
75- return nil , ErrSameGRPCandHTTPPort
76+ if ( options . HTTP . TLSSetting != nil || options . GRPC . TLSSetting != nil ) && ! separatePorts {
77+ return nil , errors . New ( "server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead" )
7678 }
7779
7880 grpcServer , err := createGRPCServer (ctx , host , querySvc , metricsQuerySvc , options , tm , telset )
@@ -86,11 +88,12 @@ func NewServer(
8688 }
8789
8890 return & Server {
89- querySvc : querySvc ,
90- queryOptions : options ,
91- grpcServer : grpcServer ,
92- httpServer : httpServer ,
93- Setting : telset ,
91+ querySvc : querySvc ,
92+ queryOptions : options ,
93+ grpcServer : grpcServer ,
94+ httpServer : httpServer ,
95+ separatePorts : separatePorts ,
96+ Setting : telset ,
9497 }, nil
9598}
9699
@@ -233,32 +236,70 @@ func (hS httpServer) Close() error {
233236}
234237
235238// initListener initialises listeners of the server
236- func (s * Server ) initListener (ctx context.Context ) error {
237- var err error
238- s .grpcConn , err = s .queryOptions .GRPC .NetAddr .Listen (ctx )
239- if err != nil {
240- return err
239+ func (s * Server ) initListener (ctx context.Context ) (cmux.CMux , error ) {
240+ if s .separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
241+ var err error
242+ s .grpcConn , err = s .queryOptions .GRPC .NetAddr .Listen (ctx )
243+ if err != nil {
244+ return nil , err
245+ }
246+
247+ s .httpConn , err = s .queryOptions .HTTP .ToListener (ctx )
248+ if err != nil {
249+ return nil , err
250+ }
251+ s .Logger .Info (
252+ "Query server started" ,
253+ zap .String ("http_addr" , s .HTTPAddr ()),
254+ zap .String ("grpc_addr" , s .GRPCAddr ()),
255+ )
256+ return nil , nil
241257 }
242258
243- s .httpConn , err = s .queryOptions .HTTP .ToListener (ctx )
259+ // old behavior using cmux
260+ conn , err := net .Listen ("tcp" , s .queryOptions .HTTP .Endpoint )
244261 if err != nil {
245- return err
262+ return nil , err
246263 }
264+
265+ s .conn = conn
266+
267+ var tcpPort int
268+ if port , err := netutils .GetPort (s .conn .Addr ()); err == nil {
269+ tcpPort = port
270+ }
271+
247272 s .Logger .Info (
248273 "Query server started" ,
249- zap .String ("http_addr" , s .HTTPAddr ()),
250- zap .String ("grpc_addr" , s .GRPCAddr ()),
274+ zap .Int ("port" , tcpPort ),
275+ zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
276+
277+ // cmux server acts as a reverse-proxy between HTTP and GRPC backends.
278+ cmuxServer := cmux .New (s .conn )
279+
280+ s .grpcConn = cmuxServer .MatchWithWriters (
281+ cmux .HTTP2MatchHeaderFieldSendSettings ("content-type" , "application/grpc" ),
282+ cmux .HTTP2MatchHeaderFieldSendSettings ("content-type" , "application/grpc+proto" ),
251283 )
252- return nil
284+ s . httpConn = cmuxServer . Match ( cmux . Any ())
253285
286+ return cmuxServer , nil
254287}
255288
256289// Start http, GRPC and cmux servers concurrently
257290func (s * Server ) Start (ctx context.Context ) error {
258- err := s .initListener (ctx )
291+ cmuxServer , err := s .initListener (ctx )
259292 if err != nil {
260293 return fmt .Errorf ("query server failed to initialize listener: %w" , err )
261294 }
295+ s .cmuxServer = cmuxServer
296+
297+ var tcpPort int
298+ if ! s .separatePorts {
299+ if port , err := netutils .GetPort (s .conn .Addr ()); err == nil {
300+ tcpPort = port
301+ }
302+ }
262303
263304 var httpPort int
264305 if port , err := netutils .GetPort (s .httpConn .Addr ()); err == nil {
@@ -303,6 +344,23 @@ func (s *Server) Start(ctx context.Context) error {
303344 s .Logger .Info ("GRPC server stopped" , zap .Int ("port" , grpcPort ), zap .String ("addr" , s .queryOptions .GRPC .NetAddr .Endpoint ))
304345 }()
305346
347+ // Start cmux server concurrently.
348+ if ! s .separatePorts {
349+ s .bgFinished .Add (1 )
350+ go func () {
351+ defer s .bgFinished .Done ()
352+ s .Logger .Info ("Starting CMUX server" , zap .Int ("port" , tcpPort ), zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
353+
354+ err := cmuxServer .Serve ()
355+ // TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
356+ if err != nil && ! strings .Contains (err .Error (), "use of closed network connection" ) {
357+ s .Logger .Error ("Could not start multiplexed server" , zap .Error (err ))
358+ s .ReportStatus (componentstatus .NewFatalErrorEvent (err ))
359+ return
360+ }
361+ s .Logger .Info ("CMUX server stopped" , zap .Int ("port" , tcpPort ), zap .String ("addr" , s .queryOptions .HTTP .Endpoint ))
362+ }()
363+ }
306364 return nil
307365}
308366
@@ -326,6 +384,10 @@ func (s *Server) Close() error {
326384 s .Logger .Info ("Stopping gRPC server" )
327385 s .grpcServer .Stop ()
328386
387+ if ! s .separatePorts {
388+ s .Logger .Info ("Closing CMux server" )
389+ s .cmuxServer .Close ()
390+ }
329391 s .bgFinished .Wait ()
330392
331393 s .Logger .Info ("Server stopped" )
0 commit comments