Skip to content

Commit cf958ab

Browse files
Add async ref count
1 parent d38dd0a commit cf958ab

File tree

5 files changed

+88
-17
lines changed

5 files changed

+88
-17
lines changed

src/async.c

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,24 @@ static void luv_async_cb(uv_async_t* handle) {
3333
static int luv_new_async(lua_State* L) {
3434
uv_async_t* handle;
3535
luv_handle_t* data;
36+
luv_ref_t *ref;
3637
int ret;
3738
luv_ctx_t* ctx = luv_context(L);
3839
luaL_checktype(L, 1, LUA_TFUNCTION);
39-
handle = (uv_async_t*)luv_newuserdata(L, uv_handle_size(UV_ASYNC));
40+
ref = (luv_ref_t*)luv_newuserdata(L, sizeof(luv_ref_t));
41+
handle = &ref->handle.async;
42+
ret = uv_mutex_init(&ref->mutex);
43+
if (ret < 0) {
44+
lua_pop(L, 1);
45+
return luv_error(L, ret);
46+
}
4047
ret = uv_async_init(ctx->loop, handle, luv_async_cb);
4148
if (ret < 0) {
49+
uv_mutex_destroy(&ref->mutex);
4250
lua_pop(L, 1);
4351
return luv_error(L, ret);
4452
}
53+
ref->count = 1;
4554
data = luv_setup_handle(L, ctx);
4655
data->extra = (luv_thread_arg_t*)malloc(sizeof(luv_thread_arg_t));
4756
data->extra_gc = free;
@@ -51,6 +60,22 @@ static int luv_new_async(lua_State* L) {
5160
return 1;
5261
}
5362

63+
static int luv_handle_gc(lua_State* L);
64+
65+
static int luv_async_gc(lua_State* L) {
66+
luv_ref_t** udata = (luv_ref_t**)lua_touserdata(L, 1);
67+
luv_ref_t* ref = *udata;
68+
uv_mutex_t *mutex = &ref->mutex;
69+
uv_mutex_lock(mutex);
70+
ref->count--;
71+
uv_mutex_unlock(mutex);
72+
if (ref->count > 0) {
73+
return 0;
74+
}
75+
uv_mutex_destroy(mutex);
76+
return luv_handle_gc(L);
77+
}
78+
5479
static int luv_async_send(lua_State* L) {
5580
int ret;
5681
uv_async_t* handle = luv_check_async(L, 1);
@@ -61,3 +86,10 @@ static int luv_async_send(lua_State* L) {
6186
luv_thread_arg_clear(L, arg, LUVF_THREAD_SIDE_CHILD);
6287
return luv_result(L, ret);
6388
}
89+
90+
static void luv_async_init(lua_State* L) {
91+
luaL_getmetatable(L, "uv_async");
92+
lua_pushcfunction(L, luv_async_gc);
93+
lua_setfield(L, -2, "__gc");
94+
lua_pop(L, 1);
95+
}

src/lthreadpool.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ typedef struct {
4949
luv_val_t argv[LUV_THREAD_MAXNUM_ARG];
5050
} luv_thread_arg_t;
5151

52+
typedef struct {
53+
union {
54+
uv_async_t async;
55+
} handle;
56+
uv_mutex_t mutex;
57+
int count;
58+
} luv_ref_t;
59+
5260
//luajit miss LUA_OK
5361
#ifndef LUA_OK
5462
#define LUA_OK 0

src/luv.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ LUALIB_API int luaopen_luv (lua_State* L) {
892892

893893
luv_req_init(L);
894894
luv_handle_init(L);
895+
luv_async_init(L);
895896
#if LUV_UV_VERSION_GEQ(1, 28, 0)
896897
luv_dir_init(L);
897898
#endif

src/thread.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,25 @@ static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags)
189189
case LUA_TUSERDATA:
190190
if (arg->val.udata.size)
191191
{
192+
int r = 1;
192193
char *p = lua_newuserdata(L, arg->val.udata.size);
193194
memcpy(p, arg->val.udata.data, arg->val.udata.size);
194195
if (arg->val.udata.metaname)
195196
{
196197
luaL_getmetatable(L, arg->val.udata.metaname);
197198
lua_setmetatable(L, -2);
199+
if (strcmp(arg->val.udata.metaname, "uv_async") == 0) {
200+
luv_ref_t* ref = *(luv_ref_t**)p;
201+
uv_mutex_lock(&ref->mutex);
202+
ref->count++;
203+
uv_mutex_unlock(&ref->mutex);
204+
r = 0;
205+
}
206+
}
207+
if (r) {
208+
lua_pushvalue(L, -1);
209+
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
198210
}
199-
lua_pushvalue(L, -1);
200-
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
201211
}else{
202212
lua_pushlightuserdata(L, (void*)arg->val.udata.data);
203213
}

tests/test-async.lua

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
return require('lib/tap')(function (test)
22

33
test("test pass async between threads", function(p, p, expect, uv)
4-
local before = os.time()
54
local async
65
async = uv.new_async(expect(function (a,b,c)
76
p('in async notify callback')
@@ -11,22 +10,43 @@ return require('lib/tap')(function (test)
1110
assert(c==250)
1211
uv.close(async)
1312
end))
14-
local args = {500, 'string', nil, false, 5, "helloworld",async}
15-
local unpack = unpack or table.unpack
16-
uv.new_thread(function(num,s,null,bool,five,hw,asy)
13+
uv.new_thread(function(asy)
1714
local uv = require'luv'
18-
assert(type(num) == "number")
19-
assert(type(s) == "string")
20-
assert(null == nil)
21-
assert(bool == false)
22-
assert(five == 5)
23-
assert(hw == 'helloworld')
2415
assert(type(asy)=='userdata')
2516
assert(uv.async_send(asy,'a',true,250)==0)
26-
uv.sleep(1000)
27-
end, unpack(args)):join()
28-
local elapsed = (os.time() - before) * 1000
29-
assert(elapsed >= 1000, "elapsed should be at least delay ")
17+
uv.run()
18+
end, async):join()
19+
end)
20+
21+
test("test pass back async between threads", function(p, p, expect, uv)
22+
local async
23+
async = uv.new_async(expect(function (a)
24+
uv.close(async)
25+
p('in async notify callback')
26+
p(a)
27+
assert(type(a)=='userdata')
28+
local timer = uv.new_timer()
29+
uv.timer_start(timer, 10, 0, expect(function()
30+
uv.close(timer)
31+
p("timeout", timer)
32+
assert(uv.async_send(a,'a',true,250)==0)
33+
end))
34+
end))
35+
local t = uv.new_thread(function(asy)
36+
local uv = require'luv'
37+
assert(type(asy)=='userdata', 'bad aync type')
38+
local as
39+
as = uv.new_async(function (a,b,c)
40+
uv.close(as)
41+
assert(a=='a', 'bad string')
42+
assert(b==true, 'bad boolean')
43+
assert(c==250, 'bad number')
44+
end)
45+
assert(uv.async_send(asy,as)==0)
46+
uv.run()
47+
end, async)
48+
uv.run()
49+
t:join()
3050
end)
3151

3252
end)

0 commit comments

Comments
 (0)