Agenda

This post concerns you if your application extensively relies on queues and workers.

We discuss a strategy to identify slow executing tasks which can lead to pile up of tasks in the queue and ultimately collapse of the queue infrastructure.

We will use Redis and Python-RQ for this post. But the strategy can equally well be applied to any queue and worker combination. We have successfully deployed this strategry for Celery and Rabbitmq.

Problem statement

An application can have multiple tasks which are candidates for asynchronous processing. The application code will enqueue the tasks into a queue and workers will dequeue from the queue to process it.

The rate of dequeue by the workers must keep up with the rate of enqueue. Else tasks will start piling up in the queue. If entries start accumulating, very soon the queue capacity would be breached and queue system would collapse.

Consider the following example - At an average 5 tasks are being inserted into the queue per second. There are 10 workers consuming from the queue. Thereby, we expect each worker to process atleast a task every 2 seconds. If the tasks take greater than 2 seconds at an average, workers would be dequeueing at a slower rate than the enqueue rate.

We need to pinpoint the time consuming tasks which keep the workers occupied for long time — since they are essentially the cause of accumulation of tasks in the queue.

Going through the task logic and trying to infer the slow tasks might work when you have 2 tasks or few tasks. However, with even a slightly complex application, there are chances that the application has more than 50 tasks. Manually finding the slow tasks isn't an option then.

The solution lies in logging the time taken for execution of each task. And then ordering the tasks by time taken.

Setup

Create virtual environment and install RQ.

pip install rq

We need to interface with redis from Python code. Hence install redis

pip install redis

Ensure you have redis installed on your system because we would use redis as our queue. Ensure you are able to run

redis-server

Refer our post on RQ if you want a primer on RQ.

Tasks

Let's write few tasks in a module jobs.py.

# jobs.py

import time


def send_mail(username):
    time.sleep(5)
    return "sent"

We defined a task. Task is nothing but a function. We want to mimic a scenario where tasks take significant time to execute. That's why we are making the tasks sleep for 5 seconds.

Reproduce the issue

We want to replicate the issue where rate of dequeue is lower than the rate of enqueue, leading to pile up of tasks, and ultimately leading to the collapse of the queue infrastructure.

Once we have replicated the issue, we will fix it.

Let's start a redis server.

$ redis-server

Let's start two worker instances on two different shells.

# First shell
$ rq worker

# Second shell
$ rq worker

Assume storing single instance of send_mail requires 300 bytes in Redis. Considering 5 tasks are accumulating every second, it would take around 8 days for the 1 GB to be breached. To replciate our scenario on a Redis instance with 1 GB memory, it would take us 8 days. We don't want to wait for such a long time.

That's why we will constrain the memory limit of Redis and start queueing tasks and should be able to see the limit breached within few minutes.

Redis allows you to see the current memory usage. Start a redis cli and issue info memory command.

You would notice a used_memory section. It looks like the following.

used_memory:1077200

This suggests that Redis is currently using 1077200 bytes, i.e 1.03MB approximately. Let's add 250KB to this value and set it as maxmemory of Redis. This will ensure that only 250KB more is available for operation of RQ and for storing enqueued tasks.

config set maxmemory 1327200 # We have added 250KB i.e 250000 bytes

Out of the extra 250KB which we have allocated to Redis, approximately 120 to 130 KB would be used for RQ overhead. The actual tasks will have approximately 120KB available to them.

Each enqueued instance of task send_mail would consume approximately 300 Bytes. Thus it would take 400 tasks to consume the available 120KB.

Let's mimic enqueueing 10 tasks every second. So the tasks would be able to consume the available 120KB within 40 to 50 seconds.

def enqueue_ten_times():
    for _ in range(10):
        q.enqueue(send_mail, username='elon')

tasks_enqueued = 0
while True:
    enqueue_ten_times()
    tasks_enqueued += 10
    print("Total tasks enqueued {}".format(tasks_enqueued,))
    time.sleep(1)

Execute the above code on a terminal.

After a while, you would notice the following error:

ResponseError: Command # 1 (SADD rq:queues rq:queue:default) of pipeline caused error: OOM command not allowed when used memory > 'maxmemory'.

This suggests that the maxmemory specified by us has been breached and hence the enqueuing code causes error.

If your application were enqueueing tasks for asynchronous processing, at this point any enqueue call would cause error and tasks wouldn't be inserted into the queue. The application would start misbehaving.

If your application has a few tasks, you can manually figure out the task which is slow and is preventing the workers from processing tasks at a faster rate. How would you do it in an application that has say 100 of tasks?

Solution

Let's add another task to jobs.py.

def compute():
    num_seconds = random.randint(0, 10)
    time.sleep(num_seconds)
    return "done"

Also, modify send_mail to sleep for random number of seconds.

def send_mail(username):
    num_seconds = random.randint(0, 10)
    time.sleep(num_seconds)
    return "sent"

And add another simple task which executes immediately.

def simple_task():
    return "done"

We want to track the time spent while processing different tasks. Touching every function/task and adding a logging statement is cumbersome. Instead we should be figuring out the RQ class which encapsulates the task and does actual execution of task.

For RQ, the class is rq.job.Job. An instance of rq.job.Job encapsulates every task which is enqueued. Method perform() of Job does the actual invocation of task.

We will inherit from this class and override the perform method. Let's do it in a module called custom_job.py.

import time
import logging

from rq.job import Job


# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('Time: %(asctime)s Task: %(task_name)s Task-time: %(task_time)s')
handler = logging.FileHandler('logs.txt')
handler.setFormatter(formatter)
logger.addHandler(handler)


class MyJob(Job):
    """
    This extends from RQ's default Job class.
    It computes the time spent in performing the actual work and logs it.
    """

    def perform(self):
        start = time.time()
        rv = super().perform()
        end = time.time()
        elapsed = (end - start)
        logger.info("Job completed", extra={'task_time': elapsed, 'task_name': self.func_name})
        return rv

We have setup logging so that we could log the task name and the time it takes for execution of each task. And we want to log our logrecords to a file called logs.txt.

Refer our post on logging to do a deep dive into Python logging module.

We have then overridden the perform() method and tracking the time before calling the super and after it returns. This tells us the actual time spent in execution of task.

Stop and restart the two workers. We also need to set the environment variable to point to the custom Job class to force the workers to use it, otherwise the default Job class will keep getting used.

$ export RQ_JOB_CLASS=custom_job.MyJob

# First shell
$ rq worker default

# Second shell
$ rq worker default

Start a shell and enqueue the three tasks

In [2]: import redis

In [3]: from rq import Queue

In [4]: q = Queue(connection=redis.StrictRedis())

In [5]: from jobs import send_mail, compute, simple_task

In [6]: def enqueue_tasks():
   ...:     q.enqueue(send_mail, username='')
   ...:     q.enqueue(compute)
   ...:     q.enqueue(simple_task)
   ...:

In [7]: for _ in range(10):
   ...:     enqueue_tasks()

You would notice the following logs in logs.txt. This has been trimmed to keep it readable.

Time: 2021-01-13 10:03:59,202 Task: jobs.send_mail Task-time: 1
Time: 2021-01-13 10:03:59,218 Task: jobs.simple_task Task-time: 0
Time: 2021-01-13 10:04:05,234 Task: jobs.send_mail Task-time: 6
Time: 2021-01-13 10:04:08,203 Task: jobs.compute Task-time: 2
Time: 2021-01-13 10:04:08,221 Task: jobs.simple_task Task-time: 0
Time: 2021-01-13 10:04:11,266 Task: jobs.compute Task-time: 9

As you can notice simple_task is executing immediately while the other two tasks are sometimes slow depending on the random time for which it would have slept.

With this solution in place, we can be certain that execution time of tasks would be logged. We can then have some kind of monitoring service in place which can alert if say 80% of queue capacity is reached. If alerted, we can analyse the logs and find out the the slow performing tasks.

The logs would point that send_mail and compute are time consuming and then application logic can be further analyzed to find out the cause of slowness.

Analysing these logs isn't straightforward because log entries are text entries. They have to be parsed into a structured format to be able to sort by Task-time. That's where ELK comes into picture. We can setup an ELK pipeline. Logstash can parse the logs and ingest into Elasticsearch where it can be sorted by Task-time and time consuming tasks can be identified.

We have deployed this strategy with great success in our products and hope it helps you keep your tasks and queue sane.

Stay tuned for the next post which would discuss how to ingest these logs into Elasticsearch for easy analysis.