1515 */
1616package io.rsocket.android
1717
18- import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
19-
20- import io.netty.buffer.*
18+ import com.sun.org.apache.xpath.internal.operations.Bool
19+ import io.netty.buffer.ByteBuf
20+ import io.netty.buffer.ByteBufAllocator
21+ import io.netty.buffer.ByteBufHolder
22+ import io.netty.buffer.Unpooled
2123import io.netty.util.IllegalReferenceCountException
2224import io.netty.util.Recycler
2325import io.netty.util.Recycler.Handle
2426import io.netty.util.ResourceLeakDetector
25- import io.rsocket.android.frame.ErrorFrameFlyweight
26- import io.rsocket.android.frame.FrameHeaderFlyweight
27- import io.rsocket.android.frame.KeepaliveFrameFlyweight
28- import io.rsocket.android.frame.LeaseFrameFlyweight
29- import io.rsocket.android.frame.RequestFrameFlyweight
30- import io.rsocket.android.frame.RequestNFrameFlyweight
31- import io.rsocket.android.frame.SetupFrameFlyweight
32- import io.rsocket.android.frame.VersionFlyweight
27+ import io.rsocket.android.frame.*
28+ import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
29+ import org.slf4j.LoggerFactory
3330import java.nio.ByteBuffer
3431import java.nio.charset.StandardCharsets
35- import org.slf4j.LoggerFactory
3632
3733/* *
3834 * Represents a Frame sent over a [DuplexConnection].
@@ -51,10 +47,12 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
5147
5248 /* * Return the content which is held by this [Frame]. */
5349 override fun content (): ByteBuf {
54- if (content!! .refCnt() <= 0 ) {
55- throw IllegalReferenceCountException (content!! .refCnt())
56- }
57- return content as ByteBuf
50+ val c = content
51+ return if (c == null ) {
52+ throw IllegalReferenceCountException (0 )
53+ } else if (c.refCnt() <= 0 ) {
54+ throw IllegalReferenceCountException (c.refCnt())
55+ } else content as ByteBuf
5856 }
5957
6058 /* * Creates a deep copy of this [Frame]. */
@@ -79,7 +77,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
7977 * Returns the reference count of this object. If `0`, it means this object has been
8078 * deallocated.
8179 */
82- override fun refCnt (): Int = content!! .refCnt()
80+ override fun refCnt (): Int = content? .refCnt() ? : 0
8381
8482 /* * Increases the reference count by `1`. */
8583 override fun retain (): Frame {
@@ -210,11 +208,17 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
210208 */
211209 fun flags (): Int = FrameHeaderFlyweight .flags(content!! )
212210
213- fun hasMetadata (): Boolean = isFlagSet(this .flags(), FLAGS_M )
211+ fun isFlagSet (flag : Int ): Boolean {
212+ return isFlagSet(this .flags(), flag)
213+ }
214+
215+ fun hasMetadata (): Boolean = isFlagSet(FLAGS_M )
214216
215217 val dataUtf8: String
216218 get() = StandardCharsets .UTF_8 .decode(data).toString()
217219
220+ val isFragmentable: Boolean
221+ get() = type.isFragmentable
218222 /* TODO:
219223 *
220224 * fromRequest(type, id, payload)
@@ -227,6 +231,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
227231
228232 fun from (
229233 flags : Int ,
234+ version : Int ,
230235 keepaliveInterval : Int ,
231236 maxLifetime : Int ,
232237 metadataMimeType : String ,
@@ -250,6 +255,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
250255 SetupFrameFlyweight .encode(
251256 frame.content!! ,
252257 flags,
258+ version,
253259 keepaliveInterval,
254260 maxLifetime,
255261 metadataMimeType,
@@ -259,6 +265,25 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
259265 return frame
260266 }
261267
268+ fun from (
269+ flags : Int ,
270+ keepaliveInterval : Int ,
271+ maxLifetime : Int ,
272+ metadataMimeType : String ,
273+ dataMimeType : String ,
274+ payload : Payload ): Frame {
275+
276+ return from(
277+ flags,
278+ SetupFrameFlyweight .CURRENT_VERSION ,
279+ keepaliveInterval,
280+ maxLifetime,
281+ metadataMimeType,
282+ dataMimeType,
283+ payload)
284+ }
285+
286+
262287 fun getFlags (frame : Frame ): Int {
263288 ensureFrameType(FrameType .SETUP , frame)
264289 val flags = FrameHeaderFlyweight .flags(frame.content!! )
@@ -271,6 +296,20 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
271296 return SetupFrameFlyweight .version(frame.content!! )
272297 }
273298
299+ fun resumeEnabled (frame : Frame ): Boolean {
300+ ensureFrameType(FrameType .SETUP , frame)
301+ return Frame .isFlagSet(
302+ frame.flags(),
303+ SetupFrameFlyweight .FLAGS_RESUME_ENABLE )
304+ }
305+
306+ fun leaseEnabled (frame : Frame ): Boolean {
307+ ensureFrameType(FrameType .SETUP , frame)
308+ return Frame .isFlagSet(
309+ frame.flags(),
310+ SetupFrameFlyweight .FLAGS_WILL_HONOR_LEASE )
311+ }
312+
274313 fun keepaliveInterval (frame : Frame ): Int {
275314 ensureFrameType(FrameType .SETUP , frame)
276315 return SetupFrameFlyweight .keepaliveInterval(frame.content!! )
@@ -429,14 +468,14 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
429468 fun from (
430469 streamId : Int ,
431470 type : FrameType ,
432- metadata : ByteBuf ,
471+ metadata : ByteBuf ? ,
433472 data : ByteBuf ,
434473 initialRequestN : Int ,
435474 flags : Int ): Frame {
436475 val frame = RECYCLER .get()
437476 frame.content = ByteBufAllocator .DEFAULT .buffer(
438477 RequestFrameFlyweight .computeFrameLength(
439- type, metadata.readableBytes(), data.readableBytes()))
478+ type, metadata? .readableBytes(), data.readableBytes()))
440479 frame.content!! .writerIndex(
441480 RequestFrameFlyweight .encode(
442481 frame.content!! ,
@@ -449,6 +488,21 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
449488 return frame
450489 }
451490
491+ fun from (streamId : Int ,
492+ type : FrameType ,
493+ metadata : ByteBuf ? ,
494+ data : ByteBuf ,
495+ flags : Int ): Frame {
496+
497+ return PayloadFrame .from(
498+ streamId,
499+ type,
500+ metadata,
501+ data,
502+ flags)
503+ }
504+
505+
452506 fun initialRequestN (frame : Frame ): Int {
453507 val type = frame.type
454508 if (! type.isRequestType) {
@@ -538,6 +592,66 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
538592 }
539593 }
540594
595+ object Fragmentation {
596+
597+ fun assembleFrame (blueprintFrame : Frame ,
598+ metadata : ByteBuf ,
599+ data : ByteBuf ): Frame =
600+
601+ create(blueprintFrame,
602+ metadata,
603+ data,
604+ { it and FrameHeaderFlyweight .FLAGS_F .inv () })
605+
606+ fun sliceFrame (blueprintFrame : Frame ,
607+ metadata : ByteBuf ? ,
608+ data : ByteBuf ,
609+ additionalFlags : Int ): Frame =
610+
611+ create(blueprintFrame,
612+ metadata,
613+ data,
614+ { it or additionalFlags })
615+
616+ private inline fun create (blueprintFrame : Frame ,
617+ metadata : ByteBuf ? ,
618+ data : ByteBuf ,
619+ modifyFlags : (Int ) -> Int ): Frame =
620+ when (blueprintFrame.type) {
621+ FrameType .FIRE_AND_FORGET ,
622+ FrameType .REQUEST_RESPONSE -> {
623+ Frame .Request .from(
624+ blueprintFrame.streamId,
625+ blueprintFrame.type,
626+ metadata,
627+ data,
628+ modifyFlags(blueprintFrame.flags()))
629+ }
630+ FrameType .NEXT ,
631+ FrameType .NEXT_COMPLETE -> {
632+ Frame .PayloadFrame .from(
633+ blueprintFrame.streamId,
634+ blueprintFrame.type,
635+ metadata,
636+ data,
637+ modifyFlags(blueprintFrame.flags()))
638+ }
639+
640+ FrameType .REQUEST_STREAM ,
641+ FrameType .REQUEST_CHANNEL -> {
642+ Frame .Request .from(
643+ blueprintFrame.streamId,
644+ blueprintFrame.type,
645+ metadata,
646+ data,
647+ Frame .Request .initialRequestN(blueprintFrame),
648+ modifyFlags(blueprintFrame.flags()))
649+ }
650+ else -> throw AssertionError (" Non-fragmentable frame: " +
651+ " ${blueprintFrame.type} " )
652+ }
653+ }
654+
541655 override fun toString (): String {
542656 val type = FrameHeaderFlyweight .frameType(content!! )
543657 val payload = StringBuilder ()
@@ -587,7 +701,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
587701 }
588702
589703 companion object {
590- val NULL_BYTEBUFFER : ByteBuffer = ByteBuffer .allocateDirect(0 )
704+ val NULL_BYTEBUFFER : ByteBuffer = ByteBuffer .allocateDirect(0 )
591705
592706 private val RECYCLER = object : Recycler <Frame >() {
593707 override fun newObject (handle : Handle <Frame >): Frame {
0 commit comments