My project involves processing images for clients en mass. Clients send image files zipped up, which fires off ImageMagick command-line scripts per image. The problem I am trying to solve is that if these commands are queued in the order I receive them, then a client that needs to process 10k images will hog all resources for hours. My solution is to round-robin each client's queues, so that everyone slows each other down equally. I have created this class to implement this:
class QueueBalancer():
def __init__(self, cycle_list=[]):
self.cycle_list = cycle_list
self.update_status()
def cmd_gen(self):
index = -1
while True:
try:
if self.cycle_list:
self.processing = True
index += 1
commands = self.cycle_list[index]["commands"]
if commands:
command = commands.pop(0)
if len(commands) == 0:
del self.cycle_list[index]
index -= 1
self.update_status()
yield command
else:
yield None
except IndexError:
index = -1
def get_user(self, user):
return next((item for item in self.cycle_list[0] if item["user"] == user), None)
def create_or_append(self, user, commands):
existing_user = self.get_user(user)
if existing_user:
index = self.cycle_list.index(existing_user)
self.cycle_list[index]["commands"] += commands
else:
self.cycle_list += [{
"user" : user,
"commands" : commands
}]
def update_status(self):
if next((item for item in self.cycle_list if item["commands"] != []), None):
self.processing = True
else:
self.processing = False
def status(self):
return self.processing
As you can see from the else clause of create_or_append(), the cycle_list is a list of dictionaries like this:
{"user": "test1", "commands": ["command1", "command2"]},
{"user": "test2", "commands": ["x", "y", "z"]},
{"user": "test3", "commands": ["a", "b", "c"]}
(real commands removed, sample strings used)
A single instance of cmd_gen() will be used to feed commands into my shell, and I will use create_or_append() to add in users and commands on-the-fly, while commands in the queue are still being processed. This seems to work great so far in my initial tests, but is this theoretically thread safe? If not, what would I need to do to make sure it is?
I thought I'd have a shot at creating a generic balanced queue like you described - here's the result. I think there are still some pathological cases where a user could have many jobs processed sequentially, but it would involve other users jobs being added a specific times/orders, so I don't think it would happen in the real works and couldn't be exploited unless multiple users colluded.
from threading import Lock
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = {}
self._user_list = []
self._user_index = 0
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
if self._user_index >= len(self._user_list):
self._user_index = 0
user = self._user_list[self._user_index]
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
self._delete_current_user()
self._user_index += 1
return user, job
def _delete_current_user(self):
user = self._user_list.pop(self._user_index)
del self._user_jobs[user]
def add_user_job(self, user, job):
with self._lock:
if user not in self._user_jobs:
self._user_list.append(user)
self._user_jobs[user] = []
self._user_jobs[user].append(job)
if __name__ == "__main__":
q = UserBalancedJobQueue()
q.add_user_job("tom", "job1")
q.add_user_job("tom", "job2")
q.add_user_job("tom", "job3")
q.add_user_job("fred", "job4")
q.add_user_job("fred", "job5")
for i in xrange(3):
print q.pop_user_job()
print "Adding more jobs"
q.add_user_job("dave", "job6")
q.add_user_job("dave", "job7")
q.add_user_job("dave", "job8")
q.add_user_job("dave", "job9")
try:
while True:
print q.pop_user_job()
except ValueError:
pass
Thinking more about it, an alternative implementation would be to remember for each user when their last job was run, and then choose the next user based who's last job was the oldest. It would probably be more 'correct' but it would have the (probably negligible) extra memory overhead of remembering last job time for every user.
Edit: So it's a slow day - here's that other approach. I think I prefer it to the above, though it's slower due to O(N) search for the user with the oldest previous job.
from collections import defaultdict
from threading import Lock
import time
class UserBalancedJobQueue(object):
def __init__(self):
self._user_jobs = defaultdict(list)
self._user_last_run = defaultdict(lambda: 0.0)
self._lock = Lock()
def pop_user_job(self):
with self._lock:
if not self._user_jobs:
raise ValueError("No jobs to run")
user = min(
self._user_jobs.keys(),
key=lambda u: self._user_last_run[u]
)
self._user_last_run[user] = time.time()
jobs = self._user_jobs[user]
job = jobs.pop(0)
if not jobs:
del self._user_jobs[user]
return user, job
def add_user_job(self, user, job):
with self._lock:
self._user_jobs[user].append(job)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With