Thread Pool Engine, and Work-Stealing scheduling algorithm
On a multicore system, your goal is to spread the work efficiently among many cores so that it does executes simultaneously. And performance gain should be directly related to how many cores you have. So, a quad core system should be able to get the work done 4 times faster than a single core system. A 16-core platform should be 4-times faster than a quad-core system, and 16-times faster than a single core...
That's where my Threadpool is usefull , it spreads the work efficiently among many cores. Threadpool (and Threadpool with priority) consist of lock-free thread safe/concurrent enabled local FIFO queues of work items, so when you call ThreadPool.execute() , your work item get queued in the local lock-free queues. The worker threads pick them out in a First In First Out order (i.e., FIFO order), and execute them. .
The following have been added to Threadpool:
See lock-free ParallelQueue: http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm
Work-Stealing scheduling concepts:
But what is the Work-Stealing Queue? As I said every worker thread will have its own Work-Stealing Queue. WSQ is data structure designed to be effective. WSQ data structure concepts:
Work-Stealing scheduling algorithm offer many feature over the ordinary scheduling algorithm:
My Threadpool allows load balancing, and also minimize contention.
Threadpool is very easy to use, let's look now at an example in Object Pascal...
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
ThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TThreadPool;
obj:pointer;
cs:TCriticalSection;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TThreadPool.Create(4, 20, TMyThread); // 4 workers threads and 2^20 items for each queue.
obj:=pointer(1);
TP.execute(myobj.myproc1,pointer(obj));
obj:=pointer(2);
TP.execute(myobj.myproc2,pointer(obj));
readln;
TP.Terminate;
TP.Free;
end.
Let us look at the first line...
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
ThreadPool,sysutils,syncobjs;
cmem is required for Delphi to use TBB memory manager (from Intel) , this will allow delphi memory manager to scale linearely...
Note: FPC doesn't need cmem, cause it scales linearely with threads...
ThreadPool: is our threadpool unit ..
syncobjs: contains all the sychronizations stuff like CriticalSections, Events etc..
After that we have the following lines:
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
We declare a TMyThread that ineherit from TThreadPoolThread, and we declare our two methods MyProc1 and MyProc2 that we want to be executed by our threadpool's worker threads. Each method has an obj as a paramater.
In the main body we create a TMyThread object like this:
myobj:=TMyThread.create;
and after that we create a TThreadPool object with 4 workers threads and lock-free FIFO queues and 2^20 items for each lock-free queue like this:
TP := TThreadPool.Create(4, 20, TMyThread); // 4 workers threads and 2^20 items for each queue.
After that we distribute to our worker threads the methods to be executed , we do it by calling the Threadpool's execute() method and we pass it myobj.myproc1 and myobj.myproc2 with there parameters:.
TP.execute(myobj.myproc1,pointer(obj));
TP.execute(myobj.myproc2,pointer(obj));
As you see, Threadpool (and threadpool with priority) is very easy to use...
Let's look now at an example of a Threadpool with priority:.
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
PThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TPThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TPThreadPool;
obj:pointer;
cs:TCriticalSection;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TPThreadPool.Create(4, 20, TMyThread); // 4 workers threads and 2^20 items for each queue.
obj:=pointer(1);
TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);
obj:=pointer(2);
TP.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);
readln;
TP.Terminate;
TP.Free;
end.
As you have noticed, this is almost the same as threadpool..
You use PThreadPool - P for priority - rather than Threadpool
TPThreadPoolThread rather that TThreadPoolThread
TPThreadPool.Create rather than TThreadPool.Create
and as you have noticed in TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using priorities.
You can give the following priorities to jobs:
LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY
That's all.
You can download threadpool (and threadpool with priority) from:
http://pages.videotron.com/aminer/
Sincerely,
Amine Moulay Ramdane.