Simple PHP Task Queue

Here’s a simple task queue implementation that I’ve started using. It’s great when you need to send an email when you process some heavy forms and don’t want the email to slow you down. It’s also great for background syncing of data: for example I needed to persist a local change to a remote db over an API when this little queue came handy. Here are the details, adjust for your needs.

<?php
/*
Simple PHP Task Queue implementation.
Author: Raivo Ratsep http://raivoratsep.com on 6/01/2011
Version: 0.1a
*/

class Queue {
   
    public function run() {
        foreach ($this->get_tasks() as $task) {
            $result = $this->execute_script($task['run_script'], $task['method'], unserialize($task['script_params']));
            if($result === true) {
                $this->mark_complete($task['id']);
                echo "Task id {$task['id']} complete.<br>";
            } else {
                echo "Task id {$task['id']} not complete.<br>";
            }
        }
    }

    private function mark_complete($task_id) {
        $db = DB::get_instance();
        $sql = "UPDATE queue SET completed = 1, completed_datetime = NOW() WHERE id = ? LIMIT 1;";
        $db->stmtPrepare($sql);
        $db->bindParameters("i", array($task_id));
        $db->stmtExecute();            
    }  
   
    private function execute_script($script_to_run, $method, $param_array) {
        $query_string = http_build_query($param_array);
        switch ($method) {
            case 'POST':
                $urlConn = curl_init ("http://".CRON_DIR_USERNAME.":".CRON_DIR_PASSWORD."@{$_SERVER['HTTP_HOST']}/cron/queues/{$script_to_run}.php");
                curl_setopt ($urlConn, CURLOPT_POST, 1);
                curl_setopt ($urlConn, CURLOPT_POSTFIELDS, $query_string);  //submitting an array did not work :(
            break;

            case 'GET':                
                $urlConn = curl_init ("http://".CRON_DIR_USERNAME.":".CRON_DIR_PASSWORD."@{$_SERVER['HTTP_HOST']}/cron/queues/{$script_to_run}.php?$query_string");
                curl_setopt ($urlConn, CURLOPT_HTTPGET, 1);
            break;     
        }

        ob_start(); // prevent the buffer from being displayed
        curl_exec($urlConn);
        $raw_response = ob_get_contents();
        ob_end_clean();    
        curl_close($urlConn);       // close the connection
//echo $raw_response;      
        $result_array = json_decode($raw_response, true);
        if(isset($result_array['status'])) {
            return $result_array['status'];
        } else {
            return -1;
        }
    }
   
    private function get_tasks() {
        $db = DB::get_instance();
        $sql = "SELECT * FROM queue WHERE completed = 0;";
        $db->stmtPrepare($sql);
        $db->stmtExecute();        
        $out = array();
        while($row = $db->stmtFetch()) {
            $out[] = $row;
        }
        return $out;
    }
   
    public static function add($run_script, Array $params, $method = 'GET') {
        $db = DB::get_instance();
        $sql = "INSERT INTO queue (run_script, script_params, inserted_datetime, task_md5_hash, method) VALUES (?,?, NOW(),?,?);";
        $db->stmtPrepare($sql);
        $serialized_params = serialize($params);
        $db->bindParameters("ssss", array($run_script, $serialized_params, md5($run_script.$serialized_params), strtoupper($method)));
        $db->stmtExecute();
    }  
   
    public static function exists($run_script, Array $params) {
        $db = DB::get_instance();
        $sql = "SELECT id FROM queue WHERE task_md5_hash = ?;";
        $db->stmtPrepare($sql);
        $serialized_params = serialize($params);
        $db->bindParameters("s", array(md5($run_script.$serialized_params)));
        $db->stmtExecute();
        if($db->stmtGetRowCount() > 0) {
            return true;
        } else {
            return false;
        }
    }
}

First, you’ll need to plug in your own db access code.

Then, create two constants, CRON_DIR_USERNAME and CRON_DIR_PASSWORD. You probably want to protect access to the folder and its contents that will be running the task queue. If not, leave the constants empty or remove from code.

Next, as you probably guessed, you need to create a directory where you will place a file called init.php. Set the cron to run that file every 5 or so minutes. This will launch the process that will execute the pending tasks.

Here’s the code for init.php:

<?php

$Tasks = new Queue();
$Tasks->run();

Now, to add tasks to the queue call:

Queue::add($run_script, Array $params, 'GET' or 'POST');

Here, $run_script will be the name of the script that does the work. You will need to implement that in the same folder where you placed the init.php file. The extension .php will be added automatically. $params is an array of parameters that will be given to your implementation script, and the last argument is whether you want the $param array POSTed or GET it. GET is default.

For example:

Queue::add('email-customer', array('id' => 'Raivo', 'email' => 'raivo@php.net'), 'POST');

There’s also a convenience function to check if a task with given params already exists:

Queue::exists($run_script, Array $params);

Next, you will need to implement the script that will process your task. The script will receive your params as either GET or POST.

One way to enhance this Task Queue implementation would be to also supply a task id to your script, so that if something goes wrong you could email the id to yourself to follow up on.

The implementation of the script that completes your task is up to you, of course. The only thing you need to do is to return JSON object that sets the ‘status’ to either true or false at the end of your script. An easy way to do this in PHP is:

echo json_encode(array('status' => false));
//or
echo json_encode(array('status' => true));

When the ‘status’ is true the task is marked as completed; if false, it’s still pending and will be run again.

And finally, here’s the db schema:

CREATE TABLE `queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `run_script` varchar(255) NOT NULL,
  `script_params` text NOT NULL,
  `completed` int(1) NOT NULL DEFAULT '0',
  `inserted_datetime` datetime NOT NULL,
  `completed_datetime` datetime NOT NULL,
  `task_md5_hash` varchar(32) NOT NULL,
  `method` enum('POST','GET') NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=latin1

RSS feed for comments on this post. TrackBack URI

Leave a Reply