Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions src/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ let m = match(r"^libcurl/(\d+\.\d+\.\d+)\b", CURL_VERSION_STR)
const global USER_AGENT = "curl/$curl julia/$julia"
end

struct NoChannel end
const NOCHANNEL = NoChannel()


Base.isopen(req::NoChannel) = false
Base.isempty(req::NoChannel) = true
Base.put!(req::NoChannel, ::IOBuffer) = false
Base.take!(req::NoChannel) = nothing
Base.close(req::NoChannel) = false
Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))


function write_callback(
data::Ptr{Cchar},
Expand Down Expand Up @@ -122,10 +133,16 @@ function header_callback(
header = unsafe_string(data, n)
header = strip(header)

if (m_grpc_status = match(regex_grpc_status, header)) !== nothing
req.grpc_status = parse(UInt64, m_grpc_status.captures[1])
elseif (m_grpc_message = match(regex_grpc_message, header)) !== nothing
req.grpc_message = m_grpc_message.captures[1]
if (m_grpc_status = match(regex_grpc_status, header)) isa RegexMatch
capture = m_grpc_status.captures[1]
if capture !== nothing
req.grpc_status = parse(UInt64, capture)
end
elseif (m_grpc_message = match(regex_grpc_message, header)) isa RegexMatch
capture = m_grpc_message.captures[1]
if capture !== nothing
req.grpc_message = capture
end
end

return n
Expand Down Expand Up @@ -155,6 +172,7 @@ function grpc_timeout_header_val(timeout::Real)
return "$(string(timeout_nanosecs))n"
end


mutable struct gRPCRequest
# CURL multi lock for exclusive access to the easy handle after its added to the multi
lock::ReentrantLock
Expand All @@ -179,8 +197,8 @@ mutable struct gRPCRequest
response::IOBuffer

# These are only used when the request or response is streaming
request_c::Union{Nothing,Channel{IOBuffer}}
response_c::Union{Nothing,Channel{IOBuffer}}
request_c::Union{Channel{IOBuffer}, NoChannel}
response_c::Union{Channel{IOBuffer}, NoChannel}

# The task making the request can block on this until the request is complete
ready::Event
Expand Down Expand Up @@ -210,11 +228,11 @@ mutable struct gRPCRequest

function gRPCRequest(
grpc,
url,
url::String,
request::IOBuffer,
response::IOBuffer,
request_c::Union{Nothing,Channel{IOBuffer}},
response_c::Union{Nothing,Channel{IOBuffer}};
request_c::Union{Channel{IOBuffer}, NoChannel},
response_c::Union{Channel{IOBuffer}, NoChannel};
deadline = 10,
keepalive = 60,
max_send_message_length = 4 * 1024 * 1024,
Expand Down Expand Up @@ -353,8 +371,10 @@ function handle_exception(req::gRPCRequest, ex; notify_ready = false)
end
end

isstreaming_request(req::gRPCRequest) = !isnothing(req.request_c)
isstreaming_response(req::gRPCRequest) = !isnothing(req.response_c)

isstreaming_request(req::gRPCRequest) = !isa(req.request_c, NoChannel)
isstreaming_response(req::gRPCRequest) = !isa(req.response_c, NoChannel)


Base.wait(req::gRPCRequest) = wait(req.ready)

Expand Down Expand Up @@ -683,7 +703,7 @@ mutable struct gRPCCURL
# Allows for controlling the maximum number of concurrent gRPC requests/streams
events::Channel{Event}

function gRPCCURL(max_streams = GRPC_MAX_STREAMS)
function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS)
grpc = new(
Ptr{Cvoid}(0),
ReentrantLock(),
Expand Down
7 changes: 5 additions & 2 deletions src/Streaming.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ function grpc_async_stream_response(
try
while isnothing(req.ex)
response_buf = take!(req.response_c)
if response_buf === nothing
continue
end
response = decode(ProtoDecoder(response_buf), TResponse)
put!(channel, response)
end
Expand Down Expand Up @@ -168,7 +171,7 @@ function grpc_async_request(
IOBuffer(),
IOBuffer(),
Channel{IOBuffer}(16),
nothing;
NOCHANNEL;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
Expand Down Expand Up @@ -255,7 +258,7 @@ function grpc_async_request(
url(client),
request_buf,
IOBuffer(),
nothing,
NOCHANNEL,
Channel{IOBuffer}(16);
deadline = client.deadline,
keepalive = client.keepalive,
Expand Down
8 changes: 4 additions & 4 deletions src/Unary.jl
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ function grpc_async_request(
url(client),
request_buf,
IOBuffer(),
nothing,
nothing;
NOCHANNEL,
NOCHANNEL;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
Expand Down Expand Up @@ -168,8 +168,8 @@ function grpc_async_request(
url(client),
request_buf,
IOBuffer(),
nothing,
nothing;
NOCHANNEL,
NOCHANNEL;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
Expand Down