Skip to content

Commit c0daf8d

Browse files
authored
Fix/error response on future (#513)
* forget to set error_response * forget to set error_response * forget to set error_response * forget to set error_response
1 parent c2f2a7f commit c0daf8d

File tree

3 files changed

+83
-7
lines changed

3 files changed

+83
-7
lines changed

jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,33 +216,35 @@ public void complete(final Object result, final Throwable err) {
216216

217217
if (err == null) {
218218
Status status = Status.OK();
219+
Message msg;
219220
if (result instanceof ErrorResponse) {
220221
status = handleErrorResponse((ErrorResponse) result);
222+
msg = (Message) result;
221223
} else if (result instanceof Message) {
222224
final Descriptors.FieldDescriptor fd = ((Message) result).getDescriptorForType() //
223225
.findFieldByNumber(RpcResponseFactory.ERROR_RESPONSE_NUM);
224226
if (fd != null && ((Message) result).hasField(fd)) {
225227
final ErrorResponse eResp = (ErrorResponse) ((Message) result).getField(fd);
226228
status = handleErrorResponse(eResp);
229+
msg = eResp;
227230
} else {
228-
if (done != null) {
229-
done.setResponse((T) result);
230-
}
231+
msg = (T) result;
231232
}
232233
} else {
233-
if (done != null) {
234-
done.setResponse((T) result);
235-
}
234+
msg = (T) result;
236235
}
237236
if (done != null) {
238237
try {
238+
if (status.isOk()) {
239+
done.setResponse((T) msg);
240+
}
239241
done.run(status);
240242
} catch (final Throwable t) {
241243
LOG.error("Fail to run RpcResponseClosure, the request is {}.", request, t);
242244
}
243245
}
244246
if (!future.isDone()) {
245-
future.setResult((Message) result);
247+
future.setResult(msg);
246248
}
247249
} else {
248250
if (done != null) {

jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/AbstractClientServiceTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,4 +243,41 @@ public void testInvokeWithDoneOnException() throws Exception {
243243
assertNotNull(done.status);
244244
assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
245245
}
246+
247+
@Test
248+
public void testInvokeWithDOneOnErrorResponse() throws Exception {
249+
final InvokeContext invokeCtx = new InvokeContext();
250+
invokeCtx.put(InvokeContext.CRC_SWITCH, false);
251+
final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
252+
final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
253+
.setGroupId("id") //
254+
.setLeaderId("127.0.0.1:8001") //
255+
.build();
256+
257+
MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
258+
Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, invokeCtx, done, -1);
259+
Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), eq(invokeCtx),
260+
callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
261+
InvokeCallback cb = callbackArg.getValue();
262+
assertNotNull(cb);
263+
assertNotNull(future);
264+
265+
assertNull(done.getResponse());
266+
assertNull(done.status);
267+
assertFalse(future.isDone());
268+
269+
final Message resp = this.rpcResponseFactory.newResponse(CliRequests.GetPeersResponse.getDefaultInstance(),
270+
new Status(-1, "failed"));
271+
cb.complete(resp, null);
272+
273+
final Message msg = future.get();
274+
275+
assertTrue(msg instanceof ErrorResponse);
276+
assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");
277+
278+
done.latch.await();
279+
assertNotNull(done.status);
280+
assertTrue(!done.status.isOk());
281+
assertEquals(done.status.getErrorMsg(), "failed");
282+
}
246283
}

jraft-extension/rpc-grpc-impl/src/test/java/com/alipay/sofa/jraft/rpc/AbstractClientServiceTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,4 +243,41 @@ public void testInvokeWithDoneOnException() throws Exception {
243243
assertNotNull(done.status);
244244
assertEquals(RaftError.ETIMEDOUT.getNumber(), done.status.getCode());
245245
}
246+
247+
@Test
248+
public void testInvokeWithDOneOnErrorResponse() throws Exception {
249+
final InvokeContext invokeCtx = new InvokeContext();
250+
invokeCtx.put(InvokeContext.CRC_SWITCH, false);
251+
final ArgumentCaptor<InvokeCallback> callbackArg = ArgumentCaptor.forClass(InvokeCallback.class);
252+
final CliRequests.GetPeersRequest request = CliRequests.GetPeersRequest.newBuilder() //
253+
.setGroupId("id") //
254+
.setLeaderId("127.0.0.1:8001") //
255+
.build();
256+
257+
MockRpcResponseClosure<ErrorResponse> done = new MockRpcResponseClosure<>();
258+
Future<Message> future = this.clientService.invokeWithDone(this.endpoint, request, invokeCtx, done, -1);
259+
Mockito.verify(this.rpcClient).invokeAsync(eq(this.endpoint), eq(request), eq(invokeCtx),
260+
callbackArg.capture(), eq((long) this.rpcOptions.getRpcDefaultTimeout()));
261+
InvokeCallback cb = callbackArg.getValue();
262+
assertNotNull(cb);
263+
assertNotNull(future);
264+
265+
assertNull(done.getResponse());
266+
assertNull(done.status);
267+
assertFalse(future.isDone());
268+
269+
final Message resp = this.rpcResponseFactory.newResponse(CliRequests.GetPeersResponse.getDefaultInstance(),
270+
new Status(-1, "failed"));
271+
cb.complete(resp, null);
272+
273+
final Message msg = future.get();
274+
275+
assertTrue(msg instanceof ErrorResponse);
276+
assertEquals(((ErrorResponse) msg).getErrorMsg(), "failed");
277+
278+
done.latch.await();
279+
assertNotNull(done.status);
280+
assertTrue(!done.status.isOk());
281+
assertEquals(done.status.getErrorMsg(), "failed");
282+
}
246283
}

0 commit comments

Comments
 (0)