@@ -8,8 +8,8 @@ protocol PSQLChannelHandlerNotificationDelegate: AnyObject {
88}
99
1010final class PSQLChannelHandler : ChannelDuplexHandler {
11- typealias InboundIn = PSQLBackendMessage
1211 typealias OutboundIn = PSQLTask
12+ typealias InboundIn = ByteBuffer
1313 typealias OutboundOut = PSQLFrontendMessage
1414
1515 private let logger : Logger
@@ -24,6 +24,7 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
2424 /// The context is captured in `handlerAdded` and released` in `handlerRemoved`
2525 private var handlerContext : ChannelHandlerContext !
2626 private var rowStream : PSQLRowStream ?
27+ private var decoder : NIOSingleStepByteToMessageProcessor < PSQLBackendMessageDecoder >
2728 private let authentificationConfiguration : PSQLConnection . Configuration . Authentication ?
2829 private let configureSSLCallback : ( ( Channel ) throws -> Void ) ?
2930
@@ -38,6 +39,7 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
3839 self . authentificationConfiguration = authentification
3940 self . configureSSLCallback = configureSSLCallback
4041 self . logger = logger
42+ self . decoder = NIOSingleStepByteToMessageProcessor ( PSQLBackendMessageDecoder ( ) )
4143 }
4244
4345 #if DEBUG
@@ -51,6 +53,7 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
5153 self . authentificationConfiguration = authentification
5254 self . configureSSLCallback = configureSSLCallback
5355 self . logger = logger
56+ self . decoder = NIOSingleStepByteToMessageProcessor ( PSQLBackendMessageDecoder ( ) )
5457 }
5558 #endif
5659
@@ -91,54 +94,62 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
9194 }
9295
9396 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
94- let incomingMessage = self . unwrapInboundIn ( data)
97+ let buffer = self . unwrapInboundIn ( data)
9598
96- self . logger. trace ( " Backend message received " , metadata: [ . message: " \( incomingMessage) " ] )
97-
98- let action : ConnectionStateMachine . ConnectionAction
99-
100- switch incomingMessage {
101- case . authentication( let authentication) :
102- action = self . state. authenticationMessageReceived ( authentication)
103- case . backendKeyData( let keyData) :
104- action = self . state. backendKeyDataReceived ( keyData)
105- case . bindComplete:
106- action = self . state. bindCompleteReceived ( )
107- case . closeComplete:
108- action = self . state. closeCompletedReceived ( )
109- case . commandComplete( let commandTag) :
110- action = self . state. commandCompletedReceived ( commandTag)
111- case . dataRow( let dataRow) :
112- action = self . state. dataRowReceived ( dataRow)
113- case . emptyQueryResponse:
114- action = self . state. emptyQueryResponseReceived ( )
115- case . error( let errorResponse) :
116- action = self . state. errorReceived ( errorResponse)
117- case . noData:
118- action = self . state. noDataReceived ( )
119- case . notice( let noticeResponse) :
120- action = self . state. noticeReceived ( noticeResponse)
121- case . notification( let notification) :
122- action = self . state. notificationReceived ( notification)
123- case . parameterDescription( let parameterDescription) :
124- action = self . state. parameterDescriptionReceived ( parameterDescription)
125- case . parameterStatus( let parameterStatus) :
126- action = self . state. parameterStatusReceived ( parameterStatus)
127- case . parseComplete:
128- action = self . state. parseCompleteReceived ( )
129- case . portalSuspended:
130- action = self . state. portalSuspendedReceived ( )
131- case . readyForQuery( let transactionState) :
132- action = self . state. readyForQueryReceived ( transactionState)
133- case . rowDescription( let rowDescription) :
134- action = self . state. rowDescriptionReceived ( rowDescription)
135- case . sslSupported:
136- action = self . state. sslSupportedReceived ( )
137- case . sslUnsupported:
138- action = self . state. sslUnsupportedReceived ( )
99+ do {
100+ try self . decoder. process ( buffer: buffer) { message in
101+ self . logger. trace ( " Backend message received " , metadata: [ . message: " \( message) " ] )
102+ let action : ConnectionStateMachine . ConnectionAction
103+
104+ switch message {
105+ case . authentication( let authentication) :
106+ action = self . state. authenticationMessageReceived ( authentication)
107+ case . backendKeyData( let keyData) :
108+ action = self . state. backendKeyDataReceived ( keyData)
109+ case . bindComplete:
110+ action = self . state. bindCompleteReceived ( )
111+ case . closeComplete:
112+ action = self . state. closeCompletedReceived ( )
113+ case . commandComplete( let commandTag) :
114+ action = self . state. commandCompletedReceived ( commandTag)
115+ case . dataRow( let dataRow) :
116+ action = self . state. dataRowReceived ( dataRow)
117+ case . emptyQueryResponse:
118+ action = self . state. emptyQueryResponseReceived ( )
119+ case . error( let errorResponse) :
120+ action = self . state. errorReceived ( errorResponse)
121+ case . noData:
122+ action = self . state. noDataReceived ( )
123+ case . notice( let noticeResponse) :
124+ action = self . state. noticeReceived ( noticeResponse)
125+ case . notification( let notification) :
126+ action = self . state. notificationReceived ( notification)
127+ case . parameterDescription( let parameterDescription) :
128+ action = self . state. parameterDescriptionReceived ( parameterDescription)
129+ case . parameterStatus( let parameterStatus) :
130+ action = self . state. parameterStatusReceived ( parameterStatus)
131+ case . parseComplete:
132+ action = self . state. parseCompleteReceived ( )
133+ case . portalSuspended:
134+ action = self . state. portalSuspendedReceived ( )
135+ case . readyForQuery( let transactionState) :
136+ action = self . state. readyForQueryReceived ( transactionState)
137+ case . rowDescription( let rowDescription) :
138+ action = self . state. rowDescriptionReceived ( rowDescription)
139+ case . sslSupported:
140+ action = self . state. sslSupportedReceived ( )
141+ case . sslUnsupported:
142+ action = self . state. sslUnsupportedReceived ( )
143+ }
144+
145+ self . run ( action, with: context)
146+ }
147+ } catch let error as PSQLDecodingError {
148+ let action = self . state. errorHappened ( . decoding( error) )
149+ self . run ( action, with: context)
150+ } catch {
151+ preconditionFailure ( " Expected to only get PSQLDecodingErrors from the PSQLBackendMessageDecoder. " )
139152 }
140-
141- self . run ( action, with: context)
142153 }
143154
144155 func channelReadComplete( context: ChannelHandlerContext ) {
0 commit comments