{"id":2032,"date":"2020-05-15T19:16:46","date_gmt":"2020-05-15T11:16:46","guid":{"rendered":"https:\/\/www.techcoil.com\/blog\/?p=2032"},"modified":"2020-05-15T14:11:08","modified_gmt":"2020-05-15T06:11:08","slug":"how-to-use-threading-condition-to-wait-for-several-flask-apscheduler-one-off-jobs-to-complete-execution-in-your-python-3-application","status":"publish","type":"post","link":"https:\/\/www.techcoil.com\/blog\/how-to-use-threading-condition-to-wait-for-several-flask-apscheduler-one-off-jobs-to-complete-execution-in-your-python-3-application\/","title":{"rendered":"How to use threading.Condition to wait for several Flask-APScheduler one-off jobs to complete execution in your Python 3 application"},"content":{"rendered":"<p>Previously, I discussed <a href=\"https:\/\/www.techcoil.com\/blog\/how-to-use-flask-apscheduler-in-your-python-3-flask-application-to-run-multiple-tasks-in-parallel-from-a-single-http-request\/\" rel=\"noopener\" target=\"_blank\">how to use Flask-APScheduler in your Python 3 Flask application to run multiple tasks in parallel, from a single HTTP request<\/a>.<\/p>\n<p>When we run jobs as discussed in that post, jobs are ran once by the underlying <a href=\"https:\/\/apscheduler.readthedocs.io\/en\/latest\/\" rel=\"noopener\" target=\"_blank\">ApScheduler<\/a> instance. In addition, our <a href=\"https:\/\/pypi.org\/project\/Flask\/\" rel=\"noopener\" target=\"_blank\">Flask<\/a> endpoint return the <a href=\"https:\/\/www.techcoil.com\/glossary\/http-response\/\" rel=\"noopener\" target=\"_blank\">HTTP response<\/a> back to the <a href=\"https:\/\/www.techcoil.com\/glossary\/http-client\" rel=\"noopener\" target=\"_blank\">HTTP client<\/a> as soon as the jobs are scheduled.  <\/p>\n<p>If we do not want the HTTP client to know the outcome of the jobs within that HTTP call, then we are good. But what if we want to include any errors that the jobs encounter in the same HTTP response? <\/p>\n<p>In such a situation, we will need a mechanism to wait for the one-off jobs to complete execution <strong>before<\/strong> returning that response.<\/p>\n<p>Given that in mind, this post shows how we can use <code>threading.Condition<\/code> to wait for several <code>Flask-APScheduler<\/code> one-off jobs to complete execution.<\/p>\n<h2>What is <code>threading.Condition<\/code> used for?<\/h2>\n<p>In case you are wondering, an instance of <a href=\"https:\/\/docs.python.org\/3\/library\/threading.html#threading.Condition\" rel=\"noopener\" target=\"_blank\"><code>threading.Condition<\/code><\/a> can help us synchronize activities between different threads. Once we have an instance of <code>threading.Condition<\/code>, we can use its <a href=\"https:\/\/docs.python.org\/3\/library\/threading.html#threading.Condition.wait\" rel=\"noopener\" target=\"_blank\"><code>wait<\/code><\/a>, <a href=\"https:\/\/docs.python.org\/3\/library\/threading.html#threading.Condition.notify\" rel=\"noopener\" target=\"_blank\"><code>notify<\/code><\/a> and <a href=\"https:\/\/docs.python.org\/3\/library\/threading.html#threading.Condition.notify_all\" rel=\"noopener\" target=\"_blank\"><code>notify_all<\/code><\/a> methods to synchronize activities between different threads. <\/p>\n<p>If we want the thread holding the lock to release the lock and hold further execution, then we call the <code>wait<\/code> method. When we want a waiting thread to resume operation, we call the <code>notify<\/code> method. If we want all waiting threads to resume operations, then we call the <code>notify_all<\/code> method.<\/p>\n<h2>Creating the <code>JobsSynchronizer<\/code> class to help the Flask endpoint wait for jobs to complete execution before returning a HTTP response<\/h2>\n<p>Since <code>threading.Condition<\/code> provides us with the ability to synchronize activities between different threads, we can build a helper class to serve our needs:<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport threading\r\n\r\nclass JobsSynchronizer:\r\n\r\n    def __init__(self, num_tasks_to_complete):\r\n        self.condition = threading.Condition()\r\n        self.current_completed = 0\r\n        self.status_list = &#x5B;]\r\n        self.num_tasks_to_complete = num_tasks_to_complete\r\n\r\n    def notify_task_completion(self, status_to_report=None):\r\n        with (self.condition):\r\n            self.current_completed = self.current_completed + 1\r\n            if status_to_report is not None:\r\n                self.status_list.append(status_to_report)\r\n            # Notify waiting thread\r\n            if self.current_completed == self.num_tasks_to_complete:\r\n                self.condition.notify()\r\n\r\n    def wait_for_tasks_to_be_completed(self):\r\n        with(self.condition):\r\n            self.condition.wait()\r\n\r\n    def get_status_list(self):\r\n        return self.status_list\r\n<\/pre>\n<p>Before we define the <code>JobsSynchronizer<\/code> class, we import the <a href=\"https:\/\/docs.python.org\/3\/library\/threading.html\" rel=\"noopener\" target=\"_blank\"><code>threading<\/code><\/a> module into our script. When we do so, we can create an instance of <code>threading.Condition<\/code> later.<\/p>\n<h3><code>__init__<\/code> method<\/h3>\n<p>Next, we define a constructor for the <code>JobsSynchronizer<\/code> class which takes a total number of jobs to wait for, <code>num_tasks_to_complete<\/code>, as input. <\/p>\n<p>When an instance of <code>JobsSynchronizer<\/code> is created, we perform the following actions inside the default constructor:<\/p>\n<ul>\n<li>create an instance of threading.Condition, <\/li>\n<li>maintain a counter of completed jobs<\/li>\n<li>create a list to keep statuses from the jobs.<\/li>\n<li>remember the total number of jobs to wait for.<\/li>\n<\/ul>\n<h3><code>notify_task_completion<\/code> method<\/h3>\n<p>After we define the default constructor, we define the <code>notify_task_completion<\/code> method for jobs to call when they have finished their work. When we wrap <code>self.condition<\/code> with the with keyword, the running thread acquire the lock at the start and release it at the end. After acquiring the lock, we update the counter of completed jobs. If there is anything to include in <code>status_list<\/code>, then we add <code>status_to_report<\/code> to <code>status_list<\/code>. When the current thread is the last thread to wait for, we call <code>self.condition.notify()<\/code> to wake up the waiting thread.<\/p>\n<h3><code>wait_for_tasks_to_be_completed<\/code> method<\/h3>\n<p>When the <code>wait_for_tasks_to_be_completed<\/code> method is called, the executing thread calls <code>self.condition.wait()<\/code> to wait for further notification.<\/p>\n<h3><code>get_status_list<\/code> method<\/h3>\n<p>Once the jobs are completed, the awakened thread need a way to get inputs from the jobs. Therefore, the <code>get_status_list<\/code> method returns the <code>status_list<\/code> back to the caller. <\/p>\n<h2>Example Python 3 Flask application that uses <code>threading.Condition<\/code> to wait for several Flask-APScheduler one-off jobs to complete execution<\/h2>\n<p>After looking at how the <code>JobsSynchronizer<\/code> class work, let's look at an example Python 3 Flask application:<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom flask import Flask\r\nfrom flask_apscheduler import APScheduler\r\n\r\nimport json, threading, time\r\n\r\nclass JobsSynchronizer:\r\n\r\n    def __init__(self, num_tasks_to_complete):\r\n        self.condition = threading.Condition()\r\n        self.current_completed = 0\r\n        self.status_list = &#x5B;]\r\n        self.num_tasks_to_complete = num_tasks_to_complete\r\n\r\n    def notify_task_completion(self, status_to_report=None):\r\n        with (self.condition):\r\n            self.current_completed = self.current_completed + 1\r\n            if status_to_report is not None:\r\n                self.status_list.append(status_to_report)\r\n            # Notify waiting thread\r\n            if self.current_completed == self.num_tasks_to_complete:\r\n                self.condition.notify()\r\n\r\n    def wait_for_tasks_to_be_completed(self):\r\n        with(self.condition):\r\n            self.condition.wait()\r\n\r\n    def get_status_list(self):\r\n        return self.status_list\r\n\r\napp = Flask(__name__)\r\nscheduler = APScheduler()\r\nscheduler.init_app(app)\r\nscheduler.start()\r\n\r\n\r\n@app.route('\/')\r\ndef welcome():\r\n    return 'Welcome to flask_apscheduler demo', 200\r\n\r\n@app.route('\/run-tasks')\r\ndef run_tasks():\r\n\r\n    job_synchronizer = JobsSynchronizer(10)\r\n\r\n    for i in range(10):\r\n        app.apscheduler.add_job(func=scheduled_task, trigger='date', args=&#x5B;job_synchronizer, i], id='j' + str(i))\r\n\r\n    job_synchronizer.wait_for_tasks_to_be_completed()\r\n\r\n    return json.dumps(job_synchronizer.get_status_list()), 200\r\n\r\ndef scheduled_task(job_synchronizer, task_id):\r\n    for i in range(10):\r\n        time.sleep(1)\r\n        print('Task {} running iteration {}'.format(task_id, i))\r\n\r\n    job_synchronizer.notify_task_completion('Task {} completed execution'.format(task_id))\r\n\r\napp.run(host='0.0.0.0', port=12345)\r\n<\/pre>\n<p>After importing the dependencies that are needed, we included the <code>JobsSynchronizer<\/code> class that we talked about earlier. <\/p>\n<h3>Initializing Flask and APScheduler<\/h3>\n<p>Once we had defined our helper class, we create a <code>Flask<\/code> object and a <code>APScheduler<\/code> object. After these two objects are created, we use <code>scheduler.init_app(app)<\/code> to associate our <code>APScheduler<\/code> object with our <code>Flask<\/code> object.<\/p>\n<h3>Starting the <code>APScheduler<\/code> object<\/h3>\n<p>When the <code>APScheduler<\/code> object is associated with our <code>Flask<\/code> object, we then start the <code>APScheduler<\/code> object running at the background. Given that, we can then add tasks to the <code>APScheduler<\/code> object to run our tasks later.<\/p>\n<h3>Scheduling jobs for <code>APScheduler<\/code> inside the run_tasks function<\/h3>\n<p>Next, we define two functions and decorate them with <code>@app.route<\/code>. Given that, we will have a HTTP server serving HTTP GET requests at the <strong>\/<\/strong> and <strong>\/run-tasks<\/strong> endpoints.<\/p>\n<p>When a HTTP request is received at <strong>\/run-tasks<\/strong>, <code>run_tasks<\/code> will be run. <\/p>\n<p>At this point in time, we create an instance of <code>JobSynchronizer<\/code> that will help us wait for 10 jobs to complete.<\/p>\n<p>After that, we add 10 jobs that will run <code>scheduled_task<\/code> via <code>app.apscheduler.add_job<\/code> and the following keyword arguments:<\/p>\n<ul>\n<li><code>func=scheduled_task<\/code>: the function to run afterwards is <code>scheduled_task<\/code>.<\/li>\n<li><code>trigger='date'<\/code>: an indication that we want to run the task immediately afterwards, since we did not supply an input for <code>run_date<\/code>.<\/li>\n<li><code>args=[job_synchronizer, i]<\/code>: a list of arguments to pass to <code>scheduled_task<\/code> when <code>APScheduler<\/code> runs it. This is a way that we can pass in the same job_synchronizer object to all the jobs.<\/li>\n<li><code>id='j'+str(i)<\/code>: an identifier for the job. When another job with the same identifier is added, it will be ignored by default. Therefore, we give each job a unique id to make sure all the jobs get to run.<\/li>\n<\/ul>\n<p>After we had added the 10 jobs, we call <code>job_synchronizer.wait_for_tasks_to_be_completed()<\/code>. When we do so, the executing thread waits until it is notified again at a later point in time. <\/p>\n<p>Once it gets to run again, we can return the status list as a HTTP response back to the HTTP client. <\/p>\n<h3>Simulating long running tasks in <code>scheduled_task<\/code><\/h3>\n<p>When <code>scheduled_task<\/code> is run by APScheduler, the <code>JobSynchronizer<\/code> instance that was created earlier will be available. <\/p>\n<p>After printing 10 statements that are spaced with 1 second delays, we call <code>job_synchronizer.notify_task_completion<\/code>. As mentioned earlier, the last job to complete will trigger a notification to wake up the thread that ran <code>run_tasks<\/code>.<\/p>\n<h3>Starting the web server<\/h3>\n<p>Finally, at the end of the script, we start our web server through <code>app.run(host='0.0.0.0', port=12345)<\/code>.<\/p>\n<h2>Observations from running the Python 3 Flask application<\/h2>\n<p>When you run the Python script, a web server will listen at port <strong>12345<\/strong>. After that, you can then run the following command to initiate a HTTP request to run the 10 tasks:<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\ncurl localhost:12345\/run-tasks\r\n<\/pre>\n<p>When you run the command, you will notice that the call does not return immediately. After waiting for around 10 seconds, you will find a JSON list of status strings in the HTTP response.<\/p>\n\n      <ul id=\"social-sharing-buttons-list\">\n        <li class=\"facebook\">\n          <a href=\"https:\/\/www.facebook.com\/sharer\/sharer.php?u=https%3A%2F%2Fwp.me%2Fp245TQ-wM\" target=\"_blank\" role=\"button\" rel=\"nofollow\">\n            <img decoding=\"async\" src=\"\/ph\/img\/3rd-party\/social-icons\/Facebook.png\" alt=\"Facebook icon\"> Share\n          <\/a>\n        <\/li>\n        <li class=\"twitter\">\n          <a href=\"https:\/\/twitter.com\/intent\/tweet?text=&url=https%3A%2F%2Fwp.me%2Fp245TQ-wM&via=Techcoil_com\" target=\"_blank\" role=\"button\" rel=\"nofollow\">\n          <img decoding=\"async\" src=\"\/ph\/img\/3rd-party\/social-icons\/Twitter.png\" alt=\"Twitter icon\"> Tweet\n          <\/a>\n        <\/li>\n        <li class=\"linkedin\">\n          <a href=\"https:\/\/www.linkedin.com\/shareArticle?mini=1&title=&url=https%3A%2F%2Fwp.me%2Fp245TQ-wM&source=https:\/\/www.techcoil.com\" target=\"_blank\" role=\"button\" rel=\"nofollow\">\n          <img decoding=\"async\" src=\"\/ph\/img\/3rd-party\/social-icons\/linkedin.png\" alt=\"Linkedin icon\"> Share\n          <\/a>\n        <\/li>\n        <li class=\"pinterest\">\n          <a href=\"https:\/\/pinterest.com\/pin\/create\/button\/?url=https%3A%2F%2Fwww.techcoil.com%2Fblog%2Fwp-json%2Fwp%2Fv2%2Fposts%2F2032&description=\" class=\"pin-it-button\" target=\"_blank\" role=\"button\" rel=\"nofollow\" count-layout=\"horizontal\">\n          <img decoding=\"async\" src=\"\/ph\/img\/3rd-party\/social-icons\/Pinterest.png\" alt=\"Pinterest icon\"> Save\n          <\/a>\n        <\/li>\n      <\/ul>\n    ","protected":false},"excerpt":{"rendered":"<p>Previously, I discussed <a href=\"https:\/\/www.techcoil.com\/blog\/how-to-use-flask-apscheduler-in-your-python-3-flask-application-to-run-multiple-tasks-in-parallel-from-a-single-http-request\/\" rel=\"noopener\" target=\"_blank\">how to use Flask-APScheduler in your Python 3 Flask application to run multiple tasks in parallel, from a single HTTP request<\/a>.<\/p>\n<p>When we run jobs as discussed in that post, jobs are ran once by the underlying <a href=\"https:\/\/apscheduler.readthedocs.io\/en\/latest\/\" rel=\"noopener\" target=\"_blank\">ApScheduler<\/a> instance. In addition, our <a href=\"https:\/\/pypi.org\/project\/Flask\/\" rel=\"noopener\" target=\"_blank\">Flask<\/a> endpoint return the <a href=\"https:\/\/www.techcoil.com\/glossary\/http-response\/\" rel=\"noopener\" target=\"_blank\">HTTP response<\/a> back to the <a href=\"https:\/\/www.techcoil.com\/glossary\/http-client\" rel=\"noopener\" target=\"_blank\">HTTP client<\/a> as soon as the jobs are scheduled.  <\/p>\n<p>If we do not want the HTTP client to know the outcome of the jobs within that HTTP call, then we are good. But what if we want to include any errors that the jobs encounter in the same HTTP response? <\/p>\n<p>In such a situation, we will need a mechanism to wait for the one-off jobs to complete execution <strong>before<\/strong> returning that response.<\/p>\n<p>Given that in mind, this post shows how we can use <code>threading.Condition<\/code> to wait for several <code>Flask-APScheduler<\/code> one-off jobs to complete execution.<\/p>\n","protected":false},"author":1,"featured_media":1244,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"advanced_seo_description":"","jetpack_seo_html_title":"","jetpack_seo_noindex":false,"jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"footnotes":""},"categories":[375],"tags":[583,736,231,226,233,735],"jetpack_featured_media_url":"https:\/\/www.techcoil.com\/blog\/wp-content\/uploads\/Python-Logo.gif","jetpack_shortlink":"https:\/\/wp.me\/p245TQ-wM","jetpack-related-posts":[],"jetpack_likes_enabled":true,"jetpack_sharing_enabled":true,"_links":{"self":[{"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/posts\/2032"}],"collection":[{"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/comments?post=2032"}],"version-history":[{"count":0,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/posts\/2032\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/media\/1244"}],"wp:attachment":[{"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/media?parent=2032"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/categories?post=2032"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.techcoil.com\/blog\/wp-json\/wp\/v2\/tags?post=2032"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}