Reactor pattern is an event handling pattern for operating service requests delivered concurrently to a service handler by one or more inputs. All the ready-to-use ruby implementations (like EventMachine and family,) although are very smart built and quite helpful, hide all the details. I decided to write down a short example of how the task may be accomplished in pure ruby.
Let’s say we are interested in running an echo-service-like application. We don’t need any servers,
just pure “wait-for-input ⇒ reply” app. The CLI wrapper might be a good example. All we want is to have
a running instance somewhere that will accept internal calls like @inst.cmd("ls")
and reply with
a result of the command run.
We decide to have two independent threads, which are to be synchronized in the following manner: just
after the prior
function is executed, the posterior
function wakes up, does something and falls sleep
back until the next call to prior
. Here we go:
module SeqExec
class Seqs
attr_reader :init
def synch_prior mx, cv
Thread.new {
mx.synchronize {
@init[:prior] = true
loop do
cv.wait mx
yield if block_given?
cv.broadcast
end
}
}
end
def synch_posterior mx, cv
Thread.new {
mx.synchronize {
@init[:posterior] = true
loop do
cv.wait mx
yield if block_given?
cv.broadcast
end
}
}
end
def synch λ1, λ2
@init = {}
mx = Mutex.new
cv = ConditionVariable.new
synch_prior(mx, cv, &λ1) # prior function
Thread.pass until {:prior=>true} == @init
synch_posterior(mx, cv, &λ2) # posterior function
Thread.pass until {:prior=>true,:posterior=>true} == @init
cv.signal # we are ready to start
end
end
end
Here we produce two threads, waiting one for another until the yield
clause
(which may be blocking, if necessary) occurs to initiate the ping-pong mechanism.
Let’s now add some syntactic sugar:
module SeqExec
Thread.abort_on_exception = true
def pre &cb
@prior = cb
end
def post &cb
@posterior = cb
end
def run λ1 = nil, λ2 = nil
pre &λ1 if λ1
post &λ2 if λ2
raise ArgumentError.new "Cannot run sequential execution, lambdas are not set" \
unless (@prior && @posterior)
Seqs.new.synch @prior, @posterior
end
end
Now it’s time to play with:
include SeqExec
@i=0
@stack = []
pre { sleep 0.3; print "-#{@i += 1}-"; @stack.push(@i) }
post { print "|#{@stack.pop}|" }
run
10.times { print "#"; sleep 0.1 }
sleep 3
The prior
function pushes the incremented integer to the stack, the posterior
reacts
by printing it to the terminal:
# ⇒ ####-1-|1|###-2-|2|###-3-|3|-4-|4|-5-|5|-6-|6|-7-|7|-8-|8|-9-|9|-10-|10|-11-|11|-12-|12|-13-|13|