1+ --
2+ -- Licensed to the Apache Software Foundation (ASF) under one or more
3+ -- contributor license agreements. See the NOTICE file distributed with
4+ -- this work for additional information regarding copyright ownership.
5+ -- The ASF licenses this file to You under the Apache License, Version 2.0
6+ -- (the "License"); you may not use this file except in compliance with
7+ -- the License. You may obtain a copy of the License at
8+ --
9+ -- http://www.apache.org/licenses/LICENSE-2.0
10+ --
11+ -- Unless required by applicable law or agreed to in writing, software
12+ -- distributed under the License is distributed on an "AS IS" BASIS,
13+ -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ -- See the License for the specific language governing permissions and
15+ -- limitations under the License.
16+ --
17+
18+ local core = require (" apisix.core" )
19+ local utils = require (" apisix.discovery.zookeeper.utils" )
20+ local schema = require (" apisix.discovery.zookeeper.schema" )
21+ local ngx = ngx
22+ local ipairs = ipairs
23+ local pairs = pairs
24+ local tostring = tostring
25+
26+ local _M = {
27+ version = 0.1 ,
28+ priority = 1000 ,
29+ name = " zookeeper" ,
30+ schema = schema .schema ,
31+ }
32+
33+ -- Global Configuration
34+ local local_conf
35+ -- Service Instance Cache(service_name -> {nodes, expire_time})
36+ local instance_cache = core .lrucache .new ({
37+ ttl = 3600 ,
38+ count = 1024
39+ })
40+
41+ -- Timer Identifier
42+ local fetch_timer
43+
44+ -- The instance list of a single service from ZooKeeper
45+ local function fetch_service_instances (conf , service_name )
46+ -- 1. Init connect
47+ local client , err = utils .new_zk_client (conf )
48+ if not client then
49+ return nil , err
50+ end
51+
52+ -- 2. TODO: Create path
53+ local service_path = conf .root_path .. " /" .. service_name
54+ local ok , err = utils .create_zk_path (client , service_path )
55+ if not ok then
56+ utils .close_zk_client (client )
57+ return nil , err
58+ end
59+
60+ -- 3. All instance nodes under a service
61+ local children , err = client :get_children (service_path )
62+ if not children then
63+ utils .close_zk_client (client )
64+ if err == " not exists" then
65+ core .log .warn (" service path not exists: " , service_path )
66+ return {}
67+ end
68+ core .log .error (" get zk children failed: " , err )
69+ return nil , err
70+ end
71+
72+ -- 4. Parse the data of each instance node one by one
73+ local instances = {}
74+ for _ , child in ipairs (children ) do
75+ local instance_path = service_path .. " /" .. child
76+ local data , stat , err = client :get (instance_path )
77+ if not data then
78+ core .log .error (" get instance data failed: " , instance_path , " err: " , err )
79+ goto continue
80+ end
81+
82+ -- Parse instance data
83+ local instance = utils .parse_instance_data (data )
84+ if instance then
85+ table.insert (instances , instance )
86+ end
87+
88+ :: continue::
89+ end
90+
91+ -- 5. Close connects
92+ utils .close_zk_client (client )
93+
94+ core .log .debug (" fetch service instances: " , service_name , " count: " , # instances )
95+ return instances
96+ end
97+
98+ -- Scheduled fetch of all service instances (full cache update))
99+ local function fetch_all_services ()
100+ if not local_conf then
101+ core .log .warn (" zookeeper discovery config not loaded" )
102+ return
103+ end
104+
105+ -- 1. Init ZK client
106+ local client , err = utils .new_zk_client (local_conf )
107+ if not client then
108+ core .log .error (" init zk client failed: " , err )
109+ return
110+ end
111+
112+ -- 2. All instance nodes under a service
113+ local services , err = client :get_children (local_conf .root_path )
114+ if not services then
115+ utils .close_zk_client (client )
116+ core .log .error (" get zk root children failed: " , err )
117+ return
118+ end
119+
120+ -- 3. Fetch the instances of each service and update the cache
121+ local now = ngx .time ()
122+ for _ , service in ipairs (services ) do
123+ local instances , err = fetch_service_instances (local_conf , service )
124+ if instances then
125+ instance_cache :set (service , nil , {
126+ nodes = instances ,
127+ expire_time = now + local_conf .cache_ttl
128+ })
129+ else
130+ core .log .error (" fetch service instances failed: " , service , " err: " , err )
131+ end
132+ end
133+
134+ -- 4. Close connects
135+ utils .close_zk_client (client )
136+ end
137+
138+ function _M .nodes (service_name )
139+ if not service_name or service_name == " " then
140+ core .log .error (" service name is empty" )
141+ return nil
142+ end
143+
144+ -- 1. Get from cache
145+ local cache = instance_cache :get (service_name )
146+ local now = ngx .time ()
147+
148+ -- 2. If the cache is missed or expired, actively pull (the data))
149+ if not cache or cache .expire_time < now then
150+ core .log .debug (" cache miss or expired, fetch from zk: " , service_name )
151+ local instances , err = fetch_service_instances (local_conf , service_name )
152+ if not instances then
153+ core .log .error (" fetch instances failed: " , service_name , " err: " , err )
154+ -- Fallback: Return the old cache (if available))
155+ if cache then
156+ return cache .nodes
157+ end
158+ return nil
159+ end
160+
161+ -- Update the cache
162+ cache = {
163+ nodes = instances ,
164+ expire_time = now + local_conf .cache_ttl
165+ }
166+ instance_cache :set (service_name , nil , cache )
167+ end
168+
169+ return cache .nodes
170+ end
171+
172+ function _M .check_schema (conf )
173+ return schema .check (conf )
174+ end
175+
176+ function _M .init_worker ()
177+ -- Load configuration
178+ local_conf = core .config .local_conf .discovery and core .config .local_conf .discovery .zookeeper or {}
179+ local ok , err = schema .check (local_conf )
180+ if not ok then
181+ core .log .error (" invalid zookeeper discovery config: " , err )
182+ return
183+ end
184+
185+ -- The default values
186+ local_conf .connect_string = local_conf .connect_string or " 127.0.0.1:2181"
187+ local_conf .fetch_interval = local_conf .fetch_interval or 10
188+ local_conf .cache_ttl = local_conf .cache_ttl or 30
189+
190+ -- Start the timer
191+ if not fetch_timer then
192+ fetch_timer = ngx .timer .every (local_conf .fetch_interval , fetch_all_services )
193+ core .log .info (" zookeeper discovery fetch timer started, interval: " , local_conf .fetch_interval , " s" )
194+ end
195+
196+ -- Manually execute a full pull immediately
197+ ngx .timer .at (0 , fetch_all_services )
198+ end
199+
200+ function _M .destroy ()
201+ if fetch_timer then
202+ ngx .timer .cancel (fetch_timer )
203+ fetch_timer = nil
204+ end
205+ instance_cache :flush_all ()
206+ core .log .info (" zookeeper discovery destroyed" )
207+ end
208+
209+ return _M
0 commit comments