Skip to content

Commit 58b6426

Browse files
committed
changes onClose subscription style
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8c928ec commit 58b6426

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public abstract class BaseDuplexConnection implements DuplexConnection {
1111
protected UnboundedProcessor sender = new UnboundedProcessor();
1212

1313
public BaseDuplexConnection() {
14-
onClose.doFinally(s -> doOnClose()).subscribe();
14+
onClose.subscribe(null, t -> doOnClose(), this::doOnClose);
1515
}
1616

1717
@Override

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public KeepAliveFramesAcceptor start(
2727
KeepAliveSupport keepAliveSupport,
2828
Consumer<ByteBuf> onSendKeepAliveFrame,
2929
Consumer<KeepAlive> onTimeout) {
30-
duplexConnection.onClose().doFinally(s -> keepAliveSupport.stop()).subscribe();
30+
duplexConnection
31+
.onClose()
32+
.subscribe(null, __ -> keepAliveSupport.stop(), keepAliveSupport::stop);
3133
return keepAliveSupport
3234
.onSendKeepAliveFrame(onSendKeepAliveFrame)
3335
.onTimeout(onTimeout)

0 commit comments

Comments
 (0)