It took me awhile to finish this. It ended up being about 300 lines. This is the first multi-threaded application I've written, other than just using Threads:-Map or Threads:-Seq. But it's done: A fully functional multi-threaded combinations filter. Suggestions for improvement are most welcome. In particular, it would be great if the processor utilization could be increased.
ComboFilter:= module()
(*________________________________________________________________________
This is a multi-threaded combinations filter. It generates all k-subsets
of an n-set, applies a user-specified filter to them, and writes the
remaining combinations to a file using a user-specified format. It is
intended to be used in situations where it is not feasible to represent all
combinations in memory at once (the same situations where one would use
combinat:-nextcomb). The generation, filtering, and writing are handled
in separate threads. The memory usage is about 7 times (very rough
estimate) the memory needed for a list of combinations of length 'blocksize'
(see the Parameters).
Parameters: See comment at the ModuleApply.
Returns: NULL: The output goes to a file.
````````````````````````````````````````````````````````````````````````````*)
uses
M= Threads:-Mutex,
LOCK= Threads:-Mutex:-Lock,
UNLOCK= Threads:-Mutex:-Unlock,
CV= Threads:-ConditionVariable,
WAIT= Threads:-ConditionVariable:-Wait,
SIGNAL= Threads:-ConditionVariable:-Signal
;
local
GenerateSignal,
FilterBuffer, FilterBufferLock, FilterSignal,
WriteBuffer, WriteBufferLock, WriteSignal,
Combo,
# Some macros to shorten the ubiquitous userinfo statements
_:= userinfo,
C:= (5, ComboFilter),
q:= (C, "Acquired."),
w:= s-> (C, sprintf("Waiting for %s.", s)),
u:= s-> (C, sprintf("Unlocked %s lock.", s)),
s:= s-> (C, sprintf("%s signalled.", s)),
longbuffer:= B-> `if`(nops(B) > 9, [B[1..9][], `...`], B),
(*_________________________________________________________________________
Generate is the first of the three main threads. It uses combinat:-nextcomb
to generate combinations and then places them in FilterBuffer, a
list shared with thread Filter and access-controlled by
FilterBufferLock. It sends FilterSignal to Filter and waits on
GenerateSignal from Filter.
Parameters: n::posint, blocksize::posint, preciseorder::truefalse (see
comment at ModuleApply).
Returns: NULL
```````````````````````````````````````````````````````````````````````````*)
(*
#**************************************************************************#
In each of the three main procedures, code in a comment box (like this) #
is the main algorithm; everything else is for thread synchronization. #
#**************************************************************************#*)
Generate:= proc(n::posint, blocksize::posint, preciseorder::truefalse)
local buffer, more:= true, i, j;
while more do
#******************************************************#
buffer:= table(); #
if preciseorder then #
for j to blocksize do #
buffer[j]:= Combo; #
Combo:= combinat:-nextcomb(Combo, n); #
if Combo = FAIL then #End #
more:= false; #
break #
end if #
end do; #
buffer:= [seq](buffer[i], i= 1..min(j, blocksize))#
else #
to blocksize do #
buffer[Combo]:= (); #
Combo:= combinat:-nextcomb(Combo, n); #
if Combo = FAIL then #End #
more:= false; #
break #
end if #
end do; #
buffer:= [indices](buffer, ':-nolist') #
end if; #
#******************************************************#
_(w("FB lock"));
LOCK (FilterBufferLock);_(q);
# Wait for Filter to empty the buffer.
while FilterBuffer <> () do
SIGNAL (FilterSignal);_(s("Filter"));
_(w("GenerateSignal"));
WAIT (GenerateSignal, FilterBufferLock);_(q)
end do;
#************************#
FilterBuffer:= buffer; #
#************************#
_(C, longbuffer(FilterBuffer));
SIGNAL (FilterSignal);_(s("Filter"));
UNLOCK (FilterBufferLock);_(u("FB"))
end do;
#Send final signal to Filter thread, so that it can end.
_(w("final lock"));
LOCK (FilterBufferLock);_(q);
# Wait for Filter to empty the buffer.
while FilterBuffer <> () do
SIGNAL (FilterSignal);_(s("Filter"));
_(w("GenerateSignal."));
WAIT (GenerateSignal, FilterBufferLock);_(q)
end do;
FilterBuffer:= [];
SIGNAL (FilterSignal);_(s("Filter"));
UNLOCK (FilterBufferLock);_(u("FB"));
[][]
end proc,
(*_____________________________________________________________________
Filter is the second of the three main threads. It uses Threads:-Map
to apply a user-specified filter to the combinations in FilterBuffer
and then places the remaining combinations in WriteBuffer, a list
shared with Write and accessed-controlled by WriteBufferLock. It sends
GenerateSignal to Generate and waits on FilterSignal from Generate; it
sends WriteSignal to Write and waits on FilterSignal from Write.
Parameter: filter::procedure (see comment at ModuleApply)
Returns: NULL
```````````````````````````````````````````````````````````````````````*)
Filter:= proc(filter::procedure)
local buffer, more:= true;
while more do
_(w("FB lock"));
LOCK (FilterBufferLock);_(q);
while FilterBuffer = () do
SIGNAL (GenerateSignal);_(s("Generate"));
_(w("FilterSignal"));
WAIT (FilterSignal, FilterBufferLock);_(q)
end do;
#******************************************************#
if FilterBuffer = [] then #
more:= false; #
buffer:= "empty" #Signal Write process to end. #
else #
buffer:= FilterBuffer #
end if; #
#******************************************************#
FilterBuffer:= (); #Signal to Generate
UNLOCK (FilterBufferLock);_(u("FB"));
#******************************************#
if buffer <> "empty" then #
buffer:= Threads:-Map(filter, buffer) #
end if; #
#******************************************#
_(w("WB lock"));
LOCK (WriteBufferLock);_(q);
# Wait for WriteBuffer to empty.
while WriteBuffer <> () do
SIGNAL (WriteSignal);_(s("Write"));
_(w("FilterSignal"));
WAIT (FilterSignal, WriteBufferLock);_(q);
end do;
_(C, longbuffer(buffer));
#************************#
WriteBuffer:= buffer; #
#************************#
SIGNAL (WriteSignal);_(s("Write"));
UNLOCK (WriteBufferLock);_(u("WB"))
end do;
[][]
end proc,
(*__________________________________________________________________________
Write is the third of the three main threads. It writes the contents of
WriteBuffer to a file using a user-specified formatting procedure. It waits
on WriteSignal from Filter and sends FilterSignal to Filter.
Parameters: filename::string, format::procedure (see comment at
ModuleApply).
Returns: NULL: Output goes to a file.
`````````````````````````````````````````````````````````````````````````````*)
Write:= proc(format::procedure)
local
buffer,
OutFile:= FileTools:-Text:-Open(Filename)
;
do
_(w("WB lock"));
LOCK (WriteBufferLock);_(q);
while WriteBuffer = () do
SIGNAL (FilterSignal);_(s("Filter"));
_(w("WriteSignal"));
WAIT (WriteSignal, WriteBufferLock);_(q);
end do;
if WriteBuffer = "empty" then #End
UNLOCK (WriteBufferLock);
break
end if;
#************************#
buffer:= WriteBuffer; #
#************************#
WriteBuffer:= (); #Signal to Filter to fill it.
UNLOCK (WriteBufferLock);_(u("WB"));
#***********************#
format(OutFile, buffer) #
#***********************#
end do;
[][]
end proc,
(*________________________________________________________________________
ModuleApply: Generates first combination, initiates all the Threads, and
waits.
Parameters:
n::posint: The set that the combinations are drawn from
k::posint: The size of a combination.
Keyword parameters (default values are specified with :=):
blocksize::posint:= 1:
The number of combinations to be treated as one "block". It does not need to
be a divisor of binomial(n,k); if necessary, a shorter final block will be
used.
filter::procedure:= C-> C:
The procedure used to filter the combinations. It should be a procedure
such that if L is a list of combinations, then map(filter, L) yields the
combinations that are to be kept. This is achieved by the procedure
returning either its input or NULL. The default filter keeps everything.
filename::string:= cat(currentdir(), kernelopts(diresep), "Combos.m"):
The name of the output file.
format::procedure:= (F,B)-> fprintf(F, "%m", B):
The procedure used to write one block of output to the file. The procedure
takes two arguments: F, the file pointer, and B, a list (possibly empty!)
of combinations. The default uses Maple's ".m" format, which is very
quick to read and write but is not human readable while it's in the file.
preciseorder::truefalse:= false:
Boolean option that specifies that the combinations be placed in the file
in their canonical order. This is slightly less efficient, so the default
is false.
`````````````````````````````````````````````````````````````````````````*)
ModuleApply:= proc(
n::posint, k::posint,
{
blocksize::posint:= 1,
filter::procedure:= (C-> C),
format::procedure:= ((F,B)-> fprintf(F, "%m", B)),
preciseorder::truefalse:= false
}
)
GenerateSignal:= CV:-Create();
FilterBuffer:= [][];
FilterBufferLock:= M:-Create(); FilterSignal:= CV:-Create();
WriteBuffer:= [][];
WriteBufferLock:= M:-Create(); WriteSignal:= CV:-Create();
Combo:= combinat:-firstcomb(n, k);
try
Threads:-Wait(
map(
Threads:-Create,
[
'Generate(n, blocksize, preciseorder)',
'Filter(filter)',
'Write(format)'
]
)[]
)
catch: error
finally
FileTools:-Text:-Close(Filename);
CV:-Destroy(GenerateSignal, WriteSignal, FilterSignal);
M:-Destroy(WriteBufferLock, FilterBufferLock);
end try;
[][]
end proc
;
export
# The default name of the output file. Change it by assigning to
# the module export: ComboFilter:-Filename:= ...;
Filename:= cat(currentdir(), kernelopts(':-dirsep'), "Combos.m"),
(*__________________________________________________________________________
Reader is a little submodule for reading the output files assuming that the
default format was used.
````````````````````````````````````````````````````````````````````````````*)
Reader:= module()
local
File,
(*____________________________________
The ModuleApply just opens the file.
```````````````````````````````````````*)
ModuleApply:= proc()
File:= FileTools:-Text:-Open(Filename);
[][]
end proc
;
export
(*____________________________________________________________________
Read() returns the contents of one block, or NULL if at end-of-file.
``````````````````````````````````````````````````````````````````````*)
Read:= proc()
if FileTools:-AtEndOfFile(File) then
_(1, ComboFilter, "At end of file.");
FileTools:-Text:-Close(File);
[][]
else
fscanf(File, "%m")[]
end if
end proc
;
end module
;
end module;
Download Para_Combos.mw