I currently have a website written in PHP, utilizing the curl_multi for polling external APIs. The server forks child processes to standalone from web requests and is working well, but it is somewhat limited to a per process basis.
Occasionally it hit bandwidth bottlenecks and needs a better centralized queuing logic.
I am currently trying PHP IPC with a standalone background process to handle all the outgoing requests, but was stuck in things that, normally said, is not likely to be catered by casual programmers. Says, garbage collection, inter-process exception handling, request-response matching... and etc. Am I going the wrong way?
Is there a common practise (implementation theory) out there, or even libraries I could make use of?
EDIT
Using localhost TCP/IP communication would double the stress of the local trafic, which is definitely not a good approach.
I am currently working on IPC message queue with some home-brew protocol... not looking right at all. Would appreciate any help.
There are several different things to distinguish here:
jobs : you have N jobs to process. Executed tasks can crash or hang, in any way all jobs should be proceeded without any data loss.
resources : you are processing your jobs in a single machine and/or in a single connection, so you need to take care of your cpu and bandwith.
synchronization : if you have interactions between your processes, you need to share information, taking care of concurrent data access.

Because there is no built-in threads in PHP, we will need to simulate mutexes. The principle is quite simple:
1 All jobs are put in a queue
2 There is N resources available and no more in a pool
3 We iterate the queue (a while on each job)
4 Before execution, job asks for a resource in the pool
5 If there is available resources, job is executed
6 If there is no more resources, pool hangs until a job has finished or is considered dead

To proceed, we have several possibilities but the principle is the same:
We have 2 programs:
thread's contextThe process launcher knows how many tasks should be run, and run them without taking care of their result. It only control their execution (a process starts, finishes, or hangs, and N are already running).
PHP I give you the idea here, I'll give you usable examples later:
<?php
// launcher.php
require_once("ProcessesPool.php");
// The label identifies your process pool, it should be unique for your process launcher and your process children
$multi = new ProcessesPool($label = 'test');
// Initialize a new pool (creates the right directory or file, cleans a database or whatever you want)
// 10 is the maximum number of simultaneously run processes
if ($multi->create($max = '10') == false)
{
echo "Pool creation failed ...\n";
exit();
}
// We need to launch N processes, stored in $count
$count = 100; // maybe count($jobs)
// We execute all process, one by one
for ($i = 0; ($i < $count); $i++)
{
// The waitForResources method looks for how many processes are already run,
// and hangs until a resource is free or the maximum execution time is reached.
$ret = $multi->waitForResource($timeout = 10, $interval = 500000);
if ($ret)
{
// A resource is free, so we can run a new process
echo "Execute new process: $i\n";
exec("/usr/bin/php ./child.php $i > /dev/null &");
}
else
{
// Timeout is reached, we consider all children as dead and we kill them.
echo "WaitForResources Timeout! Killing zombies...\n";
$multi->killAllResources();
break;
}
}
// All process has been executed, but this does not mean they finished their work.
// This is important to follow the last executed processes to avoid zombies.
$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000);
if ($ret == false)
{
echo "WaitForTheEnd Timeout! Killing zombies...\n";
$multi->killAllResources();
}
// We destroy the process pool because we run all processes.
$multi->destroy();
echo "Finish.\n";
A child (executed job) has only 3 things to do :
PHP It could contains something like this:
<?php
// child.php
require_once("ProcessesPool.php");
// we create the *same* instance of the process pool
$multi = new ProcessesPool($label = 'test');
// child tells the launcher it started (there will be one more resource busy in pool)
$multi->start();
// here I simulate job's execution
sleep(rand() % 5 + 1);
// child tells the launcher it finished his job (there will be one more resource free in pool)
$multi->finish();
ProcessPool class?There is a lot of ways to synchronize tasks but it really depends on your requirements and constraints.
You can synchronize your tasks using :
As we seen already, we need at least 7 methods:
1 create() will create an empty pool
2 start() takes a resource on the pool
3 finish() releases a resource
4 waitForResources() hangs if there is no more free resource
5 killAllResources() get all launched jobs in the pool and kills them
6 waitForTheEnd() hangs until there is no more busy resource
7 destroy() destroys pool
So let's begin by creating an abstract class, we'll be able to implement it using the above ways later.
PHP AbstractProcessPool.php
<?php
// AbstractProcessPool.php
abstract class AbstractProcessesPool
{
abstract protected function _createPool();
abstract protected function _cleanPool();
abstract protected function _destroyPool();
abstract protected function _getPoolAge();
abstract protected function _countPid();
abstract protected function _addPid($pid);
abstract protected function _removePid($pid);
abstract protected function _getPidList();
protected $_label;
protected $_max;
protected $_pid;
public function __construct($label)
{
$this->_max = 0;
$this->_label = $label;
$this->_pid = getmypid();
}
public function getLabel()
{
return ($this->_label);
}
public function create($max = 20)
{
$this->_max = $max;
$ret = $this->_createPool();
return $ret;
}
public function destroy()
{
$ret = $this->_destroyPool();
return $ret;
}
public function waitForResource($timeout = 120, $interval = 500000, $callback = null)
{
// let enough time for children to take a resource
usleep(200000);
while (44000)
{
if (($callback != null) && (is_callable($callback)))
{
call_user_func($callback, $this);
}
$age = $this->_getPoolAge();
if ($age == -1)
{
return false;
}
if ($age > $timeout)
{
return false;
}
$count = $this->_countPid();
if ($count == -1)
{
return false;
}
if ($count < $this->_max)
{
break;
}
usleep($interval);
}
return true;
}
public function waitForTheEnd($timeout = 3600, $interval = 500000, $callback = null)
{
// let enough time to the last child to take a resource
usleep(200000);
while (44000)
{
if (($callback != null) && (is_callable($callback)))
{
call_user_func($callback, $this);
}
$age = $this->_getPoolAge();
if ($age == -1)
{
return false;
}
if ($age > $timeout)
{
return false;
}
$count = $this->_countPid();
if ($count == -1)
{
return false;
}
if ($count == 0)
{
break;
}
usleep($interval);
}
return true;
}
public function start()
{
$ret = $this->_addPid($this->_pid);
return $ret;
}
public function finish()
{
$ret = $this->_removePid($this->_pid);
return $ret;
}
public function killAllResources($code = 9)
{
$pids = $this->_getPidList();
if ($pids == false)
{
$this->_cleanPool();
return false;
}
foreach ($pids as $pid)
{
$pid = intval($pid);
posix_kill($pid, $code);
if ($this->_removePid($pid) == false)
{
return false;
}
}
return true;
}
}
If you want to use the directory method (on a /dev/ram1 partition for example), the implementation will be :
1 create() will create an empty directory using the given $label
2 start() creates a file in the directory, named by the child's pid
3 finish() destroy the child's file
4 waitForResources() counts file inside that directory
5 killAllResources() reads directory content and kill all pids
6 waitForTheEnd() reads directory until there is no more files
7 destroy() removes directory
This method looks costy, but it is really efficient if you want to run hundred tasks simultaneously without taking as many database connections as there is jobs to execute.
Implementation :
PHP ProcessPoolFiles.php
<?php
// ProcessPoolFiles.php
class ProcessesPoolFiles extends AbstractProcessesPool
{
protected $_dir;
public function __construct($label, $dir)
{
parent::__construct($label);
if ((!is_dir($dir)) || (!is_writable($dir)))
{
throw new Exception("Directory '{$dir}' does not exist or is not writable.");
}
$sha1 = sha1($label);
$this->_dir = "{$dir}/pool_{$sha1}";
}
protected function _createPool()
{
if ((!is_dir($this->_dir)) && (!mkdir($this->_dir, 0777)))
{
throw new Exception("Could not create '{$this->_dir}'");
}
if ($this->_cleanPool() == false)
{
return false;
}
return true;
}
protected function _cleanPool()
{
$dh = opendir($this->_dir);
if ($dh == false)
{
return false;
}
while (($file = readdir($dh)) !== false)
{
if (($file != '.') && ($file != '..'))
{
if (unlink($this->_dir . '/' . $file) == false)
{
return false;
}
}
}
closedir($dh);
return true;
}
protected function _destroyPool()
{
if ($this->_cleanPool() == false)
{
return false;
}
if (!rmdir($this->_dir))
{
return false;
}
return true;
}
protected function _getPoolAge()
{
$age = -1;
$count = 0;
$dh = opendir($this->_dir);
if ($dh == false)
{
return false;
}
while (($file = readdir($dh)) !== false)
{
if (($file != '.') && ($file != '..'))
{
$stat = @stat($this->_dir . '/' . $file);
if ($stat['mtime'] > $age)
{
$age = $stat['mtime'];
}
$count++;
}
}
closedir($dh);
clearstatcache();
return (($count > 0) ? (@time() - $age) : (0));
}
protected function _countPid()
{
$count = 0;
$dh = opendir($this->_dir);
if ($dh == false)
{
return -1;
}
while (($file = readdir($dh)) !== false)
{
if (($file != '.') && ($file != '..'))
{
$count++;
}
}
closedir($dh);
return $count;
}
protected function _addPid($pid)
{
$file = $this->_dir . "/" . $pid;
if (is_file($file))
{
return true;
}
echo "{$file}\n";
$file = fopen($file, 'w');
if ($file == false)
{
return false;
}
fclose($file);
return true;
}
protected function _removePid($pid)
{
$file = $this->_dir . "/" . $pid;
if (!is_file($file))
{
return true;
}
if (unlink($file) == false)
{
return false;
}
return true;
}
protected function _getPidList()
{
$array = array ();
$dh = opendir($this->_dir);
if ($dh == false)
{
return false;
}
while (($file = readdir($dh)) !== false)
{
if (($file != '.') && ($file != '..'))
{
$array[] = $file;
}
}
closedir($dh);
return $array;
}
}
PHP demo, the process launcher:
<?php
// pool_files_launcher.php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolFiles.php");
$multi = new ProcessesPoolFiles($label = 'test', $dir = "/tmp");
if ($multi->create($max = '10') == false)
{
echo "Pool creation failed ...\n";
exit();
}
$count = 20;
for ($i = 0; ($i < $count); $i++)
{
$ret = $multi->waitForResource($timeout = 10, $interval = 500000, 'test_waitForResource');
if ($ret)
{
echo "Execute new process: $i\n";
exec("/usr/bin/php ./pool_files_calc.php $i > /dev/null &");
}
else
{
echo "WaitForResources Timeout! Killing zombies...\n";
$multi->killAllResources();
break;
}
}
$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000, 'test_waitForTheEnd');
if ($ret == false)
{
echo "WaitForTheEnd Timeout! Killing zombies...\n";
$multi->killAllResources();
}
$multi->destroy();
echo "Finish.\n";
function test_waitForResource($multi)
{
echo "Waiting for available resource ( {$multi->getLabel()} )...\n";
}
function test_waitForTheEnd($multi)
{
echo "Waiting for all resources to finish ( {$multi->getLabel()} )...\n";
}
PHP demo, the process child:
<?php
// pool_files_calc.php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolFiles.php");
$multi = new ProcessesPoolFiles($label = 'test', $dir = "/tmp");
$multi->start();
// here I simulate job's execution
sleep(rand() % 7 + 1);
$multi->finish();
MySQL If you prefer using the database method, you'll need a table like:
CREATE TABLE `processes_pool` (
`label` varchar(40) PRIMARY KEY,
`nb_launched` mediumint(6) unsigned NOT NULL,
`pid_list` varchar(2048) default NULL,
`updated` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Then, the implementation will be something like:
1 create() will insert a new row in the above table
2 start() inserts a pid on the pid list
3 finish() remove one pid from the pid list
4 waitForResources() reads nb_launched field
5 killAllResources() gets and kills every pid
6 waitForTheEnd() hangs and checks regularily until nb_launched equals 0
7 destroy() removes the row
Implementation:
PHP ProcessPoolMySql.php
<?php
// ProcessPoolMysql.php
class ProcessesPoolMySQL extends AbstractProcessesPool
{
protected $_sql;
public function __construct($label, PDO $sql)
{
parent::__construct($label);
$this->_sql = $sql;
$this->_label = sha1($label);
}
protected function _createPool()
{
$request = "
INSERT IGNORE INTO processes_pool
VALUES ( ?, ?, NULL, CURRENT_TIMESTAMP )
";
$this->_query($request, $this->_label, 0);
return $this->_cleanPool();
}
protected function _cleanPool()
{
$request = "
UPDATE processes_pool
SET
nb_launched = ?,
pid_list = NULL,
updated = CURRENT_TIMESTAMP
WHERE label = ?
";
$this->_query($request, 0, $this->_label);
return true;
}
protected function _destroyPool()
{
$request = "
DELETE FROM processes_pool
WHERE label = ?
";
$this->_query($request, $this->_label);
return true;
}
protected function _getPoolAge()
{
$request = "
SELECT (CURRENT_TIMESTAMP - updated) AS age
FROM processes_pool
WHERE label = ?
";
$ret = $this->_query($request, $this->_label);
if ($ret === null)
{
return -1;
}
return $ret['age'];
}
protected function _countPid()
{
$req = "
SELECT nb_launched AS nb
FROM processes_pool
WHERE label = ?
";
$ret = $this->_query($req, $this->_label);
if ($ret === null)
{
return -1;
}
return $ret['nb'];
}
protected function _addPid($pid)
{
$request = "
UPDATE processes_pool
SET
nb_launched = (nb_launched + 1),
pid_list = CONCAT_WS(',', (SELECT IF(LENGTH(pid_list) = 0, NULL, pid_list )), ?),
updated = CURRENT_TIMESTAMP
WHERE label = ?
";
$this->_query($request, $pid, $this->_label);
return true;
}
protected function _removePid($pid)
{
$req = "
UPDATE processes_pool
SET
nb_launched = (nb_launched - 1),
pid_list =
CONCAT_WS(',', (SELECT IF (LENGTH(
SUBSTRING_INDEX(pid_list, ',', (FIND_IN_SET(?, pid_list) - 1))) = 0, null,
SUBSTRING_INDEX(pid_list, ',', (FIND_IN_SET(?, pid_list) - 1)))), (SELECT IF (LENGTH(
SUBSTRING_INDEX(pid_list, ',', (-1 * ((LENGTH(pid_list) - LENGTH(REPLACE(pid_list, ',', ''))) + 1 - FIND_IN_SET(?, pid_list))))) = 0, null,
SUBSTRING_INDEX(pid_list, ',', (-1 * ((LENGTH(pid_list) - LENGTH(REPLACE(pid_list, ',', ''))) + 1 - FIND_IN_SET(?, pid_list))
)
)
)
)
),
updated = CURRENT_TIMESTAMP
WHERE label = ?";
$this->_query($req, $pid, $pid, $pid, $pid, $this->_label);
return true;
}
protected function _getPidList()
{
$req = "
SELECT pid_list
FROM processes_pool
WHERE label = ?
";
$ret = $this->_query($req, $this->_label);
if ($ret === null)
{
return false;
}
if ($ret['pid_list'] == null)
{
return array();
}
$pid_list = explode(',', $ret['pid_list']);
return $pid_list;
}
protected function _query($request)
{
$return = null;
$stmt = $this->_sql->prepare($request);
if ($stmt === false)
{
return $return;
}
$params = func_get_args();
array_shift($params);
if ($stmt->execute($params) === false)
{
return $return;
}
if (strncasecmp(trim($request), 'SELECT', 6) === 0)
{
$return = $stmt->fetch(PDO::FETCH_ASSOC);
}
return $return;
}
}
PHP demo, the process launcher:
<?php
// pool_mysql_launcher.php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$multi = new ProcessesPoolMySQL($label = 'test', $dbh);
if ($multi->create($max = '10') == false)
{
echo "Pool creation failed ...\n";
exit();
}
$count = 20;
for ($i = 0; ($i < $count); $i++)
{
$ret = $multi->waitForResource($timeout = 10, $interval = 500000, 'test_waitForResource');
if ($ret)
{
echo "Execute new process: $i\n";
exec("/usr/bin/php ./pool_mysql_calc.php $i > /dev/null &");
}
else
{
echo "WaitForResources Timeout! Killing zombies...\n";
$multi->killAllResources();
break;
}
}
$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000, 'test_waitForTheEnd');
if ($ret == false)
{
echo "WaitForTheEnd Timeout! Killing zombies...\n";
$multi->killAllResources();
}
$multi->destroy();
echo "Finish.\n";
function test_waitForResource($multi)
{
echo "Waiting for available resource ( {$multi->getLabel()} )...\n";
}
function test_waitForTheEnd($multi)
{
echo "Waiting for all resources to finish ( {$multi->getLabel()} )...\n";
}
PHP demo, the process child:
<?php
// pool_mysql_calc.php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$multi = new ProcessesPoolMySQL($label = 'test', $dbh);
$multi->start();
// here I simulate job's execution
sleep(rand() % 7 + 1);
$multi->finish();
Demo output Those demo gives - fortunately - about the same output. If the timeout is not reached (the dreams's case), output is:
KolyMac:TaskManager ninsuo$ php pool_files_launcher.php
Waiting for available resource ( test )...
Execute new process: 0
Waiting for available resource ( test )...
Execute new process: 1
Waiting for available resource ( test )...
Execute new process: 2
Waiting for available resource ( test )...
Execute new process: 3
Waiting for available resource ( test )...
Execute new process: 4
Waiting for available resource ( test )...
Execute new process: 5
Waiting for available resource ( test )...
Execute new process: 6
Waiting for available resource ( test )...
Execute new process: 7
Waiting for available resource ( test )...
Execute new process: 8
Waiting for available resource ( test )...
Execute new process: 9
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 10
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 11
Waiting for available resource ( test )...
Execute new process: 12
Waiting for available resource ( test )...
Execute new process: 13
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 14
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 15
Waiting for available resource ( test )...
Execute new process: 16
Waiting for available resource ( test )...
Execute new process: 17
Waiting for available resource ( test )...
Execute new process: 18
Waiting for available resource ( test )...
Execute new process: 19
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Finish.
Demo output In a worse case (I change sleep(rand() % 7 + 1); to sleep(rand() % 7 + 100);, this gives:
KolyMac:TaskManager ninsuo$ php pool_files_launcher.php
Waiting for available resource ( test )...
Execute new process: 0
Waiting for available resource ( test )...
Execute new process: 1
Waiting for available resource ( test )...
Execute new process: 2
Waiting for available resource ( test )...
Execute new process: 3
Waiting for available resource ( test )...
Execute new process: 4
Waiting for available resource ( test )...
Execute new process: 5
Waiting for available resource ( test )...
Execute new process: 6
Waiting for available resource ( test )...
Execute new process: 7
Waiting for available resource ( test )...
Execute new process: 8
Waiting for available resource ( test )...
Execute new process: 9
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
(...)
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
WaitForResources Timeout! Killing zombies...
Waiting for all resources to finish ( test )...
Finish.
Go to page 2 to continue reading this answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With