::nsf::object::alloc ::xotcl::THREAD ::bgdelivery {set :exithandler {ns_log notice "EXITHANDLER of slave thread SELF 712337"}
set :initcmd {
package req XOTcl
namespace import -force ::xotcl::*
ns_thread name ::bgdelivery
::xotcl::Object setExitHandler {ns_log notice "EXITHANDLER of slave thread ::bgdelivery 712337"}
set ::xotcl::currentScript /var/www/openacs.org/packages/xotcl-core/tcl/bgdelivery-procs.tcl
set ::xotcl::currentThread ::bgdelivery
Class create FileSpooler
set ::delivery_count 0
FileSpooler create fileSpooler
fileSpooler set tick_interval 60000 ;
fileSpooler proc deliver_ranges {ranges client_data filename fd channel} {
set first_range [lindex $ranges 0]
set remaining_ranges [lrange $ranges 1 end]
lassign $first_range from to size
if {$remaining_ranges eq ""} {
set cmd [list [self] end-delivery -client_data $client_data $filename $fd $channel]
} else {
set cmd [list [self] deliver_ranges $remaining_ranges $client_data $filename $fd $channel]
}
seek $fd $from
fcopy $fd $channel -size $size -command $cmd
}
fileSpooler proc spool {{-ranges ""} {-delete false} -channel -filename -context {-client_data ""}} {
set fd [open $filename]
fconfigure $fd -translation binary
fconfigure $channel -translation binary
set k ::runningBgJob([lindex $context 0])
if {[info exists $k]} {
set value [set $k]
ns_log notice "resubmit: canceling currently running request $context // closing $value"
lassign $value fd0 channel0 client_data0 filename0
:end-delivery -client_data $client_data0 $filename0 $fd0 $channel0 -1
}
set $k [list $fd $channel $client_data $filename]
if {$ranges eq ""} {
ns_log notice "no Range spool for $filename"
fcopy $fd $channel -command [list [self] end-delivery -client_data $client_data $filename $fd $channel]
} else {
:deliver_ranges $ranges $client_data $filename $fd $channel
}
set key $channel,$fd,$filename
set ::running($key) $context
if {$delete} {set ::delete_file($key) 1}
incr ::delivery_count
}
fileSpooler proc end-delivery {{-client_data ""} filename fd channel bytes args} {
if {[catch {fconfigure $channel -blocking false} e]} {ns_log notice "bgdelivery, fconfigure for channel, error: $e"}
if {[catch {close $channel} e]} {ns_log notice "bgdelivery, closing channel for $filename, error: $e"}
if {[catch {close $fd} e]} {ns_log notice "bgdelivery, closing file $filename, error: $e"}
set key $channel,$fd,$filename
unset -nocomplain ::runningBgJob([lindex $::running($key) 0])
unset ::running($key)
if {[info exists ::delete_file($key)]} {
file delete -- $filename
unset ::delete_file($key)
}
}
fileSpooler proc cleanup {} {
foreach {index entry} [array get ::running] {
lassign $entry key elapsed
set t [ns_time diff [ns_time get] $elapsed]
if {[ns_time seconds $t] > 2000} {
if {[regexp {^([^,]+),([^,]+),(.+)$} $index _ channel fd filename]} {
ns_log notice "bgdelivery, fileSpooler cleanup after [ns_time seconds $t] seconds, $key"
:end-delivery $filename $fd $channel -1
}
}
}
}
fileSpooler proc tick {} {
if {[catch {:cleanup} errorMsg]} {ns_log error "Error during filespooler cleanup: $errorMsg"}
set :to [after ${:tick_interval} [list [self] tick]]
}
fileSpooler tick
FileSpooler create h264Spooler
h264Spooler set blockCount 0
h264Spooler set byteCount 0
h264Spooler proc spool {{-delete false} -channel -filename -context {-client_data ""} -query} {
if {[catch {
set handle [h264open $filename $query]
} errorMsg]} {
ns_log error "h264: error opening h264 channel for $filename $query: $errorMsg"
if {[catch {close $channel} e]} {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"}
return
}
incr ::delivery_count
set key $channel,$handle,$filename
set ::bytes($key) 0
set ::running($key) $context
if {$delete} {set ::delete_file($key) 1}
if {[catch {
set length [h264length $handle]
puts $channel "HTTP/1.0 200 OK\nContent-Type: video/mp4\nContent-Length: $length\n"
flush $channel
} errorMsg]} {
ns_log notice "h264: error writing headers in h264 channel for $filename $query: $errorMsg"
:end-delivery -client_data $client_data $filename $handle $channel 0
}
fconfigure $channel -translation binary -blocking false
fileevent $channel writable [list [self] writeBlock $client_data $filename $handle $channel]
}
h264Spooler proc writeBlock {client_data filename handle channel} {
h264Spooler incr blockCount
set bytesVar ::bytes($channel,$handle,$filename)
if {[eof $channel] || [h264eof $handle]} {
:end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar]
} else {
set block [h264read $handle]
set len [string length $block]
incr $bytesVar $len
h264Spooler incr byteCount $len
if {[catch {puts -nonewline $channel $block} errorMsg]} {
ns_log notice "h264: error on writing to channel $channel: $errorMsg"
:end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar]
}
}
}
h264Spooler proc end-delivery {{-client_data ""} filename handle channel bytes args} {
ns_log notice "h264 FINISH $channel $handle"
if {[catch {close $channel} e]} {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"}
if {[catch {h264close $handle} e]} {ns_log notice "bgdelivery, closing h264 $filename, error: $e"}
set key $channel,$handle,$filename
unset ::running($key)
unset ::bytes($key)
if {[info exists ::delete_file($key)]} {
file delete -- $filename
unset ::delete_file($key)
}
}
::xotcl::Class create ::AsyncDiskWriter -parameter {
{blocksize 4096}
{autoflush false}
{verbose false}
}
::AsyncDiskWriter instproc log {msg} {
if {[:verbose]} {ns_log notice "[self] --- $msg"}
}
::AsyncDiskWriter instproc open {-filename {-mode w}} {
set :channel [open $filename $mode]
set :content ""
set :filename $filename
fconfigure ${:channel} -translation binary -blocking false
:log "open ${:filename}"
}
::AsyncDiskWriter instproc close {{-sync false}} {
if {$sync || ${:content} eq ""} {
:log "close sync"
if {${:content} ne ""} {
fconfigure ${:channel} -translation binary -blocking true
puts -nonewline ${:channel} ${:content}
}
close ${:channel}
:destroy
} else {
:log "close async"
set :finishWhenDone 1
}
}
::AsyncDiskWriter instproc async_write {block} {
append :content $block
fileevent ${:channel} writable [list [self] writeBlock]
}
::AsyncDiskWriter instproc writeBlock {} {
if {[string length ${:content}] < ${:blocksize}} {
puts -nonewline ${:channel} ${:content}
:log "write [string length ${:content}] bytes"
fileevent ${:channel} writable ""
set :content ""
if {[:autoflush]} {flush ${:channel}}
if {[info exists :finishWhenDone]} {
:close -sync true
}
} else {
set chunk [string range ${:content} 0 ${:blocksize}-1]
set :content [string range ${:content} ${:blocksize} end]
puts -nonewline ${:channel} $chunk
:log "write [string length $chunk] bytes ([string length ${:content}] buffered)"
}
}
set ::subscription_count 0
set ::message_count 0
::xotcl::Class create Subscriber -parameter {key channel user_id mode {start_of_page ""}}
Subscriber proc current {-key } {
set result [list]
if {[info exists key]} {
if {[info exists :subscriptions($key)]} {
return [list $key [set :subscriptions($key)]]
}
} elseif {[info exists :subscriptions]} {
foreach key [array names :subscriptions] {
lappend result $key [set :subscriptions($key)]
}
}
}
Subscriber instproc close {} {
set channel [:channel]
catch {fconfigure $channel -blocking false}
catch {close $channel}
}
Subscriber instproc sweep {args} {
set channel [:channel]
if {[catch {set eof [eof $channel]}]} {set eof 1}
:log "sweep [:channel] EOF $eof"
if {$eof} {
throw {AD_CLIENTDISCONNECT} "connection $channel closed by peer"
}
if {[catch {
set blocking [fconfigure $channel -blocking]
fconfigure $channel -blocking false
set x [read $channel]
fconfigure $channel -blocking $blocking
} errorMsg]} {
throw {AD_CLIENTDISCONNECT} "connection $channel closed due to IO error"
}
}
Subscriber instproc send {msg} {
:log ""
::sec_handler_reset
set smsg [::xo::mr::bgdelivery encode_message [:mode] $msg]
try {
puts -nonewline [:channel] $smsg
flush [:channel]
} on error {errorMsg} {
set ok_errors {
"POSIX EPIPE {broken pipe}"
"POSIX ECONNRESET {connection reset by peer}"
}
if {$::errorCode in $ok_errors} {
throw {AD_CLIENTDISCONNECT} {client disconnected}
} else {
throw $::errorCode $errorMsg
}
}
}
Subscriber proc foreachSubscriber {key method {argument ""}} {
:msg "$key $method '$argument'"
if {[info exists :subscriptions($key)]} {
set subs1 [list]
foreach s [set :subscriptions($key)] {
try {
$s $method $argument
} trap {AD_CLIENTDISCONNECT} {errMsg} {
ns_log warning "$method to subscriber $s (key=$key): $errMsg"
$s destroy
} on error {errMsg} {
ns_log error "error in $method to subscriber $s (key=$key): $errMsg\n$::errorInfo"
$s destroy
} on ok {result} {
lappend subs1 $s
}
}
set :subscriptions($key) $subs1
}
}
Subscriber proc broadcast {key msg} {
:foreachSubscriber $key send $msg
incr ::message_count
}
Subscriber proc sweep {key} {
:foreachSubscriber $key sweep
}
Subscriber instproc destroy {} {
:close
next
}
Subscriber instproc init {} {
[:info class] instvar subscriptions
lappend subscriptions([:key]) [self]
incr ::subscription_count
fconfigure [:channel] -translation binary
fconfigure [:channel] -encoding utf-8
puts -nonewline [:channel] ${:start_of_page}
flush [:channel]
}
Class create ::HttpSpooler -parameter {channel {timeout 10000} {counter 0}}
::HttpSpooler instproc init {} {
set :running 0
set :release 0
set :spooling 0
set :queue [list]
}
::HttpSpooler instproc all_done {} {
catch {close [:channel]}
:log ""
:destroy
}
::HttpSpooler instproc release {} {
set :release 1
if {${:running} == 0} {:all_done}
}
::HttpSpooler instproc done {reason request} {
incr :running -1
:log "--running ${:running}"
$request destroy
if {${:running} == 0 && ${:release}} {:all_done}
}
::HttpSpooler instproc deliver {data request {encoding binary}} {
:log "-- spooling ${:spooling}"
if {${:spooling}} {
:log "--enqueue"
lappend :queue $data $request $encoding
} else {
set :spooling 1
set fd [file tempfile spool_filename [ad_tmpdir]/nsd-spool-XXXXXX]
fconfigure $fd -translation binary -encoding $encoding
puts -nonewline $fd $data
close $fd
set fd [open $spool_filename]
fconfigure $fd -translation binary -encoding $encoding
fconfigure [:channel] -translation binary -encoding $encoding
fcopy $fd [:channel] -command [list [self] end-delivery $spool_filename $fd [:channel] $request]
}
}
::HttpSpooler instproc end-delivery {filename fd ch request bytes args} {
:log "--- end of delivery of $filename, $bytes bytes written $args"
if {[catch {close $fd} e]} {ns_log notice "httpspool, closing file $filename, error: $e"}
set :spooling 0
if {[llength ${:queue}]>0} {
:log "--dequeue"
lassign ${:queue} data req enc
set :queue [lreplace ${:queue} 0 2]
:deliver $data $req $enc
}
:done delivered $request
}
::HttpSpooler instproc add {-request {-post_data ""}} {
if {[regexp {http://([^/]*)(/.*)} $request _ host path]} {
set port 80
regexp {^([^:]+):(.*)$} $host _ host port
incr :running
xo::AsyncHttpRequest [self]::[incr :counter] -host $host -port $port -path $path -timeout [:timeout] -post_data $post_data -request_manager [self]
}
}
::xotcl::Object setExitHandler {
ns_log notice "--- exit handler"
foreach writer [::AsyncDiskWriter info instances -closure] {
ns_log notice "close AsyncDiskWriter $writer"
$writer close
}
}
}
set :lightweight 0
set :mutex ns:mutex:tcl:6
set :persistent 1}
::bgdelivery proc returnfile {{-client_data ""} {-delete false} -content_disposition status_code mime_type filename} {
if {![nsf::is object ::xo::cc]} {
::xo::ConnectionContext require -url [ad_conn url]
}
set query [::xo::cc actual_query]
set secure_conn_p [security::secure_conn_p]
set use_h264 [expr {[string match "video/mp4*" $mime_type] && $query ne ""
&& ([string match {*start=[1-9]*} $query] || [string match {*end=[1-9]*} $query])
&& [info commands h264open] ne ""
&& !$secure_conn_p }]
if {[info commands ns_driversection] ne ""} {
set use_writerThread [ns_config [ns_driversection] writerthreads 0]
} else {
set use_writerThread 0
}
if {[info exists content_disposition]} {
set fn [xo::backslash_escape \" $content_disposition]
ns_set put [ns_conn outputheaders] Content-Disposition "attachment;filename=\"$fn\""
}
if {$secure_conn_p && !$use_writerThread} {
ns_returnfile $status_code $mime_type $filename
return
}
if {$use_h264} {
if {0} {
if {[catch {set handle [h264open $filename $query]} errorMsg]} {
ns_log error "h264: error opening h264 channel for $filename $query: $errorMsg"
return
}
set size [h264length $handle]
h264close $handle
} else {
set size [ad_file size $filename]
}
} else {
set size [ad_file size $filename]
}
if {$::xo::naviserver && !$use_writerThread} {
ns_conn keepalive 0
}
set range [ns_set iget [ns_conn headers] range]
if {[regexp {bytes=(.*)$} $range _ range]} {
set ranges [list]
set bytes 0
set pos 0
foreach r [split $range ,] {
regexp {^(\d*)-(\d*)$} $r _ from to
if {$from eq ""} {
set from [expr {$size - $to}]
} else {
if {$to eq ""} {set to [expr {$size-1}]}
}
set rangeSize [expr {1 + $to - $from}]
lappend ranges [list $from $to $rangeSize]
set pos [expr {$to + 1}]
incr bytes $rangeSize
}
} else {
set ranges ""
set bytes $size
}
if {!$use_h264} {
if {[llength $ranges] == 1 && $status_code == 200} {
lassign [lindex $ranges 0] from to
if {$from <= $to && $size > $to} {
ns_set put [ns_conn outputheaders] Content-Range "bytes $from-$to/$size"
set status_code 206
} else {
ns_log notice "### ignore invalid range <$range>, pos > size-1, Content-Range: bytes $from-$to/$size // $ranges"
}
} elseif {[llength $ranges]>1} {
ns_log warning "Multiple ranges are currently not supported, ignoring range request"
}
if {$::xo::naviserver && ![string match text/* $mime_type]} {
:write_headers -binary -- $status_code $mime_type $bytes
} else {
:write_headers $status_code $mime_type $bytes
}
}
if {$bytes == 0} {
ns_set put [ns_conn outputheaders] "Content-Length" 0
ns_return 200 $mime_type {}
return
}
if {$use_writerThread && !$use_h264} {
if {$status_code == 206} {
ns_log notice "ns_writer submitfile -offset $from -size $bytes $filename"
ns_writer submitfile -offset $from -size $bytes $filename
} else {
ns_log notice "ns_writer submitfile $filename"
ns_writer submitfile $filename
}
return
}
set errorMsg ""
set tid [:get_tid]
ns_mutex_lock ${:mutex}
catch {
set ch [ns_conn channel]
if {[catch {thread::transfer $tid $ch} innerError]} {
set channels_in_use "??"
catch {set channels_in_use [bgdelivery do file channels]}
ns_log error "thread transfer failed, channel=$ch, channels_in_use=$channels_in_use"
error $innerError
}
} errorMsg
ns_mutex_unlock ${:mutex}
if {$errorMsg ne ""} {
error ERROR=$errorMsg
}
if {$use_h264} {
:do -async ::h264Spooler spool -delete $delete -channel $ch -filename $filename -context [list [::xo::cc requester],[::xo::cc url],$query [ns_conn start]] -query $query -client_data $client_data
} else {
:do -async ::fileSpooler spool -ranges $ranges -delete $delete -channel $ch -filename $filename -context [list [::xo::cc requester],[::xo::cc url],$query [ns_conn start]] -client_data $client_data
}
ns_conn contentsentlength $size ;
}
::bgdelivery proc spooler_add_request {spooler request {post_data {}}} {
:log "-- do -async $spooler add -request $request"
:do -async $spooler add -request $request -post_data $post_data
}
::bgdelivery proc spooler_release spooler {
:do -async $spooler release
}
::bgdelivery proc subscribe {key {initmsg {}} {mode default}} {
set ch [ns_conn channel]
thread::transfer [:get_tid] $ch
:do ::Subscriber new -channel $ch -key $key -user_id [ad_conn user_id] -mode $mode
}
::nsf::method::property ::bgdelivery -per-object subscribe deprecated true
::bgdelivery proc send_to_subscriber {key msg} {
:do -async ::Subscriber broadcast $key $msg
}
::nsf::method::property ::bgdelivery -per-object send_to_subscriber deprecated true
::bgdelivery proc create_spooler {{-content_type text/plain} {-timeout 10000}} {
ns_write "HTTP/1.0 200 OK\r\nContent-type: $content_type\r\n\r\n"
set ch [ns_conn channel]
thread::transfer [:get_tid] $ch
:do ::HttpSpooler new -channel $ch -timeout $timeout
}
::bgdelivery forward nr_running %self do array size running
::bgdelivery forward running %self do array get running
::bgdelivery forward write_headers ns_headers