Skip to content

Commit 31d15d7

Browse files
author
anonymous
committed
[opt] Network add buffer support.
1 parent 10445dc commit 31d15d7

18 files changed

+182
-88
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.tans.tfiletransporter.netty
2+
3+
import com.tans.tlrucache.memory.LruByteArrayPool
4+
import com.tans.tlrucache.memory.LruByteArrayPool.Companion.ByteArrayValue
5+
6+
class ByteArrayPool(val maxPoolSize: Long = DEFAULT_MAX_POOL_SIZE) {
7+
8+
private val pool: LruByteArrayPool by lazy {
9+
LruByteArrayPool(maxPoolSize)
10+
}
11+
12+
fun get(requestSize: Int): ByteArrayValue {
13+
return if (requestSize <= 0) {
14+
ByteArrayValue(
15+
ByteArray(0)
16+
)
17+
} else if (requestSize % 1024 == 0) {
18+
pool.get(requestSize)
19+
} else {
20+
val fixedRequestSize = requestSize - (requestSize % 1024) + 1024
21+
pool.get(fixedRequestSize)
22+
}
23+
}
24+
25+
fun put(value: ByteArrayValue) {
26+
if (value.value.isNotEmpty()) {
27+
pool.put(value)
28+
}
29+
}
30+
31+
fun clearMemory() {
32+
pool.clearMemory()
33+
}
34+
35+
companion object {
36+
// 5 MB
37+
const val DEFAULT_MAX_POOL_SIZE = 1024L * 1024L * 5L
38+
}
39+
}

net/src/main/java/com/tans/tfiletransporter/netty/INettyConnectionTask.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ interface INettyConnectionTask : Runnable {
1616

1717
val observers: LinkedBlockingDeque<NettyConnectionObserver>
1818

19+
val byteArrayPool: ByteArrayPool
20+
1921
fun isExecuted(): Boolean = isExecuted.get()
2022

2123
fun getCurrentState(): NettyTaskState = state.get()
@@ -73,6 +75,7 @@ interface INettyConnectionTask : Runnable {
7375
state.channel.close()
7476
}
7577
dispatchState(NettyTaskState.ConnectionClosed)
78+
byteArrayPool.clearMemory()
7679
}
7780
}
7881

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.tans.tfiletransporter.netty
2+
3+
import com.tans.tlrucache.memory.LruByteArrayPool.Companion.ByteArrayValue
4+
5+
class NetByteArray(
6+
val value: ByteArrayValue,
7+
val readSize: Int
8+
)
Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,8 @@
11
package com.tans.tfiletransporter.netty
22

3-
import io.netty.buffer.ByteBuf
43

54
data class PackageData(
65
val type: Int,
76
val messageId: Long,
8-
val body: ByteArray
9-
) {
10-
override fun equals(other: Any?): Boolean {
11-
if (this === other) return true
12-
if (javaClass != other?.javaClass) return false
13-
14-
other as PackageData
15-
16-
if (type != other.type) return false
17-
return body.contentEquals(other.body)
18-
}
19-
20-
override fun hashCode(): Int {
21-
var result = type
22-
result = 31 * result + body.contentHashCode()
23-
return result
24-
}
25-
}
26-
27-
fun ByteBuf.readBytes(): ByteArray {
28-
val size = writerIndex() - readerIndex()
29-
return if (size > 0) {
30-
ByteArray(size).apply {
31-
readBytes(this)
32-
}
33-
} else {
34-
ByteArray(0)
35-
}
36-
}
7+
val body: NetByteArray
8+
)

net/src/main/java/com/tans/tfiletransporter/netty/extensions/DefaultClientManager.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ class DefaultClientManager(
166166
val response = converter.convert(
167167
downloadData.type,
168168
responseClass,
169-
downloadData
169+
downloadData,
170+
connectionTask.byteArrayPool
170171
)
171172
if (response != null) {
172173
// 将当前任务从正在等待的任务队列中移除
@@ -213,7 +214,8 @@ class DefaultClientManager(
213214
type = type,
214215
messageId = messageId,
215216
data = request,
216-
dataClass = requestClass
217+
dataClass = requestClass,
218+
byteArrayPool = connectionTask.byteArrayPool
217219
)
218220
if (pckData != null) {
219221
// 等待 Server 回复超时 Task

net/src/main/java/com/tans/tfiletransporter/netty/extensions/DefaultConverterFactory.kt

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.tans.tfiletransporter.netty.extensions
22

33
import com.squareup.moshi.Moshi
4+
import com.tans.tfiletransporter.netty.ByteArrayPool
5+
import com.tans.tfiletransporter.netty.NetByteArray
46
import com.tans.tfiletransporter.netty.PackageData
7+
import com.tans.tlrucache.memory.LruByteArrayPool
58

69
open class DefaultConverterFactory : IConverterFactory {
710

@@ -38,7 +41,7 @@ open class DefaultConverterFactory : IConverterFactory {
3841
return dataClass === PackageData::class.java
3942
}
4043

41-
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T? {
44+
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T? {
4245
return packageData as? T
4346
}
4447
}
@@ -48,18 +51,21 @@ open class DefaultConverterFactory : IConverterFactory {
4851
override fun couldHandle(type: Int, dataClass: Class<*>): Boolean {
4952
return dataClass === String::class.java
5053
}
51-
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T {
52-
return packageData.body.toString(Charsets.UTF_8) as T
54+
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T {
55+
val body = packageData.body
56+
val ret = String(body.value.value, 0, body.readSize, Charsets.UTF_8)
57+
byteArrayPool.put(body.value)
58+
return ret as T
5359
}
5460
}
5561

5662
@Suppress("UNCHECKED_CAST")
5763
private class ByteArrayDataBodyConverter : IBodyConverter {
5864
override fun couldHandle(type: Int, dataClass: Class<*>): Boolean {
59-
return dataClass === ByteArray::class.java
65+
return dataClass === NetByteArray::class.java
6066
}
6167

62-
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T? {
68+
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T? {
6369
return packageData.body as T
6470
}
6571

@@ -71,7 +77,7 @@ open class DefaultConverterFactory : IConverterFactory {
7177
return dataClass === Unit::class.java
7278
}
7379

74-
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T {
80+
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T {
7581
return Unit as T
7682
}
7783

@@ -81,9 +87,12 @@ open class DefaultConverterFactory : IConverterFactory {
8187

8288
override fun couldHandle(type: Int, dataClass: Class<*>): Boolean = true
8389

84-
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T? {
90+
override fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T? {
91+
val body = packageData.body
92+
val str = String(body.value.value, 0, body.readSize, Charsets.UTF_8)
93+
byteArrayPool.put(body.value)
8594
return try {
86-
defaultMoshi.adapter(dataClass)?.fromJson(packageData.body.toString(Charsets.UTF_8))
95+
defaultMoshi.adapter(dataClass)?.fromJson(str)
8796
} catch (e: Throwable) {
8897
e.printStackTrace()
8998
null
@@ -100,12 +109,19 @@ open class DefaultConverterFactory : IConverterFactory {
100109
type: Int,
101110
messageId: Long,
102111
data: T,
103-
dataClass: Class<T>
112+
dataClass: Class<T>,
113+
byteArrayPool: ByteArrayPool
104114
): PackageData {
115+
val toWriteBytes = (data as String).toByteArray(Charsets.UTF_8)
116+
val byteArrayValue = byteArrayPool.get(toWriteBytes.size)
117+
System.arraycopy(toWriteBytes, 0, byteArrayValue.value, 0, toWriteBytes.size)
105118
return PackageData(
106119
type = type,
107120
messageId = messageId,
108-
body = (data as String).toByteArray(Charsets.UTF_8)
121+
body = NetByteArray(
122+
value = byteArrayValue,
123+
readSize = toWriteBytes.size
124+
)
109125
)
110126
}
111127
}
@@ -119,31 +135,36 @@ open class DefaultConverterFactory : IConverterFactory {
119135
type: Int,
120136
messageId: Long,
121137
data: T,
122-
dataClass: Class<T>
138+
dataClass: Class<T>,
139+
byteArrayPool: ByteArrayPool
123140
): PackageData {
124141
return PackageData(
125142
type = type,
126143
messageId = messageId,
127-
body = byteArrayOf()
144+
body = NetByteArray(
145+
LruByteArrayPool.Companion.ByteArrayValue(ByteArray(0)),
146+
0
147+
)
128148
)
129149
}
130150
}
131151

132152
private class ByteArrayPackageDataConverter : IPackageDataConverter {
133153
override fun couldHandle(type: Int, dataClass: Class<*>): Boolean {
134-
return dataClass === ByteArray::class.java
154+
return dataClass === NetByteArray::class.java
135155
}
136156

137157
override fun <T> convert(
138158
type: Int,
139159
messageId: Long,
140160
data: T,
141-
dataClass: Class<T>
161+
dataClass: Class<T>,
162+
byteArrayPool: ByteArrayPool
142163
): PackageData {
143164
return PackageData(
144165
type = type,
145166
messageId = messageId,
146-
body = data as ByteArray
167+
body = data as NetByteArray
147168
)
148169
}
149170

@@ -156,7 +177,7 @@ open class DefaultConverterFactory : IConverterFactory {
156177
return dataClass === PackageData::class.java
157178
}
158179

159-
override fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>): PackageData? {
180+
override fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>, byteArrayPool: ByteArrayPool): PackageData? {
160181
return data as? PackageData?
161182
}
162183
}
@@ -165,14 +186,20 @@ open class DefaultConverterFactory : IConverterFactory {
165186

166187
override fun couldHandle(type: Int, dataClass: Class<*>): Boolean = true
167188

168-
override fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>): PackageData? {
189+
override fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>, byteArrayPool: ByteArrayPool): PackageData? {
169190
return try {
170191
val json = defaultMoshi.adapter(dataClass)?.toJson(data)
171192
if (json != null) {
193+
val toWriteBytes = json.toByteArray(Charsets.UTF_8)
194+
val byteArrayValue = byteArrayPool.get(toWriteBytes.size)
195+
System.arraycopy(toWriteBytes, 0, byteArrayValue.value, 0, toWriteBytes.size)
172196
PackageData(
173197
type = type,
174198
messageId = messageId,
175-
body = json.toByteArray(Charsets.UTF_8)
199+
body = NetByteArray(
200+
byteArrayValue,
201+
toWriteBytes.size
202+
)
176203
)
177204
} else {
178205
null
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.tans.tfiletransporter.netty.extensions
22

3+
import com.tans.tfiletransporter.netty.ByteArrayPool
34
import com.tans.tfiletransporter.netty.PackageData
45

56
interface IBodyConverter {
67

78
fun couldHandle(type: Int, dataClass: Class<*>): Boolean
89

9-
fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData): T?
10+
fun <T> convert(type: Int, dataClass: Class<T>, packageData: PackageData, byteArrayPool: ByteArrayPool): T?
1011
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.tans.tfiletransporter.netty.extensions
22

3+
import com.tans.tfiletransporter.netty.ByteArrayPool
34
import com.tans.tfiletransporter.netty.PackageData
45

56
interface IPackageDataConverter {
67

78
fun couldHandle(type: Int, dataClass: Class<*>): Boolean
89

9-
fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>): PackageData?
10+
fun <T> convert(type: Int, messageId: Long, data: T, dataClass: Class<T>, byteArrayPool: ByteArrayPool): PackageData?
1011
}

net/src/main/java/com/tans/tfiletransporter/netty/extensions/IServer.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ interface IServer<Request, Response> {
3434
val convertedData = converter.convert(
3535
type = msg.type,
3636
dataClass = requestClass,
37-
packageData = msg
37+
packageData = msg,
38+
byteArrayPool = connectionTask.byteArrayPool
3839
)
3940
if (convertedData != null) {
4041
// 处理 request 的数据并获取 response
@@ -48,7 +49,8 @@ interface IServer<Request, Response> {
4849
type = replyType,
4950
messageId = msg.messageId,
5051
data = response,
51-
dataClass = responseClass
52+
dataClass = responseClass,
53+
byteArrayPool = connectionTask.byteArrayPool
5254
)
5355
if (pckData != null) {
5456
// 发送 response 数据

net/src/main/java/com/tans/tfiletransporter/netty/handlers/BytesToPackageDataDecoder.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
package com.tans.tfiletransporter.netty.handlers
22

3+
import com.tans.tfiletransporter.netty.ByteArrayPool
4+
import com.tans.tfiletransporter.netty.NetByteArray
35
import com.tans.tfiletransporter.netty.PackageData
4-
import com.tans.tfiletransporter.netty.readBytes
56
import io.netty.buffer.ByteBuf
67
import io.netty.channel.ChannelHandlerContext
78
import io.netty.handler.codec.ByteToMessageDecoder
89

9-
class BytesToPackageDataDecoder : ByteToMessageDecoder() {
10-
10+
class BytesToPackageDataDecoder(
11+
private val byteArrayPool: ByteArrayPool
12+
) : ByteToMessageDecoder() {
1113

1214
override fun decode(ctx: ChannelHandlerContext, buffer: ByteBuf, out: MutableList<Any>) {
1315
try {
1416
val type = buffer.readInt()
1517
val messageId = buffer.readLong()
16-
val body = buffer.readBytes()
17-
out.add(PackageData(type = type, messageId = messageId, body = body))
18+
val bodySize = buffer.writerIndex() - buffer.readerIndex()
19+
val byteArrayValue = byteArrayPool.get(bodySize)
20+
buffer.readBytes(byteArrayValue.value, 0, bodySize)
21+
out.add(PackageData(type = type, messageId = messageId, body = NetByteArray(byteArrayValue, bodySize)))
1822
} catch (e: Throwable) {
1923
e.printStackTrace()
2024
} finally {

0 commit comments

Comments
 (0)