Class Relations

::xotcl::THREAD create ::bgdelivery

Methods (to be applied on the object)

Variables

::bgdelivery set exithandler {ns_log notice "EXITHANDLER of slave thread SELF 1444947"}
::bgdelivery set initcmd {
      package req XOTcl
      namespace import -force ::xotcl::*
    
    ns_thread name ::bgdelivery
  
    ::xotcl::Object setExitHandler \
       {ns_log notice "EXITHANDLER of slave thread ::bgdelivery 1444947"}
  
set ::xotcl::currentScript /var/www/openacs.org/packages/xotcl-core/tcl/bgdelivery-procs.tcl
set ::xotcl::currentThread ::bgdelivery

  ###############
  # FileSpooler
  ###############
  # Class FileSpooler makes it easier to overload the
  # per-object methods of the concrete file spoolers
  # (such has fileSpooler or h264Spooler)

  Class create FileSpooler

  ###############
  # File delivery
  ###############
  set ::delivery_count 0

  FileSpooler create fileSpooler
  fileSpooler set tick_interval 60000 ;# 1 min
  fileSpooler proc deliver_ranges {ranges client_data filename fd channel} {
    set first_range [lindex $ranges 0]
    set remaining_ranges [lrange $ranges 1 end]
    lassign $first_range from to size
    if {$remaining_ranges eq ""} {
      # A single delivery, which is as well the last; when finished
      # with this chunk, terminate delivery
      set cmd [list [self] end-delivery -client_data $client_data $filename $fd $channel]
    } else {
      #
      # For handling multiple ranges, HTTP/1.1 requires multipart
      # messages (multipart media type: multipart/byteranges);
      # currently these are not implemented (missing test cases). The
      # code handling the range tag switches currently to full
      # delivery, when multiple ranges are requested.
      #
      set cmd [list [self] deliver_ranges $remaining_ranges $client_data $filename $fd $channel]
    }
    seek $fd $from
    #ns_log notice "Range seek $from $filename // $first_range"
    fcopy $fd $channel -size $size -command $cmd
  }
  fileSpooler proc spool {{-ranges ""} {-delete false} -channel -filename -context \
       {-client_data ""}} {
    set fd [open $filename]
    fconfigure $fd -translation binary
    fconfigure $channel -translation binary

    set k ::runningBgJob([lindex $context 0])
    if {[info exists $k]} {
      set value [set $k]
      ns_log notice "resubmit: canceling currently running request $context  // closing $value"
      lassign $value fd0 channel0 client_data0 filename0
      :end-delivery -client_data $client_data0 $filename0 $fd0 $channel0 -1
    }
    set $k [list $fd $channel $client_data $filename]

    if {$ranges eq ""} {
      ns_log notice "no Range spool for $filename"
      fcopy $fd $channel -command [list [self] end-delivery -client_data $client_data $filename $fd \
       $channel]
    } else {
      :deliver_ranges $ranges $client_data $filename $fd $channel
    }
    #ns_log notice "--- start of delivery of $filename (running:[array size ::running])"
    set key $channel,$fd,$filename
    set ::running($key) $context
    if {$delete} {set ::delete_file($key) 1}
    incr ::delivery_count
  }
  fileSpooler proc end-delivery {{-client_data ""} filename fd channel bytes args} {
    #ns_log notice "--- end of delivery of $filename, $bytes bytes written $args"
    if {[catch {fconfigure $channel -blocking false} e]} \
       {ns_log notice "bgdelivery, fconfigure for channel, error: $e"}
    if {[catch {close $channel} e]} \
       {ns_log notice "bgdelivery, closing channel for $filename, error: $e"}
    if {[catch {close $fd} e]} {ns_log notice "bgdelivery, closing file $filename, error: $e"}
    set key $channel,$fd,$filename
    unset -nocomplain ::runningBgJob([lindex $::running($key) 0])
    unset ::running($key)
    if {[info exists ::delete_file($key)]} {
      file delete -- $filename
      unset ::delete_file($key)
    }
  }

  fileSpooler proc cleanup {} {
    # This method should not be necessary. However, under unclear conditions,
    # some fcopies seem to go into a stasis. After 2000 seconds, we will kill it.
    foreach {index entry} [array get ::running] {
      lassign $entry key elapsed
      set t [ns_time diff [ns_time get] $elapsed]
      if {[ns_time seconds $t] > 2000} {
        if {[regexp {^([^,]+),([^,]+),(.+)$} $index _ channel fd filename]} {
          ns_log notice "bgdelivery, fileSpooler cleanup after [ns_time seconds $t] seconds, $key"
          :end-delivery $filename $fd $channel -1
        }
      }
    }
  }
  fileSpooler proc tick {} {
    if {[catch {:cleanup} errorMsg]} {ns_log error "Error during filespooler cleanup: $errorMsg"}
    set :to [after ${:tick_interval} [list [self] tick]]
  }
  fileSpooler tick


  ###############
  # h264Spooler
  ###############
  #
  # A first draft of a h264 pseudo streaming spooler.
  # Like for the fileSpooler, we create a single spooler object
  # that handles spooling for all active streams. The per-stream context
  # is passed via argument lists.
  #

  FileSpooler create h264Spooler
  h264Spooler set blockCount 0
  h264Spooler set byteCount 0
  h264Spooler proc spool {{-delete false} -channel -filename -context {-client_data ""} -query} {
    #ns_log notice "h264 SPOOL gets filename '$filename'"
    if {[catch {
      set handle [h264open $filename $query]
    } errorMsg]} {
      ns_log error "h264: error opening h264 channel for $filename $query: $errorMsg"
      if {[catch {close $channel} e]} \
       {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"}
      return
    }
    # set up book-keeping info
    incr ::delivery_count
    set key $channel,$handle,$filename
    set ::bytes($key) 0
    set ::running($key) $context
    if {$delete} {set ::delete_file($key) 1}
    #
    # h264open is quite expensive; in order to output the HTTP headers
    # in the connection thread, we would have to use h264open in the
    # connection thread as well to determine the proper size. To avoid
    # this overhead, we don't write the headers in the connection
    # thread and write it here instead (note that this is different to
    # the fileSpooler above).
    #
    if {[catch {
      set length [h264length $handle]
      puts $channel "HTTP/1.0 200 OK\nContent-Type: video/mp4\nContent-Length: $length\n"
      flush $channel
    } errorMsg]} {
      ns_log notice "h264: error writing headers in h264 channel for $filename $query: $errorMsg"
      :end-delivery -client_data $client_data $filename $handle $channel 0
    }
    # setup async delivery
    fconfigure $channel -translation binary -blocking false
    fileevent $channel writable [list [self] writeBlock $client_data $filename $handle $channel]
  }
  h264Spooler proc writeBlock {client_data filename handle channel} {
    h264Spooler incr blockCount
    set bytesVar ::bytes($channel,$handle,$filename)
    #ns_log notice "h264 WRITE BLOCK $channel $handle"
    if {[eof $channel] || [h264eof $handle]} {
      :end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar]
    } else {
      set block [h264read $handle]
      # one should not use "bytelength" on binary data: https://wiki.tcl-lang.org/8455
      set len [string length $block]
      incr $bytesVar $len
      h264Spooler incr byteCount $len
      if {[catch {puts -nonewline $channel $block} errorMsg]} {
        ns_log notice "h264: error on writing to channel $channel: $errorMsg"
        :end-delivery -client_data $client_data $filename $handle $channel [set $bytesVar]
      }
    }
  }
  h264Spooler proc end-delivery {{-client_data ""} filename handle channel bytes args} {
    ns_log notice "h264 FINISH $channel $handle"
    if {[catch {close $channel} e]} \
       {ns_log notice "bgdelivery, closing h264 for $filename, error: $e"}
    if {[catch {h264close $handle} e]} \
       {ns_log notice "bgdelivery, closing h264 $filename, error: $e"}
    set key $channel,$handle,$filename
    unset ::running($key)
    unset ::bytes($key)
    if {[info exists ::delete_file($key)]} {
      file delete -- $filename
      unset ::delete_file($key)
    }
  }

  #################
  # AsyncDiskWriter
  #################
  ::xotcl::Class create ::AsyncDiskWriter -parameter {
    {blocksize 4096}
    {autoflush false}
    {verbose false}
  }
  ::AsyncDiskWriter instproc log {msg} {
    if {[:verbose]} {ns_log notice "[self] --- $msg"}
  }
  ::AsyncDiskWriter instproc open {-filename {-mode w}} {
    set :channel [open $filename $mode]
    set :content ""
    set :filename $filename
    fconfigure ${:channel} -translation binary -blocking false
    :log "open ${:filename}"
  }

  ::AsyncDiskWriter instproc close {{-sync false}} {
    if {$sync || ${:content} eq ""} {
      :log "close sync"
      if {${:content} ne ""} {
        fconfigure ${:channel} -translation binary -blocking true
        puts -nonewline ${:channel} ${:content}
      }
      close ${:channel}
      :destroy
    } else {
      :log "close async"
      set :finishWhenDone 1
    }
  }
  ::AsyncDiskWriter instproc async_write {block} {
    append :content $block
    fileevent ${:channel} writable [list [self] writeBlock]
  }
  ::AsyncDiskWriter instproc writeBlock {} {
    if {[string length ${:content}] < ${:blocksize}} {
      puts -nonewline ${:channel} ${:content}
      :log "write [string length ${:content}] bytes"
      fileevent ${:channel} writable ""
      set :content ""
      if {[:autoflush]} {flush ${:channel}}
      if {[info exists :finishWhenDone]} {
        :close -sync true
      }
    } else {
      set chunk [string range ${:content} 0 ${:blocksize}-1]
      set :content [string range ${:content} ${:blocksize} end]
      puts -nonewline ${:channel} $chunk
      :log "write [string length $chunk] bytes ([string length ${:content}] buffered)"
    }
  }


  ###############
  # Subscriptions
  ###############
  set ::subscription_count 0
  set ::message_count 0

  ::xotcl::Class create Subscriber -parameter {key channel user_id mode {start_of_page ""}}
  Subscriber proc current {-key } {
    set result [list]
    if {[info exists key]} {
      if {[info exists :subscriptions($key)]} {
        return [list $key [set :subscriptions($key)]]
      }
    } elseif {[info exists :subscriptions]} {
      foreach key [array names :subscriptions] {
        lappend result $key [set :subscriptions($key)]
      }
    }
  }

  Subscriber instproc close {} {
    set channel [:channel]
    #
    # It is important to make the channel nonblocking for the close,
    # since otherwise the close operation might block and bring all of
    # bgdelivery to a halt.
    #
    catch {fconfigure $channel -blocking false}
    catch {close $channel}
  }

  Subscriber instproc sweep {args} {
    #
    # when the sweeper raises an error the caller (foreachSubscriber)
    # destroys the instance. In this step the peer connection is close
    # as well.
    #
    set channel [:channel]
    if {[catch {set eof [eof $channel]}]} {set eof 1}
    :log "sweep [:channel] EOF $eof"
    if {$eof} {
      throw {AD_CLIENTDISCONNECT} "connection $channel closed by peer"
    }
    # make an IO attempt to trigger EOF
    if {[catch {
      set blocking [fconfigure $channel -blocking]
      fconfigure $channel -blocking false
      set x [read $channel]
      fconfigure $channel -blocking $blocking
    } errorMsg]} {
      throw {AD_CLIENTDISCONNECT} "connection $channel closed due to IO error"
    }
  }

  Subscriber instproc send {msg} {
    #ns_log notice "SEND <$msg> [:mode]"
    :log ""
    ::sec_handler_reset
    set smsg [::xo::mr::bgdelivery encode_message [:mode] $msg]
    #:log "-- sending to subscriber for [:key] $smsg ch=[:channel]  #        mode=[:mode], user_id \
       ${:user_id}"
    try {
      puts -nonewline [:channel] $smsg
      flush [:channel]
    } on error {errorMsg} {
      # Broken pipe and connection reset are to be expected if the
      # clients (e.g. browser in a chat) leaves the page. It is not a
      # real error.
      set ok_errors {
        "POSIX EPIPE {broken pipe}"
        "POSIX ECONNRESET {connection reset by peer}"
      }
      if {$::errorCode in $ok_errors} {
        throw {AD_CLIENTDISCONNECT} {client disconnected}
      } else {
        throw $::errorCode $errorMsg
      }
    }
  }

  Subscriber proc foreachSubscriber {key method {argument ""}} {
    :msg "$key $method '$argument'"
    if {[info exists :subscriptions($key)]} {
      set subs1 [list]
      foreach s [set :subscriptions($key)] {
        try {
          $s $method $argument
        } trap {AD_CLIENTDISCONNECT} {errMsg} {
          ns_log warning "$method to subscriber $s (key=$key): $errMsg"
          $s destroy
        } on error {errMsg} {
          ns_log error "error in $method to subscriber $s (key=$key): $errMsg\n$::errorInfo"
          $s destroy
        } on ok {result} {
          lappend subs1 $s
        }
      }
      set :subscriptions($key) $subs1
    }
  }

  Subscriber proc broadcast {key msg} {
    :foreachSubscriber $key send $msg
    incr ::message_count
  }

  Subscriber proc sweep {key} {
    :foreachSubscriber $key sweep
  }

  Subscriber instproc destroy {} {
    :close
    next
  }

  Subscriber instproc init {} {
    [:info class] instvar subscriptions
    lappend subscriptions([:key]) [self]
    incr ::subscription_count
    #:log "-- cl=[:info class], subscriptions([:key]) = $subscriptions([:key])"
    fconfigure [:channel] -translation binary

    fconfigure [:channel] -encoding utf-8
    puts -nonewline [:channel] ${:start_of_page}
    flush [:channel]
  }


  ###############
  # HttpSpooler
  ###############

  Class create ::HttpSpooler -parameter {channel {timeout 10000} {counter 0}}
  ::HttpSpooler instproc init {} {
    set :running 0
    set :release 0
    set :spooling 0
    set :queue [list]
  }
  ::HttpSpooler instproc all_done {} {
    catch {close [:channel]}
    :log ""
    :destroy
  }
  ::HttpSpooler instproc release {} {
    # release indicates the when running becomes 0, the spooler is finished
    set :release 1
    if {${:running} == 0} {:all_done}
  }
  ::HttpSpooler instproc done {reason request} {
    incr :running -1
    :log "--running ${:running}"
    $request destroy
    if {${:running} == 0 && ${:release}} {:all_done}
  }
  ::HttpSpooler instproc deliver {data request {encoding binary}} {
    :log "-- spooling ${:spooling}"
    if {${:spooling}} {
      :log "--enqueue"
      lappend :queue $data $request $encoding
    } else {
      #:log "--send"
      set :spooling 1
      # puts -nonewline [:channel] $data
      # :done

      set fd [file tempfile spool_filename [ad_tmpdir]/nsd-spool-XXXXXX]
      fconfigure $fd -translation binary -encoding $encoding
      puts -nonewline $fd $data
      close $fd

      set fd [open $spool_filename]
      fconfigure $fd -translation binary -encoding $encoding
      fconfigure [:channel] -translation binary  -encoding $encoding
      fcopy $fd [:channel] -command  [list [self] end-delivery $spool_filename $fd [:channel] \
       $request]
    }
  }
  ::HttpSpooler instproc end-delivery {filename fd ch request bytes args} {
    :log "--- end of delivery of $filename, $bytes bytes written $args"
    if {[catch {close $fd} e]} {ns_log notice "httpspool, closing file $filename, error: $e"}
    set :spooling 0
    if {[llength ${:queue}]>0} {
      :log "--dequeue"
      lassign ${:queue} data req enc
      set :queue [lreplace ${:queue} 0 2]
      :deliver $data $req $enc
    }
    :done delivered $request
  }
  ::HttpSpooler instproc add {-request {-post_data ""}} {
    if {[regexp {http://([^/]*)(/.*)} $request _ host path]} {
      set port 80
      regexp {^([^:]+):(.*)$} $host _ host port
      incr :running
      xo::AsyncHttpRequest [self]::[incr :counter]  -host $host -port $port -path $path  -timeout \
       [:timeout] -post_data $post_data -request_manager [self]
    }
  }

  #
  # Add an exit handler to close all AsyncDiskWriter, when this thread goes
  # down.
  #
  ::xotcl::Object setExitHandler {
    ns_log notice "--- exit handler"
    foreach writer [::AsyncDiskWriter info instances -closure] {
      ns_log notice "close AsyncDiskWriter $writer"
      $writer close
    }
  }

}
::bgdelivery set lightweight 0
::bgdelivery set mutex ns_mutex::bgdelivery
::bgdelivery set persistent 1