Skip to content

Commit 55bd686

Browse files
authored
Add NoChannel type, reduce static instabilites (#80)
1 parent ad578fd commit 55bd686

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

src/Curl.jl

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ let m = match(r"^libcurl/(\d+\.\d+\.\d+)\b", CURL_VERSION_STR)
77
const global USER_AGENT = "curl/$curl julia/$julia"
88
end
99

10+
struct NoChannel end
11+
const NOCHANNEL = NoChannel()
12+
13+
14+
Base.isopen(req::NoChannel) = false
15+
Base.isempty(req::NoChannel) = true
16+
Base.put!(req::NoChannel, ::IOBuffer) = false
17+
Base.take!(req::NoChannel) = nothing
18+
Base.close(req::NoChannel) = false
19+
Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))
20+
1021

1122
function write_callback(
1223
data::Ptr{Cchar},
@@ -122,10 +133,16 @@ function header_callback(
122133
header = unsafe_string(data, n)
123134
header = strip(header)
124135

125-
if (m_grpc_status = match(regex_grpc_status, header)) !== nothing
126-
req.grpc_status = parse(UInt64, m_grpc_status.captures[1])
127-
elseif (m_grpc_message = match(regex_grpc_message, header)) !== nothing
128-
req.grpc_message = m_grpc_message.captures[1]
136+
if (m_grpc_status = match(regex_grpc_status, header)) isa RegexMatch
137+
capture = m_grpc_status.captures[1]
138+
if capture !== nothing
139+
req.grpc_status = parse(UInt64, capture)
140+
end
141+
elseif (m_grpc_message = match(regex_grpc_message, header)) isa RegexMatch
142+
capture = m_grpc_message.captures[1]
143+
if capture !== nothing
144+
req.grpc_message = capture
145+
end
129146
end
130147

131148
return n
@@ -155,6 +172,7 @@ function grpc_timeout_header_val(timeout::Real)
155172
return "$(string(timeout_nanosecs))n"
156173
end
157174

175+
158176
mutable struct gRPCRequest
159177
# CURL multi lock for exclusive access to the easy handle after its added to the multi
160178
lock::ReentrantLock
@@ -179,8 +197,8 @@ mutable struct gRPCRequest
179197
response::IOBuffer
180198

181199
# These are only used when the request or response is streaming
182-
request_c::Union{Nothing,Channel{IOBuffer}}
183-
response_c::Union{Nothing,Channel{IOBuffer}}
200+
request_c::Union{Channel{IOBuffer}, NoChannel}
201+
response_c::Union{Channel{IOBuffer}, NoChannel}
184202

185203
# The task making the request can block on this until the request is complete
186204
ready::Event
@@ -210,11 +228,11 @@ mutable struct gRPCRequest
210228

211229
function gRPCRequest(
212230
grpc,
213-
url,
231+
url::String,
214232
request::IOBuffer,
215233
response::IOBuffer,
216-
request_c::Union{Nothing,Channel{IOBuffer}},
217-
response_c::Union{Nothing,Channel{IOBuffer}};
234+
request_c::Union{Channel{IOBuffer}, NoChannel},
235+
response_c::Union{Channel{IOBuffer}, NoChannel};
218236
deadline = 10,
219237
keepalive = 60,
220238
max_send_message_length = 4 * 1024 * 1024,
@@ -353,8 +371,10 @@ function handle_exception(req::gRPCRequest, ex; notify_ready = false)
353371
end
354372
end
355373

356-
isstreaming_request(req::gRPCRequest) = !isnothing(req.request_c)
357-
isstreaming_response(req::gRPCRequest) = !isnothing(req.response_c)
374+
375+
isstreaming_request(req::gRPCRequest) = !isa(req.request_c, NoChannel)
376+
isstreaming_response(req::gRPCRequest) = !isa(req.response_c, NoChannel)
377+
358378

359379
Base.wait(req::gRPCRequest) = wait(req.ready)
360380

@@ -683,7 +703,7 @@ mutable struct gRPCCURL
683703
# Allows for controlling the maximum number of concurrent gRPC requests/streams
684704
events::Channel{Event}
685705

686-
function gRPCCURL(max_streams = GRPC_MAX_STREAMS)
706+
function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS)
687707
grpc = new(
688708
Ptr{Cvoid}(0),
689709
ReentrantLock(),

src/Streaming.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ function grpc_async_stream_response(
8888
try
8989
while isnothing(req.ex)
9090
response_buf = take!(req.response_c)
91+
if response_buf === nothing
92+
continue
93+
end
9194
response = decode(ProtoDecoder(response_buf), TResponse)
9295
put!(channel, response)
9396
end
@@ -168,7 +171,7 @@ function grpc_async_request(
168171
IOBuffer(),
169172
IOBuffer(),
170173
Channel{IOBuffer}(16),
171-
nothing;
174+
NOCHANNEL;
172175
deadline = client.deadline,
173176
keepalive = client.keepalive,
174177
max_send_message_length = client.max_send_message_length,
@@ -255,7 +258,7 @@ function grpc_async_request(
255258
url(client),
256259
request_buf,
257260
IOBuffer(),
258-
nothing,
261+
NOCHANNEL,
259262
Channel{IOBuffer}(16);
260263
deadline = client.deadline,
261264
keepalive = client.keepalive,

src/Unary.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ function grpc_async_request(
7474
url(client),
7575
request_buf,
7676
IOBuffer(),
77-
nothing,
78-
nothing;
77+
NOCHANNEL,
78+
NOCHANNEL;
7979
deadline = client.deadline,
8080
keepalive = client.keepalive,
8181
max_send_message_length = client.max_send_message_length,
@@ -168,8 +168,8 @@ function grpc_async_request(
168168
url(client),
169169
request_buf,
170170
IOBuffer(),
171-
nothing,
172-
nothing;
171+
NOCHANNEL,
172+
NOCHANNEL;
173173
deadline = client.deadline,
174174
keepalive = client.keepalive,
175175
max_send_message_length = client.max_send_message_length,

0 commit comments

Comments
 (0)