Agenda

We will discuss a strategy on how to cleverly distribute tasks between different queues and workers. This post is specific to Python RQ, though the learning can be applied to Celery or any other task queue.

Often we have to make the following decisions when writing tasks:

  • Should all my tasks be on the same queue
  • How many queues should my application have
  • How should I determine the queue responsible for a task
  • How should I determine the number of workers for a queue
  • Should a worker consume from multiple queues

This post attempts to answer such questions.

Setup

We will use Python RQ in this post. It is a simple and lightweight alternative to Celery. Quoting rq homepage:

It is backed by Redis and it is designed to have a low barrier to entry.

Install rq in your virtual environment.

pip install rq

We will use the awesome requests library too.

pip install requests

rq needs Redis as the message queue. Start redis.

$ redis-server

Tasks

Let's assume our application has 4 tasks. 2 of the tasks are network bound tasks while the other two are CPU bound. Check this SO link to know what network bound and CPU bound mean.

Network bound tasks

Let's define the first task named check_status.

Tasks are nothing but normal functions. Depending on how they are invoked, they either execute synchronously as a function or asynchronoously as a task.

The code for check_status is:

import redis
import requests
from requests import ConnectTimeout, ReadTimeout, ConnectionError
from rq.decorators import job

connection = redis.Redis()

@job('network', connection=connection)
def check_status(url):
    """
    Given a url checks whether the service is up or down.
    """
    try:
        _ = requests.get(url)
        return True
    except (ConnectTimeout, ReadTimeout, ConnectionError) as ex:
        return False

Explanation for check_status

If you have used rq before and are comfortable with the code, skip to the next section.

We defined our imports. Then we created an instance of Redis connection. This is needed because invoking the tasks would push the tasks on redis and worker will later consume from redis to process it.

We defined a simple function called check_status. It takes a url and figures out if the url is reachable or unreachable.

We then decorated this function using a decorator called job. This decorator is provided by rq. The decorator adds an ability on the function so it could be invoked asynchronously.

Since it can be invoked asynchronously, so hereafter we will call it a task.

Also we want this task to be queued on a queue named network. Had we wanted to queue it on a queue named integrations, we would have used @job('integrations', connection=connection).

Rq supports multiple queues and tasks can be queued on any of the queues. We will soon see advantages of having multiple queues and how to determine on which queue a task should be placed.

Let's try the synchronous invocation before we get into asynchrounous invocation.

In [1]: from tasks import check_status

In [2]: is_url_reachable = check_status('https://www.urbanpiper.com/')

You would have noticed a slight delay between issuing the statement and getting the response. As you know, this happened because our task is network bound.

Let's exploit the ability added by the @job decorator and invoke the task asynchronosly.

In [3]: is_url_reachable = check_status.delay('https://www.urbanpiper.com/')

You would notice that there is no delay between issuing the statement and getting the response. But actual execution of the task hasn't happened yet and can only happen if a worker is running.

Also is_url_reachable is an instance of class Job and is not a boolean anymore. This happened because we have used delay on the function which causes rq to kick in and hence change the return value of the function.

To get actual return value of the task, we need to use result attribute on job instance.

In [3]: print(is_url_reachable.result)
None

Because task hasn't execute yet so the output is None.

For actual execution, we will have to start an rq worker. Issue command rq worker network on other shell:

$ rq worker network

rq worker is the command to start the worker. We provided another argument called network because we want the worker to consume the tasks from queue network.

Check is_url_reachable.result again and it would tell if the url is reachable or not.

In [4]: print(is_url_reachable.result)
True

Another task

Let's add another network bound task called count_words.

@job('network', connection=connection)
def count_words(url):
    """
    Given a url, returns the number of words in the response.
    """
    try:
        response = requests.get(url)
        words_count = len(response.text.split(' '))
    except Exception as _:
        words_count = -1
    return words_count

We have set this task too to be placed on queue network.

It's possible that a particular network call takes 5 seconds. Let's mimic a time consuming network call by putting a time.sleep(5) in check_status.

@job('network', connection=connection)
def check_status(url):
    """
    Given a url checks whether the service is up or down.
    """
    time.sleep(5)
    try:
        _ = requests.get(url)
        return True
    except (ConnectTimeout, ReadTimeout, ConnectionError) as _:
        return False

You would also need to add proper import statement.

import time

Restart the worker so that modified code takes effect.

Restart python shell and invoke check_status and count_words one after the other.

In [1]: from tasks import check_status, count_words

In [2]: check_status.delay('https://www.urbanpiper.com/')

In [3]: count_words.delay('https://www.urbanpiper.com/')

On the worker console you would notice that count_words gets a chance to execute after check_status is completed. check_status took more than 5 seconds and during this time CPU was available since check_status is a network bound task. Still count_words had to wait during this 5 seconds while it could have been executed as soon as the CPU was available.

It would be clever to run multiple instances of worker so that tasks aren't blocked because of another long running task. Also it would lead to better utilization of CPU.

Start another instance of rq worker network. Now we have two worker instances running.

Let's again invoke check_status and count_words in quick succession.

In [1]: from tasks import check_status, count_words

In [2]: check_status.delay('https://www.urbanpiper.com/')

In [3]: count_words.delay('https://www.urbanpiper.com/')

You would notice that first instance of worker would start executing check_status. And second worker would start executing count_words. count_words didn't have to wait for 5 seconds for check_status to complete.

If your queue is going to have network bound tasks, then it makes sense to create multiple workers to consume from this queue.

I would probably create 3 workers per core to process network bound tasks. eg: If I have two cores, I might create six workers to consume network bound tasks.

However creating multiple workers per core isn't advisable for CPU bound tasks. Let's see that in detail in the following section.

CPU bound task

Let's assume you have a CPU bound task. To mimic such a task, let's create a task which loops 100 million times.

@job('cruncher', connection=connection)
def loop_n():
    num_times = 100000000
    for _ in range(num_times):
        pass
    return "processed"

We want to place this task on queue cruncher. The task takes approximately 2 seconds to complete on a 2GHz processor. It might take a little less or more depending on your processor.

As is evident, this task is not I/O or network bound. Instead it is CPU bound.

Let's assume you have a single core machine. In such cases, you should have a single worker instance to process this CPU bound task. Else the CPU time would be sliced between multiple workers and the latency of each task would increase. Let's see that in practise.

Create a single instance of rq worker and see how long it takes for loop_n to complete.

$ rq worker cruncher

Start a python shell and queue loop_n.

In [1]: from tasks import  loop_n

In [2]: loop_n.delay()
Out[2]: Job('b1194d26-d8c3-48f4-aa9c-40e9dfbbd9f3', enqueued_at=datetime.datetime(2020, 10, 24, 13, 52, 8, 639240))

Notice the output on the worker shell.

19:22:08 cruncher: tasks.loop_n() (b1194d26-d8c3-48f4-aa9c-40e9dfbbd9f3)
19:22:10 cruncher: Job OK (b1194d26-d8c3-48f4-aa9c-40e9dfbbd9f3)

The worker picked up the task for execution at 19:22:08. Execution was completed at 19:22:10. The task took 2 seconds.

Start another instance of rq worker cruncher on another shell. Start two loop_n tasks in quick succession.

In [3]: loop_n.delay()
Out[3]: Job('b1194d26-d8c3-48f4-aa9c-40e9dfbbd9f3', enqueued_at=datetime.datetime(2020, 10, 24, 13, 53, 8, 639240))
In [4]: loop_n.delay()
Out[4]: Job('b1194d26-48f4-d8c3-aa9c-40e9dfbbd9f3', enqueued_at=datetime.datetime(2020, 10, 24, 13, 53, 8, 639240))

Notice the output on both the worker shells. You would notice that both tasks would have completed almost at same time. However both would have taken approximately 4 seconds i.e 2x the initial time. This would happen because the CPU was split between both the tasks. So the latency of the tasks went up by 2x.

However if your system has multiple cores then both tasks would be completed within 2 seconds since they would be concurrently processed on different cores.
To make them take 4 seconds, you would have to start one task more than the number of cores. If your machine has 4 cores, then you need to start atleast 5 tasks to notice this increased latency behaviour.

Learning

So increasing the number of workers helped us increase the throughput without affecting latency for network bound tasks. Whereas increasing the number of workers reduced the latency for CPU bound tasks and kept the throughput same.

So increasing the number of workers can only help when network bound tasks are involved.

Mix of network bound and CPU bound tasks

Any application would rarely have tasks which are either network bound or CPU bound. Some tasks would be network bound while other would be CPU bound. Infact there would be scenarios where it's hard to categorize a task as network or CPU bound.

Considering the simple case where every task can be categorized as network bound or CPU bound. In our application we have 2 network bound tasks and 1 CPU bound task. Let's create one more CPU bound task.

@job('cruncher', connection=connection)
def add_n():
    num_times = 100000000
    return sum(i for i in range(num_times))

Restart the worker so that worker becomes aware about this task.

$ rq worker cruncher

Start a Python shell and enqueue this task. Notice how long it takes for the worker to execute it.

15:44:21 cruncher: tasks.add_n() (28563726-35a5-47c5-969a-6226198c2d30)
15:44:26 cruncher: Job OK (28563726-35a5-47c5-969a-6226198c2d30)

It would almost take the worker 5 seconds to execute this.

How many workers to start

The number of processors would largely determine the ideal number of workers that should be started.

On a 2 core machine, I would probably have 2 worker instances consuming from queue network while 2 worker instances consuming from queue cruncher.
So 1 cruncher worker can use 1 CPU each if concurrent CPU tasks are being executed. And since network tasks need extremely minor CPU cycle, so they can get a time slice from the CPUs.

On a single core machine, I would start a single cruncher worker because otherwise the CPU time would be split between multiple cruncher workers in case multiple tasks are being executed at the same time. I would still keep number of network workers as 2 since our network tasks don't need much of CPU.
Having two worker processes for 'cruncher' would make the tasks being executed on two 'cruncher' to contend for the CPU and hence will increase the latency of both 'crunchers'. Most of the time you would expect that first task gets completed and then the next task execute.

On a 3 core machine, I would probably have 3 worker instances for queue cruncher and 6 worker instances for queue network.

Should workers consume from multiple queue

A worker can be configured to consume from multiple queue in the following way:

$ rq worker network cruncher

The downside of this is if a task is added to queue network, this worker might consume it. And if at the same moment another task is queued on cruncher then the worker would be blocked on network task and hence wouldn't be able to process the task which it was supposed to process.

That's why it makes sense to create separate instances of worker for network bound tasks and separate instances of worker for CPU bound tasks.

So you can instead create two or more instances of rq worker network and separate instances of rq worker cruncher. Number of rq worker cruncher should be decided based on number of cores on the machine.

High priority tasks

It's possible that your application has few tasks which should be executed as soon as they are queued. In such cases, you should create a separate queue for such tasks, instead of using the regular queue for them.

Let's create a high priority task called factorial and set it's queue as high.

@job('high', connection=connection)
def factorial(n):
    product = 1
    while (n != 0):
        product = product * n
        n = n - 1
    return product

We will have the worker listening on this queue.

$ rq worker high

It's a CPU bound task. In case it's put on queue cruncher it's possible that there are several queued tasks add_n or loop_n. In such case they will block factorial which isn't desirable.

That's why we should use a separate queue for factorial and as soon as it's queued it would get a chance to execute.

Even if there is a single CPU and there are other tasks executing, because a worker would be available for factorial, the worker will pick up factorial and will get a chance to execute.

Takeaway

It's important to have clarity around whether tasks are network bound or cpu bound. They should be properly categorized and kept on corresponding queues.

If you have several network bound tasks, it makes sense to keep them on the same queue so they are logically grouped together. And you should have multiple workers to consume from this queue.

If you have several CPU bound tasks, it makes sense to use the same queue for them. And depending on their CPU need, probably keep one worker per core for cpu bound tasks.

Keep a separate queue and worker for highest priority tasks so irrespective of which tasks are being executed, these highest priority task get some CPU share instead of being blocked in the queue because of other time consuming tasks.

The Essential Plug

UrbanPiper is a growing team and we are looking for smart engineers to work with us on problems in the sphere of data, scale and high-throughput integrations.
If you are interested to be a part of our team, feel free to write in to us at [email protected]