Skip to content

Latest commit

 

History

History
257 lines (255 loc) · 8.96 KB

RECRUITER.org

File metadata and controls

257 lines (255 loc) · 8.96 KB

Recruiter

What

It’s an implementation of a job queue with the following goals

  • Reliability: a job must never be lost, a job must never be done twice unless we want it
  • Introspectability: we must always know the history of what happend and be able to recover
  • Horizontal Scale: we must be able to add workers linearly
  • Retry Policies: we must be able to define complex custom logic to know if and when to retry to do a failed job

History

First implementation was really simple, many Workers contending all the jobs

> c
recruiter                     	2806/14MB
recruiter_history             	1969359/1GB

One collection recruiter containing all the jobs

  {
	"_id" : ObjectId("58a575cc6240a6e9458b6827"),
	"active" : true,
	"done" : false,
	"created_at" : ISODate("2017-02-16T09:50:04.830Z"),
	"attempts" : NumberLong(4),
	"locked" : false,
	"tags" : [
		"notification"
	],
	"method" : "runCommand",
	"class" : "Subeng\\Notification\\NotificationJob",
	"parameters" : {
		"name" : "abort",
		"subscription_id" : "58a180ef6140a65c5eed3870"
	},
	"last_execution" : {
		"scheduled_at" : ISODate("2017-02-16T09:57:05Z"),
		"started_at" : ISODate("2017-02-16T09:57:06.364Z"),
		"ended_at" : ISODate("2017-02-16T09:57:06.540Z"),
		"class" : "LogicException",
		"message" : "404 Not Found-{\"error\":\"onebip.subscription.subscription-not-found\",\"message\":\"exceptions.onebip.subscription.subscription-not-found\"}",
		"trace" : "404 Not Found-{\"error\":\"onebip.subscription.subscription-not-found\",\"message\":\"exceptions.onebip.subscription.subscription-not-found\"}",
		"pid" : NumberLong(18519),
		"hostname" : "production-ws-0998bcbbe44e931e8"
	},
	"shell" : "Onebip\\Recruiter\\Shell",
	"scheduled_at" : ISODate("2017-02-16T10:05:06Z")
  }

Every worker contending the jobs

$this->collection->findAndModify(
    [
        'scheduled_at' => ['$lt' => new \MongoDate()],
        'tags' => ['$all' => $workOnTags],
        'locked' => false,
    ],
    [
        '$set' => ['locked' => true]
    ],
    [
        'sort' => ['scheduled_at' => 1],
    ]
);

Here’s a locked job with the associated worker informations

  {
	"_id" : ObjectId("553a077e5b1861650d8b9076"),
	"active" : true,
	"done" : false,
	"created_at" : ISODate("2015-04-24T09:06:06.036Z"),
	"attempts" : NumberLong(1),
	"locked" : true,
	"tags" : [
		"generic"
	],
	"scheduled_at" : ISODate("2015-04-24T09:06:06.036Z"),
	"method" : "runCommand",
	"class" : "Subeng\\Operation\\Task\\TaskExecutionJob",
	"parameters" : {
		"name" : "Retry",
		"argument" : "53a00281529af2d1718bc008"
	},
	"worker" : {
		"started_at" : ISODate("2015-04-24T09:06:18.043Z"),
		"pid" : NumberLong(3424),
		"hostname" : "production-subengws-1417517940"
	},
	"shell" : "Onebip\\Recruiter\\Shell"
  }

There were a few problems, can you spot them?

Solution (Version 2.0)

A few more collections

> c
archived                      	0/20KB
metadata                      	1/68KB
roster                        	10/104KB
scheduled                     	0/28KB

Every worker registers itself as available to work when it’s born

  > db.roster.findOne()
  {
	"_id" : ObjectId("58a57442419e3e7a4d8b4567"),
	"work_on" : "*",
	"available" : true,
	"available_since" : ISODate("2017-02-16T09:43:30.678Z"),
	"last_seen_at" : ISODate("2017-02-16T09:44:08.917Z"),
	"created_at" : ISODate("2017-02-16T09:43:30.678Z"),
	"working" : false,
	"pid" : NumberLong(19834)
  }

When there’s something todo a job is created in the scheduled collection

  > db.scheduled.last().pretty()
  {
	"_id" : ObjectId("58a57c2f419e3e1d328b4ee9"),
	"done" : false,
	"created_at" : ISODate("2017-02-16T10:17:19.669Z"),
	"locked" : false,
	"attempts" : NumberLong(0),
	"group" : "generic",
	"workable" : {
		"class" : "Recruiter\\Workable\\LazyBones",
		"parameters" : {
			"us_to_sleep" : NumberLong(200000),
			"us_of_delta" : NumberLong(100000)
		},
		"method" : "execute"
	},
	"scheduled_at" : ISODate("2017-02-16T10:17:19.669Z"),
	"retry_policy" : {
		"class" : "Recruiter\\RetryPolicy\\DoNotDoItAgain",
		"parameters" : [ ]
	}
  }

Now we have two special processes

  • Recruiter: for every available worker picks a job for it and we assigns the job to it
  • Cleaner: removes documents that represents dead processes and unlocks jobs that were locked by dead workers

This is how it looks a locked job

  > db.scheduled.find({locked: true}).limit(1).pretty()
  {
	"_id" : ObjectId("58a57cb4419e3e1d328b5b4a"),
	"done" : false,
	"created_at" : ISODate("2017-02-16T10:19:32.969Z"),
	"locked" : true,
	"attempts" : NumberLong(1),
	"group" : "generic",
	"workable" : {
		"class" : "Recruiter\\Workable\\LazyBones",
		"parameters" : {
			"us_to_sleep" : NumberLong(200000),
			"us_of_delta" : NumberLong(100000)
		},
		"method" : "execute"
	},
	"scheduled_at" : ISODate("2017-02-16T10:19:32.969Z"),
	"retry_policy" : {
		"class" : "Recruiter\\RetryPolicy\\DoNotDoItAgain",
		"parameters" : [ ]
	},
	"last_execution" : {
		"scheduled_at" : ISODate("2017-02-16T10:19:32.969Z"),
		"started_at" : ISODate("2017-02-16T10:20:56.188Z")
	}
  }

Note that there isn’t any reference to the assigned worker

This is how looks a worker assigned to a job

  {
	"_id" : ObjectId("58a57442419e3e894d8b4567"),
	"work_on" : "*",
	"available" : false,
	"available_since" : ISODate("2017-02-16T10:22:25.472Z"),
	"last_seen_at" : ISODate("2017-02-16T10:22:25.675Z"),
	"created_at" : ISODate("2017-02-16T09:43:30.827Z"),
	"working" : true,
	"pid" : NumberLong(19849),
	"assigned_to" : {
		"58a57442419e3e7a4d8b4567" : ObjectId("58a57d0d419e3e1d328b63a7"),
		"58a57442419e3e7c4d8b4567" : ObjectId("58a57d0d419e3e1d328b63a8"),
		"58a57442419e3e764d8b4567" : ObjectId("58a57d0e419e3e1d328b63a9"),
		"58a57442419e3e7e4d8b4567" : ObjectId("58a57d0e419e3e1d328b63aa"),
		"58a57442419e3e784d8b4567" : ObjectId("58a57d0e419e3e1d328b63ab"),
		"58a57442419e3e844d8b4567" : ObjectId("58a57d0e419e3e1d328b63ac"),
		"58a57442419e3e874d8b4567" : ObjectId("58a57d0e419e3e1d328b63ad"),
		"58a57442419e3e894d8b4567" : ObjectId("58a57d0e419e3e1d328b63ae")
	},
	"assigned_since" : ISODate("2017-02-16T10:22:25.560Z"),
	"working_on" : ObjectId("58a57d0e419e3e1d328b63ae"),
	"working_since" : ISODate("2017-02-16T10:22:25.675Z")
  }

Note that it looks like it has been assigned to many jobs but it’s not, assigned_to it’s a map from worker ids to job ids, every worker will search in this map its id (in this case “58a57442419e3e894d8b4567”, hint: the last one) to find the real job id it has been assigned to (in this case ObjectId(“58a57d0e419e3e1d328b63ae”), it’s a performance trick to be able to bulk update all workers in one database operation.

This is how it looks a job that it has been successfully executed

  > db.archived.last().pretty()
  {
	"_id" : ObjectId("58a57fdc419e3e1d328ba6e3"),
	"done" : true,
	"created_at" : ISODate("2017-02-16T10:33:00.295Z"),
	"locked" : false,
	"attempts" : NumberLong(1),
	"group" : "generic",
	"workable" : {
		"class" : "Recruiter\\Workable\\LazyBones",
		"parameters" : {
			"us_to_sleep" : NumberLong(200000),
			"us_of_delta" : NumberLong(100000)
		},
		"method" : "execute"
	},
	"retry_policy" : {
		"class" : "Recruiter\\RetryPolicy\\DoNotDoItAgain",
		"parameters" : [ ]
	},
	"why" : "done",
	"last_execution" : {
		"scheduled_at" : ISODate("2017-02-16T10:33:00.295Z"),
		"started_at" : ISODate("2017-02-16T10:34:22.687Z"),
		"ended_at" : ISODate("2017-02-16T10:34:22.789Z")
	}
  }

Explanation

This is much better because

  • We don’t have contention when picking jobs because there’s only one recruiter process so it doesn’t have to lock anything, when a worker it’s available it can’t suddenly become unavailable because the only one who can assign jobs is the recruiter. What can happen is that a worker can become available in the meantime but who cares, it will be considered in the next round of jobs assignment
  • Workers don’t have contention, they can freely modify their own document (in roster collection) because they are the only one who are supposed to do that, they can freely modify their own assigned job document beucase they are the only one who are supposed to do that after the assignment from the recruiter
  • Bulk updates are faster
    $roster->update(
        $where = ['_id' => ['$in' => array_values($workers)]],
        $update = ['$set' => [
            'available' => false,
            'assigned_to' => $assignments,
            'assigned_since' => T\MongoDate::now()
        ]],
        ['multiple' => true]
    );