forked from chrisboulton/php-resque-scheduler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathResqueScheduler.php
271 lines (243 loc) · 7.72 KB
/
ResqueScheduler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
<?php
/**
* ResqueScheduler core class to handle scheduling of jobs in the future.
*
* @package ResqueScheduler
* @author Chris Boulton <[email protected]>
* @copyright (c) 2012 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class ResqueScheduler
{
const VERSION = "0.1";
/**
* Enqueue a job in a given number of seconds from now.
*
* Identical to Resque::enqueue, however the first argument is the number
* of seconds before the job should be executed.
*
* @param int $in Number of seconds from now when the job should be executed.
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
*/
public static function enqueueIn($in, $queue, $class, array $args = array())
{
self::enqueueAt(time() + $in, $queue, $class, $args);
}
/**
* Enqueue a job for execution at a given timestamp.
*
* Identical to Resque::enqueue, however the first argument is a timestamp
* (either UNIX timestamp in integer format or an instance of the DateTime
* class in PHP).
*
* @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
*/
public static function enqueueAt($at, $queue, $class, $args = array())
{
self::validateJob($class, $queue);
$job = self::jobToHash($queue, $class, $args);
self::delayedPush($at, $job);
Resque_Event::trigger('afterSchedule', array(
'at' => $at,
'queue' => $queue,
'class' => $class,
'args' => $args,
));
}
/**
* Directly append an item to the delayed queue schedule.
*
* @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
* @param array $item Hash of item to be pushed to schedule.
*/
public static function delayedPush($timestamp, $item)
{
$timestamp = self::getTimestamp($timestamp);
$redis = Resque::redis();
$redis->rpush('delayed:' . $timestamp, json_encode($item));
$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
}
/**
* Get the total number of jobs in the delayed schedule.
*
* @return int Number of scheduled jobs.
*/
public static function getDelayedQueueScheduleSize()
{
return (int)Resque::redis()->zcard('delayed_queue_schedule');
}
/**
* Get the number of jobs for a given timestamp in the delayed schedule.
*
* @param DateTime|int $timestamp Timestamp
* @return int Number of scheduled jobs.
*/
public static function getDelayedTimestampSize($timestamp)
{
$timestamp = self::toTimestamp($timestamp);
return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
}
/**
* Remove a delayed job from the queue
*
* note: you must specify exactly the same
* queue, class and arguments that you used when you added
* to the delayed queue
*
* also, this is an expensive operation because all delayed keys have tobe
* searched
*
* @param $queue
* @param $class
* @param $args
* @return int number of jobs that were removed
*/
public static function removeDelayed($queue, $class, $args)
{
$destroyed=0;
$item=json_encode(self::jobToHash($queue, $class, $args));
$redis=Resque::redis();
foreach($redis->keys('delayed:*') as $key)
{
$key=$redis->removePrefix($key);
$destroyed+=$redis->lrem($key,0,$item);
}
return $destroyed;
}
/**
* removed a delayed job queued for a specific timestamp
*
* note: you must specify exactly the same
* queue, class and arguments that you used when you added
* to the delayed queue
*
* @param $timestamp
* @param $queue
* @param $class
* @param $args
* @return mixed
*/
public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
{
$key = 'delayed:' . self::getTimestamp($timestamp);
$item = json_encode(self::jobToHash($queue, $class, $args));
$redis = Resque::redis();
$count = $redis->lrem($key, 0, $item);
self::cleanupTimestamp($key, $timestamp);
return $count;
}
/**
* Generate hash of all job properties to be saved in the scheduled queue.
*
* @param string $queue Name of the queue the job will be placed on.
* @param string $class Name of the job class.
* @param array $args Array of job arguments.
*/
private static function jobToHash($queue, $class, $args)
{
return array(
'class' => $class,
'args' => array($args),
'queue' => $queue,
);
}
/**
* If there are no jobs for a given key/timestamp, delete references to it.
*
* Used internally to remove empty delayed: items in Redis when there are
* no more jobs left to run at that timestamp.
*
* @param string $key Key to count number of items at.
* @param int $timestamp Matching timestamp for $key.
*/
private static function cleanupTimestamp($key, $timestamp)
{
$timestamp = self::getTimestamp($timestamp);
$redis = Resque::redis();
if ($redis->llen($key) == 0) {
$redis->del($key);
$redis->zrem('delayed_queue_schedule', $timestamp);
}
}
/**
* Convert a timestamp in some format in to a unix timestamp as an integer.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* @return int Timestamp
* @throws ResqueScheduler_InvalidTimestampException
*/
private static function getTimestamp($timestamp)
{
if ($timestamp instanceof DateTime) {
$timestamp = $timestamp->getTimestamp();
}
if ((int)$timestamp != $timestamp) {
throw new ResqueScheduler_InvalidTimestampException(
'The supplied timestamp value could not be converted to an integer.'
);
}
return (int)$timestamp;
}
/**
* Find the first timestamp in the delayed schedule before/including the timestamp.
*
* Will find and return the first timestamp upto and including the given
* timestamp. This is the heart of the ResqueScheduler that will make sure
* that any jobs scheduled for the past when the worker wasn't running are
* also queued up.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* Defaults to now.
* @return int|false UNIX timestamp, or false if nothing to run.
*/
public static function nextDelayedTimestamp($at = null)
{
if ($at === null) {
$at = time();
}
else {
$at = self::getTimestamp($at);
}
$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
if (!empty($items)) {
return $items[0];
}
return false;
}
/**
* Pop a job off the delayed queue for a given timestamp.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* @return array Matching job at timestamp.
*/
public static function nextItemForTimestamp($timestamp)
{
$timestamp = self::getTimestamp($timestamp);
$key = 'delayed:' . $timestamp;
$item = json_decode(Resque::redis()->lpop($key), true);
self::cleanupTimestamp($key, $timestamp);
return $item;
}
/**
* Ensure that supplied job class/queue is valid.
*
* @param string $class Name of job class.
* @param string $queue Name of queue.
* @throws Resque_Exception
*/
private static function validateJob($class, $queue)
{
if (empty($class)) {
throw new Resque_Exception('Jobs must be given a class.');
}
else if (empty($queue)) {
throw new Resque_Exception('Jobs must be put in a queue.');
}
return true;
}
}