Thread Pool Engine, and Work-Stealing scheduling algorithm

 

On a multicore system, your goal is to spread the work efficiently among many cores so that it does executes simultaneously. And performance gain should be directly related to how many cores you have. So, a quad core system should be able to get the work done 4 times faster than a single core system. A 16-core platform should be 4-times faster than a quad-core system, and 16-times faster than a single core...

That's where my Threadpool is usefull , it spreads the work efficiently among many cores. Threadpool (and Threadpool with priority) consist of lock-free thread safe/concurrent enabled local FIFO queues of work items, so when you call ThreadPool.execute() , your work item get queued in the local lock-free queues. The worker threads pick them out in a First In First Out order (i.e., FIFO order), and execute them. .


The following have been added to Threadpool:

See lock-free ParallelQueue: http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

Work-Stealing scheduling algorithm:

Work-Stealing scheduling concepts:

But what is the Work-Stealing Queue? As I said every worker thread will have it’s own Work-Stealing Queue. WSQ is data structure designed to be effective. WSQ data structure concepts:

Work-Stealing scheduling algorithm offer many feature over the ordinary scheduling algorithm:

  1. Effective:
  2. Load Balancing:

 

My Threadpool allows load balancing, and also minimize contention.

 

Threadpool is very easy to use, let's look now at an example in Object Pascal...


program test;

 

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

ThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

 

var

myobj:TMyThread;

TP: TThreadPool;

obj:pointer;

cs:TCriticalSection;

 

procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TThreadPool.Create(4, TMyThread, 20); // 4 workers threads and 2^20 items for each queue.

obj:=pointer(1);

TP.execute(myobj.myproc1,pointer(obj));

obj:=pointer(2);

TP.execute(myobj.myproc2,pointer(obj));

readln;

TP.Terminate;

TP.Free;

end.


 

Let us look at the first line...

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

ThreadPool,sysutils,syncobjs;

cmem is required for Delphi to use TBB memory manager (from Intel) , this will allow delphi memory manager to scale linearely...

Note: FPC doesn't need cmem, cause it scales linearely with threads...

ThreadPool: is our threadpool unit ..

syncobjs: contains all the sychronizations stuff like CriticalSections, Events etc..

After that we have the following lines:

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

We declare a TMyThread that ineherit from TThreadPoolThread, and we declare our two methods MyProc1 and MyProc2 that we want to be executed by our threadpool's worker threads. Each method has an obj as a paramater.

In the main body we create a TMyThread object like this:

myobj:=TMyThread.create;

and after that we create a TThreadPool object with 4 workers threads and lock-free FIFO queues and 2^20 items for each lock-free queue like this:

TP := TThreadPool.Create(4, TMyThread, 20); // 4 workers threads and 2^20 items for each queue.

After that we distribute to our worker threads the methods to be executed , we do it by calling the Threadpool's execute() method and we pass it myobj.myproc1 and myobj.myproc2 with there parameters:.

TP.execute(myobj.myproc1,pointer(obj));

TP.execute(myobj.myproc2,pointer(obj));

As you see, Threadpool (and threadpool with priority) is very easy to use...

Let's look now at an example of a Threadpool with priority:.


program test;

 

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

PThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TPThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

 

var

myobj:TMyThread;

TP: TPThreadPool;

obj:pointer;

cs:TCriticalSection;

 

procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TPThreadPool.Create(4,TMyThread, 20); // 4 workers threads and 2^20 items for each queue.

obj:=pointer(1);

TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);

obj:=pointer(2);

TP.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);

readln;

TP.Terminate;

TP.Free;

end.


 

As you have noticed, this is almost the same as threadpool..

You use PThreadPool - P for priority - rather than Threadpool

TPThreadPoolThread rather that TThreadPoolThread

TPThreadPool.Create rather than TThreadPool.Create

and as you have noticed in TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using priorities.

You can give the following priorities to jobs:

LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY

 

That's all.

You can download threadpool (and threadpool with priority) from:

http://pages.videotron.com/aminer/

 

Sincerely,
Amine Moulay Ramdane.