lib/monitor.rb


DEFINITIONS

This source file includes following functions.


   1  =begin
   2  
   3  = monitor.rb
   4  
   5  Copyright (C) 2001  Shugo Maeda <shugo@ruby-lang.org>
   6  
   7  This library is distributed under the terms of the Ruby license.
   8  You can freely distribute/modify this library.
   9  
  10  == example
  11  
  12  This is a simple example.
  13  
  14    require 'monitor.rb'
  15    
  16    buf = []
  17    buf.extend(MonitorMixin)
  18    empty_cond = buf.new_cond
  19    
  20    # consumer
  21    Thread.start do
  22      loop do
  23        buf.synchronize do
  24          empty_cond.wait_while { buf.empty? }
  25          print buf.shift
  26        end
  27      end
  28    end
  29    
  30    # producer
  31    while line = ARGF.gets
  32      buf.synchronize do
  33        buf.push(line)
  34        empty_cond.signal
  35      end
  36    end
  37  
  38  The consumer thread waits for the producer thread to push a line
  39  to buf while buf.empty?, and the producer thread (main thread)
  40  reads a line from ARGF and push it to buf, then call
  41  empty_cond.signal.
  42  
  43  =end
  44    
  45  module MonitorMixin
  46    module Accessible
  47    protected
  48      attr_accessor :mon_owner, :mon_count
  49      attr_reader :mon_entering_queue, :mon_waiting_queue
  50    end
  51    
  52    module Initializable
  53    protected
  54      def mon_initialize
  55        @mon_owner = nil
  56        @mon_count = 0
  57        @mon_entering_queue = []
  58        @mon_waiting_queue = []
  59      end
  60    end
  61    
  62    class ConditionVariable
  63      class Timeout < Exception; end
  64      
  65      include Accessible
  66      
  67      def wait(timeout = nil)
  68        if @monitor.mon_owner != Thread.current
  69          raise ThreadError, "current thread not owner"
  70        end
  71        
  72        if timeout
  73          ct = Thread.current
  74          timeout_thread = Thread.start {
  75            Thread.pass
  76            sleep(timeout)
  77            ct.raise(Timeout.new)
  78          }
  79        end
  80  
  81        Thread.critical = true
  82        count = @monitor.mon_count
  83        @monitor.mon_count = 0
  84        @monitor.mon_owner = nil
  85        if @monitor.mon_waiting_queue.empty?
  86          t = @monitor.mon_entering_queue.shift
  87        else
  88          t = @monitor.mon_waiting_queue.shift
  89        end
  90        t.wakeup if t
  91        @waiters.push(Thread.current)
  92  
  93        begin
  94          Thread.stop
  95        rescue Timeout
  96        ensure
  97          Thread.critical = true
  98          if timeout && timeout_thread.alive?
  99            Thread.kill(timeout_thread)
 100          end
 101          if @waiters.include?(Thread.current)  # interrupted?
 102            @waiters.delete(Thread.current)
 103          end
 104          while @monitor.mon_owner &&
 105              @monitor.mon_owner != Thread.current
 106            @monitor.mon_waiting_queue.push(Thread.current)
 107            Thread.stop
 108            Thread.critical = true
 109          end
 110          @monitor.mon_owner = Thread.current
 111          @monitor.mon_count = count
 112          Thread.critical = false
 113        end
 114      end
 115      
 116      def wait_while
 117        while yield
 118          wait
 119        end
 120      end
 121      
 122      def wait_until
 123        until yield
 124          wait
 125        end
 126      end
 127      
 128      def signal
 129        if @monitor.mon_owner != Thread.current
 130          raise ThreadError, "current thread not owner"
 131        end
 132        Thread.critical = true
 133        t = @waiters.shift
 134        t.wakeup if t
 135        Thread.critical = false
 136        Thread.pass
 137      end
 138      
 139      def broadcast
 140        if @monitor.mon_owner != Thread.current
 141          raise ThreadError, "current thread not owner"
 142        end
 143        Thread.critical = true
 144        for t in @waiters
 145          t.wakeup
 146        end
 147        @waiters.clear
 148        Thread.critical = false
 149        Thread.pass
 150      end
 151      
 152      def count_waiters
 153        return @waiters.length
 154      end
 155      
 156    private
 157      def initialize(monitor)
 158        @monitor = monitor
 159        @waiters = []
 160      end
 161    end
 162    
 163    include Accessible
 164    include Initializable
 165    extend Initializable
 166    
 167    def self.extend_object(obj)
 168      super(obj)
 169      obj.mon_initialize
 170    end
 171    
 172    def try_mon_enter
 173      result = false
 174      Thread.critical = true
 175      if mon_owner.nil?
 176        self.mon_owner = Thread.current
 177      end
 178      if mon_owner == Thread.current
 179        self.mon_count += 1
 180        result = true
 181      end
 182      Thread.critical = false
 183      return result
 184    end
 185  
 186    def mon_enter
 187      Thread.critical = true
 188      while mon_owner != nil && mon_owner != Thread.current
 189        mon_entering_queue.push(Thread.current)
 190        Thread.stop
 191        Thread.critical = true
 192      end
 193      self.mon_owner = Thread.current
 194      self.mon_count += 1
 195      Thread.critical = false
 196    end
 197    
 198    def mon_exit
 199      if mon_owner != Thread.current
 200        raise ThreadError, "current thread not owner"
 201      end
 202      Thread.critical = true
 203      self.mon_count -= 1
 204      if mon_count == 0
 205        self.mon_owner = nil
 206        if mon_waiting_queue.empty?
 207          t = mon_entering_queue.shift
 208        else
 209          t = mon_waiting_queue.shift
 210        end
 211      end
 212      t.wakeup if t
 213      Thread.critical = false
 214      Thread.pass
 215    end
 216  
 217    def mon_synchronize
 218      mon_enter
 219      begin
 220        yield
 221      ensure
 222        mon_exit
 223      end
 224    end
 225    alias synchronize mon_synchronize
 226    
 227    def new_cond
 228      return ConditionVariable.new(self)
 229    end
 230    
 231  private
 232    def initialize(*args)
 233      super
 234      mon_initialize
 235    end
 236  end
 237  
 238  class Monitor
 239    include MonitorMixin
 240    alias try_enter try_mon_enter
 241    alias enter mon_enter
 242    alias exit mon_exit
 243    alias owner mon_owner
 244  end
 245  
 246  # Local variables:
 247  # mode: Ruby
 248  # tab-width: 8
 249  # End: