Skip to content

Commit 62011cf

Browse files
author
anonymous
committed
[opt] Add some comments.
1 parent f2b61e2 commit 62011cf

File tree

5 files changed

+63
-10
lines changed

5 files changed

+63
-10
lines changed

net/src/main/java/com/tans/tfiletransporter/netty/extensions/DefaultClientManager.kt

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class DefaultClientManager(
2020
val log: ILog
2121
) : IClientManager, NettyConnectionObserver {
2222

23-
private val waitingRspTasks: LinkedBlockingDeque<Task<*, *>> by lazy {
24-
LinkedBlockingDeque()
23+
private val waitingRspTasks: ConcurrentHashMap<Task<*, *>, Unit> by lazy {
24+
ConcurrentHashMap()
2525
}
2626

2727
private val ioExecutor: Executor by lazy {
@@ -38,13 +38,15 @@ class DefaultClientManager(
3838

3939
override fun onNewState(nettyState: NettyTaskState, task: INettyConnectionTask) {}
4040

41+
// 收到来自 server 的回复消息
4142
override fun onNewMessage(
4243
localAddress: InetSocketAddress?,
4344
remoteAddress: InetSocketAddress?,
4445
msg: PackageData,
4546
task: INettyConnectionTask
4647
) {
47-
for (t in waitingRspTasks) {
48+
// 通知正在等待 server 回复消息的 Task
49+
for (t in waitingRspTasks.keys()) {
4850
t.onNewDownloadData(
4951
localAddress = localAddress,
5052
remoteAddress = remoteAddress,
@@ -53,6 +55,7 @@ class DefaultClientManager(
5355
}
5456
}
5557

58+
// 请求 server
5659
override fun <Request, Response> request(
5760
type: Int,
5861
request: Request,
@@ -62,6 +65,7 @@ class DefaultClientManager(
6265
retryTimeout: Long,
6366
callback: IClientManager.RequestCallback<Response>
6467
) {
68+
// 将请求消息封装到 Task,然后在后台线程执行任务.
6569
enqueueTask(
6670
Task(
6771
type = type,
@@ -119,7 +123,7 @@ class DefaultClientManager(
119123
}
120124

121125
private fun <Req, Resp> addWaitingTask(t: Task<Req, Resp>) {
122-
waitingRspTasks.add(t)
126+
waitingRspTasks[t] = Unit
123127
}
124128

125129
private fun <Req, Resp> removeWaitingTask(t: Task<Req, Resp>) {
@@ -144,15 +148,19 @@ class DefaultClientManager(
144148

145149
private val timeoutTask: AtomicReference<ScheduledFuture<*>?> = AtomicReference(null)
146150

151+
// 收到来自 Server 的回复消息
147152
fun onNewDownloadData(
148153
localAddress: InetSocketAddress?,
149154
remoteAddress: InetSocketAddress?,
150155
downloadData: PackageData
151156
) {
152-
if (downloadData.messageId == this.messageId) {
157+
if (downloadData.messageId == this.messageId) { // 是当前的任务的回复消息
158+
// 移除超时信息
153159
timeoutTask.get()?.cancel(true)
154160
timeoutTask.set(null)
161+
155162
// log.d(TAG, "Received response: msgId -> ${downloadData.messageId}, type -> $type")
163+
// 找到回复的消息的转换器
156164
val converter = converterFactory.findBodyConverter(downloadData.type, responseClass)
157165
if (converter != null) {
158166
val response = converter.convert(
@@ -161,8 +169,10 @@ class DefaultClientManager(
161169
downloadData
162170
)
163171
if (response != null) {
172+
// 将当前任务从正在等待的任务队列中移除
164173
removeWaitingTask(this)
165174
if (taskIsDone.compareAndSet(false, true)) {
175+
// 回调成功.
166176
callback.onSuccess(
167177
type = type,
168178
messageId = messageId,
@@ -184,9 +194,12 @@ class DefaultClientManager(
184194
}
185195
}
186196

197+
// 发送信息到 Server
187198
override fun run() {
199+
// 将当前任务添加到等待回复的队列
188200
addWaitingTask(this)
189201
// log.d(TAG, "Sending request: msgId -> $messageId, cmdType -> $type")
202+
// 获取 request 的 converter
190203
val converter = converterFactory.findPackageDataConverter(
191204
type = type,
192205
dataClass = requestClass
@@ -203,6 +216,7 @@ class DefaultClientManager(
203216
dataClass = requestClass
204217
)
205218
if (pckData != null) {
219+
// 等待 Server 回复超时 Task
206220
val timeoutTask = taskScheduleExecutor.schedule(
207221
{
208222
handleError("Waiting Response timeout: $retryTimeout ms.")
@@ -211,6 +225,8 @@ class DefaultClientManager(
211225
TimeUnit.MILLISECONDS
212226
)
213227
this.timeoutTask.set(timeoutTask)
228+
229+
// 发送数据到 Server
214230
if (udpTargetAddress != null) {
215231
connectionTask.sendData(
216232
data = PackageDataWithAddress(
@@ -246,11 +262,14 @@ class DefaultClientManager(
246262
}
247263
}
248264

265+
// 发送失败,处理异常
249266
private fun handleError(e: String) {
267+
// 从等待队列中移除当前任务
250268
removeWaitingTask(this)
251269
log.e(TAG, "Send request error: msgId -> $messageId, cmdType -> $type, error -> $e")
252-
if (retryTimes > 0) {
253-
if (taskIsDone.compareAndSet(false, true)) {
270+
if (taskIsDone.compareAndSet(false, true)) {
271+
// 判断是否需要重试,如果需要重试,构建一个新的任务继续请求,反之直接回调异常.
272+
if (retryTimes > 0) {
254273
log.e(TAG, "Retry send request")
255274
enqueueTask(
256275
Task(
@@ -267,9 +286,7 @@ class DefaultClientManager(
267286
udpSenderAddress = udpSenderAddress
268287
)
269288
)
270-
}
271-
} else {
272-
if (taskIsDone.compareAndSet(false, true)) {
289+
} else {
273290
callback.onFail(e)
274291
}
275292
}

net/src/main/java/com/tans/tfiletransporter/netty/extensions/DefaultServerManager.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,20 @@ class DefaultServerManager(
2929

3030
override fun onNewState(nettyState: NettyTaskState, task: INettyConnectionTask) {}
3131

32+
// 收到 client 发送的请求数据
3233
override fun onNewMessage(
3334
localAddress: InetSocketAddress?,
3435
remoteAddress: InetSocketAddress?,
3536
msg: PackageData,
3637
task: INettyConnectionTask
3738
) {
39+
// 找一个 server 来处理这种类型的数据
3840
val server = servers.find { it.couldHandle(msg.type) }
3941
if (server != null) {
42+
// 是否已经处理过这个 message
4043
val isNew = !handledMessageId.containsKey(msg.messageId)
4144
handledMessageId[msg.messageId] = Unit
45+
// 交给 server 处理
4246
server.dispatchRequest(
4347
localAddress = localAddress,
4448
remoteAddress = remoteAddress,

net/src/main/java/com/tans/tfiletransporter/netty/extensions/IServer.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,31 @@ interface IServer<Request, Response> {
2727
connectionTask: INettyConnectionTask,
2828
isNewRequest: Boolean
2929
) {
30+
// 找到 request 的 body 转换器
3031
val converter = converterFactory.findBodyConverter(msg.type, requestClass)
3132
if (converter != null) {
33+
// 转换 request 的数据
3234
val convertedData = converter.convert(
3335
type = msg.type,
3436
dataClass = requestClass,
3537
packageData = msg
3638
)
3739
if (convertedData != null) {
40+
// 处理 request 的数据并获取 response
3841
val response = onRequest(localAddress, remoteAddress, convertedData, isNewRequest)
3942
if (response != null) {
43+
// 找到 response 的 pkt 转换器
4044
val responseConverter = converterFactory.findPackageDataConverter(replyType, responseClass)
4145
if (responseConverter != null) {
46+
// 转换 response 到 pkt
4247
val pckData = responseConverter.convert(
4348
type = replyType,
4449
messageId = msg.messageId,
4550
data = response,
4651
dataClass = responseClass
4752
)
4853
if (pckData != null) {
54+
// 发送 response 数据
4955
if (connectionTask is NettyUdpConnectionTask) {
5056
if (remoteAddress != null) {
5157
connectionTask.sendData(

net/src/main/java/com/tans/tfiletransporter/transferproto/broadcastconn/BroadcastReceiver.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class BroadcastReceiver(
5959
LinkedBlockingDeque()
6060
}
6161

62+
// 链接断开监控
6263
private val closeObserver: NettyConnectionObserver by lazy {
6364
object : NettyConnectionObserver {
6465
override fun onNewState(nettyState: NettyTaskState, task: INettyConnectionTask) {
@@ -86,6 +87,7 @@ class BroadcastReceiver(
8687
responseType = BroadcastDataType.BroadcastMsg.type,
8788
log = log,
8889
onRequest = { _, rr, r, isNewRequest ->
90+
// 扫描到一个 Server
8991
if (rr != null && isNewRequest) {
9092
dispatchBroadcast(rr, r)
9193
}
@@ -111,6 +113,7 @@ class BroadcastReceiver(
111113
}
112114
newState(BroadcastReceiverState.Requesting)
113115
// Receive server broadcast information task.
116+
// 扫描广播消息任务
114117
val receiverTask = NettyUdpConnectionTask(
115118
connectionType = NettyUdpConnectionTask.Companion.ConnectionType.Bind(
116119
address = broadcastAddress,
@@ -121,6 +124,7 @@ class BroadcastReceiver(
121124
this.receiverTask.get()?.stopTask()
122125
this.receiverTask.set(receiverTask)
123126
// Request server to transfer file task.
127+
// 请求 Server 链接任务
124128
val transferRequestTask = NettyUdpConnectionTask(
125129
connectionType = NettyUdpConnectionTask.Companion.ConnectionType.Bind(
126130
address = localAddress,
@@ -133,16 +137,19 @@ class BroadcastReceiver(
133137
val hasInvokeCallback = AtomicBoolean(false)
134138

135139
receiverTask.addObserver(object : NettyConnectionObserver {
140+
136141
override fun onNewMessage(
137142
localAddress: InetSocketAddress?,
138143
remoteAddress: InetSocketAddress?,
139144
msg: PackageData,
140145
task: INettyConnectionTask
141146
) {}
147+
142148
override fun onNewState(receiverTaskState: NettyTaskState, task: INettyConnectionTask) {
143149
if (receiverTaskState is NettyTaskState.Error
144150
|| receiverTaskState is NettyTaskState.ConnectionClosed
145151
|| getCurrentState() !is BroadcastReceiverState.Requesting) {
152+
// 扫描广播消息任务启动失败
146153
// Receive broadcast task fail.
147154
log.e(TAG, "Bind receiver task error: $receiverTaskState, ${getCurrentState()}")
148155
if (hasInvokeCallback.compareAndSet(false, true)) {
@@ -152,6 +159,7 @@ class BroadcastReceiver(
152159
receiverTask.removeObserver(this)
153160
onNewState(BroadcastReceiverState.NoConnection)
154161
} else {
162+
// 扫描广播消息任务启动成功
155163
if (receiverTaskState is NettyTaskState.ConnectionActive) {
156164
// Receive broadcast task start success.
157165
log.d(TAG, "Bind receiver task success")
@@ -173,6 +181,7 @@ class BroadcastReceiver(
173181
|| receiverTask.getCurrentState() !is NettyTaskState.ConnectionActive
174182
|| getCurrentState() !is BroadcastReceiverState.Requesting) {
175183
// Request server task fail.
184+
// 请求链接任务启动失败
176185
log.e(TAG, "Bind transfer req task error: $transferTaskState, ${receiverTask.getCurrentState()}, ${getCurrentState()}")
177186
if (hasInvokeCallback.compareAndSet(false, true)) {
178187
simpleCallback.onError(transferTaskState.toString())
@@ -182,6 +191,7 @@ class BroadcastReceiver(
182191
receiverTask.stopTask()
183192
onNewState(BroadcastReceiverState.NoConnection)
184193
} else {
194+
// 请求链接任务启动成功
185195
if (transferTaskState is NettyTaskState.ConnectionActive) {
186196
// Request server task success.
187197
log.d(TAG, "Bind transfer req task success")
@@ -201,6 +211,7 @@ class BroadcastReceiver(
201211
/**
202212
* Step2: Start request server task.
203213
*/
214+
// 启动请求链接任务
204215
transferRequestTask.startTask()
205216
}
206217
}
@@ -210,6 +221,7 @@ class BroadcastReceiver(
210221
/**
211222
* Step1: Start receive server broadcast task.
212223
*/
224+
// 启动扫描广播消息任务
213225
receiverTask.startTask()
214226
}
215227

net/src/main/java/com/tans/tfiletransporter/transferproto/broadcastconn/BroadcastSender.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class BroadcastSender(
5858
responseType = BroadcastDataType.TransferFileResp.type,
5959
log = log,
6060
onRequest = { _, rr, r, isNewRequest ->
61+
// 收到 client 的请求链接的信息
6162
if (rr == null || r.version != TransferProtoConstant.VERSION) {
6263
null
6364
} else {
@@ -82,6 +83,7 @@ class BroadcastSender(
8283
AtomicReference(null)
8384
}
8485

86+
// 监控链接断开
8587
private val closeObserver: NettyConnectionObserver by lazy {
8688
object : NettyConnectionObserver {
8789
override fun onNewState(nettyState: NettyTaskState, task: INettyConnectionTask) {
@@ -102,6 +104,7 @@ class BroadcastSender(
102104
/**
103105
* Broadcast send task.
104106
*/
107+
// 发送广播消息任务
105108
private val senderBroadcastTask: Runnable by lazy {
106109
Runnable {
107110
val state = getCurrentState()
@@ -149,6 +152,7 @@ class BroadcastSender(
149152
newState(BroadcastSenderState.Requesting)
150153
val hasInvokeCallback = AtomicBoolean(false)
151154
// Broadcast send task.
155+
// 发送 Udp 广播信息的任务
152156
val senderTask = NettyUdpConnectionTask(
153157
connectionType = ConnectionType.Connect(
154158
address = broadcastAddress,
@@ -160,6 +164,7 @@ class BroadcastSender(
160164
this.broadcastSenderTask.set(senderTask)
161165

162166
// Receive client transfer file request task.
167+
// 接受 Client 请求的 Server
163168
val requestReceiverTask = NettyUdpConnectionTask(
164169
connectionType = ConnectionType.Bind(
165170
address = localAddress,
@@ -182,6 +187,7 @@ class BroadcastSender(
182187
|| senderState is NettyTaskState.Error
183188
|| getCurrentState() !is BroadcastSenderState.Requesting
184189
) {
190+
// Udp 广播发送任务启动失败
185191
// Broadcast sender task fail.
186192
log.e(TAG, "Sender task error: $senderState, ${getCurrentState()}")
187193
if (hasInvokeCallback.compareAndSet(false, true)) {
@@ -191,6 +197,7 @@ class BroadcastSender(
191197
senderTask.removeObserver(this)
192198
senderTask.stopTask()
193199
} else {
200+
// Udp 广播发送任务启动成功
194201
// Broadcast sender task success.
195202
if (senderState is NettyTaskState.ConnectionActive) {
196203
log.d(TAG, "Sender task connect success")
@@ -212,6 +219,7 @@ class BroadcastSender(
212219
|| senderTask.getCurrentState() !is NettyTaskState.ConnectionActive
213220
|| getCurrentState() !is BroadcastSenderState.Requesting
214221
) {
222+
// 接受 Client 请求的任务启动失败
215223
// Receive client request task fail.
216224
log.d(TAG, "Request task bind fail: $receiverState, ${senderTask.getCurrentState()}, ${getCurrentState()}")
217225
if (hasInvokeCallback.compareAndSet(false, true)) {
@@ -222,13 +230,15 @@ class BroadcastSender(
222230
requestReceiverTask.stopTask()
223231
senderTask.stopTask()
224232
} else {
233+
// 接受 Client 请求的任务启动成功
225234
// Receive client request task success.
226235
if (receiverState is NettyTaskState.ConnectionActive) {
227236
log.d(TAG, "Request task bind success")
228237
if (hasInvokeCallback.compareAndSet(false, true)) {
229238
simpleCallback.onSuccess(Unit)
230239
}
231240
// Send one broadcast each second (default)
241+
// 定时循环发送广播消息
232242
val senderFuture = taskScheduleExecutor.scheduleWithFixedDelay(
233243
senderBroadcastTask,
234244
500,
@@ -240,6 +250,7 @@ class BroadcastSender(
240250
BroadcastSenderState.Active(
241251
broadcastAddress = broadcastAddress)
242252
)
253+
// 监控链接断开
243254
senderTask.addObserver(closeObserver)
244255
requestReceiverTask.addObserver(closeObserver)
245256
}
@@ -249,15 +260,18 @@ class BroadcastSender(
249260
/**
250261
* Step2: Start Receive client transfer file request task.
251262
*/
263+
// 启动接受 Client 请求链接的任务
252264
requestReceiverTask.startTask()
253265
senderTask.removeObserver(this)
254266
}
255267
}
256268
}
257269
})
270+
258271
/**
259272
* Step1: Start broadcast sender task.
260273
*/
274+
// 启动 Udp 广播信息任务
261275
senderTask.startTask()
262276
}
263277

0 commit comments

Comments
 (0)