# # tpool.tcl -- # # Tcl implementation of a threadpool paradigm in pure Tcl using # the Tcl threading extension 2.5 (or higher). # # This file is for example purposes only. The efficient C-level # threadpool implementation is already a part of the threading # extension starting with 2.5 version. Both implementations have # the same Tcl API so both can be used interchangeably. Goal of # this implementation is to serve as an example of using the Tcl # extension to implement some very common threading paradigms. # # Beware: with time, as improvements are made to the C-level # implementation, this Tcl one might lag behind. # Please consider this code as a working example only. # # # # Copyright (c) 2002 by Zoran Vasiljevic. # # See the file "license.terms" for information on usage and # redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES. # ----------------------------------------------------------------------------- package require Thread 2.5 set thisScript [info script] namespace eval tpool { variable afterevent "" ; # Idle timer event for worker threads variable result ; # Stores result from the worker thread variable waiter ; # Waits for an idle worker thread variable jobsdone ; # Accumulates results from worker threads # # Create shared array with a single element. # It is used for automatic pool handles creation. # set ns [namespace current] tsv::lock $ns { if {[tsv::exists $ns count] == 0} { tsv::set $ns count 0 } tsv::set $ns count -1 } variable thisScript [info script] } # # tpool::create -- # # Creates instance of a thread pool. # # Arguments: # args Variable number of key/value arguments, as follows: # # -minworkers minimum # of worker threads (def:0) # -maxworkers maximum # of worker threads (def:4) # -idletime # of sec worker is idle before exiting (def:0 = never) # -initcmd script used to initialize new worker thread # -exitcmd script run at worker thread exit # # Side Effects: # Might create many new threads if "-minworkers" option is > 0. # # Results: # The id of the newly created thread pool. This id must be used # in all other tpool::* commands. # proc tpool::create {args} { variable thisScript # # Get next threadpool handle and create the pool array. # set usage "wrong \# args: should be \"[lindex [info level 1] 0]\ ?-minworkers count? ?-maxworkers count?\ ?-initcmd script? ?-exitcmd script?\ ?-idletime seconds?\"" set ns [namespace current] set tpid [namespace tail $ns][tsv::incr $ns count] tsv::lock $tpid { tsv::set $tpid name $tpid } # # Setup default pool data. # tsv::array set $tpid { thrworkers "" thrwaiters "" jobcounter 0 refcounter 0 numworkers 0 -minworkers 0 -maxworkers 4 -idletime 0 -initcmd "" -exitcmd "" } tsv::set $tpid -initcmd "source $thisScript" # # Override with user-supplied data # if {[llength $args] % 2} { error $usage } foreach {arg val} $args { switch -- $arg { -minworkers - -maxworkers {tsv::set $tpid $arg $val} -idletime {tsv::set $tpid $arg [expr {$val*1000}]} -initcmd {tsv::append $tpid $arg \n $val} -exitcmd {tsv::append $tpid $arg \n $val} default { error $usage } } } # # Start initial (minimum) number of worker threads. # for {set ii 0} {$ii < [tsv::set $tpid -minworkers]} {incr ii} { Worker $tpid } return $tpid } # # tpool::names -- # # Returns list of currently created threadpools # # Arguments: # None. # # Side Effects: # None. # # Results # List of active threadpoool identifiers or empty if none found # # proc tpool::names {} { tsv::names [namespace tail [namespace current]]* } # # tpool::post -- # # Submits the new job to the thread pool. The caller might pass # the job in two modes: synchronous and asynchronous. # For the synchronous mode, the pool implementation will retain # the result of the passed script until the caller collects it # using the "thread::get" command. # For the asynchronous mode, the result of the script is ignored. # # Arguments: # args Variable # of arguments with the following syntax: # tpool::post ?-detached? tpid script # # -detached flag to turn the async operation (ignore result) # tpid the id of the thread pool # script script to pass to the worker thread for execution # # Side Effects: # Depends on the passed script. # # Results: # The id of the posted job. This id is used later on to collect # result of the job and set local variables accordingly. # For asynchronously posted jobs, the return result is ignored # and this function returns empty result. # proc tpool::post {args} { # # Parse command arguments. # set ns [namespace current] set usage "wrong \# args: should be \"[lindex [info level 1] 0]\ ?-detached? tpoolId script\"" if {[llength $args] == 2} { set detached 0 set tpid [lindex $args 0] set cmd [lindex $args 1] } elseif {[llength $args] == 3} { if {[lindex $args 0] != "-detached"} { error $usage } set detached 1 set tpid [lindex $args 1] set cmd [lindex $args 2] } else { error $usage } # # Find idle (or create new) worker thread. This is relatively # a complex issue, since we must honour the limits about number # of allowed worker threads imposed to us by the caller. # set tid "" while {$tid == ""} { tsv::lock $tpid { set tid [tsv::lpop $tpid thrworkers] if {$tid == "" || [catch {thread::preserve $tid}]} { set tid "" tsv::lpush $tpid thrwaiters [thread::id] end if {[tsv::set $tpid numworkers]<[tsv::set $tpid -maxworkers]} { Worker $tpid } } } if {$tid == ""} { vwait ${ns}::waiter } } # # Post the command to the worker thread # if {$detached} { set j "" thread::send -async $tid [list ${ns}::Run $tpid 0 $cmd] } else { set j [tsv::incr $tpid jobcounter] thread::send -async $tid [list ${ns}::Run $tpid $j $cmd] ${ns}::result } variable jobsdone set jobsdone($j) "" return $j } # # tpool::wait -- # # Waits for jobs sent with "thread::post" to finish. # # Arguments: # tpid Name of the pool shared array. # jobList List of job id's done. # jobLeft List of jobs still pending. # # Side Effects: # Might eventually enter the event loop while waiting # for the job result to arrive from the worker thread. # It ignores bogus job ids. # # Results: # Result of the job. If the job resulted in error, it sets # the global errorInfo and errorCode variables accordingly. # proc tpool::wait {tpid jobList {jobLeft ""}} { variable result variable jobsdone if {$jobLeft != ""} { upvar $jobLeft jobleft } set retlist "" set jobleft "" foreach j $jobList { if {[info exists jobsdone($j)] == 0} { continue ; # Ignore (skip) bogus job ids } if {$jobsdone($j) != ""} { lappend retlist $j } else { lappend jobleft $j } } if {[llength $retlist] == 0 && [llength $jobList]} { # # No jobs found; wait for the first one to get ready. # set jobleft $jobList while {1} { vwait [namespace current]::result set doneid [lindex $result 0] set jobsdone($doneid) $result if {[lsearch $jobList $doneid] >= 0} { lappend retlist $doneid set x [lsearch $jobleft $doneid] set jobleft [lreplace $jobleft $x $x] break } } } return $retlist } # # tpool::get -- # # Waits for a job sent with "thread::post" to finish. # # Arguments: # tpid Name of the pool shared array. # jobid Id of the previously posted job. # # Side Effects: # None. # # Results: # Result of the job. If the job resulted in error, it sets # the global errorInfo and errorCode variables accordingly. # proc tpool::get {tpid jobid} { variable jobsdone if {[lindex $jobsdone($jobid) 1] != 0} { eval error [lrange $jobsdone($jobid) 2 end] } return [lindex $jobsdone($jobid) 2] } # # tpool::preserve -- # # Increments the reference counter of the threadpool, reserving it # for the private usage.. # # Arguments: # tpid Name of the pool shared array. # # Side Effects: # None. # # Results: # Current number of threadpool reservations. # proc tpool::preserve {tpid} { tsv::incr $tpid refcounter } # # tpool::release -- # # Decrements the reference counter of the threadpool, eventually # tearing the pool down if this was the last reservation. # # Arguments: # tpid Name of the pool shared array. # # Side Effects: # If the number of reservations drops to zero or below # the threadpool is teared down. # # Results: # Current number of threadpool reservations. # proc tpool::release {tpid} { tsv::lock $tpid { if {[tsv::incr $tpid refcounter -1] <= 0} { # Release all workers threads foreach t [tsv::set $tpid thrworkers] { thread::release -wait $t } tsv::unset $tpid ; # This is not an error; it works! } } } # # Private procedures, not a part of the threadpool API. # # # tpool::Worker -- # # Creates new worker thread. This procedure must be executed # under the tsv lock. # # Arguments: # tpid Name of the pool shared array. # # Side Effects: # Depends on the thread initialization script. # # Results: # None. # proc tpool::Worker {tpid} { # # Create new worker thread # set tid [thread::create] thread::send $tid [tsv::set $tpid -initcmd] thread::preserve $tid tsv::incr $tpid numworkers tsv::lpush $tpid thrworkers $tid # # Signalize waiter threads if any # set waiter [tsv::lpop $tpid thrwaiters] if {$waiter != ""} { thread::send -async $waiter [subst { set [namespace current]::waiter 1 }] } } # # tpool::Timer -- # # This procedure should be executed within the worker thread only. # It registers the callback for terminating the idle thread. # # Arguments: # tpid Name of the pool shared array. # # Side Effects: # Thread may eventually exit. # # Results: # None. # proc tpool::Timer {tpid} { tsv::lock $tpid { if {[tsv::set $tpid numworkers] > [tsv::set $tpid -minworkers]} { # # We have more workers than needed, so kill this one. # We first splice ourselves from the list of active # workers, adjust the number of workers and release # this thread, which may exit eventually. # set x [tsv::lsearch $tpid thrworkers [thread::id]] if {$x >= 0} { tsv::lreplace $tpid thrworkers $x $x tsv::incr $tpid numworkers -1 set exitcmd [tsv::set $tpid -exitcmd] if {$exitcmd != ""} { catch {eval $exitcmd} } thread::release } } } } # # tpool::Run -- # # This procedure should be executed within the worker thread only. # It performs the actual command execution in the worker thread. # # Arguments: # tpid Name of the pool shared array. # jid The job id # cmd The command to execute # # Side Effects: # Many, depending of the passed command # # Results: # List for passing the evaluation result and status back. # proc tpool::Run {tpid jid cmd} { # # Cancel the idle timer callback, if any. # variable afterevent if {$afterevent != ""} { after cancel $afterevent } # # Evaluate passed command and build the result list. # set code [catch {uplevel \#0 $cmd} ret] if {$code == 0} { set res [list $jid 0 $ret] } else { set res [list $jid $code $ret $::errorInfo $::errorCode] } # # Check to see if any caller is waiting to be serviced. # If yes, kick it out of the waiting state. # set ns [namespace current] tsv::lock $tpid { tsv::lpush $tpid thrworkers [thread::id] set waiter [tsv::lpop $tpid thrwaiters] if {$waiter != ""} { thread::send -async $waiter [subst { set ${ns}::waiter 1 }] } } # # Release the thread. If this turns out to be # the last refcount held, don't bother to do # any more work, since thread will soon exit. # if {[thread::release] <= 0} { return $res } # # Register the idle timer again. # if {[set idle [tsv::set $tpid -idletime]]} { set afterevent [after $idle [subst { ${ns}::Timer $tpid }]] } return $res } # EOF $RCSfile: tpool.tcl,v $ # Emacs Setup Variables # Local Variables: # mode: Tcl # indent-tabs-mode: nil # tcl-basic-offset: 4 # End: