How to use threading.Condition to wait for several Flask-APScheduler one-off jobs to complete execution in your Python 3 application

Previously, I discussed how to use Flask-APScheduler in your Python 3 Flask application to run multiple tasks in parallel, from a single HTTP request.

When we run jobs as discussed in that post, jobs are ran once by the underlying ApScheduler instance. In addition, our Flask endpoint return the HTTP response back to the HTTP client as soon as the jobs are scheduled.

If we do not want the HTTP client to know the outcome of the jobs within that HTTP call, then we are good. But what if we want to include any errors that the jobs encounter in the same HTTP response?

In such a situation, we will need a mechanism to wait for the one-off jobs to complete execution before returning that response.

Given that in mind, this post shows how we can use threading.Condition to wait for several Flask-APScheduler one-off jobs to complete execution.

What is threading.Condition used for?

In case you are wondering, an instance of threading.Condition can help us synchronize activities between different threads. Once we have an instance of threading.Condition, we can use its wait, notify and notify_all methods to synchronize activities between different threads.

If we want the thread holding the lock to release the lock and hold further execution, then we call the wait method. When we want a waiting thread to resume operation, we call the notify method. If we want all waiting threads to resume operations, then we call the notify_all method.

Creating the JobsSynchronizer class to help the Flask endpoint wait for jobs to complete execution before returning a HTTP response

Since threading.Condition provides us with the ability to synchronize activities between different threads, we can build a helper class to serve our needs:

import threading

class JobsSynchronizer:

    def __init__(self, num_tasks_to_complete):
        self.condition = threading.Condition()
        self.current_completed = 0
        self.status_list = []
        self.num_tasks_to_complete = num_tasks_to_complete

    def notify_task_completion(self, status_to_report=None):
        with (self.condition):
            self.current_completed = self.current_completed + 1
            if status_to_report is not None:
                self.status_list.append(status_to_report)
            # Notify waiting thread
            if self.current_completed == self.num_tasks_to_complete:
                self.condition.notify()

    def wait_for_tasks_to_be_completed(self):
        with(self.condition):
            self.condition.wait()

    def get_status_list(self):
        return self.status_list

Before we define the JobsSynchronizer class, we import the threading module into our script. When we do so, we can create an instance of threading.Condition later.

__init__ method

Next, we define a constructor for the JobsSynchronizer class which takes a total number of jobs to wait for, num_tasks_to_complete, as input.

When an instance of JobsSynchronizer is created, we perform the following actions inside the default constructor:

  • create an instance of threading.Condition,
  • maintain a counter of completed jobs
  • create a list to keep statuses from the jobs.
  • remember the total number of jobs to wait for.

notify_task_completion method

After we define the default constructor, we define the notify_task_completion method for jobs to call when they have finished their work. When we wrap self.condition with the with keyword, the running thread acquire the lock at the start and release it at the end. After acquiring the lock, we update the counter of completed jobs. If there is anything to include in status_list, then we add status_to_report to status_list. When the current thread is the last thread to wait for, we call self.condition.notify() to wake up the waiting thread.

wait_for_tasks_to_be_completed method

When the wait_for_tasks_to_be_completed method is called, the executing thread calls self.condition.wait() to wait for further notification.

get_status_list method

Once the jobs are completed, the awakened thread need a way to get inputs from the jobs. Therefore, the get_status_list method returns the status_list back to the caller.

Example Python 3 Flask application that uses threading.Condition to wait for several Flask-APScheduler one-off jobs to complete execution

After looking at how the JobsSynchronizer class work, let's look at an example Python 3 Flask application:

from flask import Flask
from flask_apscheduler import APScheduler

import json, threading, time

class JobsSynchronizer:

    def __init__(self, num_tasks_to_complete):
        self.condition = threading.Condition()
        self.current_completed = 0
        self.status_list = []
        self.num_tasks_to_complete = num_tasks_to_complete

    def notify_task_completion(self, status_to_report=None):
        with (self.condition):
            self.current_completed = self.current_completed + 1
            if status_to_report is not None:
                self.status_list.append(status_to_report)
            # Notify waiting thread
            if self.current_completed == self.num_tasks_to_complete:
                self.condition.notify()

    def wait_for_tasks_to_be_completed(self):
        with(self.condition):
            self.condition.wait()

    def get_status_list(self):
        return self.status_list

app = Flask(__name__)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()


@app.route('/')
def welcome():
    return 'Welcome to flask_apscheduler demo', 200

@app.route('/run-tasks')
def run_tasks():

    job_synchronizer = JobsSynchronizer(10)

    for i in range(10):
        app.apscheduler.add_job(func=scheduled_task, trigger='date', args=[job_synchronizer, i], id='j' + str(i))

    job_synchronizer.wait_for_tasks_to_be_completed()

    return json.dumps(job_synchronizer.get_status_list()), 200

def scheduled_task(job_synchronizer, task_id):
    for i in range(10):
        time.sleep(1)
        print('Task {} running iteration {}'.format(task_id, i))

    job_synchronizer.notify_task_completion('Task {} completed execution'.format(task_id))

app.run(host='0.0.0.0', port=12345)

After importing the dependencies that are needed, we included the JobsSynchronizer class that we talked about earlier.

Initializing Flask and APScheduler

Once we had defined our helper class, we create a Flask object and a APScheduler object. After these two objects are created, we use scheduler.init_app(app) to associate our APScheduler object with our Flask object.

Starting the APScheduler object

When the APScheduler object is associated with our Flask object, we then start the APScheduler object running at the background. Given that, we can then add tasks to the APScheduler object to run our tasks later.

Scheduling jobs for APScheduler inside the run_tasks function

Next, we define two functions and decorate them with @app.route. Given that, we will have a HTTP server serving HTTP GET requests at the / and /run-tasks endpoints.

When a HTTP request is received at /run-tasks, run_tasks will be run.

At this point in time, we create an instance of JobSynchronizer that will help us wait for 10 jobs to complete.

After that, we add 10 jobs that will run scheduled_task via app.apscheduler.add_job and the following keyword arguments:

  • func=scheduled_task: the function to run afterwards is scheduled_task.
  • trigger='date': an indication that we want to run the task immediately afterwards, since we did not supply an input for run_date.
  • args=[job_synchronizer, i]: a list of arguments to pass to scheduled_task when APScheduler runs it. This is a way that we can pass in the same job_synchronizer object to all the jobs.
  • id='j'+str(i): an identifier for the job. When another job with the same identifier is added, it will be ignored by default. Therefore, we give each job a unique id to make sure all the jobs get to run.

After we had added the 10 jobs, we call job_synchronizer.wait_for_tasks_to_be_completed(). When we do so, the executing thread waits until it is notified again at a later point in time.

Once it gets to run again, we can return the status list as a HTTP response back to the HTTP client.

Simulating long running tasks in scheduled_task

When scheduled_task is run by APScheduler, the JobSynchronizer instance that was created earlier will be available.

After printing 10 statements that are spaced with 1 second delays, we call job_synchronizer.notify_task_completion. As mentioned earlier, the last job to complete will trigger a notification to wake up the thread that ran run_tasks.

Starting the web server

Finally, at the end of the script, we start our web server through app.run(host='0.0.0.0', port=12345).

Observations from running the Python 3 Flask application

When you run the Python script, a web server will listen at port 12345. After that, you can then run the following command to initiate a HTTP request to run the 10 tasks:

curl localhost:12345/run-tasks

When you run the command, you will notice that the call does not return immediately. After waiting for around 10 seconds, you will find a JSON list of status strings in the HTTP response.

About Clivant

Clivant a.k.a Chai Heng enjoys composing software and building systems to serve people. He owns techcoil.com and hopes that whatever he had written and built so far had benefited people. All views expressed belongs to him and are not representative of the company that he works/worked for.