Jowens_JobQueue - Version 0.1.0

Version Notes

First release.

Download this release

Release Info

Developer Jordan Owens
Extension Jowens_JobQueue
Version 0.1.0
Comparing to
See all releases


Version 0.1.0

app/code/community/Jowens/JobQueue/Block/Adminhtml/Job/View.php ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Block_Adminhtml_Job_View extends Mage_Adminhtml_Block_Widget_Form_Container
4
+ {
5
+
6
+ protected $_job;
7
+
8
+ public function __construct()
9
+ {
10
+ $this->_job = Mage::registry('jowens_jobqueue_job');
11
+
12
+ $this->_blockGroup = 'jobqueue';
13
+ $this->_controller = 'adminhtml_job';
14
+
15
+ parent::__construct();
16
+ $this->_removeButton('save');
17
+ $this->_removeButton('reset');
18
+
19
+ $confirmMsg = $this->__('Are you sure you want to do this?');
20
+ $resubmitUrl = $this->getUrl('*/*/resubmit', array('id' => $this->_job->getId()));
21
+ $this->_addButton('resubmit', array(
22
+ 'label' => $this->__('Resubmit'),
23
+ 'onclick' => "confirmSetLocation('{$confirmMsg}', '{$resubmitUrl}')",
24
+ ), 0, -10);
25
+
26
+ if(!$this->_job->getFailedAt()) {
27
+ $cancelUrl = $this->getUrl('*/*/cancel', array('id' => $this->_job->getId()));
28
+ $this->_addButton('cancel', array(
29
+ 'label' => $this->__('Cancel'),
30
+ 'onclick' => "confirmSetLocation('{$confirmMsg}', '{$cancelUrl}')",
31
+ ), 0, -5);
32
+ }
33
+ }
34
+
35
+ public function getHeaderText()
36
+ {
37
+ return $this->__("Job: \"%s\"", $this->_job->getName());
38
+ }
39
+
40
+ protected function _toHtml()
41
+ {
42
+ $this->setJobIdHtml($this->escapeHtml($this->_job->getId()));
43
+ $this->setJobNameHtml($this->escapeHtml($this->_job->getName()));
44
+ $this->setJobNameHtml($this->escapeHtml($this->_job->getName()));
45
+
46
+ $storeId = $this->_job->getStoreId();
47
+ $store = Mage::app()->getStore($storeId);
48
+ $this->setStoreNameHtml($this->escapeHtml($store->getName()));
49
+
50
+ $this->setJobQueueHtml($this->escapeHtml($this->_job->getQueue()));
51
+ $this->setAttemptsHtml($this->escapeHtml($this->_job->getAttempts()));
52
+
53
+ $runAt = (strtotime($this->_job->getRunAt()))
54
+ ? $this->formatDate($this->_job->getRunAt(), Mage_Core_Model_Locale::FORMAT_TYPE_MEDIUM, true)
55
+ : $this->__('N/A');
56
+ $this->setRunAtHtml($this->escapeHtml($runAt));
57
+
58
+ $status = $this->__("Pending");
59
+ if( $this->_job->getFailedAt()) {
60
+ $status = $this->__('Failed');
61
+ } else if($this->_job->getLockedAt()) {
62
+ $status = $this->__('In Process');
63
+ }
64
+ $this->setStatusHtml($this->escapeHtml($status));
65
+
66
+ $this->setErrorHtml($this->escapeHtml($this->_job->getError()));
67
+
68
+ $createdAt = (strtotime($this->_job->getCreatedAt()))
69
+ ? $this->formatDate($this->_job->getCreatedAt(), Mage_Core_Model_Locale::FORMAT_TYPE_MEDIUM, true)
70
+ : $this->__('N/A');
71
+ $this->setCreatedAtHtml($this->escapeHtml($createdAt));
72
+ return parent::_toHtml();
73
+ }
74
+
75
+ }
app/code/community/Jowens/JobQueue/Block/Adminhtml/Queue.php ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Block_Adminhtml_Queue extends Mage_Adminhtml_Block_Widget_Grid_Container
4
+ {
5
+ public function __construct()
6
+ {
7
+ $this->_blockGroup = 'jobqueue';
8
+ $this->_controller = 'adminhtml_queue';
9
+ $this->_headerText = $this->__('JobQueue');
10
+
11
+ parent::__construct();
12
+
13
+ $this->removeButton('add');
14
+ }
15
+ }
app/code/community/Jowens/JobQueue/Block/Adminhtml/Queue/Grid.php ADDED
@@ -0,0 +1,178 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Block_Adminhtml_Queue_Grid extends Mage_Adminhtml_Block_Widget_Grid
4
+ {
5
+ public function __construct()
6
+ {
7
+ parent::__construct();
8
+
9
+ $this->setDefaultSort('created_at');
10
+ $this->setId('jowens_jobqueue_grid');
11
+ $this->setDefaultDir('desc');
12
+ $this->setSaveParametersInSession(true);
13
+ }
14
+
15
+ protected function _getCollectionClass()
16
+ {
17
+ return 'jobqueue/job_collection';
18
+ }
19
+
20
+ protected function _prepareCollection()
21
+ {
22
+ $collection = Mage::getModel('jobqueue/job')->getCollection();
23
+ //$collection->getSelect()->columns('(`main_table`.`failed_at` is null) as status');
24
+ $collection->getSelect()->columns("(case when main_table.locked_at is not null then 2 when main_table.failed_at is null then 1 else 0 end) as status");
25
+ $this->setCollection($collection);
26
+
27
+ return parent::_prepareCollection();
28
+ }
29
+
30
+ protected function _addColumnFilterToCollection($column)
31
+ {
32
+ if ($column->getId() == 'status') {
33
+ $value = $column->getFilter()->getValue();
34
+ if($value == '2') {
35
+ $this->getCollection()->addFieldToFilter('locked_at', array('notnull'=> true));
36
+ } else {
37
+ $condition = $value == '1' ? 'null' : 'notnull';
38
+ $this->getCollection()->addFieldToFilter('failed_at', array($condition => true));
39
+ $this->getCollection()->addFieldToFilter('locked_at', array('null'=> true));
40
+ }
41
+ } else {
42
+ parent::_addColumnFilterToCollection($column);
43
+ }
44
+ return $this;
45
+ }
46
+
47
+
48
+ protected function _prepareColumns()
49
+ {
50
+ $this->addColumn('id',
51
+ array(
52
+ 'header'=> $this->__('ID'),
53
+ 'align' => 'right',
54
+ 'type' => 'number',
55
+ 'width' => '50px',
56
+ 'index' => 'id'
57
+ )
58
+ );
59
+
60
+ if (!Mage::app()->isSingleStoreMode()) {
61
+ $this->addColumn('store_id', array(
62
+ 'header' => $this->__('Store'),
63
+ 'index' => 'store_id',
64
+ 'type' => 'store',
65
+ 'store_view'=> true,
66
+ 'width' => '200px',
67
+ ));
68
+ }
69
+
70
+ $this->addColumn('name',
71
+ array(
72
+ 'header'=> $this->__('Name'),
73
+ 'index' => 'name'
74
+ )
75
+ );
76
+
77
+ $this->addColumn('queue',
78
+ array(
79
+ 'header'=> $this->__('Queue'),
80
+ 'index' => 'queue',
81
+ 'align' => 'center',
82
+ 'width' => '80px',
83
+ )
84
+ );
85
+
86
+ $this->addColumn('created_at',
87
+ array(
88
+ 'header'=> $this->__('Created At'),
89
+ 'index' => 'created_at',
90
+ 'type' => 'datetime',
91
+ 'width' => '175px',
92
+ 'align' => 'center',
93
+ )
94
+ );
95
+
96
+ $this->addColumn('run_at',
97
+ array(
98
+ 'header'=> $this->__('Run At'),
99
+ 'index' => 'run_at',
100
+ 'type' => 'datetime',
101
+ 'align' => 'center',
102
+ )
103
+ );
104
+
105
+ $this->addColumn('attempts',
106
+ array(
107
+ 'header'=> $this->__('Attempts'),
108
+ 'index' => 'attempts',
109
+ 'type' => 'number',
110
+ 'align' => 'center',
111
+ 'width' => '100px',
112
+ )
113
+ );
114
+
115
+ $this->addColumn('status',
116
+ array(
117
+ 'header'=> $this->__('Status'),
118
+ 'index' => 'status',
119
+ 'type' => 'options',
120
+ 'options' => array('1'=>'Pending', '2'=>'In Process', '0'=>'Failed'),
121
+ 'align' => 'center',
122
+ 'width' => '80px',
123
+ )
124
+ );
125
+
126
+ $this->addColumn('action',
127
+ array(
128
+ 'header' => $this->__('Action'),
129
+ 'width' => '50px',
130
+ 'type' => 'action',
131
+ 'getter' => 'getId',
132
+ 'actions' => array(
133
+ array(
134
+ 'caption' => $this->__('View'),
135
+ 'url' => array('base'=>'*/*/view'),
136
+ 'field' => 'id'
137
+ )
138
+ ),
139
+ 'filter' => false,
140
+ 'sortable' => false,
141
+ 'align' => 'center',
142
+ )
143
+ );
144
+
145
+ return parent::_prepareColumns();
146
+ }
147
+
148
+ protected function _prepareMassaction()
149
+ {
150
+ $this->setMassactionIdField('id');
151
+ $this->getMassactionBlock()->setFormFieldName('job_id');
152
+
153
+ $this->getMassactionBlock()->addItem('resubmit_job', array(
154
+ 'label' => $this->__('Resubmit Job'),
155
+ 'url' => $this->getUrl('*/*/massResubmitJob'),
156
+ 'confirm' => $this->__('Are you sure?')
157
+ ));
158
+
159
+ $this->getMassactionBlock()->addItem('cancel_job', array(
160
+ 'label' => $this->__('Cancel Job'),
161
+ 'url' => $this->getUrl('*/*/massCancelJob'),
162
+ 'confirm' => $this->__('Are you sure?')
163
+ ));
164
+
165
+ $this->getMassactionBlock()->addItem('delete_job', array(
166
+ 'label' => $this->__('Delete Job'),
167
+ 'url' => $this->getUrl('*/*/massDeleteJob'),
168
+ 'confirm' => $this->__('Are you sure?')
169
+ ));
170
+
171
+ return $this;
172
+ }
173
+
174
+ public function getRowUrl($row)
175
+ {
176
+ return $this->getUrl('*/*/view', array('id' => $row->getId()));
177
+ }
178
+ }
app/code/community/Jowens/JobQueue/Helper/Data.php ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Helper_Data extends Mage_Core_Helper_Abstract
4
+ {
5
+ }
app/code/community/Jowens/JobQueue/Model/Job.php ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Model_Job extends Mage_Core_Model_Abstract
4
+ {
5
+ protected function _construct()
6
+ {
7
+ $this->_init('jobqueue/job');
8
+ }
9
+
10
+ public function resubmit() {
11
+ $this->setFailedAt(null);
12
+ $this->setRunAt(null);
13
+ $this->setAttempts(0);
14
+ $this->setError(null);
15
+ $this->save();
16
+ }
17
+
18
+ public function cancel() {
19
+ $this->setFailedAt(Mage::getModel('core/date')->timestamp(time()));
20
+ $this->setError(Mage::helper('jobqueue')->__("Job canceled"));
21
+ $this->save();
22
+ }
23
+ }
app/code/community/Jowens/JobQueue/Model/Job/Abstract.php ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ abstract class Jowens_JobQueue_Model_Job_Abstract extends Mage_Core_Model_Abstract
4
+ {
5
+ private $name;
6
+ private $storeId;
7
+
8
+ public function __construct($name=null) {
9
+ $this->name = $name ? $name : $this->getType();
10
+
11
+ $this->setStoreId(Mage::app()->getStore()->getStoreId());
12
+ }
13
+
14
+ public abstract function perform();
15
+
16
+ public function enqueue($queue="default", $run_at=null) {
17
+ $job = Mage::getModel('jobqueue/job');
18
+ $job->setStoreId($this->getStoreId());
19
+ $job->setName($this->getName());
20
+ $job->setHandler(serialize($this));
21
+ $job->setQueue($queue);
22
+ $job->setRunAt($run_at);
23
+ $job->setCreatedAt(now());
24
+ $job->save();
25
+ }
26
+
27
+ public function setName($name)
28
+ {
29
+ $this->name = $name;
30
+ return $this;
31
+ }
32
+
33
+ public function getName()
34
+ {
35
+ return $this->name;
36
+ }
37
+
38
+ public function setStoreId($storeId)
39
+ {
40
+ $this->storeId = $storeId;
41
+ return $this;
42
+ }
43
+
44
+ public function getStoreId()
45
+ {
46
+ return $this->storeId;
47
+ }
48
+
49
+ public function getType()
50
+ {
51
+ return get_class($this);
52
+ }
53
+ }
app/code/community/Jowens/JobQueue/Model/Resource/Job.php ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Model_Resource_Job extends Mage_Core_Model_Resource_Db_Abstract
4
+ {
5
+ protected function _construct()
6
+ {
7
+ $this->_init('jobqueue/job', 'id');
8
+ }
9
+ }
app/code/community/Jowens/JobQueue/Model/Resource/Job/Collection.php ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Model_Resource_Job_Collection extends Mage_Core_Model_Resource_Db_Collection_Abstract
4
+ {
5
+ protected function _construct()
6
+ {
7
+ $this->_init('jobqueue/job');
8
+ }
9
+ }
app/code/community/Jowens/JobQueue/Model/Resource/Setup.php ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Model_Resource_Setup extends Mage_Core_Model_Resource_Setup
4
+ {
5
+ }
app/code/community/Jowens/JobQueue/Model/Worker.php ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ set_include_path(get_include_path().PS.Mage::getBaseDir('lib').DS.'DJJob');
4
+
5
+ require_once('DJJob.php');
6
+
7
+ class Jowens_JobQueue_Model_Worker extends Mage_Core_Model_Abstract
8
+ {
9
+ const DEFAULT_QUEUE = 'default';
10
+
11
+ private $workerName;
12
+ private $queue;
13
+
14
+ public function __construct() {
15
+ list($hostname, $pid) = array(trim(`hostname`), getmypid());
16
+ $this->workerName = "host::$hostname pid::$pid";
17
+ $this->queue = Mage::getStoreConfig('jobqueue/config/queue', self::DEFAULT_QUEUE);
18
+ }
19
+
20
+ public function getQueue() {
21
+ return $this->queue;
22
+ }
23
+
24
+ public function setQueue($queue) {
25
+ $this->queue = $queue;
26
+ }
27
+
28
+ public function getWorkerName() {
29
+ return $this->workerName;
30
+ }
31
+
32
+
33
+ public function executeJobs($schedule=null) {
34
+ if(!Mage::getStoreConfig('jobqueue/config/enabled')) {
35
+ return;
36
+ }
37
+
38
+ if($schedule) {
39
+ $jobsRoot = Mage::getConfig()->getNode('crontab/jobs');
40
+ $jobConfig = $jobsRoot->{$schedule->getJobCode()};
41
+ $queue = $jobConfig->queue;
42
+ if($queue) {
43
+ $this->setQueue($queue);
44
+ }
45
+ }
46
+
47
+ $this->setupDJJob();
48
+
49
+ try {
50
+ $collection = Mage::getModel('jobqueue/job')->getCollection();
51
+ $collection->addFieldToFilter('queue', array('eq' => $this->getQueue()))
52
+ ->addFieldToFilter('run_at', array(
53
+ array('null' => true),
54
+ array('lteq'=> date('Y-m-d H:i:s', Mage::app()->getLocale()->storeTimeStamp()))
55
+ ))
56
+ ->addFieldToFilter(array('locked_at', 'locked_by'), array(
57
+ array('locked_at', 'null' => true),
58
+ array('locked_by', 'eq' => $this->workerName)
59
+ ))
60
+ ->addFieldToFilter('failed_at', array('null' => true))
61
+ ->addFieldToFilter('attempts', array('lt' => (int)Mage::getStoreConfig('jobqueue/config/max_attempts')));
62
+
63
+ // randomly order to prevent lock contention among workers
64
+ $collection->getSelect()->order(new Zend_Db_Expr('RAND()'));
65
+ $collection->load();
66
+
67
+ foreach($collection as $row) {
68
+ $job = new DJJob($this->workerName, $row->getId(), array(
69
+ "max_attempts" => Mage::getStoreConfig('jobqueue/config/max_attempts')
70
+ ));
71
+ if ($job->acquireLock()) {
72
+ $job->run();
73
+ }
74
+ }
75
+ } catch (Exception $e) {
76
+ Mage::log($e);
77
+ }
78
+ }
79
+
80
+ protected function setupDJJob() {
81
+ $config = Mage::getConfig()->getResourceConnectionConfig("default_setup");
82
+ DJJob::configure(
83
+ "mysql:host=" . $config->host . ";dbname=" . $config->dbname . ";port=" . $config->port,
84
+ array('mysql_user' => $config->username, 'mysql_pass' => $config->password)
85
+ );
86
+ }
87
+ }
app/code/community/Jowens/JobQueue/controllers/Adminhtml/QueueController.php ADDED
@@ -0,0 +1,213 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class Jowens_JobQueue_Adminhtml_QueueController extends Mage_Adminhtml_Controller_Action
4
+ {
5
+ public function indexAction()
6
+ {
7
+ $this->_init()
8
+ ->renderLayout();
9
+ }
10
+
11
+ protected function _init()
12
+ {
13
+ $this->loadLayout()
14
+ ->_setActiveMenu('system/jowens_jobqueue_queue')
15
+ ->_title($this->__('System'))->_title($this->__('JobQueue'))
16
+ ->_addBreadcrumb($this->__('System'), $this->__('System'))
17
+ ->_addBreadcrumb($this->__('JobQueue'), $this->__('JobQueue'));
18
+
19
+ return $this;
20
+ }
21
+
22
+ public function viewAction()
23
+ {
24
+ $id = $this->getRequest()->getParam('id');
25
+ $job = Mage::getModel('jobqueue/job');
26
+
27
+ if ($id) {
28
+ $job->load($id);
29
+
30
+ if (!$job->getId()) {
31
+ Mage::getSingleton('adminhtml/session')->addError($this->__('This job no longer exists.'));
32
+ $this->_redirect('*/*/index');
33
+ return;
34
+ }
35
+ }
36
+
37
+ $this->_title($job->getId() ? $job->getName() : "Job Details");
38
+
39
+ $data = Mage::getSingleton('adminhtml/session')->getJobData(true);
40
+ if (!empty($data)) {
41
+ $job->setData($data);
42
+ }
43
+
44
+ Mage::register('jowens_jobqueue_job', $job);
45
+
46
+ $this->_init()
47
+ ->renderLayout();
48
+ }
49
+
50
+ public function resubmitAction()
51
+ {
52
+ $id = $this->getRequest()->getParam('id');
53
+ $job = Mage::getModel('jobqueue/job');
54
+
55
+ if ($id) {
56
+ $job->load($id);
57
+
58
+ if (!$job->getId()) {
59
+ Mage::getSingleton('adminhtml/session')->addError($this->__('This job no longer exists.'));
60
+ $this->_redirect('*/*/index');
61
+ return;
62
+ }
63
+
64
+ try {
65
+ $job->resubmit();
66
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('Job "%s" has been resubmitted', $job->getName()));
67
+ } catch (Exception $e) {
68
+ Mage::getSingleton('adminhtml/session')->addError($this->__('Job "%s" could not be resubmitted', $job->getName()));
69
+ }
70
+ }
71
+ $this->_redirect('*/*/index');
72
+ }
73
+
74
+ public function cancelAction()
75
+ {
76
+ $id = $this->getRequest()->getParam('id');
77
+ $job = Mage::getModel('jobqueue/job');
78
+
79
+ if ($id) {
80
+ $job->load($id);
81
+
82
+ if (!$job->getId()) {
83
+ Mage::getSingleton('adminhtml/session')->addError($this->__('This job no longer exists.'));
84
+ $this->_redirect('*/*/index');
85
+ return;
86
+ }
87
+
88
+ try {
89
+ $job->cancel();
90
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('Job "%s" has been canceled', $job->getName()));
91
+ } catch (Exception $e) {
92
+ Mage::getSingleton('adminhtml/session')->addError($this->__('Job "%s" could not be canceled', $job->getName()));
93
+ }
94
+ }
95
+ $this->_redirect('*/*/index');
96
+ }
97
+
98
+ public function deleteAction()
99
+ {
100
+ $id = $this->getRequest()->getParam('id');
101
+ $job = Mage::getModel('jobqueue/job');
102
+
103
+ if ($id) {
104
+ $job->load($id);
105
+
106
+ if (!$job->getId()) {
107
+ Mage::getSingleton('adminhtml/session')->addError($this->__('This job no longer exists.'));
108
+ $this->_redirect('*/*/index');
109
+ return;
110
+ }
111
+
112
+ try {
113
+ $job->delete();
114
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('Job "%s" has been deleted', $job->getName()));
115
+ } catch (Exception $e) {
116
+ Mage::getSingleton('adminhtml/session')->addError($this->__('Job "%s" could not be deleted', $job->getName()));
117
+ }
118
+ }
119
+ $this->_redirect('*/*/index');
120
+ }
121
+
122
+ public function massResubmitJobAction()
123
+ {
124
+ $jobIds = $this->getRequest()->getParam('job_id');
125
+ $success = 0;
126
+ $error = 0;
127
+
128
+ foreach($jobIds as $jobId) {
129
+ $job = Mage::getModel('jobqueue/job')->load($jobId);
130
+ try {
131
+ $job->resubmit();
132
+ $success++;
133
+ } catch (Exception $e) {
134
+ Mage::log($e);
135
+ $error++;
136
+ }
137
+ }
138
+
139
+
140
+ if($error) {
141
+ Mage::getSingleton('adminhtml/session')->addError($this->__('%s job(s) could not be resubmitted', $error));
142
+ }
143
+
144
+ if($success) {
145
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('%s job(s) resubmitted', $success));
146
+ }
147
+
148
+ $this->_redirect('*/*/index');
149
+ }
150
+
151
+ public function massCancelJobAction()
152
+ {
153
+ $jobIds = $this->getRequest()->getParam('job_id');
154
+ $success = 0;
155
+ $error = 0;
156
+
157
+ foreach($jobIds as $jobId) {
158
+ $job = Mage::getModel('jobqueue/job')->load($jobId);
159
+ try {
160
+ if($job->getFailedAt()) {
161
+ $error++;
162
+ } else {
163
+ $job->cancel();
164
+ $success++;
165
+ }
166
+ } catch (Exception $e) {
167
+ Mage::log($e);
168
+ $error++;
169
+ }
170
+ }
171
+
172
+
173
+ if($error) {
174
+ Mage::getSingleton('adminhtml/session')->addError($this->__('%s job(s) could not be canceled', $error));
175
+ }
176
+
177
+ if($success) {
178
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('%s job(s) canceled', $success));
179
+ }
180
+
181
+
182
+ $this->_redirect('*/*/index');
183
+ }
184
+
185
+ public function massDeleteJobAction()
186
+ {
187
+ $jobIds = $this->getRequest()->getParam('job_id');
188
+ $success = 0;
189
+ $error = 0;
190
+
191
+ foreach($jobIds as $jobId) {
192
+ $job = Mage::getModel('jobqueue/job')->load($jobId);
193
+ try {
194
+ $job->delete();
195
+ $success++;
196
+ } catch (Exception $e) {
197
+ Mage::log($e);
198
+ $error++;
199
+ }
200
+ }
201
+
202
+
203
+ if($error) {
204
+ Mage::getSingleton('adminhtml/session')->addError($this->__('%s job(s) could not be deleted', $error));
205
+ }
206
+
207
+ if($success) {
208
+ Mage::getSingleton('adminhtml/session')->addSuccess($this->__('%s job(s) deleted', $success));
209
+ }
210
+
211
+ $this->_redirect('*/*/index');
212
+ }
213
+ }
app/code/community/Jowens/JobQueue/etc/adminhtml.xml ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0"?>
2
+ <config>
3
+ <menu>
4
+ <system>
5
+ <children>
6
+ <jowens_jobqueue_queue translate="title" module="jobqueue">
7
+ <title>JobQueue</title>
8
+ <action>adminhtml/queue</action>
9
+ </jowens_jobqueue_queue>
10
+ </children>
11
+ </system>
12
+ </menu>
13
+
14
+ <acl>
15
+ <resources>
16
+ <admin>
17
+ <children>
18
+ <system>
19
+ <children>
20
+ <jowens_jobqueue_queue>
21
+ <title>JobQueue</title>
22
+ </jowens_jobqueue_queue>
23
+ <config>
24
+ <children>
25
+ <jobqueue translate="title" module="jobqueue">
26
+ <title>JobQueue Configuration</title>
27
+ </jobqueue>
28
+ </children>
29
+ </config>
30
+ </children>
31
+ </system>
32
+ </children>
33
+ </admin>
34
+ </resources>
35
+ </acl>
36
+ </config>
app/code/community/Jowens/JobQueue/etc/config.xml ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0" encoding="utf-8" ?>
2
+ <config>
3
+ <modules>
4
+ <Jowens_JobQueue>
5
+ <version>0.1.0</version>
6
+ </Jowens_JobQueue>
7
+ </modules>
8
+ <global>
9
+ <models>
10
+ <jobqueue>
11
+ <class>Jowens_JobQueue_Model</class>
12
+ <resourceModel>jobqueue_resource</resourceModel>
13
+ </jobqueue>
14
+ <jobqueue_resource>
15
+ <class>Jowens_JobQueue_Model_Resource</class>
16
+ <entities>
17
+ <job>
18
+ <table>jobs</table>
19
+ </job>
20
+ </entities>
21
+ </jobqueue_resource>
22
+ </models>
23
+ <blocks>
24
+ <jobqueue>
25
+ <class>Jowens_JobQueue_Block</class>
26
+ </jobqueue>
27
+ </blocks>
28
+ <helpers>
29
+ <jobqueue>
30
+ <class>Jowens_JobQueue_Helper</class>
31
+ </jobqueue>
32
+ </helpers>
33
+ <resources>
34
+ <jobqueue_setup>
35
+ <setup>
36
+ <module>Jowens_JobQueue</module>
37
+ <class>Jowens_JobQueue_Model_Resource_Setup</class>
38
+ </setup>
39
+ </jobqueue_setup>
40
+ </resources>
41
+ </global>
42
+ <frontend>
43
+ <routers>
44
+ <jowens_jobqueue>
45
+ <use>standard</use>
46
+ <args>
47
+ <module>Jowens_JobQueue</module>
48
+ <frontName>jobqueue</frontName>
49
+ </args>
50
+ </jowens_jobqueue>
51
+ </routers>
52
+ </frontend>
53
+ <admin>
54
+ <routers>
55
+ <adminhtml>
56
+ <args>
57
+ <modules>
58
+ <Jowens_JobQueue before="Mage_Adminhtml">Jowens_JobQueue_Adminhtml</Jowens_JobQueue>
59
+ </modules>
60
+ </args>
61
+ </adminhtml>
62
+ </routers>
63
+ </admin>
64
+ <adminhtml>
65
+ <layout>
66
+ <updates>
67
+ <jowens_jobqueue>
68
+ <file>jowens/jobqueue.xml</file>
69
+ </jowens_jobqueue>
70
+ </updates>
71
+ </layout>
72
+ </adminhtml>
73
+ <crontab>
74
+ <jobs>
75
+ <jobqueue_default>
76
+ <schedule>
77
+ <config_path>jobqueue/config/cron_expr</config_path>
78
+ </schedule>
79
+ <run>
80
+ <model>jobqueue/worker::executeJobs</model>
81
+ </run>
82
+ </jobqueue_default>
83
+ </jobs>
84
+ </crontab>
85
+ <default>
86
+ <jobqueue>
87
+ <config>
88
+ <enabled>1</enabled>
89
+ <cron_expr>*/5 * * * *</cron_expr>
90
+ <max_attempts>10</max_attempts>
91
+ <queue>default</queue>
92
+ </config>
93
+ </jobqueue>
94
+ </default>
95
+ </config>
app/code/community/Jowens/JobQueue/etc/system.xml ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0"?>
2
+ <config>
3
+ <sections>
4
+ <jobqueue translate="label" module="jobqueue">
5
+ <label>JobQueue</label>
6
+ <tab>general</tab>
7
+ <frontend_type>text</frontend_type>
8
+ <sort_order>10000</sort_order>
9
+ <show_in_default>1</show_in_default>
10
+ <groups>
11
+ <config translate="label" module="jobqueue">
12
+ <label>Configuration</label>
13
+ <frontend_type>text</frontend_type>
14
+ <sort_order>2</sort_order>
15
+ <show_in_default>1</show_in_default>
16
+ <fields>
17
+ <enabled translate="label comment">
18
+ <label>Cron Worker Enabled</label>
19
+ <frontend_type>select</frontend_type>
20
+ <source_model>adminhtml/system_config_source_yesno</source_model>
21
+ <show_in_default>1</show_in_default>
22
+ </enabled>
23
+ <cron_expr>
24
+ <label>How often do you want the cron to run?</label>
25
+ <frontend_type>text</frontend_type>
26
+ <sort_order>40</sort_order>
27
+ <comment>Use Crontab Format (Eg. "*/5 * * * *" for every 5 minutes)</comment>
28
+ <show_in_default>1</show_in_default>
29
+ </cron_expr>
30
+ <max_attempts>
31
+ <label>Max Attempts</label>
32
+ <frontend_type>text</frontend_type>
33
+ <sort_order>50</sort_order>
34
+ <show_in_default>1</show_in_default>
35
+ </max_attempts>
36
+ <queue>
37
+ <label>Queue</label>
38
+ <frontend_type>text</frontend_type>
39
+ <sort_order>60</sort_order>
40
+ <show_in_default>1</show_in_default>
41
+ </queue>
42
+ </fields>
43
+ </config>
44
+ </groups>
45
+ </jobqueue>
46
+ </sections>
47
+ </config>
app/code/community/Jowens/JobQueue/sql/jobqueue_setup/mysql4-install-0.1.0.php ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ $installer = $this;
4
+
5
+ $installer->startSetup();
6
+
7
+ $installer->run(
8
+ "CREATE TABLE " . $installer->getTable('jobqueue/job')." (
9
+ `id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
10
+ `store_id` INT UNSIGNED NOT NULL DEFAULT 0,
11
+ `name` VARCHAR(255),
12
+ `handler` TEXT NOT NULL,
13
+ `queue` VARCHAR(255) NOT NULL DEFAULT 'default',
14
+ `attempts` INT UNSIGNED NOT NULL DEFAULT 0,
15
+ `run_at` DATETIME NULL,
16
+ `locked_at` DATETIME NULL,
17
+ `locked_by` VARCHAR(255) NULL,
18
+ `failed_at` DATETIME NULL,
19
+ `error` TEXT NULL,
20
+ `created_at` DATETIME NOT NULL
21
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8;"
22
+ );
23
+
24
+ $installer->endSetup();
app/design/adminhtml/default/default/layout/jowens/jobqueue.xml ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0"?>
2
+ <layout>
3
+ <adminhtml_queue_index>
4
+ <reference name="content">
5
+ <block type="jobqueue/adminhtml_queue" name="jowens_jobqueue_queue" />
6
+ </reference>
7
+ </adminhtml_queue_index>
8
+
9
+ <adminhtml_queue_view>
10
+ <reference name="content">
11
+ <block type="jobqueue/adminhtml_job_view" name="jowens_jobqueue_job" template="jowens/jobqueue/job.phtml"/>
12
+ </reference>
13
+ </adminhtml_queue_view>
14
+ </layout>
app/design/adminhtml/default/default/template/jowens/jobqueue/job.phtml ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <div class="content-header">
2
+ <table cellspacing="0">
3
+ <tr>
4
+ <td style="<?php echo $this->getHeaderWidth() ?>"><?php echo $this->getHeaderHtml() ?></td>
5
+ <td class="form-buttons"><?php echo $this->getButtonsHtml() ?></td>
6
+ </tr>
7
+ </table>
8
+ </div>
9
+
10
+ <div class="entry-edit">
11
+ <div class="entry-edit">
12
+ <div class="entry-edit-head">
13
+ <h4 class="icon-head head-edit-form fieldset-legend"><?php echo $this->__('Job Details'); ?></h4>
14
+ </div>
15
+ <div id="log_details_fieldset" class="log-details">
16
+ <table cellspacing="0" class="log-info table">
17
+ <col width="25%" />
18
+ <col />
19
+ <tbody>
20
+ <tr>
21
+ <th><?php echo $this->__('Job ID'); ?></th>
22
+ <td><?php echo $this->getJobIdHtml(); ?></td>
23
+ </tr>
24
+ <tr>
25
+ <th><?php echo $this->__('Job Name'); ?></th>
26
+ <td><?php echo $this->getJobNameHtml(); ?></td>
27
+ </tr>
28
+ <tr>
29
+ <th><?php echo $this->__('Store'); ?></th>
30
+ <td><?php echo $this->getStoreNameHtml(); ?></td>
31
+ </tr>
32
+ <tr>
33
+ <th><?php echo $this->__('Queue'); ?></th>
34
+ <td><?php echo $this->getJobQueueHtml(); ?></td>
35
+ </tr>
36
+ <tr>
37
+ <th><?php echo $this->__('Run At'); ?></th>
38
+ <td><?php echo $this->getRunAtHtml(); ?></td>
39
+ </tr>
40
+ <tr>
41
+ <th><?php echo $this->__('Attempts'); ?></th>
42
+ <td><?php echo $this->getAttemptsHtml(); ?></td>
43
+ </tr>
44
+ <tr>
45
+ <th><?php echo $this->__('Status'); ?></th>
46
+ <td><?php echo $this->getStatusHtml(); ?></td>
47
+ </tr>
48
+ <tr>
49
+ <th><?php echo $this->__('Created At'); ?></th>
50
+ <td><?php echo $this->getCreatedAtHtml(); ?></td>
51
+ </tr>
52
+ <?php if ($this->getErrorHtml()): ?>
53
+ <tr>
54
+ <th><?php echo $this->__('Error'); ?></th>
55
+ <td><?php echo $this->getErrorHtml(); ?></td>
56
+ </tr>
57
+ <?php endif; ?>
58
+ </tbody>
59
+ </table>
60
+ </div>
61
+ </div>
62
+ </div>
app/etc/modules/Jowens_JobQueue.xml ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0"?>
2
+ <config>
3
+ <modules>
4
+ <Jowens_JobQueue>
5
+ <active>true</active>
6
+ <codePool>community</codePool>
7
+ </Jowens_JobQueue>
8
+ </modules>
9
+ </config>
lib/DJJob/DJJob.php ADDED
@@ -0,0 +1,401 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ # This system is mostly a port of delayed_job: http://github.com/tobi/delayed_job
4
+
5
+ class DJException extends Exception { }
6
+
7
+ class DJRetryException extends DJException {
8
+
9
+ private $delay_seconds = 7200;
10
+
11
+ public function setDelay($delay) {
12
+ $this->delay_seconds = $delay;
13
+ }
14
+ public function getDelay() {
15
+ return $this->delay_seconds;
16
+ }
17
+ }
18
+
19
+ class DJBase {
20
+
21
+ // error severity levels
22
+ const CRITICAL = 4;
23
+ const ERROR = 3;
24
+ const WARN = 2;
25
+ const INFO = 1;
26
+ const DEBUG = 0;
27
+
28
+ private static $log_level = self::DEBUG;
29
+
30
+ private static $db = null;
31
+
32
+ private static $dsn = "";
33
+ private static $options = array(
34
+ "mysql_user" => null,
35
+ "mysql_pass" => null,
36
+ );
37
+
38
+ // use either `configure` or `setConnection`, depending on if
39
+ // you already have a PDO object you can re-use
40
+ public static function configure($dsn, $options = array()) {
41
+ self::$dsn = $dsn;
42
+ self::$options = array_merge(self::$options, $options);
43
+ }
44
+
45
+ public static function setLogLevel($const) {
46
+ self::$log_level = $const;
47
+ }
48
+
49
+ public static function setConnection(PDO $db) {
50
+ self::$db = $db;
51
+ }
52
+
53
+ protected static function getConnection() {
54
+ if (self::$db === null) {
55
+ if (!self::$dsn) {
56
+ throw new DJException("Please tell DJJob how to connect to your database by calling DJJob::configure(\$dsn, [\$options = array()]) or re-using an existing PDO connection by calling DJJob::setConnection(\$pdoObject). If you're using MySQL you'll need to pass the db credentials as separate 'mysql_user' and 'mysql_pass' options. This is a PDO limitation, see [http://stackoverflow.com/questions/237367/why-is-php-pdo-dsn-a-different-format-for-mysql-versus-postgresql] for an explanation.");
57
+ }
58
+ try {
59
+ // http://stackoverflow.com/questions/237367/why-is-php-pdo-dsn-a-different-format-for-mysql-versus-postgresql
60
+ if (self::$options["mysql_user"] !== null) {
61
+ self::$db = new PDO(self::$dsn, self::$options["mysql_user"], self::$options["mysql_pass"]);
62
+ } else {
63
+ self::$db = new PDO(self::$dsn);
64
+ }
65
+ self::$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
66
+ } catch (PDOException $e) {
67
+ throw new Exception("DJJob couldn't connect to the database. PDO said [{$e->getMessage()}]");
68
+ }
69
+ }
70
+ return self::$db;
71
+ }
72
+
73
+ public static function runQuery($sql, $params = array()) {
74
+ $stmt = self::getConnection()->prepare($sql);
75
+ $stmt->execute($params);
76
+
77
+ $ret = array();
78
+ if ($stmt->rowCount()) {
79
+ // calling fetchAll on a result set with no rows throws a
80
+ // "general error" exception
81
+ foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $r) $ret []= $r;
82
+ }
83
+
84
+ $stmt->closeCursor();
85
+ return $ret;
86
+ }
87
+
88
+ public static function runUpdate($sql, $params = array()) {
89
+ $stmt = self::getConnection()->prepare($sql);
90
+ $stmt->execute($params);
91
+ return $stmt->rowCount();
92
+ }
93
+
94
+ protected static function log($mesg, $severity=self::CRITICAL) {
95
+ if ($severity >= self::$log_level) {
96
+ printf("[%s] %s\n", date('c'), $mesg);
97
+ }
98
+ }
99
+ }
100
+
101
+ class DJWorker extends DJBase {
102
+ # This is a singleton-ish thing. It wouldn't really make sense to
103
+ # instantiate more than one in a single request (or commandline task)
104
+
105
+ public function __construct($options = array()) {
106
+ $options = array_merge(array(
107
+ "queue" => "default",
108
+ "count" => 0,
109
+ "sleep" => 5,
110
+ "max_attempts" => 5
111
+ ), $options);
112
+ list($this->queue, $this->count, $this->sleep, $this->max_attempts) =
113
+ array($options["queue"], $options["count"], $options["sleep"], $options["max_attempts"]);
114
+
115
+ list($hostname, $pid) = array(trim(`hostname`), getmypid());
116
+ $this->name = "host::$hostname pid::$pid";
117
+
118
+ if (function_exists("pcntl_signal")) {
119
+ pcntl_signal(SIGTERM, array($this, "handleSignal"));
120
+ pcntl_signal(SIGINT, array($this, "handleSignal"));
121
+ }
122
+ }
123
+
124
+ public function handleSignal($signo) {
125
+ $signals = array(
126
+ SIGTERM => "SIGTERM",
127
+ SIGINT => "SIGINT"
128
+ );
129
+ $signal = $signals[$signo];
130
+
131
+ $this->log("[WORKER] Received received {$signal}... Shutting down", self::INFO);
132
+ $this->releaseLocks();
133
+ die(0);
134
+ }
135
+
136
+ public function releaseLocks() {
137
+ $this->runUpdate("
138
+ UPDATE jobs
139
+ SET locked_at = NULL, locked_by = NULL
140
+ WHERE locked_by = ?",
141
+ array($this->name)
142
+ );
143
+ }
144
+
145
+ /**
146
+ * Returns a new job ordered by most recent first
147
+ * why this?
148
+ * run newest first, some jobs get left behind
149
+ * run oldest first, all jobs get left behind
150
+ * @return DJJob
151
+ */
152
+ public function getNewJob() {
153
+ # we can grab a locked job if we own the lock
154
+ $rs = $this->runQuery("
155
+ SELECT id
156
+ FROM jobs
157
+ WHERE queue = ?
158
+ AND (run_at IS NULL OR NOW() >= run_at)
159
+ AND (locked_at IS NULL OR locked_by = ?)
160
+ AND failed_at IS NULL
161
+ AND attempts < ?
162
+ ORDER BY created_at DESC
163
+ LIMIT 10
164
+ ", array($this->queue, $this->name, $this->max_attempts));
165
+
166
+ // randomly order the 10 to prevent lock contention among workers
167
+ shuffle($rs);
168
+
169
+ foreach ($rs as $r) {
170
+ $job = new DJJob($this->name, $r["id"], array(
171
+ "max_attempts" => $this->max_attempts
172
+ ));
173
+ if ($job->acquireLock()) return $job;
174
+ }
175
+
176
+ return false;
177
+ }
178
+
179
+ public function start() {
180
+ $this->log("[JOB] Starting worker {$this->name} on queue::{$this->queue}", self::INFO);
181
+
182
+ $count = 0;
183
+ $job_count = 0;
184
+ try {
185
+ while ($this->count == 0 || $count < $this->count) {
186
+ if (function_exists("pcntl_signal_dispatch")) pcntl_signal_dispatch();
187
+
188
+ $count += 1;
189
+ $job = $this->getNewJob($this->queue);
190
+
191
+ if (!$job) {
192
+ $this->log("[JOB] Failed to get a job, queue::{$this->queue} may be empty", self::DEBUG);
193
+ sleep($this->sleep);
194
+ continue;
195
+ }
196
+
197
+ $job_count += 1;
198
+ $job->run();
199
+ }
200
+ } catch (Exception $e) {
201
+ $this->log("[JOB] unhandled exception::\"{$e->getMessage()}\"", self::ERROR);
202
+ }
203
+
204
+ $this->log("[JOB] worker shutting down after running {$job_count} jobs, over {$count} polling iterations", self::INFO);
205
+ }
206
+ }
207
+
208
+ class DJJob extends DJBase {
209
+
210
+ public function __construct($worker_name, $job_id, $options = array()) {
211
+ $options = array_merge(array(
212
+ "max_attempts" => 5
213
+ ), $options);
214
+ $this->worker_name = $worker_name;
215
+ $this->job_id = $job_id;
216
+ $this->max_attempts = $options["max_attempts"];
217
+ }
218
+
219
+ public function run() {
220
+ # pull the handler from the db
221
+ $handler = $this->getHandler();
222
+ if (!is_object($handler)) {
223
+ $this->log("[JOB] bad handler for job::{$this->job_id}", self::ERROR);
224
+ $this->finishWithError("bad handler for job::{$this->job_id}");
225
+ return false;
226
+ }
227
+
228
+ # run the handler
229
+ try {
230
+ $handler->perform();
231
+
232
+ # cleanup
233
+ $this->finish();
234
+ return true;
235
+
236
+ } catch (DJRetryException $e) {
237
+ # attempts hasn't been incremented yet.
238
+ $attempts = $this->getAttempts()+1;
239
+
240
+ $msg = "Caught DJRetryException \"{$e->getMessage()}\" on attempt $attempts/{$this->max_attempts}.";
241
+
242
+ if($attempts == $this->max_attempts) {
243
+ $this->log("[JOB] job::{$this->job_id} $msg Giving up.");
244
+ $this->finishWithError($msg);
245
+ } else {
246
+ $this->log("[JOB] job::{$this->job_id} $msg Try again in {$e->getDelay()} seconds.", self::WARN);
247
+ $this->retryLater($e->getDelay());
248
+ }
249
+ return false;
250
+
251
+ } catch (Exception $e) {
252
+
253
+ $this->finishWithError($e->getMessage());
254
+ return false;
255
+
256
+ }
257
+ }
258
+
259
+ public function acquireLock() {
260
+ $this->log("[JOB] attempting to acquire lock for job::{$this->job_id} on {$this->worker_name}", self::INFO);
261
+
262
+ $lock = $this->runUpdate("
263
+ UPDATE jobs
264
+ SET locked_at = NOW(), locked_by = ?
265
+ WHERE id = ? AND (locked_at IS NULL OR locked_by = ?) AND failed_at IS NULL
266
+ ", array($this->worker_name, $this->job_id, $this->worker_name));
267
+
268
+ if (!$lock) {
269
+ $this->log("[JOB] failed to acquire lock for job::{$this->job_id}", self::INFO);
270
+ return false;
271
+ }
272
+
273
+ return true;
274
+ }
275
+
276
+ public function releaseLock() {
277
+ $this->runUpdate("
278
+ UPDATE jobs
279
+ SET locked_at = NULL, locked_by = NULL
280
+ WHERE id = ?",
281
+ array($this->job_id)
282
+ );
283
+ }
284
+
285
+ public function finish() {
286
+ $this->runUpdate(
287
+ "DELETE FROM jobs WHERE id = ?",
288
+ array($this->job_id)
289
+ );
290
+ $this->log("[JOB] completed job::{$this->job_id}", self::INFO);
291
+ }
292
+
293
+ public function finishWithError($error) {
294
+ $this->runUpdate("
295
+ UPDATE jobs
296
+ SET attempts = attempts + 1,
297
+ failed_at = IF(attempts >= ?, NOW(), NULL),
298
+ error = IF(attempts >= ?, ?, NULL)
299
+ WHERE id = ?",
300
+ array(
301
+ $this->max_attempts,
302
+ $this->max_attempts,
303
+ $error,
304
+ $this->job_id
305
+ )
306
+ );
307
+ $this->log("[JOB] failure in job::{$this->job_id}", self::ERROR);
308
+ $this->releaseLock();
309
+ }
310
+
311
+ public function retryLater($delay) {
312
+ $this->runUpdate("
313
+ UPDATE jobs
314
+ SET run_at = DATE_ADD(NOW(), INTERVAL ? SECOND),
315
+ attempts = attempts + 1
316
+ WHERE id = ?",
317
+ array(
318
+ $delay,
319
+ $this->job_id
320
+ )
321
+ );
322
+ $this->releaseLock();
323
+ }
324
+
325
+ public function getHandler() {
326
+ $rs = $this->runQuery(
327
+ "SELECT handler FROM jobs WHERE id = ?",
328
+ array($this->job_id)
329
+ );
330
+ foreach ($rs as $r) return unserialize($r["handler"]);
331
+ return false;
332
+ }
333
+
334
+ public function getAttempts() {
335
+ $rs = $this->runQuery(
336
+ "SELECT attempts FROM jobs WHERE id = ?",
337
+ array($this->job_id)
338
+ );
339
+ foreach ($rs as $r) return $r["attempts"];
340
+ return false;
341
+ }
342
+
343
+ public static function enqueue($handler, $queue = "default", $run_at = null) {
344
+ $affected = self::runUpdate(
345
+ "INSERT INTO jobs (handler, queue, run_at, created_at) VALUES(?, ?, ?, NOW())",
346
+ array(serialize($handler), (string) $queue, $run_at)
347
+ );
348
+
349
+ if ($affected < 1) {
350
+ self::log("[JOB] failed to enqueue new job", self::ERROR);
351
+ return false;
352
+ }
353
+
354
+ return true;
355
+ }
356
+
357
+ public static function bulkEnqueue($handlers, $queue = "default", $run_at = null) {
358
+ $sql = "INSERT INTO jobs (handler, queue, run_at, created_at) VALUES";
359
+ $sql .= implode(",", array_fill(0, count($handlers), "(?, ?, ?, NOW())"));
360
+
361
+ $parameters = array();
362
+ foreach ($handlers as $handler) {
363
+ $parameters []= serialize($handler);
364
+ $parameters []= (string) $queue;
365
+ $parameters []= $run_at;
366
+ }
367
+ $affected = self::runUpdate($sql, $parameters);
368
+
369
+ if ($affected < 1) {
370
+ self::log("[JOB] failed to enqueue new jobs", self::ERROR);
371
+ return false;
372
+ }
373
+
374
+ if ($affected != count($handlers))
375
+ self::log("[JOB] failed to enqueue some new jobs", self::ERROR);
376
+
377
+ return true;
378
+ }
379
+
380
+ public static function status($queue = "default") {
381
+ $rs = self::runQuery("
382
+ SELECT COUNT(*) as total, COUNT(failed_at) as failed, COUNT(locked_at) as locked
383
+ FROM `jobs`
384
+ WHERE queue = ?
385
+ ", array($queue));
386
+ $rs = $rs[0];
387
+
388
+ $failed = $rs["failed"];
389
+ $locked = $rs["locked"];
390
+ $total = $rs["total"];
391
+ $outstanding = $total - $locked - $failed;
392
+
393
+ return array(
394
+ "outstanding" => $outstanding,
395
+ "locked" => $locked,
396
+ "failed" => $failed,
397
+ "total" => $total
398
+ );
399
+ }
400
+
401
+ }
lib/DJJob/README.textile ADDED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ h1. DJJob
2
+
3
+ p. DJJob allows PHP web applications to process long-running tasks asynchronously. It is a PHP port of "delayed_job":http://github.com/tobi/delayed_job (developed at Shopify), which has been used in production at SeatGeek since April 2010.
4
+
5
+ p. Like delayed_job, DJJob uses a @jobs@ table for persisting and tracking pending, in-progress, and failed jobs.
6
+
7
+ h2. Requirements
8
+
9
+ * PHP5
10
+ * PDO (Ships with PHP >= 5.1)
11
+ * (Optional) PCNTL library
12
+
13
+ h2. Setup
14
+
15
+ bc. mysql db < jobs.sql
16
+
17
+ p. The @jobs@ table structure looks like:
18
+
19
+ bc. CREATE TABLE `jobs` (
20
+ `id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
21
+ `handler` TEXT NOT NULL,
22
+ `queue` VARCHAR(255) NOT NULL DEFAULT 'default',
23
+ `attempts` INT UNSIGNED NOT NULL DEFAULT 0,
24
+ `run_at` DATETIME NULL,
25
+ `locked_at` DATETIME NULL,
26
+ `locked_by` VARCHAR(255) NULL,
27
+ `failed_at` DATETIME NULL,
28
+ `error` TEXT NULL,
29
+ `created_at` DATETIME NOT NULL
30
+ ) ENGINE = INNODB;
31
+
32
+ p. Tell DJJob how to connect to your database:
33
+
34
+ bc. DJJob::configure("mysql:host=127.0.0.1;dbname=djjob_test;port=3306", array('mysql_user' => "root", 'mysql_pass' => "topsecret"));
35
+
36
+ p. If you're using mysql, you'll need to pass the database credentials separately. Otherwise, you can provide those in the connection string, see "http://stackoverflow.com/questions/237367/why-is-php-pdo-dsn-a-different-format-for-mysql-versus-postgresql":http://stackoverflow.com/questions/237367/why-is-php-pdo-dsn-a-different-format-for-mysql-versus-postgresql for an explanation.
37
+
38
+
39
+ h2. Usage
40
+
41
+ p. Jobs are PHP objects that respond to a method @perform@. Jobs are serialized and stored in the database.
42
+
43
+ bc.. class HelloWorldJob {
44
+ public function __construct($name) {
45
+ $this->name = $name;
46
+ }
47
+ public function perform() {
48
+ echo "Hello {$this->name}!\n";
49
+ }
50
+ }
51
+
52
+ DJJob::enqueue(new HelloWorldJob("delayed_job"));
53
+
54
+ p. Unlike delayed_job, DJJob does not have the concept of task priority (not yet at least). Instead, it supports multiple queues. By default, jobs are placed on the "default" queue. You can specifiy an alternative queue like:
55
+
56
+ bc. DJJob::enqueue(new SignupEmailJob("dev@seatgeek.com"), "email");
57
+
58
+ p. At SeatGeek, we run an email-specific queue. Emails have a @sendLater@ method which places a job on the @email@ queue. Here's a simplified version of our base @Email@ class:
59
+
60
+ bc.. class Email {
61
+ public function __construct($recipient) {
62
+ $this->recipient = $recipient;
63
+ }
64
+ public function send() {
65
+ ...do some expensive work to build the email: geolocation, etc..
66
+ ...use mail api to send this email
67
+ }
68
+ public function perform() {
69
+ $this->send();
70
+ }
71
+ public function sendLater() {
72
+ DJJob::enqueue($this, "email");
73
+ }
74
+ }
75
+
76
+ p. Because @Email@ has a @perform@ method, all instances of the email class are also jobs.
77
+
78
+ h2. Running the jobs
79
+
80
+ p. Running a worker is as simple as:
81
+
82
+ bc. $worker = new DJWorker($options);
83
+ $worker->start();
84
+
85
+ p. Initializing your environment, connecting to the database, etc. is up to you. We use symfony's task system to run workers, here's an example of our jobs:worker task:
86
+
87
+ bc.. class jobsWorkerTask extends sfPropelBaseTask {
88
+ protected function configure() {
89
+ $this->namespace = 'jobs';
90
+ $this->name = 'worker';
91
+ $this->briefDescription = '';
92
+ $this->detailedDescription = <<<EOF
93
+ The [jobs:worker|INFO] task runs jobs created by the DJJob system.
94
+ Call it with:
95
+
96
+ [php symfony jobs:worker|INFO]
97
+ EOF;
98
+ $this->addArgument('application', sfCommandArgument::OPTIONAL, 'The application name', 'customer');
99
+ $this->addOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'dev');
100
+ $this->addOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel');
101
+ $this->addOption('queue', null, sfCommandOption::PARAMETER_REQUIRED, 'The queue to pull jobs from', 'default');
102
+ $this->addOption('count', null, sfCommandOption::PARAMETER_REQUIRED, 'The number of jobs to run before exiting (0 for unlimited)', 0);
103
+ $this->addOption('sleep', null, sfCommandOption::PARAMETER_REQUIRED, 'Seconds to sleep after finding no new jobs', 5);
104
+ }
105
+
106
+ protected function execute($arguments = array(), $options = array()) {
107
+ // Database initialization
108
+ $databaseManager = new sfDatabaseManager($this->configuration);
109
+ $connection = Propel::getConnection($options['connection'] ? $options['connection'] : '');
110
+
111
+ $worker = new DJWorker($options);
112
+ $worker->start();
113
+ }
114
+ }
115
+
116
+ p. The worker will exit if the database has any connectivity problems. We use "god":http://god.rubyforge.org/ to manage our workers, including restarting them when they exit for any reason.
117
+
118
+ h3. Changes
119
+
120
+ * Change DJJob::configure to take an options array
121
+ * Eliminated Propel dependency by switching to PDO
lib/DJJob/examples/HelloWorldJob.php ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ class HelloWorldJob {
4
+
5
+ public function __construct($name) {
6
+ $this->name = $name;
7
+ }
8
+
9
+ public function perform() {
10
+ echo "Hello {$this->name}!\n";
11
+ }
12
+
13
+ }
lib/DJJob/jobs.sql ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ CREATE TABLE `jobs` (
2
+ `id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
3
+ `handler` TEXT NOT NULL,
4
+ `queue` VARCHAR(255) NOT NULL DEFAULT 'default',
5
+ `attempts` INT UNSIGNED NOT NULL DEFAULT 0,
6
+ `run_at` DATETIME NULL,
7
+ `locked_at` DATETIME NULL,
8
+ `locked_by` VARCHAR(255) NULL,
9
+ `failed_at` DATETIME NULL,
10
+ `error` TEXT NULL,
11
+ `created_at` DATETIME NOT NULL
12
+ ) ENGINE = INNODB;
lib/DJJob/test/database.php ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?php
2
+
3
+ date_default_timezone_set('America/New_York');
4
+
5
+ require dirname(__FILE__) . "/../DJJob.php";
6
+
7
+ DJJob::configure("mysql:host=127.0.0.1;dbname=djjob", array(
8
+ "mysql_user" => "root",
9
+ "mysql_pass" => "",
10
+ ));
11
+
12
+ DJJob::runQuery("
13
+ DROP TABLE IF EXISTS `jobs`;
14
+ CREATE TABLE `jobs` (
15
+ `id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
16
+ `handler` VARCHAR(255) NOT NULL,
17
+ `queue` VARCHAR(255) NOT NULL DEFAULT 'default',
18
+ `attempts` INT UNSIGNED NOT NULL DEFAULT 0,
19
+ `run_at` DATETIME NULL,
20
+ `locked_at` DATETIME NULL,
21
+ `locked_by` VARCHAR(255) NULL,
22
+ `failed_at` DATETIME NULL,
23
+ `error` VARCHAR(255) NULL,
24
+ `created_at` DATETIME NOT NULL
25
+ ) ENGINE = MEMORY;
26
+ ");
27
+
28
+ class HelloWorldJob {
29
+ public function __construct($name) {
30
+ $this->name = $name;
31
+ }
32
+ public function perform() {
33
+ echo "Hello {$this->name}!\n";
34
+ sleep(1);
35
+ }
36
+ }
37
+
38
+ class FailingJob {
39
+ public function perform() {
40
+ sleep(1);
41
+ throw new Exception("Uh oh");
42
+ }
43
+ }
44
+
45
+ var_dump(DJJob::status());
46
+
47
+ DJJob::enqueue(new HelloWorldJob("delayed_job"));
48
+ DJJob::bulkEnqueue(array(
49
+ new HelloWorldJob("shopify"),
50
+ new HelloWorldJob("github"),
51
+ ));
52
+ DJJob::enqueue(new FailingJob());
53
+
54
+ $worker = new DJWorker(array("count" => 5, "max_attempts" => 2, "sleep" => 10));
55
+ $worker->start();
56
+
57
+ var_dump(DJJob::status());
package.xml ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <?xml version="1.0"?>
2
+ <package>
3
+ <name>Jowens_JobQueue</name>
4
+ <version>0.1.0</version>
5
+ <stability>stable</stability>
6
+ <license uri="http://opensource.org/licenses/MIT">MIT</license>
7
+ <channel>community</channel>
8
+ <extends/>
9
+ <summary>Asynchronous job queue for Magento</summary>
10
+ <description>JobQueue allows jobs to be queued in the database to be processed asynchronously.</description>
11
+ <notes>First release.</notes>
12
+ <authors><author><name>Jordan Owens</name><user>jkowens</user><email>jkowens@gmail.com</email></author></authors>
13
+ <date>2013-01-06</date>
14
+ <time>03:11:58</time>
15
+ <contents><target name="magecommunity"><dir name="Jowens"><dir name="JobQueue"><dir name="Block"><dir name="Adminhtml"><dir name="Job"><file name="View.php" hash="3e79a5273a917b9062472a1229821421"/></dir><dir name="Queue"><file name="Grid.php" hash="9618ecc67272f750c88e7e19680aaa44"/></dir><file name="Queue.php" hash="719a95c4bde9c4c109b159d0667de2da"/></dir></dir><dir name="Helper"><file name="Data.php" hash="8c8b3a2b79a1546e8b6600dde974c049"/></dir><dir name="Model"><dir name="Job"><file name="Abstract.php" hash="922d6f9eba68200a173bfc1e245991cb"/><file name="Order.php" hash="7499bc4148f3c9ea581147ed28416ae7"/></dir><file name="Job.php" hash="f0b9928c1063dc3d90087fb69de16fa4"/><dir name="Resource"><dir name="Job"><file name="Collection.php" hash="a175198f2c3d252dadb817d108920b3f"/></dir><file name="Job.php" hash="30843d577f463d43ed4e3807187e8248"/><file name="Setup.php" hash="911413029124da3964d9822926fb44de"/></dir><file name="Worker.php" hash="ca270c525862c1d86c86e4ca4b8901c6"/></dir><dir name="controllers"><dir name="Adminhtml"><file name="QueueController.php" hash="a7c9f70edc34f41b416415dca2e8b44c"/></dir><file name="IndexController.php" hash="d9c6df399193307b2e2ed053e8bedd6b"/></dir><dir name="etc"><file name="adminhtml.xml" hash="885171a59394683e2a8ec1d5701c0817"/><file name="config.xml" hash="160c25da367ccd4489bb179e0b5d8fdc"/><file name="system.xml" hash="3a8426d2f3c9a3f29d4adff6733d3ea9"/></dir><dir name="sql"><dir name="jobqueue_setup"><file name="mysql4-install-0.1.0.php" hash="19592c4c921f2e1c64e85c1a776edda2"/></dir></dir></dir></dir></target><target name="mageetc"><dir name="modules"><file name="Jowens_JobQueue.xml" hash="272f42382ccc1b0226c7e25c078d54ae"/></dir></target><target name="magedesign"><dir name="adminhtml"><dir name="default"><dir name="default"><dir name="layout"><dir name="jowens"><file name="jobqueue.xml" hash="491d99d8da67cc879386dda4ef90f285"/></dir></dir><dir name="template"><dir name="jowens"><dir name="jobqueue"><file name="job.phtml" hash="1511cf6f2b85b62f77d6ce741004875e"/></dir></dir></dir></dir></dir></dir></target><target name="magelib"><dir name="DJJob"><file name="DJJob.php" hash="2cb422e394a1adc800406c371244a089"/><file name="README.textile" hash="ae3feeccf3476b207a05894aabf4afaf"/><dir name="examples"><file name="HelloWorldJob.php" hash="3b7a9e4b1f912fb48acf5399f5fe33b9"/></dir><file name="jobs.sql" hash="d73a8213feedadf9dc9eb719fe33b935"/><dir name="test"><file name="database.php" hash="1500daa862ebe81e488590321520518d"/></dir><file name=".git" hash="646286b169bfd57392b2070fed759fe0"/></dir></target></contents>
16
+ <compatible/>
17
+ <dependencies><required><php><min>5.1.0</min><max>6.0.0</max></php></required></dependencies>
18
+ </package>