Revision a089227c
Added by Marc Dequènes almost 14 years ago
- ID a089227ca94121115f46e5998b497fa102787398
lib/cyborghood/cyborg/dsl.rb | ||
---|---|---|
class TaskBase < BaseDSL
|
||
attr_reader :bot, :name, :errors, :results, :preferred_locales, :locale, :user
|
||
|
||
# mutex used for all class variables in this paragraph
|
||
@@tasks_info_mutex = Mutex.new
|
||
@@task_wip = 0
|
||
@@registered_resources = {}
|
||
|
||
@@stop_all_tasks = false
|
||
|
||
def self.idle?
|
||
@@task_wip == 0
|
||
@@tasks_info_mutex.synchronize do
|
||
@@task_wip == 0
|
||
end
|
||
end
|
||
|
||
def self.stop_all
|
||
@@stop_all_tasks = true
|
||
|
||
# call defuse callback out of the sync block
|
||
# (which may need to synchronize later to the
|
||
# same mutex in usual conditions)
|
||
resources_to_free = true
|
||
while not resources_to_free.nil?
|
||
@@tasks_info_mutex.synchronize do
|
||
resources_to_free = @@registered_resources.shift
|
||
end
|
||
resources_to_free[1].call unless resources_to_free.nil?
|
||
end
|
||
end
|
||
|
||
# the name MUST be unique
|
||
... | ... | |
@bot = bot
|
||
@name = name
|
||
|
||
@@task_wip += 1
|
||
@@tasks_info_mutex.synchronize do
|
||
@@task_wip += 1
|
||
end
|
||
|
||
@errors = {}.freeze
|
||
@results = {}.freeze
|
||
... | ... | |
def wait_notification(name, criterias = {}, timeout = nil, &cb)
|
||
name = @notification_name if name == :task
|
||
|
||
_add_subtask("notification/#{name}/in") do |subtask|
|
||
logger.debug "Task '#{@name}': Waiting notification on '#{name}'"
|
||
subtask_name = "notification/#{name}/#{criterias.hash}/in"
|
||
_add_subtask(subtask_name) do |subtask|
|
||
logger.debug "Task '#{@name}': subtask '#{subtask.name}': waiting notification on '#{name}'"
|
||
subcription_id = nil
|
||
subcription_id_mutex = Mutex.new
|
||
|
||
# callback to end notification and subtask
|
||
# (used when shooting the task)
|
||
defuse_notification_cb = Proc.new do
|
||
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
|
||
|
||
subcription_id_mutex.synchronize do
|
||
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
|
||
subcription_id = nil
|
||
end
|
||
|
||
subtask.finish unless subtask.finished?
|
||
|
||
# we already own the mutex here
|
||
@@registered_resources.delete(subtask_name)
|
||
end
|
||
# register notification in the task
|
||
@@tasks_info_mutex.synchronize do
|
||
@@registered_resources[subtask_name] = defuse_notification_cb
|
||
end
|
||
|
||
subcription_id = @bot.get_channel(name).subscribe do |msg|
|
||
if _notification_criterias_match(msg, criterias)
|
||
cb.call(subtask, msg)
|
||
@bot.get_channel(name).unsubscribe(subcription_id) if subtask.finished?
|
||
|
||
if subtask.finished?
|
||
subcription_id_mutex.synchronize do
|
||
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
|
||
subcription_id = nil
|
||
end
|
||
|
||
# unregister the notification
|
||
@@tasks_info_mutex.synchronize do
|
||
@@registered_resources.delete(subtask_name)
|
||
end
|
||
end
|
||
end
|
||
end
|
||
|
||
if timeout
|
||
EventMachine.add_timer(timeout) do
|
||
@bot.get_channel(name).unsubscribe(subcription_id)
|
||
subcription_id_mutex.synchronize do
|
||
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
|
||
subcription_id = nil
|
||
end
|
||
|
||
# unregister the notification
|
||
@@tasks_info_mutex.synchronize do
|
||
@@registered_resources.delete(subtask_name)
|
||
end
|
||
|
||
subtask.finish
|
||
end
|
||
end
|
||
... | ... | |
end
|
||
|
||
def wait_timer(timeout, repeat = false, &cb)
|
||
_add_subtask("timer/#{timeout}/#{cb.hash}") do |subtask|
|
||
periodic_timer_signature = nil
|
||
subtask_name = "timer/#{timeout}/#{cb.hash}"
|
||
_add_subtask(subtask_name) do |subtask|
|
||
timer_signature = nil
|
||
timer_signature_mutex = Mutex.new
|
||
|
||
# callback to end timer and subtask
|
||
# (used when shooting the task)
|
||
defuse_timer_cb = Proc.new do
|
||
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
|
||
|
||
timer_signature_mutex.synchronize do
|
||
EventMachine.cancel_timer(timer_signature) unless timer_signature.nil?
|
||
timer_signature = nil
|
||
end
|
||
|
||
subtask.finish unless subtask.finished?
|
||
|
||
# we already own the mutex here
|
||
@@registered_resources.delete(subtask_name)
|
||
end
|
||
# register timer in the task
|
||
@@tasks_info_mutex.synchronize do
|
||
@@registered_resources[subtask_name] = defuse_timer_cb
|
||
end
|
||
|
||
timer_cb = Proc.new do
|
||
if repeat
|
||
cb.call(subtask)
|
||
EventMachine.cancel_timer(periodic_timer_signature) if subtask.finished?
|
||
timer_signature_mutex.synchronize do
|
||
if subtask.finished? and not timer_signature.nil?
|
||
EventMachine.cancel_timer(timer_signature)
|
||
timer_signature = nil
|
||
end
|
||
end
|
||
else
|
||
cb.call(subtask)
|
||
subtask.finish
|
||
|
||
# unregister the timer
|
||
@@tasks_info_mutex.synchronize do
|
||
@@registered_resources.delete(subtask_name)
|
||
end
|
||
|
||
subtask.finish unless subtask.finished?
|
||
end
|
||
end
|
||
|
||
if repeat
|
||
periodic_timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
|
||
timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
|
||
else
|
||
EventMachine.add_timer(timeout, timer_cb)
|
||
timer_signature = EventMachine.add_timer(timeout, timer_cb)
|
||
end
|
||
end
|
||
end
|
||
... | ... | |
# in this order: send a last message before leaving
|
||
# (so we are sure the peer received a reply to its message)
|
||
send_notification notification_name, {:topic => "MEET", :from => @name}
|
||
subtask.finish if meet_ok
|
||
|
||
subtask.finish if meet_ok or @@stop_all_tasks
|
||
end
|
||
end
|
||
|
||
... | ... | |
|
||
def stop_bot(condition)
|
||
_add_subtask('stop') do |subtask|
|
||
case condition
|
||
when :when_finished
|
||
# try to stop gracefully
|
||
@bot.try_stop
|
||
when :at_once
|
||
# it won't wait for anything to finish…
|
||
@bot.stop
|
||
end
|
||
@bot.stop(condition)
|
||
|
||
subtask.finish
|
||
end
|
||
... | ... | |
|
||
def finish
|
||
if @finished
|
||
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask should have ended, but it lied")
|
||
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask '#{@name}' should have ended, but it lied")
|
||
end
|
||
@finished = true
|
||
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
|
||
... | ... | |
|
||
logger.debug "Task '#{@name}': step finished"
|
||
|
||
if @@stop_all_tasks
|
||
_finished
|
||
|
||
# running no more steps will end the task
|
||
return
|
||
end
|
||
|
||
# compute step result
|
||
@errors = {}
|
||
@results = {}
|
||
... | ... | |
end
|
||
|
||
def _finished
|
||
@@task_wip -= 1
|
||
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
|
||
@@tasks_info_mutex.synchronize do
|
||
@@task_wip -= 1
|
||
logger.debug "Task '#{@name}': finished (#{@@task_wip} tasks remaining)"
|
||
end
|
||
end
|
||
|
||
# needed for mixin
|
||
# TODO: find a cleaner solution
|
||
def tasks_info_mutex
|
||
@@tasks_info_mutex
|
||
end
|
||
|
||
# needed for mixin
|
||
# TODO: find a cleaner solution
|
||
def registered_resources
|
||
@@registered_resources
|
||
end
|
||
end
|
||
|
Also available in: Unified diff
[fix/evol] work on better Interface/Task/Conversation/… sync and bot stop action in order to avoid races and locks