Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
356fead
Refactor pool management system
ArnabChatterjee20k Dec 16, 2025
5467696
Refactor connection pool management and improve concurrency handling
ArnabChatterjee20k Dec 17, 2025
d3519cb
Implement withLock method for thread-safe operations and fix constant…
ArnabChatterjee20k Dec 17, 2025
51e7c0f
Merge remote-tracking branch 'origin/main' into dat-966
ArnabChatterjee20k Dec 17, 2025
d8ae4c9
Fix parameter type in withLock method and enhance connection pool man…
ArnabChatterjee20k Dec 17, 2025
5347ce0
fix new connection creation while pool is empty and now coonnections …
ArnabChatterjee20k Dec 17, 2025
b861f5b
linting
ArnabChatterjee20k Dec 17, 2025
a703aaf
typo fix
ArnabChatterjee20k Dec 17, 2025
2f12968
Refactor connection creation and destruction logic to improve concurr…
ArnabChatterjee20k Dec 17, 2025
22a8b38
Enhance documentation for withLock method and improve Pool class logi…
ArnabChatterjee20k Dec 17, 2025
1fc36c9
doc strings added
ArnabChatterjee20k Dec 17, 2025
fdd750f
Refactor Adapter methods to standardize initialization and locking; r…
ArnabChatterjee20k Dec 18, 2025
a70164f
fix active connection was not getting set under lock leading to concu…
ArnabChatterjee20k Dec 18, 2025
05d27a9
Add retry mechanism to use method for connection handling with tests
ArnabChatterjee20k Dec 22, 2025
c32a92a
linting
ArnabChatterjee20k Dec 22, 2025
c77ddd8
Revert "linting"
ArnabChatterjee20k Jan 5, 2026
c9d98e9
Revert "Add retry mechanism to use method for connection handling wit…
ArnabChatterjee20k Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
"require-dev": {
"phpunit/phpunit": "11.*",
"laravel/pint": "1.*",
"phpstan/phpstan": "1.*"
"phpstan/phpstan": "1.*",
"swoole/ide-helper": "5.1.2"
},
"suggests": {
"ext-mongodb": "Needed to support MongoDB database pools",
"ext-redis": "Needed to support Redis cache pools",
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools"
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools",
"ext-swoole" : "Needed to support Swoole based pool adapter"
},
"config": {
"platform": {
Expand Down
32 changes: 32 additions & 0 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Utopia\Pools;

abstract class Adapter
{
abstract public function initialize(int $size): static;

abstract public function push(mixed $connection): static;

/**
* @param int $timeout
* @return mixed
*/
abstract public function pop(int $timeout): mixed;

abstract public function count(): int;

/**
* Execute a callback with lock protection if the adapter supports it
*
* @param callable $callback
* @param int $timeout Timeout in seconds
* @return mixed
*/
abstract public function synchronized(callable $callback, int $timeout): mixed;
}
68 changes: 68 additions & 0 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;

class Stack extends Adapter
{
/** @var array<mixed> $pool */
protected array $pool = [];

/**
* Initialize the stack-based pool.
*
* Note:
* - `$size` is accepted for API compatibility with other pool adapters.
* - The stack adapter does NOT enforce capacity limits.
* - `$size` is ignored because the pool is backed by a simple array.
*
* @param int $size Ignored by the stack adapter.
*/
public function initialize(int $size): static
{
$this->pool = [];
return $this;
}

public function push(mixed $connection): static
{
// Push connection to pool
$this->pool[] = $connection;
return $this;
}

/**
* Pop an item from the stack.
*
* Note: The stack adapter does not support blocking operations.
* The `$timeout` parameter is ignored.
*
* @param int $timeout Ignored by the stack adapter.
* @return mixed|null Returns the popped item, or null if the stack is empty.
*/
public function pop(int $timeout): mixed
{
return array_pop($this->pool);
}

public function count(): int
{
return count($this->pool);
}

/**
* Executes the callback without acquiring a lock.
*
* This implementation does not provide mutual exclusion.
* The `$timeout` parameter is ignored.
*
* @param callable $callback Callback to execute.
* @param int $timeout Ignored.
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback, int $timeout): mixed
{
return $callback();
}
}
75 changes: 75 additions & 0 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Lock;

class Swoole extends Adapter
{
protected Channel $pool;

/** @var Lock $lock */
protected Lock $lock;
public function initialize(int $size): static
{
$this->pool = new Channel($size);

$this->lock = new Lock(SWOOLE_MUTEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this lock the whole worker? Or just the coroutine? Which is desired?

Copy link
Author

@ArnabChatterjee20k ArnabChatterjee20k Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections)
so all operations on the pool will be synchronized


return $this;
}

public function push(mixed $connection): static
{
// Push connection to channel
$this->pool->push($connection);
return $this;
}

/**
* Pop an item from the pool.
*
* @param int $timeout Timeout in seconds. Use 0 for non-blocking pop.
* @return mixed|false Returns the pooled value, or false if the pool is empty
* or the timeout expires.
*/
public function pop(int $timeout): mixed
{
return $this->pool->pop($timeout);
}

public function count(): int
{
$length = $this->pool->length();
return is_int($length) ? $length : 0;
}

/**
* Executes a callback while holding a lock.
*
* The lock is acquired before invoking the callback and is always released
* afterward, even if the callback throws an exception.
*
* @param callable $callback Callback to execute within the critical section.
* @param int $timeout Maximum time (in seconds) to wait for the lock.
* @return mixed The value returned by the callback.
*
* @throws \RuntimeException If the lock cannot be acquired within the timeout.
*/
public function synchronized(callable $callback, int $timeout): mixed
{
$acquired = $this->lock->lockwait($timeout);

if (!$acquired) {
throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds");
}

try {
return $callback();
} finally {
$this->lock->unlock();
}
}
}
Loading