Code archives/Miscellaneous/WorkingQueue

This code has been declared by its author to be Public Domain code.

Download source code

WorkingQueue by Kurator2008
[EDIT] Reworked vor 1.32rc2 compaibility
[EDIT] 2010-Jan-15, Reworked for 1.37, Added functionality for async. Queue processing

A Threadool creates a defined number of Threads. These Threads do not die after their work is done, so there is no syscall overhead when they are needed again. They rest in a pool, used simply when they are needed and some work has to be done.

The work is organised in so called "Tasks"

You can create 1000's of different Tasks, and add them to an Workingqueue - The queue is simply First in - First out.

The queue simply looks for unused threads in its threadspool, and assigns them some work.
SuperStrict

' *** THREADPOOL CLASSES START ***

Type TWorkQueue

	Field numThreads:Int
	Field workerThreads:TThreadPoolThread[]
	Field workQueue:TList
	Field mutexWorkQueue:TMutex
	
	Field _asyncThread:TThread
	
	Function Create:TWorkQueue(numThreads:Int = 10)
		Local t:TWorkQueue = New TWorkQueue
		t.numThreads= numThreads
		t.workerThreads= New TThreadPoolThread[numThreads]
		t.workQueue= New TList
		t.mutexWorkQueue= CreateMutex()
		
		For Local i:Int = 0 To numThreads-1
			t.workerThreads[i] = TThreadPoolThread.Create( QueueWrapper, Object(t) )
		Next
		
		Return t
	EndFunction
	
	Method ProcessQueueAsync()
		If _asyncThread = Null 
			_asyncThread = CreateThread(_AsyncWrapper, Object(Self))
			'Print "Created AsyncThread"
		EndIf
	EndMethod
	
	Method WaitForEmptyQueue()
		WaitThread(_asyncThread)
	EndMethod
	
	Function _AsyncWrapper:Object(data:Object)
		Local q:TWorkQueue = TWorkQueue(data)
		If data 
			While q.DoWork()
				Delay(0)
			Wend
		EndIf
		Return data
	EndFunction
	
	Function QueueWrapper:Object( data:Object)
		
		Local myTask:TWorkQueue = TWorkQueue(data)
		myTask.Execute()
	
	EndFunction
	
	Method AddTask(obj:TTask)
		LockMutex(mutexWorkQueue)
		  workQueue.AddLast(obj)
		UnlockMutex(mutexWorkQueue)		
	EndMethod
	
	Method DoWork:Int()
		If Not workQueue.IsEmpty()
			LockMutex(mutexWorkQueue)
			  Local t:TTask = TTask(workQueue.First())
			UnlockMutex(mutexWorkQueue)
			
			Local i:Int = 0

			While i < numThreads
				If workerThreads[i].GetStatus() = False
					workerThreads[i].SetFunction(t.func, t.data)
					LockMutex(mutexWorkQueue)
					  workQueue.RemoveFirst
					UnlockMutex(mutexWorkQueue)
					workerThreads[i].Start()
					Exit
				Else
					i :+ 1
				EndIf
			Wend

			Return True
		Else
			Return False
		EndIf
	EndMethod
	
	Method Execute()
	
		' Dummy
		
	EndMethod

EndType


Type TTask

	Field func:Object( data:Object )
	Field data:Object
	
	Method Run()
		If func Then func(data)
	EndMethod
	
	Function Create:TTask(func:Object( data:Object ), data:Object )
		Local t:TTask = New TTask
		t.func = func
		t.data = data
		Return t

	EndFunction
	
EndType

Type TThreadPoolThread
	
	Field id:TThread
	Field mutex:TMutex
	
	Field func:Object( data:Object )
	Field data:Object
	
	Field working:Int

	
	Function Create:TThreadPoolThread(func:Object( data:Object ), data:Object)
		Local t:TThreadPoolThread= New TThreadPoolThread
		t.id    = CreateThread( ThreadWrapper, Object(t) )
		t.mutex = CreateMutex()
		t.func = func
		t.data = data
		t.working = False
		
		LockMutex(t.mutex)

		Return t

	EndFunction
	
	Function ThreadWrapper:Object( data:Object)
		
		Local myTask:TThreadPoolThread= TThreadPoolThread(data)
		myTask.Execute()
	
	EndFunction
	
	Method SetFunction(func:Object( data:Object ), data:Object)
		Self.func = func
		Self.data = data
		Self.working = False
	EndMethod

	Method GetStatus:Int()
		Return working
	EndMethod
	
	
	Method Destroy()
		DetachThread(id)
		id = Null
		CloseMutex(mutex)
		mutex = Null
		working = False
		func = Null
		data = Null
	EndMethod
	
	Method Execute()

		Repeat
			
			If Self.working = True 
				working = Self.DoWork()
			EndIf
			
			If Self.working = False
				LockMutex(mutex)
			EndIf
	
		Forever
	
	EndMethod
	
	Method DoWork:Int()
		If func Then func(data)
		Return False
	EndMethod
	
	Method Start()
		UnlockMutex(mutex)
		Self.working = True
	EndMethod
	
	Method Suspend()
		Self.working = False
	EndMethod
	
EndType

' *** THREADPOOL CLASSES END ***


' ***** EXAMPLE START ****
' Demo Functions, just generate some heavy Workload :)


Function fTest1:Object(data:Object)
	Local superCalcResult:Double
	For Local x:Int = 0 To 10000
		superCalcResult = x*Cos(x)+Tan(x)*x/x
	Next
EndFunction

Function fTest2:Object(data:Object)
	Local superCalcResult:Double
	For Local x:Int = 0 To 10000
		superCalcResult = x*Cos(x)+Tan(x)*x/x
	Next
EndFunction


'Create a WorkQueue with 8 preinitalised Threads waiting for work
Local q:TWorkQueue = TWorkQueue.Create(8)


'Create a LOT of Tasks to be done

Local test1:TTask[] = New TTask[200]
Local test2:TTask[] = New TTask[200]


'Assign the Functions you want to be done to the Tasks
'I'm pretty lazy - using, just the same functions :)
For Local i:Int = 0 To 199
	test1[i] =  TTask.Create(fTest1, String(i))
	test2[i] =  TTask.Create(fTest2, String(i))
Next

Print "Initialized..."
' To prove that ther is no CPU time wasted with 8 Threads waiting for work (look at the Taskmanager)
Delay 1000
Print "Starting..."


'Assign the 400 Tasks to the WorkQeue

For Local i:Int = 0 To 199
	q.AddTask(test1[i])
	q.AddTask(test2[i])
Next

Local starttime:Int = MilliSecs()

'While Not KeyHit(KEY_ESCAPE)
	'Update the WorkQueue
'	If Not q.DoWork() Exit
'	Delay(100)
	'Print q.queue.Count()
	'it should also be possible to but this loop in an extra thread, maybe in future :)
'Wend

Print "Processing Queue..."
q.ProcessQueueAsync()
Print "Waiting for empty Queue..."
q.WaitForEmptyQueue()

Print "Duration "+String(MilliSecs()-starttime)+" ms"

Comments

Kurator2009
Updated for BMAX 1.32


N2009
This is pretty nice work. I'll have to have a go at mucking about with it.


Kurator2010
Updated for BMAX 1.37
Added async functionality

TODO:
- Adding small local Queues for TWorkQueueThreads (that means less synchronisation)
- Maybe it is possible to implement some kind of workstealing from local Queues if one TWorkQueueThread runs out of work, and other ones have plenty of it
- If a TWorkQueueThread creates new Tasks it should add it to its own local Queue at the beginning (improves CacheHits)
- If a Thread is stealing work from another Thread, it should steal it from the end of its Queue
- Need to count QueueElements by its own, because TLists.Count is pretty lame


Code Archives Forum