diff --git a/src/Curl.jl b/src/Curl.jl index 470a6b9..7d4a725 100644 --- a/src/Curl.jl +++ b/src/Curl.jl @@ -7,6 +7,17 @@ let m = match(r"^libcurl/(\d+\.\d+\.\d+)\b", CURL_VERSION_STR) const global USER_AGENT = "curl/$curl julia/$julia" end +struct NoChannel end +const NOCHANNEL = NoChannel() + + +Base.isopen(req::NoChannel) = false +Base.isempty(req::NoChannel) = true +Base.put!(req::NoChannel, ::IOBuffer) = false +Base.take!(req::NoChannel) = nothing +Base.close(req::NoChannel) = false +Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0))) + function write_callback( data::Ptr{Cchar}, @@ -122,10 +133,16 @@ function header_callback( header = unsafe_string(data, n) header = strip(header) - if (m_grpc_status = match(regex_grpc_status, header)) !== nothing - req.grpc_status = parse(UInt64, m_grpc_status.captures[1]) - elseif (m_grpc_message = match(regex_grpc_message, header)) !== nothing - req.grpc_message = m_grpc_message.captures[1] + if (m_grpc_status = match(regex_grpc_status, header)) isa RegexMatch + capture = m_grpc_status.captures[1] + if capture !== nothing + req.grpc_status = parse(UInt64, capture) + end + elseif (m_grpc_message = match(regex_grpc_message, header)) isa RegexMatch + capture = m_grpc_message.captures[1] + if capture !== nothing + req.grpc_message = capture + end end return n @@ -155,6 +172,7 @@ function grpc_timeout_header_val(timeout::Real) return "$(string(timeout_nanosecs))n" end + mutable struct gRPCRequest # CURL multi lock for exclusive access to the easy handle after its added to the multi lock::ReentrantLock @@ -179,8 +197,8 @@ mutable struct gRPCRequest response::IOBuffer # These are only used when the request or response is streaming - request_c::Union{Nothing,Channel{IOBuffer}} - response_c::Union{Nothing,Channel{IOBuffer}} + request_c::Union{Channel{IOBuffer}, NoChannel} + response_c::Union{Channel{IOBuffer}, NoChannel} # The task making the request can block on this until the request is complete ready::Event @@ -210,11 +228,11 @@ mutable struct gRPCRequest function gRPCRequest( grpc, - url, + url::String, request::IOBuffer, response::IOBuffer, - request_c::Union{Nothing,Channel{IOBuffer}}, - response_c::Union{Nothing,Channel{IOBuffer}}; + request_c::Union{Channel{IOBuffer}, NoChannel}, + response_c::Union{Channel{IOBuffer}, NoChannel}; deadline = 10, keepalive = 60, max_send_message_length = 4 * 1024 * 1024, @@ -353,8 +371,10 @@ function handle_exception(req::gRPCRequest, ex; notify_ready = false) end end -isstreaming_request(req::gRPCRequest) = !isnothing(req.request_c) -isstreaming_response(req::gRPCRequest) = !isnothing(req.response_c) + +isstreaming_request(req::gRPCRequest) = !isa(req.request_c, NoChannel) +isstreaming_response(req::gRPCRequest) = !isa(req.response_c, NoChannel) + Base.wait(req::gRPCRequest) = wait(req.ready) @@ -683,7 +703,7 @@ mutable struct gRPCCURL # Allows for controlling the maximum number of concurrent gRPC requests/streams events::Channel{Event} - function gRPCCURL(max_streams = GRPC_MAX_STREAMS) + function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS) grpc = new( Ptr{Cvoid}(0), ReentrantLock(), diff --git a/src/Streaming.jl b/src/Streaming.jl index 89a4828..ec50c84 100644 --- a/src/Streaming.jl +++ b/src/Streaming.jl @@ -88,6 +88,9 @@ function grpc_async_stream_response( try while isnothing(req.ex) response_buf = take!(req.response_c) + if response_buf === nothing + continue + end response = decode(ProtoDecoder(response_buf), TResponse) put!(channel, response) end @@ -168,7 +171,7 @@ function grpc_async_request( IOBuffer(), IOBuffer(), Channel{IOBuffer}(16), - nothing; + NOCHANNEL; deadline = client.deadline, keepalive = client.keepalive, max_send_message_length = client.max_send_message_length, @@ -255,7 +258,7 @@ function grpc_async_request( url(client), request_buf, IOBuffer(), - nothing, + NOCHANNEL, Channel{IOBuffer}(16); deadline = client.deadline, keepalive = client.keepalive, diff --git a/src/Unary.jl b/src/Unary.jl index 4031fdb..936ad9c 100644 --- a/src/Unary.jl +++ b/src/Unary.jl @@ -74,8 +74,8 @@ function grpc_async_request( url(client), request_buf, IOBuffer(), - nothing, - nothing; + NOCHANNEL, + NOCHANNEL; deadline = client.deadline, keepalive = client.keepalive, max_send_message_length = client.max_send_message_length, @@ -168,8 +168,8 @@ function grpc_async_request( url(client), request_buf, IOBuffer(), - nothing, - nothing; + NOCHANNEL, + NOCHANNEL; deadline = client.deadline, keepalive = client.keepalive, max_send_message_length = client.max_send_message_length,