Project

General

Profile

« Previous | Next » 

Revision a089227c

Added by Marc Dequènes about 13 years ago

  • ID a089227ca94121115f46e5998b497fa102787398

[fix/evol] work on better Interface/Task/Conversation/… sync and bot stop action in order to avoid races and locks

View differences:

lib/cyborghood/cyborg/dsl.rb
class TaskBase < BaseDSL
attr_reader :bot, :name, :errors, :results, :preferred_locales, :locale, :user
# mutex used for all class variables in this paragraph
@@tasks_info_mutex = Mutex.new
@@task_wip = 0
@@registered_resources = {}
@@stop_all_tasks = false
def self.idle?
@@task_wip == 0
@@tasks_info_mutex.synchronize do
@@task_wip == 0
end
end
def self.stop_all
@@stop_all_tasks = true
# call defuse callback out of the sync block
# (which may need to synchronize later to the
# same mutex in usual conditions)
resources_to_free = true
while not resources_to_free.nil?
@@tasks_info_mutex.synchronize do
resources_to_free = @@registered_resources.shift
end
resources_to_free[1].call unless resources_to_free.nil?
end
end
# the name MUST be unique
......
@bot = bot
@name = name
@@task_wip += 1
@@tasks_info_mutex.synchronize do
@@task_wip += 1
end
@errors = {}.freeze
@results = {}.freeze
......
def wait_notification(name, criterias = {}, timeout = nil, &cb)
name = @notification_name if name == :task
_add_subtask("notification/#{name}/in") do |subtask|
logger.debug "Task '#{@name}': Waiting notification on '#{name}'"
subtask_name = "notification/#{name}/#{criterias.hash}/in"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': subtask '#{subtask.name}': waiting notification on '#{name}'"
subcription_id = nil
subcription_id_mutex = Mutex.new
# callback to end notification and subtask
# (used when shooting the task)
defuse_notification_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
subtask.finish unless subtask.finished?
# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register notification in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_notification_cb
end
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
cb.call(subtask, msg)
@bot.get_channel(name).unsubscribe(subcription_id) if subtask.finished?
if subtask.finished?
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
end
end
end
if timeout
EventMachine.add_timer(timeout) do
@bot.get_channel(name).unsubscribe(subcription_id)
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
subtask.finish
end
end
......
end
def wait_timer(timeout, repeat = false, &cb)
_add_subtask("timer/#{timeout}/#{cb.hash}") do |subtask|
periodic_timer_signature = nil
subtask_name = "timer/#{timeout}/#{cb.hash}"
_add_subtask(subtask_name) do |subtask|
timer_signature = nil
timer_signature_mutex = Mutex.new
# callback to end timer and subtask
# (used when shooting the task)
defuse_timer_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
timer_signature_mutex.synchronize do
EventMachine.cancel_timer(timer_signature) unless timer_signature.nil?
timer_signature = nil
end
subtask.finish unless subtask.finished?
# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register timer in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_timer_cb
end
timer_cb = Proc.new do
if repeat
cb.call(subtask)
EventMachine.cancel_timer(periodic_timer_signature) if subtask.finished?
timer_signature_mutex.synchronize do
if subtask.finished? and not timer_signature.nil?
EventMachine.cancel_timer(timer_signature)
timer_signature = nil
end
end
else
cb.call(subtask)
subtask.finish
# unregister the timer
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
subtask.finish unless subtask.finished?
end
end
if repeat
periodic_timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
else
EventMachine.add_timer(timeout, timer_cb)
timer_signature = EventMachine.add_timer(timeout, timer_cb)
end
end
end
......
# in this order: send a last message before leaving
# (so we are sure the peer received a reply to its message)
send_notification notification_name, {:topic => "MEET", :from => @name}
subtask.finish if meet_ok
subtask.finish if meet_ok or @@stop_all_tasks
end
end
......
def stop_bot(condition)
_add_subtask('stop') do |subtask|
case condition
when :when_finished
# try to stop gracefully
@bot.try_stop
when :at_once
# it won't wait for anything to finish…
@bot.stop
end
@bot.stop(condition)
subtask.finish
end
......
def finish
if @finished
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask should have ended, but it lied")
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask '#{@name}' should have ended, but it lied")
end
@finished = true
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
......
logger.debug "Task '#{@name}': step finished"
if @@stop_all_tasks
_finished
# running no more steps will end the task
return
end
# compute step result
@errors = {}
@results = {}
......
end
def _finished
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
@@tasks_info_mutex.synchronize do
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} tasks remaining)"
end
end
# needed for mixin
# TODO: find a cleaner solution
def tasks_info_mutex
@@tasks_info_mutex
end
# needed for mixin
# TODO: find a cleaner solution
def registered_resources
@@registered_resources
end
end

Also available in: Unified diff