Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members

FXThreadPool.h

Go to the documentation of this file.
00001 /********************************************************************************
00002 *                                                                               *
00003 *                             T h r e a d   P o o l                             *
00004 *                                                                               *
00005 *********************************************************************************
00006 * Copyright (C) 2006,2009 by Jeroen van der Zijp.   All Rights Reserved.        *
00007 *********************************************************************************
00008 * This library is free software; you can redistribute it and/or modify          *
00009 * it under the terms of the GNU Lesser General Public License as published by   *
00010 * the Free Software Foundation; either version 3 of the License, or             *
00011 * (at your option) any later version.                                           *
00012 *                                                                               *
00013 * This library is distributed in the hope that it will be useful,               *
00014 * but WITHOUT ANY WARRANTY; without even the implied warranty of                *
00015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                 *
00016 * GNU Lesser General Public License for more details.                           *
00017 *                                                                               *
00018 * You should have received a copy of the GNU Lesser General Public License      *
00019 * along with this program.  If not, see <http://www.gnu.org/licenses/>          *
00020 *********************************************************************************
00021 * $Id: FXThreadPool.h,v 1.56 2009/01/06 13:07:28 fox Exp $                      *
00022 ********************************************************************************/
00023 #ifndef FXTHREADPOOL_H
00024 #define FXTHREADPOOL_H
00025 
00026 namespace FX {
00027 
00028 
00029 class FXWorker;
00030 class FXRunnable;
00031 class FXThreadPool;
00032 
00033 
00034 /// Worker in the pool
00035 class FXAPI FXWorker : public FXThread {
00036   friend class FXThreadPool;
00037 private:
00038   FXCondition     condition;    // Task available
00039   FXMutex         mutex;        // Task mutex
00040   FXThreadPool   *pool;         // Backlink to pool
00041   FXWorker       *next;         // Next free worker
00042   FXRunnable     *task;         // Task to perform
00043   volatile FXbool runs;         // Thread runs
00044 private:
00045   FXWorker(const FXWorker&);
00046   FXWorker &operator=(const FXWorker&);
00047 public:
00048 
00049   /// Create worker belonging to given pool
00050   FXWorker(FXThreadPool* ptr,FXRunnable* job=NULL);
00051 
00052   /// Is worker running
00053   FXbool active();
00054 
00055   /// Return thread pool
00056   FXThreadPool* getPool() const;
00057 
00058   /// Get task
00059   FXRunnable* getTask();
00060 
00061   /// Wait for task
00062   FXRunnable* waitTask();
00063 
00064   /// Set task
00065   void setTask(FXRunnable* job);
00066 
00067   /// Run worker
00068   virtual FXint run();
00069 
00070   /// Destructor
00071   virtual ~FXWorker();
00072   };
00073 
00074 
00075 /**
00076 * A Thread Pool manages execution of jobs on a number of worker threads.
00077 * For compute-bound tasks, the amount of parallelism in a program is limited by the
00078 * number of physical processors available; however for I/O-bound tasks, it makes sense
00079 * to create more threads than the number of physical processors, in order to more fully
00080 * utilize available processors.
00081 * The thread pool executes incoming jobs in parallel, assigning each job to the first
00082 * available thread out of a fixed pool of worker threads.
00083 * Fluctuations in work-load can be accomodated by creating a few extra worker threads
00084 * during peak loads, while terminating superfluous worker threads during periods of
00085 * low activity, thus minimizing resources.
00086 * In order to prevent falling behind on incoming jobs, the calling thread can be made
00087 * to block scheduling the next job until a worker thread becomes available to handle
00088 * it.
00089 * When the thread pool is deleted, all worker threads are allowed to complete their
00090 * tasks prior to destroying the thread pool.
00091 * The jobs which are passed to the thread pool are derived from FXRunnable.  In order
00092 * to perform some useful function, a subclass of FXRunnable should overload the run()
00093 * function.  Any exceptions thrown by this function are caught in FXWorker, thus the
00094 * worker thread will remain running despite exceptions thrown by the job object.
00095 */
00096 class FXAPI FXThreadPool {
00097   friend class FXWorker;
00098 private:
00099   FXCondition     condition;    // Waiting list condition
00100   FXMutex         mutex;        // Waiting list mutex
00101   FXWorker       *waiters;      // List of free workers
00102   volatile FXint  maximum;      // Maximum number of workers
00103   volatile FXint  minimum;      // Minimum number of workers
00104   volatile FXint  running;      // Running number of workers
00105   volatile FXint  waiting;      // Waiting number of workers
00106   volatile FXbool runs;         // Thread pool is running
00107 protected:
00108   FXRunnable* getTask(FXWorker* wrk);
00109   void appendWorker(FXWorker* wrk);
00110   void removeWorker(FXWorker* wrk);
00111   FXWorker* startWorker(FXRunnable* job);
00112 private:
00113   FXThreadPool(const FXThreadPool&);
00114   FXThreadPool &operator=(const FXThreadPool&);
00115 public:
00116 
00117   /**
00118   * Construct an empty thread pool.
00119   */
00120   FXThreadPool();
00121 
00122   /**
00123   * Construct an thread pool and call start() to initiate
00124   * the thread pool run.
00125   */
00126   FXThreadPool(FXint min,FXint max=32,FXint run=0);
00127 
00128   /// Is pool running
00129   FXbool active();
00130 
00131   /// Return number of waiting threads
00132   FXint getWaitingThreads();
00133 
00134   /// Return number of worker threads
00135   FXint getRunningThreads();
00136 
00137   /// Change minimum number of worker threads
00138   void setMinimumThreads(FXint n);
00139 
00140   /// Return minimum number of worker threads
00141   FXint getMinimumThreads();
00142 
00143   /// Change maximum number of worker threads
00144   void setMaximumThreads(FXint n);
00145 
00146   /// Return maximum number of worker threads
00147   FXint getMaximumThreads();
00148 
00149   /**
00150   * Start the thread pool; the number of workers will
00151   * vary between min and max, depending on work-load.
00152   * A total of run workers will be started immediately;
00153   * additional workers will be started on as-needed basis.
00154   * When the number of available workers exceeds min, any
00155   * additional workers which finish their assigned job will
00156   * terminate gracefully so as to minimize the number of
00157   * inactive threads.
00158   */
00159   FXint start(FXint min=1,FXint max=32,FXint run=0);
00160 
00161   /**
00162   * Wait until all jobs currently in progress have been finished.
00163   * One should not execute new jobs while waiting.
00164   */
00165   FXint wait();
00166 
00167   /**
00168   * Stop pool.
00169   * Wait until all workers have terminated gracefully, i.e. until
00170   * the last job has been completed.
00171   */
00172   FXint stop();
00173 
00174   /**
00175   * Execute job on the next available worker thread.
00176   * If no worker thread is available, check if the maximum number
00177   * of worker threads has been reached already.  If less than the maximum
00178   * number of workers is active, create a new worker and start it on the job.
00179   * Otherwise, if the flag block=true, wait until a worker finishes its job
00180   * and start it on the new job; if flag block=false, do not start the job at
00181   * this time.
00182   * Return the worker to whom the job was assigned, or NULL if the job could
00183   * not be started.
00184   */
00185   FXWorker* execute(FXRunnable *job,FXbool block=true);
00186 
00187   /**
00188   * Signal the running workers to terminate, and wait until
00189   * all jobs have finished.
00190   */
00191   virtual ~FXThreadPool();
00192   };
00193 
00194 }
00195 
00196 #endif
00197 

Copyright © 1997-2009 Jeroen van der Zijp