Skip to content

Commit f547f83

Browse files
committed
add experimental curlshare locks
Added an as of now experimental option to enable locking while using curl context concurrently. It can be switched on by passing `true` for the `enable_shared_locks` flag to gRPCController.
1 parent 08cf23a commit f547f83

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ gRPCController(;
143143
[ max_message_length = DEFAULT_MAX_MESSAGE_LENGTH, ]
144144
[ max_recv_message_length = 0, ]
145145
[ max_send_message_length = 0, ]
146+
[ enable_shared_locks = false, ]
146147
[ verbose::Bool = false, ]
147148
)
148149
```
@@ -163,6 +164,8 @@ gRPCController(;
163164
`max_message_length`, same as setting this to 0)
164165
- `max_send_message_length`: maximum message length to send (default is
165166
`max_message_length`, same as setting this to 0)
167+
- `enable_shared_locks`: whether to enable locks for using gRPCClient across
168+
tasks/threads concurrently (experimental, default is false)
166169
- `verbose`: whether to print out verbose communication logs (default false)
167170

168171
### `gRPCChannel`

src/curl.jl

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,57 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType
4242
end
4343
=#
4444

45+
function share_lock(easy_p::Ptr{Cvoid}, data::curl_lock_data, access::curl_lock_access, userptr::Ptr{Cvoid})
46+
share = unsafe_pointer_to_objref(Ptr{CurlShare}(userptr))::CurlShare
47+
lock(share.locks[data])
48+
nothing
49+
end
50+
51+
function share_unlock(easy_p::Ptr{Cvoid}, data::curl_lock_data, userptr::Ptr{Cvoid})
52+
share = unsafe_pointer_to_objref(Ptr{CurlShare}(userptr))::CurlShare
53+
unlock(share.locks[data])
54+
nothing
55+
end
56+
57+
mutable struct CurlShare
58+
shptr::Ptr{CURLSH}
59+
locks::Vector{ReentrantLock}
60+
closed::Bool
61+
62+
function CurlShare()
63+
shptr = curl_share_init()
64+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_SHARE)
65+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_COOKIE)
66+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS)
67+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_PSL)
68+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT)
69+
70+
share_lock_cb = @cfunction(share_lock, Cvoid, (Ptr{Cvoid}, Cuint, Cuint, Ptr{Cvoid}))
71+
share_unlock_cb = @cfunction(share_unlock, Cvoid, (Ptr{Cvoid}, Cuint, Ptr{Cvoid}))
72+
73+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_LOCKFUNC::CURLSHoption; share_lock_cb::Ptr{Cvoid})::CURLSHcode
74+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_UNLOCKFUNC::CURLSHoption; share_unlock_cb::Ptr{Cvoid})::CURLSHcode
75+
76+
locks = Vector(undef, CURL_LOCK_DATA_LAST)
77+
for idx in 1:CURL_LOCK_DATA_LAST
78+
locks[idx] = ReentrantLock()
79+
end
80+
81+
obj = new(shptr, locks, false)
82+
userptr = pointer_from_objref(obj)
83+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_USERDATA::CURLSHoption; userptr::Ptr{Cvoid})::CURLSHcode
84+
obj
85+
end
86+
end
87+
88+
function close(share::CurlShare)
89+
if share.closed
90+
curl_share_cleanup(share.shptr)
91+
share.closed = true
92+
end
93+
nothing
94+
end
95+
4596
function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType
4697
while true
4798
yield()
@@ -95,7 +146,7 @@ function grpc_request_header(request_timeout::Real)
95146
end
96147
end
97148

98-
function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
149+
function easy_handle(curlshare::Union{Nothing,Ptr{CURLSH}}, maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
99150
easy = Curl.Easy()
100151
http_version = (negotiation === :http2) ? CURL_HTTP_VERSION_2_0 :
101152
(negotiation === :http2_tls) ? CURL_HTTP_VERSION_2TLS :
@@ -105,6 +156,9 @@ function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revoc
105156
Curl.setopt(easy, CURLOPT_PIPEWAIT, Clong(1))
106157
Curl.setopt(easy, CURLOPT_POST, Clong(1))
107158
Curl.setopt(easy, CURLOPT_HTTPHEADER, grpc_request_header(request_timeout))
159+
if curlshare !== nothing
160+
Curl.setopt(easy, CURLOPT_SHARE, curlshare)
161+
end
108162
if !revocation
109163
Curl.setopt(easy, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NO_REVOKE)
110164
end
@@ -200,7 +254,7 @@ function get_grpc_status(easy::Curl.Easy)
200254
return grpc_status, grpc_message
201255
end
202256

203-
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
257+
function grpc_request(curlshare::Union{Nothing,Ptr{CURLSH}}, downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
204258
maxage::Clong = typemax(Clong),
205259
keepalive::Clong = 60,
206260
negotiation::Symbol = :http2_prior_knowledge,
@@ -210,7 +264,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
210264
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
211265
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
212266
verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
213-
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do easy
267+
Curl.with_handle(easy_handle(curlshare, maxage, keepalive, negotiation, revocation, request_timeout)) do easy
214268
# setup the request
215269
Curl.set_url(easy, url)
216270
Curl.set_timeout(easy, request_timeout)

src/gRPCClient.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ using ProtoBuf
66

77
import Downloads: Curl
88
import ProtoBuf: call_method
9+
import Base: close
910

1011
export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck, StatusCode
1112

src/grpc.jl

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ Contains settings to control the behavior of gRPC requests.
9797
`max_message_length`, same as setting this to 0)
9898
- `max_send_message_length`: maximum message length to send (default is
9999
`max_message_length`, same as setting this to 0)
100+
- `enable_shared_locks`: whether to enable locks for using gRPCClient across
101+
tasks/threads concurrently (experimental, default is false)
100102
- `verbose`: whether to print out verbose communication logs (default false)
101103
"""
102104
struct gRPCController <: ProtoRpcController
@@ -108,6 +110,7 @@ struct gRPCController <: ProtoRpcController
108110
connect_timeout::Real
109111
max_recv_message_length::Int
110112
max_send_message_length::Int
113+
enable_shared_locks::Bool
111114
verbose::Bool
112115

113116
function gRPCController(;
@@ -120,6 +123,7 @@ struct gRPCController <: ProtoRpcController
120123
max_message_length::Integer = DEFAULT_MAX_MESSAGE_LENGTH,
121124
max_recv_message_length::Integer = 0,
122125
max_send_message_length::Integer = 0,
126+
enable_shared_locks::Bool = false,
123127
verbose::Bool = false
124128
)
125129
if maxage < 0 || keepalive < 0 || request_timeout < 0 || connect_timeout < 0 ||
@@ -128,7 +132,17 @@ struct gRPCController <: ProtoRpcController
128132
end
129133
(max_recv_message_length == 0) && (max_recv_message_length = max_message_length)
130134
(max_send_message_length == 0) && (max_send_message_length = max_message_length)
131-
new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, max_recv_message_length, max_send_message_length, verbose)
135+
new(maxage,
136+
keepalive,
137+
negotiation,
138+
revocation,
139+
request_timeout,
140+
connect_timeout,
141+
max_recv_message_length,
142+
max_send_message_length,
143+
enable_shared_locks,
144+
verbose,
145+
)
132146
end
133147
end
134148

@@ -146,16 +160,22 @@ the server.
146160
struct gRPCChannel <: ProtoRpcChannel
147161
downloader::Downloader
148162
baseurl::String
163+
curlshare::CurlShare
149164

150165
function gRPCChannel(baseurl::String)
151166
downloader = Downloader(; grace=Inf)
152167
Curl.init!(downloader.multi)
153168
Curl.setopt(downloader.multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX)
154169
endswith(baseurl, '/') && (baseurl = baseurl[1:end-1])
155-
new(downloader, baseurl)
170+
new(downloader, baseurl, CurlShare())
156171
end
157172
end
158173

174+
function close(channel::gRPCChannel)
175+
close(channel.curlshare)
176+
nothing
177+
end
178+
159179
function to_delimited_message_bytes(msg, max_message_length::Int)
160180
iob = IOBuffer()
161181
limitiob = LimitIO(iob, max_message_length)
@@ -193,7 +213,8 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
193213
end
194214
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
195215
url = string(channel.baseurl, "/", service.name, "/", method.name)
196-
status_future = @async grpc_request(channel.downloader, url, input, outchannel;
216+
shptr = controller.enable_shared_locks ? channel.curlshare.shptr : nothing
217+
status_future = @async grpc_request(shptr, channel.downloader, url, input, outchannel;
197218
maxage = controller.maxage,
198219
keepalive = controller.keepalive,
199220
negotiation = controller.negotiation,

0 commit comments

Comments
 (0)