|
1 | 1 | use super::misc::{self, Counter, StreamID}; |
2 | 2 | use super::spi::*; |
3 | | -use crate::errors::{ErrorKind, RSocketError, ERR_APPLICATION}; |
| 3 | +use crate::errors::{self, ErrorKind, RSocketError}; |
4 | 4 | use crate::frame::{self, Body, Frame}; |
5 | 5 | use crate::payload::{Payload, SetupPayload}; |
6 | 6 | use crate::result::RSocketResult; |
@@ -103,7 +103,17 @@ impl DuplexSocket { |
103 | 103 | let flag = msg.get_flag(); |
104 | 104 | misc::debug_frame(false, &msg); |
105 | 105 | match msg.get_body() { |
106 | | - Body::Setup(v) => self.on_setup(&acceptor, sid, flag, SetupPayload::from(v)), |
| 106 | + Body::Setup(v) => { |
| 107 | + if let Err(e) = self.on_setup(&acceptor, sid, flag, SetupPayload::from(v)) { |
| 108 | + let errmsg = format!("{}", e); |
| 109 | + let sending = frame::Error::builder(0, 0) |
| 110 | + .set_code(errors::ERR_REJECT_SETUP) |
| 111 | + .set_data(Bytes::from(errmsg)) |
| 112 | + .build(); |
| 113 | + self.tx.send(sending).unwrap(); |
| 114 | + return; |
| 115 | + } |
| 116 | + } |
107 | 117 | Body::Resume(v) => { |
108 | 118 | // TODO: support resume |
109 | 119 | } |
@@ -225,16 +235,28 @@ impl DuplexSocket { |
225 | 235 | } |
226 | 236 |
|
227 | 237 | #[inline] |
228 | | - fn on_setup(&self, acceptor: &Acceptor, sid: u32, flag: u16, setup: SetupPayload) { |
| 238 | + fn on_setup( |
| 239 | + &self, |
| 240 | + acceptor: &Acceptor, |
| 241 | + sid: u32, |
| 242 | + flag: u16, |
| 243 | + setup: SetupPayload, |
| 244 | + ) -> Result<(), Box<dyn Error>> { |
229 | 245 | match acceptor { |
230 | 246 | Acceptor::Simple(gen) => { |
231 | 247 | self.responder.set(gen()); |
| 248 | + Ok(()) |
232 | 249 | } |
233 | | - Acceptor::Generate(gen) => { |
234 | | - self.responder.set(gen(setup, Box::new(self.clone()))); |
235 | | - } |
| 250 | + Acceptor::Generate(gen) => match gen(setup, Box::new(self.clone())) { |
| 251 | + Ok(it) => { |
| 252 | + self.responder.set(it); |
| 253 | + Ok(()) |
| 254 | + } |
| 255 | + Err(e) => Err(e), |
| 256 | + }, |
236 | 257 | Acceptor::Empty() => { |
237 | 258 | self.responder.set(Box::new(EmptyRSocket)); |
| 259 | + Ok(()) |
238 | 260 | } |
239 | 261 | } |
240 | 262 | } |
@@ -277,7 +299,7 @@ impl DuplexSocket { |
277 | 299 | bu.build() |
278 | 300 | } |
279 | 301 | Err(e) => frame::Error::builder(sid, 0) |
280 | | - .set_code(ERR_APPLICATION) |
| 302 | + .set_code(errors::ERR_APPLICATION) |
281 | 303 | .set_data(Bytes::from("TODO: should be error details")) |
282 | 304 | .build(), |
283 | 305 | }; |
|
0 commit comments