- Methods: All Methods Documented Methods Hide Methods
- Source: Display Source Hide Source
- Variables: Show Variables Hide Variables
Class Relations
Methods (to be applied on the object)
create_spooler (scripted)
nr_running (forward, public)
bgdelivery nr_running
Interface to the background delivery thread to query the number of currently running deliveries.
- Returns:
- number of currently running background deliveries
- Partial Call Graph (max 5 caller/called nodes):
- Testcases:
- No testcase defined.
returnfile (scripted, public)
bgdelivery returnfile [ -client_data client_data ] [ -delete delete ] \ [ -content_disposition content_disposition ] status_code mime_type \ filename
Deliver the given file to the requester in the background. This proc uses the background delivery thread to send the file in an event-driven manner without blocking a request thread. This is especially important when large files are requested over slow connections. With NaviServer, this function is mostly obsolete, at least, when writer threads are configured. The writer threads have as well the advantage, that these can be used with https, while the bgdelivery thread works directly on the socket. One remaining purpose of this function is h264 streaming delivery (when the module is in use).
- Switches:
- -client_data (optional)
- -delete (defaults to
- -content_disposition (optional)
"false"
) (optional)- Parameters:
-
status_code
mime_type
filename
- Partial Call Graph (max 5 caller/called nodes):
- Testcases:
- No testcase defined.
running (forward, public)
bgdelivery running
Interface to the background delivery thread to query the currently running deliveries.
- Returns:
- list of key value pairs of all currently running background processes
- Partial Call Graph (max 5 caller/called nodes):
- Testcases:
- No testcase defined.
send_to_subscriber (scripted)
spooler_add_request (scripted)
spooler_release (scripted)
subscribe (scripted)
write_headers (forward)
Variables
::bgdelivery set exithandler {ns_log notice "EXITHANDLER of slave thread SELF 1179460"} ::bgdelivery set initcmd { package req XOTcl namespace import -force ::xotcl::* ns_thread name ::bgdelivery ::xotcl::Object setExitHandler \ {ns_log notice "EXITHANDLER of slave thread ::bgdelivery 1179460"} set ::xotcl::currentScript /var/www/openacs.org/packages/xotcl-core/tcl/bgdelivery-procs.tcl set ::xotcl::currentThread ::bgdelivery############### # FileSpooler ############### # Class FileSpooler makes it easier to overload the # per-object methods of the concrete file spoolers # (such has fileSpooler or h264Spooler) Class create FileSpooler############### # File delivery ############### set ::delivery_count 0 FileSpooler create fileSpooler fileSpooler set tick_interval 60000 ;# 1 min fileSpooler proc deliver_ranges {ranges client_data filename fd channel} { set first_range [lindex $ranges 0] set remaining_ranges [lrange $ranges 1 end] lassign $first_range from to size if {$remaining_ranges eq ""} {# A single delivery, which is as well the last; when finished # with this chunk, terminate delivery set cmd [list [self] end-delivery -client_data $client_data $filename $fd $channel] } else {# # For handling multiple ranges, HTTP/1.1 requires multipart # messages (multipart media type: multipart/byteranges); # currently these are not implemented (missing test cases). The # code handling the range tag switches currently to full # delivery, when multiple ranges are requested. # set cmd [list [self] deliver_ranges $remaining_ranges $client_data $filename $fd $channel] } seek $fd $from#ns_log notice "Range seek $from $filename // $first_range" fcopy $fd $channel -size $size -command $cmd } fileSpooler proc spool {{-ranges ""} {-delete false} -channel -filename -context \ {-client_data ""}} { set fd [open $filename] fconfigure $fd -translation binary fconfigure $channel -translation binary set k ::runningBgJob([lindex $context 0]) if {[info exists $k]} { set value [set $k] ns_log notice "resubmit: canceling currently running request $context // closing $value" lassign $value fd0 channel0 client_data0 filename0 :end-delivery -client_data $client_data0 $filename0 $fd0 $channel0 -1 } set $k [list $fd $channel $client_data $filename] if {$ranges eq ""} { ns_log notice "no Range spool for $filename" fcopy $fd $channel -command [list [self] end-delivery -client_data $client_data $filename $fd \ $channel] } else { :deliver_ranges $ranges $client_data $filename $fd $channel }#ns_log notice "--- start of delivery of $filename (running:[array size ::running])" set key $channel,$fd,$filename set ::running($key) $context if {$delete} {set ::delete_file($key) 1} incr ::delivery_count } fileSpooler proc end-delivery {{-client_data ""} filename fd channel bytes args} {#ns_log notice "--- end of delivery of $filename, $bytes bytes written $args" if {[catch {fconfigure $channel -blocking false} e]} \ {ns_log notice "bgdelivery, fconfigure for channel, error: $e"} if {[catch {close $channel} e]} \ {ns_log notice "bgdelivery, closing channel for $filename, error: $e"} if {[catch {close $fd} e]} {ns_log notice "bgdelivery, closing file $filename, error: $e"} set key $channel,$fd,$filename unset -nocomplain ::runningBgJob([lindex $::running($key) 0]) unset ::running($key) if {[info exists ::delete_file($key)]} { file delete -- $filename unset ::delete_file($key) } } fileSpooler proc cleanup {} {# This method should not be necessary. However, under unclear conditions, # some fcopies seem to go into a stasis. After 2000 seconds, we will kill it. foreach {index entry} [array get ::running] { lassign $entry key elapsed set t [ns_time diff [ns_time get] $elapsed] if {[ns_time seconds $t] > 2000} { if {[regexp {^([^,]+),([^,]+),(.+)$} $index _ channel fd filename]} { ns_log notice "bgdelivery, fileSpooler cleanup after [ns_time seconds $t] seconds, $key" :end-delivery $filename $fd $channel -1 } } } } fileSpooler proc tick {} { if {[catch {:cleanup} errorMsg]} {ns_log error "Error during filespooler cleanup: $errorMsg"} set :to [after ${:tick_interval} [list [self] tick]] } fileSpooler tick############### # h264Spooler ############### # # A first draft of a h264 pseudo streaming spooler. # Like for the fileSpooler, we create a single spooler object # that handles spooling for all active streams. The per-stream context # is passed via argument lists. # FileSpooler create h264Spooler h264Spooler set blockCount 0 h264Spooler set byteCount 0 h264Spooler proc spool {{-delete false} -channel -filename -context {-client_data ""} -query} {#ns_log notice "h264 SPOOL gets filename '$filename'" if {[catch { set handle [h264open $filename $query] } errorMsg]} { ns_log error "h264: error opening h264 channel for $filename $query: $errorMsg" if {[catch {close $channel} e]} \ {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"} return }# set up book-keeping info incr ::delivery_count set key $channel,$handle,$filename set ::bytes($key) 0 set ::running($key) $context if {$delete} {set ::delete_file($key) 1}# # h264open is quite expensive; in order to output the HTTP headers # in the connection thread, we would have to use h264open in the # connection thread as well to determine the proper size. To avoid # this overhead, we don't write the headers in the connection # thread and write it here instead (note that this is different to # the fileSpooler above). # if {[catch { set length [h264length $handle] puts $channel "HTTP/1.0 200 OK\nContent-Type: video/mp4\nContent-Length: $length\n" flush $channel } errorMsg]} { ns_log notice "h264: error writing headers in h264 channel for $filename $query: $errorMsg" :end-delivery -client_data $client_data $filename $handle $channel 0 }# setup async delivery fconfigure $channel -translation binary -blocking false fileevent $channel writable [list [self] writeBlock $client_data $filename $handle $channel] } h264Spooler proc writeBlock {client_data filename handle channel} { h264Spooler incr blockCount set bytesVar ::bytes($channel,$handle,$filename)#ns_log notice "h264 WRITE BLOCK $channel $handle" if {[eof $channel] || [h264eof $handle]} { :end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar] } else { set block [h264read $handle]# one should not use "bytelength" on binary data: https://wiki.tcl-lang.org/8455 set len [string length $block] incr $bytesVar $len h264Spooler incr byteCount $len if {[catch {puts -nonewline $channel $block} errorMsg]} { ns_log notice "h264: error on writing to channel $channel: $errorMsg" :end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar] } } } h264Spooler proc end-delivery {{-client_data ""} filename handle channel bytes args} { ns_log notice "h264 FINISH $channel $handle" if {[catch {close $channel} e]} \ {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"} if {[catch {h264close $handle} e]} \ {ns_log notice "bgdelivery, closing h264 $filename, error: $e"} set key $channel,$handle,$filename unset ::running($key) unset ::bytes($key) if {[info exists ::delete_file($key)]} { file delete -- $filename unset ::delete_file($key) } }################# # AsyncDiskWriter ################# ::xotcl::Class create ::AsyncDiskWriter -parameter { {blocksize 4096} {autoflush false} {verbose false} } ::AsyncDiskWriter instproc log {msg} { if {[:verbose]} {ns_log notice "[self] --- $msg"} } ::AsyncDiskWriter instproc open {-filename {-mode w}} { set :channel [open $filename $mode] set :content "" set :filename $filename fconfigure ${:channel} -translation binary -blocking false :log "open ${:filename}" } ::AsyncDiskWriter instproc close {{-sync false}} { if {$sync || ${:content} eq ""} { :log "close sync" if {${:content} ne ""} { fconfigure ${:channel} -translation binary -blocking true puts -nonewline ${:channel} ${:content} } close ${:channel} :destroy } else { :log "close async" set :finishWhenDone 1 } } ::AsyncDiskWriter instproc async_write {block} { append :content $block fileevent ${:channel} writable [list [self] writeBlock] } ::AsyncDiskWriter instproc writeBlock {} { if {[string length ${:content}] < ${:blocksize}} { puts -nonewline ${:channel} ${:content} :log "write [string length ${:content}] bytes" fileevent ${:channel} writable "" set :content "" if {[:autoflush]} {flush ${:channel}} if {[info exists :finishWhenDone]} { :close -sync true } } else { set chunk [string range ${:content} 0 ${:blocksize}-1] set :content [string range ${:content} ${:blocksize} end] puts -nonewline ${:channel} $chunk :log "write [string length $chunk] bytes ([string length ${:content}] buffered)" } }############### # Subscriptions ############### set ::subscription_count 0 set ::message_count 0 ::xotcl::Class create Subscriber -parameter {key channel user_id mode {start_of_page ""}} Subscriber proc current {-key } { set result [list] if {[info exists key]} { if {[info exists :subscriptions($key)]} { return [list $key [set :subscriptions($key)]] } } elseif {[info exists :subscriptions]} { foreach key [array names :subscriptions] { lappend result $key [set :subscriptions($key)] } } } Subscriber instproc close {} { set channel [:channel]# # It is important to make the channel nonblocking for the close, # since otherwise the close operation might block and bring all of # bgdelivery to a halt. # catch {fconfigure $channel -blocking false} catch {close $channel} } Subscriber instproc sweep {args} {# # when the sweeper raises an error the caller (foreachSubscriber) # destroys the instance. In this step the peer connection is close # as well. # set channel [:channel] if {[catch {set eof [eof $channel]}]} {set eof 1} :log "sweep [:channel] EOF $eof" if {$eof} { throw {AD_CLIENTDISCONNECT} "connection $channel closed by peer" }# make an IO attempt to trigger EOF if {[catch { set blocking [fconfigure $channel -blocking] fconfigure $channel -blocking false set x [read $channel] fconfigure $channel -blocking $blocking } errorMsg]} { throw {AD_CLIENTDISCONNECT} "connection $channel closed due to IO error" } } Subscriber instproc send {msg} {#ns_log notice "SEND <$msg> [:mode]" :log "" ::sec_handler_reset set smsg [::xo::mr::bgdelivery encode_message [:mode] $msg]#:log "-- sending to subscriber for [:key] $smsg ch=[:channel] # mode=[:mode], user_id \ ${:user_id}" try { puts -nonewline [:channel] $smsg flush [:channel] } on error {errorMsg} {# Broken pipe and connection reset are to be expected if the # clients (e.g. browser in a chat) leaves the page. It is not a # real error. set ok_errors { "POSIX EPIPE {broken pipe}" "POSIX ECONNRESET {connection reset by peer}" } if {$::errorCode in $ok_errors} { throw {AD_CLIENTDISCONNECT} {client disconnected} } else { throw $::errorCode $errorMsg } } } Subscriber proc foreachSubscriber {key method {argument ""}} { :msg "$key $method '$argument'" if {[info exists :subscriptions($key)]} { set subs1 [list] foreach s [set :subscriptions($key)] { try { $s $method $argument } trap {AD_CLIENTDISCONNECT} {errMsg} { ns_log warning "$method to subscriber $s (key=$key): $errMsg" $s destroy } on error {errMsg} { ns_log error "error in $method to subscriber $s (key=$key): $errMsg\n$::errorInfo" $s destroy } on ok {result} { lappend subs1 $s } } set :subscriptions($key) $subs1 } } Subscriber proc broadcast {key msg} { :foreachSubscriber $key send $msg incr ::message_count } Subscriber proc sweep {key} { :foreachSubscriber $key sweep } Subscriber instproc destroy {} { :close next } Subscriber instproc init {} { [:info class] instvar subscriptions lappend subscriptions([:key]) [self] incr ::subscription_count#:log "-- cl=[:info class], subscriptions([:key]) = $subscriptions([:key])" fconfigure [:channel] -translation binary fconfigure [:channel] -encoding utf-8 puts -nonewline [:channel] ${:start_of_page} flush [:channel] }############### # HttpSpooler ############### Class create ::HttpSpooler -parameter {channel {timeout 10000} {counter 0}} ::HttpSpooler instproc init {} { set :running 0 set :release 0 set :spooling 0 set :queue [list] } ::HttpSpooler instproc all_done {} { catch {close [:channel]} :log "" :destroy } ::HttpSpooler instproc release {} {# release indicates the when running becomes 0, the spooler is finished set :release 1 if {${:running} == 0} {:all_done} } ::HttpSpooler instproc done {reason request} { incr :running -1 :log "--running ${:running}" $request destroy if {${:running} == 0 && ${:release}} {:all_done} } ::HttpSpooler instproc deliver {data request {encoding binary}} { :log "-- spooling ${:spooling}" if {${:spooling}} { :log "--enqueue" lappend :queue $data $request $encoding } else {#:log "--send" set :spooling 1# puts -nonewline [:channel] $data # :done set fd [file tempfile spool_filename [ad_tmpdir]/nsd-spool-XXXXXX] fconfigure $fd -translation binary -encoding $encoding puts -nonewline $fd $data close $fd set fd [open $spool_filename] fconfigure $fd -translation binary -encoding $encoding fconfigure [:channel] -translation binary -encoding $encoding fcopy $fd [:channel] -command [list [self] end-delivery $spool_filename $fd [:channel] \ $request] } } ::HttpSpooler instproc end-delivery {filename fd ch request bytes args} { :log "--- end of delivery of $filename, $bytes bytes written $args" if {[catch {close $fd} e]} {ns_log notice "httpspool, closing file $filename, error: $e"} set :spooling 0 if {[llength ${:queue}]>0} { :log "--dequeue" lassign ${:queue} data req enc set :queue [lreplace ${:queue} 0 2] :deliver $data $req $enc } :done delivered $request } ::HttpSpooler instproc add {-request {-post_data ""}} { if {[regexp {http://([^/]*)(/.*)} $request _ host path]} { set port 80 regexp {^([^:]+):(.*)$} $host _ host port incr :running xo::AsyncHttpRequest [self]::[incr :counter] -host $host -port $port -path $path -timeout \ [:timeout] -post_data $post_data -request_manager [self] } }# # Add an exit handler to close all AsyncDiskWriter, when this thread goes # down. # ::xotcl::Object setExitHandler { ns_log notice "--- exit handler" foreach writer [::AsyncDiskWriter info instances -closure] { ns_log notice "close AsyncDiskWriter $writer" $writer close } } } ::bgdelivery set lightweight 0 ::bgdelivery set mutex ns_mutex::bgdelivery ::bgdelivery set persistent 1
- Methods: All Methods Documented Methods Hide Methods
- Source: Display Source Hide Source
- Variables: Show Variables Hide Variables