@@ -24,6 +24,7 @@ THE SOFTWARE.
2424#include < iostream>
2525#include < algorithm>
2626#include < future>
27+ #include < thread>
2728#include " TreeDecomposition.hpp"
2829#include " IFlowCutter.hpp"
2930#include " arjun/arjun.h"
@@ -113,11 +114,14 @@ void run_arjun(ArjunNS::SimplifiedCNF& cnf) {
113114 arjun.set_oracle_find_bins (0 );
114115 arjun.set_cms_glob_mult (0.0001 );
115116
116- /* ArjunNS::Arjun::ElimToFileConf etof_conf; */
117+ ArjunNS::SimpConf simp_conf;
118+ /* simp_conf.iter1 = 1; */
119+ simp_conf.iter1 = 0 ;
120+ ArjunNS::Arjun::ElimToFileConf etof_conf;
117121 arjun.standalone_minimize_indep (cnf, false );
118- /* if (cnf.get_sampl_vars().size() >= arjun_further_min_cutoff && do_puura ) { */
119- /* arjun.standalone_elim_to_file(cnf, etof_conf, simp_conf); */
120- /* } else cnf.renumber_sampling_vars_for_ganak(); */
122+ if (cnf.get_sampl_vars ().size () >= 10 ) {
123+ arjun.standalone_elim_to_file (cnf, etof_conf, simp_conf);
124+ } else cnf.renumber_sampling_vars_for_ganak ();
121125 /* verb_print(1, "Arjun T: " << (cpu_time()-my_time)); */
122126}
123127
@@ -211,7 +215,6 @@ FF OuterCounter::count_with_td_parallel(uint8_t bits_threads) {
211215 std::cout << std::endl;
212216 }
213217
214- std::vector<std::future<FF>> futures;
215218 auto worker = [&](uint64_t num) -> FF {
216219 ArjunNS::SimplifiedCNF cnf (fg);
217220 cnf.new_vars (nvars);
@@ -233,28 +236,72 @@ FF OuterCounter::count_with_td_parallel(uint8_t bits_threads) {
233236 for (const auto & [cl, lbd] : red_cls)
234237 cnf.add_red_clause (ganak_to_cms_cl (cl));
235238 if (true ) run_arjun (cnf);
236- cnf.renumber_sampling_vars_for_ganak ();
237- auto conf_verb0 = conf;
238- conf.verb = 0 ; // disable verb for threads
239- auto counter = std::make_unique<OuterCounter>(conf, fg);
240- setup_ganak (cnf, *counter);
241- auto ret = counter->count ();
242- num_cache_lookups += counter->get_num_cache_lookups ();
243- max_cache_elems = std::max (max_cache_elems, counter->get_max_cache_elems ());
244- count_is_approximate |= counter->get_is_approximate ();
245- return ret;
239+ if (cnf.multiplier_weight != fg->zero ()) {
240+ auto local_conf = conf;
241+ local_conf.verb = 0 ; // disable verb for threads
242+ auto counter = std::make_unique<OuterCounter>(local_conf, fg);
243+ setup_ganak (cnf, *counter);
244+ auto ret = counter->count ();
245+ num_cache_lookups += counter->get_num_cache_lookups ();
246+ max_cache_elems = std::max (max_cache_elems, counter->get_max_cache_elems ());
247+ count_is_approximate |= counter->get_is_approximate ();
248+ *ret *= *cnf.multiplier_weight ;
249+ return ret;
250+ } else {
251+ return fg->zero ();
252+ }
246253 };
247254
248- for (uint32_t t = 0 ; t < nthreads; t++) {
249- futures.push_back (std::async (std::launch::async, worker, t));
250- }
255+ // Get number of hardware cores and limit concurrent threads
256+ uint32_t num_cores = std::thread::hardware_concurrency ();
257+ if (num_cores == 0 ) num_cores = 1 ; // fallback if detection fails
258+ uint32_t max_concurrent = std::min ((uint64_t )num_cores, nthreads);
259+
260+ verb_print (1 , " [par] Using " << max_concurrent << " concurrent threads (out of "
261+ << nthreads << " total tasks) on " << num_cores << " cores" );
251262
252- // Collect results
263+ // Thread pool: maintain at most max_concurrent active threads
264+ std::vector<std::pair<uint64_t , std::future<FF>>> active_futures;
265+ uint64_t next_task = 0 ;
253266 FF total_count = fg->zero ();
254- for (auto & future : futures) {
255- FF partial = future.get ();
256- *total_count += *partial;
267+
268+ // Launch initial batch of threads
269+ for (uint32_t i = 0 ; i < max_concurrent && next_task < nthreads; i++) {
270+ active_futures.push_back ({next_task, std::async (std::launch::async, worker, next_task)});
271+ next_task++;
257272 }
273+
274+ // Process results and launch new tasks as threads complete
275+ while (!active_futures.empty ()) {
276+ for (size_t i = 0 ; i < active_futures.size (); ) {
277+ auto & [task_id, future] = active_futures[i];
278+ // Check if this future is ready
279+ if (future.wait_for (std::chrono::milliseconds (0 )) == std::future_status::ready) {
280+ // Collect result
281+ FF partial = future.get ();
282+ *total_count += *partial;
283+ verb_print (1 , " [par] Task " << task_id << " completed" );
284+
285+ // Remove from active list
286+ active_futures.erase (active_futures.begin () + i);
287+
288+ // Launch next task if available
289+ if (next_task < nthreads) {
290+ active_futures.push_back ({next_task, std::async (std::launch::async, worker, next_task)});
291+ verb_print (1 , " [par] Launched task " << next_task);
292+ next_task++;
293+ }
294+ } else {
295+ i++;
296+ }
297+ }
298+
299+ // Small sleep to avoid busy waiting
300+ if (!active_futures.empty ()) {
301+ std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
302+ }
303+ }
304+
258305 return total_count;
259306}
260307
0 commit comments