lib/monitor.rb
DEFINITIONS
This source file includes following functions.
1 =begin
2
3 = monitor.rb
4
5 Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org>
6
7 This library is distributed under the terms of the Ruby license.
8 You can freely distribute/modify this library.
9
10 == example
11
12 This is a simple example.
13
14 require 'monitor.rb'
15
16 buf = []
17 buf.extend(MonitorMixin)
18 empty_cond = buf.new_cond
19
20 # consumer
21 Thread.start do
22 loop do
23 buf.synchronize do
24 empty_cond.wait_while { buf.empty? }
25 print buf.shift
26 end
27 end
28 end
29
30 # producer
31 while line = ARGF.gets
32 buf.synchronize do
33 buf.push(line)
34 empty_cond.signal
35 end
36 end
37
38 The consumer thread waits for the producer thread to push a line
39 to buf while buf.empty?, and the producer thread (main thread)
40 reads a line from ARGF and push it to buf, then call
41 empty_cond.signal.
42
43 =end
44
45 module MonitorMixin
46 module Accessible
47 protected
48 attr_accessor :mon_owner, :mon_count
49 attr_reader :mon_entering_queue, :mon_waiting_queue
50 end
51
52 module Initializable
53 protected
54 def mon_initialize
55 @mon_owner = nil
56 @mon_count = 0
57 @mon_entering_queue = []
58 @mon_waiting_queue = []
59 end
60 end
61
62 class ConditionVariable
63 class Timeout < Exception; end
64
65 include Accessible
66
67 def wait(timeout = nil)
68 if @monitor.mon_owner != Thread.current
69 raise ThreadError, "current thread not owner"
70 end
71
72 if timeout
73 ct = Thread.current
74 timeout_thread = Thread.start {
75 Thread.pass
76 sleep(timeout)
77 ct.raise(Timeout.new)
78 }
79 end
80
81 Thread.critical = true
82 count = @monitor.mon_count
83 @monitor.mon_count = 0
84 @monitor.mon_owner = nil
85 if @monitor.mon_waiting_queue.empty?
86 t = @monitor.mon_entering_queue.shift
87 else
88 t = @monitor.mon_waiting_queue.shift
89 end
90 t.wakeup if t
91 @waiters.push(Thread.current)
92
93 begin
94 Thread.stop
95 rescue Timeout
96 ensure
97 Thread.critical = true
98 if timeout && timeout_thread.alive?
99 Thread.kill(timeout_thread)
100 end
101 if @waiters.include?(Thread.current) # interrupted?
102 @waiters.delete(Thread.current)
103 end
104 while @monitor.mon_owner &&
105 @monitor.mon_owner != Thread.current
106 @monitor.mon_waiting_queue.push(Thread.current)
107 Thread.stop
108 Thread.critical = true
109 end
110 @monitor.mon_owner = Thread.current
111 @monitor.mon_count = count
112 Thread.critical = false
113 end
114 end
115
116 def wait_while
117 while yield
118 wait
119 end
120 end
121
122 def wait_until
123 until yield
124 wait
125 end
126 end
127
128 def signal
129 if @monitor.mon_owner != Thread.current
130 raise ThreadError, "current thread not owner"
131 end
132 Thread.critical = true
133 t = @waiters.shift
134 t.wakeup if t
135 Thread.critical = false
136 Thread.pass
137 end
138
139 def broadcast
140 if @monitor.mon_owner != Thread.current
141 raise ThreadError, "current thread not owner"
142 end
143 Thread.critical = true
144 for t in @waiters
145 t.wakeup
146 end
147 @waiters.clear
148 Thread.critical = false
149 Thread.pass
150 end
151
152 def count_waiters
153 return @waiters.length
154 end
155
156 private
157 def initialize(monitor)
158 @monitor = monitor
159 @waiters = []
160 end
161 end
162
163 include Accessible
164 include Initializable
165 extend Initializable
166
167 def self.extend_object(obj)
168 super(obj)
169 obj.mon_initialize
170 end
171
172 def try_mon_enter
173 result = false
174 Thread.critical = true
175 if mon_owner.nil?
176 self.mon_owner = Thread.current
177 end
178 if mon_owner == Thread.current
179 self.mon_count += 1
180 result = true
181 end
182 Thread.critical = false
183 return result
184 end
185
186 def mon_enter
187 Thread.critical = true
188 while mon_owner != nil && mon_owner != Thread.current
189 mon_entering_queue.push(Thread.current)
190 Thread.stop
191 Thread.critical = true
192 end
193 self.mon_owner = Thread.current
194 self.mon_count += 1
195 Thread.critical = false
196 end
197
198 def mon_exit
199 if mon_owner != Thread.current
200 raise ThreadError, "current thread not owner"
201 end
202 Thread.critical = true
203 self.mon_count -= 1
204 if mon_count == 0
205 self.mon_owner = nil
206 if mon_waiting_queue.empty?
207 t = mon_entering_queue.shift
208 else
209 t = mon_waiting_queue.shift
210 end
211 end
212 t.wakeup if t
213 Thread.critical = false
214 Thread.pass
215 end
216
217 def mon_synchronize
218 mon_enter
219 begin
220 yield
221 ensure
222 mon_exit
223 end
224 end
225 alias synchronize mon_synchronize
226
227 def new_cond
228 return ConditionVariable.new(self)
229 end
230
231 private
232 def initialize(*args)
233 super
234 mon_initialize
235 end
236 end
237
238 class Monitor
239 include MonitorMixin
240 alias try_enter try_mon_enter
241 alias enter mon_enter
242 alias exit mon_exit
243 alias owner mon_owner
244 end
245
246 # Local variables:
247 # mode: Ruby
248 # tab-width: 8
249 # End: