Project

General

Profile

« Previous | Next » 

Revision a089227c

Added by Marc Dequènes about 13 years ago

  • ID a089227ca94121115f46e5998b497fa102787398

[fix/evol] work on better Interface/Task/Conversation/… sync and bot stop action in order to avoid races and locks

View differences:

bin/librarian
bot = reg.bot
trap('INT') do
bot.ask_to_stop
bot.stop(:quickly)
end
trap('TERM') do
bot.ask_to_stop
bot.stop(:quickly)
end
bot.run
bin/mapmaker
bot = reg.bot
trap('INT') do
bot.ask_to_stop
bot.stop(:quickly)
end
trap('TERM') do
bot.ask_to_stop
bot.stop(:quickly)
end
bot.run
bin/postman
end
end
def ask_to_stop
def stop(condition)
super do
self.services.imap.stop_mail_check
end
end
protected
def ready_to_stop?
# TODO: also wait for messages being processed to be finished (or try to interrupt cleanly if possible in :quickly condition)
super and not self.services.imap.connected?
end
......
bot = reg.bot
trap('INT') do
bot.ask_to_stop
bot.stop(:quickly)
end
trap('TERM') do
bot.ask_to_stop
bot.stop(:quickly)
end
bot.run
bin/test_client
def start_work
task "compare stuff" do
ask "MapMaker", :info1, "/_cyborg_"
ask "Librarian", :info2, "/_cyborg_"
ask "Librarian", :library, "/Records"
ask "Librarian", :persons, "/Records/Persons"
#ask "MapMaker", :info1, "/_cyborg_"
#ask "Librarian", :info2, "/_cyborg_"
#ask "Librarian", :library, "/Records"
#ask "Librarian", :persons, "/Records/Persons"
#ask "Librarian", :dns_domains, "/Records/DnsDomains"
ask "Librarian", :gorou, "/Records/Persons/gorou"
ask "Librarian", :guihome_net, "/Records/DnsDomains/guihome.net"
ask "Librarian", :person_search, "/Records/Persons/?", {:uid => "g*"}
#ask "Librarian", :gorou, "/Records/Persons/gorou"
#ask "Librarian", :guihome_net, "/Records/DnsDomains/guihome.net"
#ask "Librarian", :person_search, "/Records/Persons/?", {:uid => "g*"}
#ask "Librarian", :person_search_null, "/Records/Persons/?"
#ask "MapMaker", :zones, "/Zones"
#ask "MapMaker", :wanted_failure, "/prout"
......
#ask "MapMaker", :zone_mp, "/Zones/milkypond.org"
#ask "MapMaker", :dns_check, "/Services/DNS/check_config"
#ask "MapMaker", :search, "/Zones/?"
ask "MapMaker", :search_master, "/Zones/?", {:is_master => true}
#ask "MapMaker", :search_master, "/Zones/?", {:is_master => true}
#ask "MapMaker", :search_slave, "/Zones/?", {:is_master => false}
#know? "MapMaker", :k1, "/Zones"
#know? "MapMaker", :k2, "/prout"
set_user "duck"
ask "Clerk", :dns_info, "/Commands/DNS/INFO"
on_error do
puts "PLOUF"
pp errors
pp results
stop_bot :at_once
stop_bot :quickly
end
on_success do
puts "OK"
pp errors
pp results
puts "Tadam: " + (results[:info1][:api_version] == results[:info2][:api_version] ? "same" : "different")
set_preferred_locales "fr;q=1,en-us,en"
#puts "Tadam: " + (results[:info1][:api_version] == results[:info2][:api_version] ? "same" : "different")
#set_preferred_locales "fr;q=1,en-us,en"
meet "waiter", :zzz
STDOUT.flush
on_success do
puts "OK compare stuff"
end
on_error do
stop_bot :at_once
stop_bot :quickly
end
end
end
......
bot = CyborgHood::TestClientHome::TestClientClient.new
trap('INT') do
bot.ask_to_stop
bot.stop(:quickly)
end
trap('TERM') do
bot.ask_to_stop
bot.stop(:quickly)
end
bot.run
lib/cyborghood-postman/imap.rb
#++
require 'net/imap'
require 'thread'
# IMAP IDLE support
class Net::IMAP
lib/cyborghood/cyborg.rb
require 'cyborghood'
require 'eventmachine'
require 'thread'
require 'cyborghood/cyborg/dsl'
......
# use "aspects" Modules to define behaviors
end
def ask_to_stop
def stop(condition)
logger.info "Bot was asked to stop..."
yield if block_given?
try_stop
case condition
when :when_finished
wait_until_ready_to_stop
when :quickly
# try to stop gracefully
DSL::Task.stop_all
wait_until_ready_to_stop
when :at_once
# it won't wait for anything to finish…
exit
end
end
def ready_to_stop?
DSL::Task.idle?
# core capabilities
def capabilities
[]
end
protected
def stop_gracefully
end
def stop
def try_stop
logger.info "Bot stopping"
EventMachine.next_tick { EventMachine.stop_event_loop }
end
def try_stop
def ready_to_stop?
DSL::Task.idle?
end
def wait_until_ready_to_stop
if ready_to_stop?
@system_notification.unsubscribe(@system_notification_processing)
@system_notification_processing = nil
drop_channel(@system_notification_name)
stop
try_stop
else
EventMachine.next_tick { try_stop }
EventMachine.next_tick { wait_until_ready_to_stop }
end
end
# core capabilities
def capabilities
[]
end
protected
def process_system_notification(msg)
end
end
lib/cyborghood/cyborg/botnet.rb
def contact_peer(peer, block, dont_open_new_connection = false)
if @comm_list.has_key? peer
logger.debug "Botnet: Reusing connection for peer #{peer}"
return block.call @comm_list[peer]
if @comm_list[peer].usuable?
logger.debug "Botnet: Reusing connection for peer #{peer}"
return block.call @comm_list[peer]
end
end
return if dont_open_new_connection
......
# used to quit properly and later to reuse communication channels
def register_communication(peer, conversation)
logger.debug "Botnet: conversation with '#{peer}' ready"
@comm_list[peer] = conversation
end
def unregister_communication(peer)
logger.debug "Botnet: conversation with '#{peer}' closed"
@comm_list.delete(peer)
@pending_conversation_close.delete(peer)
end
def ask_to_stop
@comm_list.values.each {|conv| conv.bye }
super
def stop(condition)
super do
@comm_list.each_value{|conv| conv.stop(condition) }
end
end
protected
lib/cyborghood/cyborg/botnet/conversation.rb
@sent
end
def send
def send(go_quiet = false)
raise CyberError.new(:unrecoverable, "bot/conversation", "Not sending twice the same message") if self.sent?
@action_id = @conv_thread.next_action_id if @action_id.nil?
@conv_thread.conversation.send_message(self)
@conv_thread.conversation.send_message(self, go_quiet)
@sent = true
# return message (convenience)
......
@split_data_message = nil
@split_data = []
@comm_stop = false
@comm_dead = false
# associated conversation threads
@auto_close_threads = 60 # max idle time before closing (nil => never close threads)
......
@next_thread_id = 0
@system_thread = self.thread('system')
@stop_when_finished = false
# post-negociation peer info
@peer_name = nil
@peer_capabilities = []
......
def unbind
logger.info "Conversation finished with #{identifier} (#{@peer_name})"
# ensure we go quiet
@message_send.synchronize do
@comm_stop = true
end
@comm_dead = true
@bot.unregister_communication @peer_name unless @peer_name.nil?
@bot.drop_channel(@system_notification_name)
@conv_threads.each_value {|s| s.close(false) }
@conv_threads = {}
@conv_threads_index = {}
@comm_logic_block.call false unless @comm_logic_block.nil? or @protocol.negociation_ok?
end
def usuable?
not (@comm_dead or @comm_stop)
end
def bye
@protocol.send_quit_leaving
stop :quickly
end
def stop(condition)
case condition
when :when_finished
@stop_when_finished = true
when :quickly
@protocol.send_quit_leaving
end
end
def set_peer_info(name, capabilities)
......
end
def set_comm_stop(peer_left = false)
@comm_stop = true
return if @comm_stop
logger.debug "Conversation with '#{@peer_name}' on #{identifier} is closing"
@system_notification.unsubscribe(@system_notification_processing)
# the block may use a <message>.send(true) to go quiet (set @comm_stop)
yield if block_given?
# ensure we go quiet
@message_send.synchronize do
@comm_stop = true
end
peer_left ? close_connection : close_connection_after_writing
end
......
@conv_threads_closing.delete(th.name)
end
def send_message(message)
def send_message(message, quiet_after = false)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot send message without action id") if message.action_id.nil?
reset_idle_thread_check(message.conv_thread)
......
flags += "+" unless message.action_parameters.nil?
@message_send.synchronize do
send_line sprintf("%s-%04d-%04d%s %s", @bot.name, message.conv_thread.id, message.action_id, flags, message.action_code)
unless message.action_parameters.nil?
message.action_parameters.to_yaml.each_line {|l| send_line l }
send_line EOD
unless @comm_stop
send_line sprintf("%s-%04d-%04d%s %s", @bot.name, message.conv_thread.id, message.action_id, flags, message.action_code)
unless message.action_parameters.nil?
message.action_parameters.to_yaml.each_line {|l| send_line l }
send_line EOD
end
@comm_stop = true if quiet_after
end
end
......
def send_line(msg)
return if error?
return if @comm_stop
logger.debug "Sending data [#{identifier}]: #{msg}"
send_data "#{msg}\n"
......
@conv_threads_timers_manage.synchronize do
# if a timer is running, do nothing
return unless @conv_threads_timers[conv_thread.id].nil?
end
# test for idleness
if conv_thread.idle?
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is currently idle, doing another check in #{@auto_close_threads}s"
# test for idleness
if conv_thread.idle?
if @stop_when_finished
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is currently idle, considered finished, closing"
close_thread(conv_thread.name)
return
end
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is currently idle, doing another check in #{@auto_close_threads}s"
# send notification
@bot.get_channel(@system_notification_name) << {
:topic => 'THREAD IDLE',
:thread => conv_thread.name
}
# send notification
@bot.get_channel(@system_notification_name) << {
:topic => 'THREAD IDLE',
:thread => conv_thread.name
}
@conv_threads_timers_manage.synchronize do
# set timer for closing
@conv_threads_timers[conv_thread.id] = EventMachine.add_timer(@auto_close_threads) do
@conv_threads_timers[conv_thread.id] = nil
lib/cyborghood/cyborg/botnet/dsl.rb
module DSL
module BotnetTask
def ask(peer, key, cmd, *args)
_add_subtask_using_peer(peer) do |subtask, conv_thread|
action_name = ['ask', cmd, *args].hash
_add_subtask_using_peer(action_name, peer) do |subtask, conv_thread|
conv_thread.call(cmd, *args) do |reply|
case reply[:status]
when :ok
......
end
def know?(peer, key, cmd)
_add_subtask_using_peer(peer) do |subtask, conv_thread|
action_name = ['know', cmd].hash
_add_subtask_using_peer(action_name, peer) do |subtask, conv_thread|
conv_thread.exists?(cmd) do |reply|
case reply[:status]
when :ok
......
end
end
def _add_subtask_using_peer(peer)
_add_subtask("botnet/peer/#{peer}/out") do |subtask|
def _add_subtask_using_peer(action_name, peer)
subtask_name = "botnet/peer/#{peer}/#{action_name}/out"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': Trying to contact peer '#{peer}'"
# callback to end peer action and subtask
# (used when shooting the task)
defuse_peer_action_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
subtask.finish unless subtask.finished?
# we already own the mutex here
registered_resources.delete(subtask_name)
end
# register peer action in the task
tasks_info_mutex.synchronize do
registered_resources[subtask_name] = defuse_peer_action_cb
end
@bot.contact_peer(peer) do |conv|
if conv
logger.debug "Task '#{@name}': Peer '#{peer}' contacted, starting conversation"
logger.debug "Task '#{@name}': subtask '#{subtask.name}': peer '#{peer}' contacted, starting conversation"
@peer_contacted << peer
......
def _finished
super
# close opened thread
cb = Proc.new do |conv|
conv.close_thread(@notification_name)
end
# loop on all contacted peers to close the thread
@peer_contacted.each do |peer|
@bot.contact_peer(peer, true, &cb)
end
lib/cyborghood/cyborg/botnet/protocol.rb
def send_quit_decline(reason)
@conversation.set_comm_stop do
@conversation.thread('system').new_message("QUIT DECLINE", { :reason => reason }).send
@conversation.thread('system').new_message("QUIT DECLINE", { :reason => reason }).send(true)
end
end
def send_quit_leaving
@conversation.set_comm_stop do
@conversation.thread('system').new_message("QUIT LEAVING").send
@conversation.thread('system').new_message("QUIT LEAVING").send(true)
end
end
end
lib/cyborghood/cyborg/dsl.rb
class TaskBase < BaseDSL
attr_reader :bot, :name, :errors, :results, :preferred_locales, :locale, :user
# mutex used for all class variables in this paragraph
@@tasks_info_mutex = Mutex.new
@@task_wip = 0
@@registered_resources = {}
@@stop_all_tasks = false
def self.idle?
@@task_wip == 0
@@tasks_info_mutex.synchronize do
@@task_wip == 0
end
end
def self.stop_all
@@stop_all_tasks = true
# call defuse callback out of the sync block
# (which may need to synchronize later to the
# same mutex in usual conditions)
resources_to_free = true
while not resources_to_free.nil?
@@tasks_info_mutex.synchronize do
resources_to_free = @@registered_resources.shift
end
resources_to_free[1].call unless resources_to_free.nil?
end
end
# the name MUST be unique
......
@bot = bot
@name = name
@@task_wip += 1
@@tasks_info_mutex.synchronize do
@@task_wip += 1
end
@errors = {}.freeze
@results = {}.freeze
......
def wait_notification(name, criterias = {}, timeout = nil, &cb)
name = @notification_name if name == :task
_add_subtask("notification/#{name}/in") do |subtask|
logger.debug "Task '#{@name}': Waiting notification on '#{name}'"
subtask_name = "notification/#{name}/#{criterias.hash}/in"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': subtask '#{subtask.name}': waiting notification on '#{name}'"
subcription_id = nil
subcription_id_mutex = Mutex.new
# callback to end notification and subtask
# (used when shooting the task)
defuse_notification_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
subtask.finish unless subtask.finished?
# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register notification in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_notification_cb
end
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
cb.call(subtask, msg)
@bot.get_channel(name).unsubscribe(subcription_id) if subtask.finished?
if subtask.finished?
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
end
end
end
if timeout
EventMachine.add_timer(timeout) do
@bot.get_channel(name).unsubscribe(subcription_id)
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end
# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
subtask.finish
end
end
......
end
def wait_timer(timeout, repeat = false, &cb)
_add_subtask("timer/#{timeout}/#{cb.hash}") do |subtask|
periodic_timer_signature = nil
subtask_name = "timer/#{timeout}/#{cb.hash}"
_add_subtask(subtask_name) do |subtask|
timer_signature = nil
timer_signature_mutex = Mutex.new
# callback to end timer and subtask
# (used when shooting the task)
defuse_timer_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"
timer_signature_mutex.synchronize do
EventMachine.cancel_timer(timer_signature) unless timer_signature.nil?
timer_signature = nil
end
subtask.finish unless subtask.finished?
# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register timer in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_timer_cb
end
timer_cb = Proc.new do
if repeat
cb.call(subtask)
EventMachine.cancel_timer(periodic_timer_signature) if subtask.finished?
timer_signature_mutex.synchronize do
if subtask.finished? and not timer_signature.nil?
EventMachine.cancel_timer(timer_signature)
timer_signature = nil
end
end
else
cb.call(subtask)
subtask.finish
# unregister the timer
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
subtask.finish unless subtask.finished?
end
end
if repeat
periodic_timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
else
EventMachine.add_timer(timeout, timer_cb)
timer_signature = EventMachine.add_timer(timeout, timer_cb)
end
end
end
......
# in this order: send a last message before leaving
# (so we are sure the peer received a reply to its message)
send_notification notification_name, {:topic => "MEET", :from => @name}
subtask.finish if meet_ok
subtask.finish if meet_ok or @@stop_all_tasks
end
end
......
def stop_bot(condition)
_add_subtask('stop') do |subtask|
case condition
when :when_finished
# try to stop gracefully
@bot.try_stop
when :at_once
# it won't wait for anything to finish…
@bot.stop
end
@bot.stop(condition)
subtask.finish
end
......
def finish
if @finished
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask should have ended, but it lied")
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask '#{@name}' should have ended, but it lied")
end
@finished = true
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
......
logger.debug "Task '#{@name}': step finished"
if @@stop_all_tasks
_finished
# running no more steps will end the task
return
end
# compute step result
@errors = {}
@results = {}
......
end
def _finished
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
@@tasks_info_mutex.synchronize do
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} tasks remaining)"
end
end
# needed for mixin
# TODO: find a cleaner solution
def tasks_info_mutex
@@tasks_info_mutex
end
# needed for mixin
# TODO: find a cleaner solution
def registered_resources
@@registered_resources
end
end

Also available in: Unified diff