Project

General

Profile

Download (13.2 KB) Statistics
| Branch: | Tag: | Revision:
7bc7e62a Marc Dequènes (Duck)
#--
# CyborgHood, a distributed system management software.
364e4a96 Marc Dequènes (Duck)
# Copyright (c) 2009-2011 Marc Dequènes (Duck) <Duck@DuckCorp.org>
7bc7e62a Marc Dequènes (Duck)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++


module CyborgHood
a445b722 Marc Dequènes (Duck)
module TaskAspect
def task(name, &block)
the_bot = self.is_a?(Cyborg) ? self : self.bot
DSL::Task.new(the_bot, name, &block)
end

def schedule_task(callback = nil, &task)
EventMachine.defer(task, callback)
7bc7e62a Marc Dequènes (Duck)
end
a445b722 Marc Dequènes (Duck)
end
7bc7e62a Marc Dequènes (Duck)
a445b722 Marc Dequènes (Duck)
module DSL
class TaskBase
af81fa28 Marc Dequènes (Duck)
attr_reader :bot, :name, :errors, :results, :preferred_locales, :locale, :user
7bc7e62a Marc Dequènes (Duck)
a445b722 Marc Dequènes (Duck)
include I18nTranslation

a089227c Marc Dequènes (Duck)
# mutex used for all class variables in this paragraph
@@tasks_info_mutex = Mutex.new
4d9f2962 Marc Dequènes (Duck)
@@task_wip = 0
a089227c Marc Dequènes (Duck)
@@registered_resources = {}

@@stop_all_tasks = false
4d9f2962 Marc Dequènes (Duck)
def self.idle?
a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip == 0
end
end

def self.stop_all
@@stop_all_tasks = true

# call defuse callback out of the sync block
# (which may need to synchronize later to the
# same mutex in usual conditions)
resources_to_free = true
while not resources_to_free.nil?
@@tasks_info_mutex.synchronize do
resources_to_free = @@registered_resources.shift
end
resources_to_free[1].call unless resources_to_free.nil?
end
4d9f2962 Marc Dequènes (Duck)
end
7bc7e62a Marc Dequènes (Duck)
3b29bf46 Marc Dequènes (Duck)
# the name MUST be unique
def initialize(bot, name, &block)
7bc7e62a Marc Dequènes (Duck)
@bot = bot
@name = name

a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip += 1
end
7bc7e62a Marc Dequènes (Duck)
5da1960c Marc Dequènes (Duck)
@errors = {}.freeze
@results = {}.freeze
7bc7e62a Marc Dequènes (Duck)
@notification_name = "task/#{@name}"
4c969e21 Marc Dequènes (Duck)
@preferred_locales = nil
@locale = nil
af81fa28 Marc Dequènes (Duck)
@user = nil
7bc7e62a Marc Dequènes (Duck)
429ebb9c Marc Dequènes (Duck)
_setup
5da1960c Marc Dequènes (Duck)
_start_dsl &block
end

4c969e21 Marc Dequènes (Duck)
def set_preferred_locales(prefs)
_add_subtask("setting/preferred_locales") do |subtask|
51546fe5 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': setting preferred locales to: #{prefs}"
@preferred_locales = prefs

4c969e21 Marc Dequènes (Duck)
subtask.finish
end
end

af81fa28 Marc Dequènes (Duck)
# temporary setting until Guard is created
def set_user(user)
_add_subtask("setting/user") do |subtask|
logger.debug "Task '#{@name}': setting user to: #{user}"
@user = user

subtask.finish
end
end

7bc7e62a Marc Dequènes (Duck)
# may return a Hash of results
def schedule(&job)
612572f9 Marc Dequènes (Duck)
_add_subtask("job/#{job.hash}") do |subtask|
logger.debug "Task '#{@name}': Scheduling job"
7bc7e62a Marc Dequènes (Duck)
cb = Proc.new do
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
d9ef7de9 Marc Dequènes (Duck)
job_wrapper = Proc.new do
job.call subtask
end
@bot.schedule_task(cb, job_wrapper)
7bc7e62a Marc Dequènes (Duck)
end
end

def send_notification(name, data)
947e1d12 Marc Dequènes (Duck)
name = @notification_name if name == :task
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
_add_subtask("notification/#{name}/out") do |subtask|
logger.debug "Task '#{@name}': Sending notification to '#{name}'"
7bc7e62a Marc Dequènes (Duck)
chan = @bot.get_channel(name)
chan << data
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

def wait_notification(name, criterias = {}, timeout = nil, &cb)
947e1d12 Marc Dequènes (Duck)
name = @notification_name if name == :task
7bc7e62a Marc Dequènes (Duck)
a089227c Marc Dequènes (Duck)
subtask_name = "notification/#{name}/#{criterias.hash}/in"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': subtask '#{subtask.name}': waiting notification on '#{name}'"
subcription_id = nil
subcription_id_mutex = Mutex.new

# callback to end notification and subtask
# (used when shooting the task)
defuse_notification_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"

subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

subtask.finish unless subtask.finished?

# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register notification in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_notification_cb
end

7bc7e62a Marc Dequènes (Duck)
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
612572f9 Marc Dequènes (Duck)
cb.call(subtask, msg)
a089227c Marc Dequènes (Duck)
if subtask.finished?
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
end
7bc7e62a Marc Dequènes (Duck)
end
end

if timeout
EventMachine.add_timer(timeout) do
a089227c Marc Dequènes (Duck)
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end

612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end
end
end

def wait_timer(timeout, repeat = false, &cb)
a089227c Marc Dequènes (Duck)
subtask_name = "timer/#{timeout}/#{cb.hash}"
_add_subtask(subtask_name) do |subtask|
timer_signature = nil
timer_signature_mutex = Mutex.new

# callback to end timer and subtask
# (used when shooting the task)
defuse_timer_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"

timer_signature_mutex.synchronize do
EventMachine.cancel_timer(timer_signature) unless timer_signature.nil?
timer_signature = nil
end

subtask.finish unless subtask.finished?

# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register timer in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_timer_cb
end
7bc7e62a Marc Dequènes (Duck)
timer_cb = Proc.new do
if repeat
612572f9 Marc Dequènes (Duck)
cb.call(subtask)
a089227c Marc Dequènes (Duck)
timer_signature_mutex.synchronize do
if subtask.finished? and not timer_signature.nil?
EventMachine.cancel_timer(timer_signature)
timer_signature = nil
end
end
7bc7e62a Marc Dequènes (Duck)
else
612572f9 Marc Dequènes (Duck)
cb.call(subtask)
a089227c Marc Dequènes (Duck)
# unregister the timer
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end

subtask.finish unless subtask.finished?
7bc7e62a Marc Dequènes (Duck)
end
end

if repeat
a089227c Marc Dequènes (Duck)
timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
7bc7e62a Marc Dequènes (Duck)
else
a089227c Marc Dequènes (Duck)
timer_signature = EventMachine.add_timer(timeout, timer_cb)
7bc7e62a Marc Dequènes (Duck)
end
end
end

3b29bf46 Marc Dequènes (Duck)
# the name MUST be unique
def task(name, &block)
612572f9 Marc Dequènes (Duck)
_add_subtask("task/#{name}") do |subtask|
3b29bf46 Marc Dequènes (Duck)
self.class.new(@bot, name, &block)
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

f6fd4b56 Marc Dequènes (Duck)
def meet(task_list, meetpoint, retry_time = 2)
task_list = [task_list] unless task_list.is_a? Array
task_list = Set.new(task_list)

notification_name = "meeting_point/#{meetpoint}"
notifications_received = Set.new
meet_ok = false

wait_notification(notification_name, {:topic => "MEET"}) do |subtask, msg|
notifications_received << msg[:from] if task_list.include?(msg[:from])
if task_list == notifications_received
meet_ok = true
subtask.finish
end
end
wait_timer(retry_time, true) do |subtask|
# in this order: send a last message before leaving
# (so we are sure the peer received a reply to its message)
send_notification notification_name, {:topic => "MEET", :from => @name}
a089227c Marc Dequènes (Duck)
subtask.finish if meet_ok or @@stop_all_tasks
f6fd4b56 Marc Dequènes (Duck)
end
end

7bc7e62a Marc Dequènes (Duck)
# error when adding subtasks, as we cannot check callbacks
# only applies to subsequently added tasks (wanted behavior)
def cancel_on_start_error
@cancel_on_start_error = true
end

def on_error(&cb)
@error_cb = cb
end

def on_success(&cb)
@success_cb = cb
end

def stop_bot(condition)
612572f9 Marc Dequènes (Duck)
_add_subtask('stop') do |subtask|
a089227c Marc Dequènes (Duck)
@bot.stop(condition)
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

protected

612572f9 Marc Dequènes (Duck)
class Subtask
attr_reader :name
attr_accessor :results, :errors
7bc7e62a Marc Dequènes (Duck)
4d9f2962 Marc Dequènes (Duck)
def initialize(task, name, &block)
@task = task
612572f9 Marc Dequènes (Duck)
@name = name
@block = block
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
@finished = false
@results = {}
@errors = []
7bc7e62a Marc Dequènes (Duck)
end

612572f9 Marc Dequènes (Duck)
def do_it
@block.call self
rescue
4d9f2962 Marc Dequènes (Duck)
msg = "Task '#{@task.name}': error: " + $!.to_s
612572f9 Marc Dequènes (Duck)
logger.error msg
@errors << CyberError.new(:unrecoverable, "botnet/client/dsl", msg)
4d9f2962 Marc Dequènes (Duck)
logger.debug $!.backtrace.join("\n")
612572f9 Marc Dequènes (Duck)
finish
end

def finish
if @finished
a089227c Marc Dequènes (Duck)
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask '#{@name}' should have ended, but it lied")
7bc7e62a Marc Dequènes (Duck)
end
612572f9 Marc Dequènes (Duck)
@finished = true
4d9f2962 Marc Dequènes (Duck)
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
@task.__send__(:_check_finished)
7bc7e62a Marc Dequènes (Duck)
end

612572f9 Marc Dequènes (Duck)
def finished?
@finished
end
end
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
def _add_subtask(name, cb = nil, &block)
f6fd4b56 Marc Dequènes (Duck)
subtask = Subtask.new(self, name, &block)
@subtasks << subtask
subtask.do_it if @dsl_runing
7bc7e62a Marc Dequènes (Duck)
end

def _subtasks_finished?
654cf341 Marc Dequènes (Duck)
subtasks_running = []
7bc7e62a Marc Dequènes (Duck)
@subtasks.each do |subtask|
654cf341 Marc Dequènes (Duck)
subtasks_running << subtask.name unless subtask.finished?
7bc7e62a Marc Dequènes (Duck)
end
654cf341 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': the following subtasks are still running: " +
subtasks_running.join(', ') unless subtasks_running.empty?

subtasks_running.empty?
7bc7e62a Marc Dequènes (Duck)
end

def _check_finished
return unless _subtasks_finished?

f6fd4b56 Marc Dequènes (Duck)
# avoid race: no subtask will be run in this task now
@dsl_runing = false

654cf341 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': step finished"

a089227c Marc Dequènes (Duck)
if @@stop_all_tasks
_finished

# running no more steps will end the task
return
end

5da1960c Marc Dequènes (Duck)
# compute step result
@errors = {}
@results = {}
7bc7e62a Marc Dequènes (Duck)
@subtasks.each do |subtask|
5da1960c Marc Dequènes (Duck)
@errors[subtask.name] = subtask.errors unless subtask.errors.empty?
@results.merge!(subtask.results)
7bc7e62a Marc Dequènes (Duck)
end
5da1960c Marc Dequènes (Duck)
@errors.freeze
@result.freeze
7619e2d8 Marc Dequènes (Duck)
5da1960c Marc Dequènes (Duck)
# next step
cb = @errors.empty? ? @success_cb : @error_cb
4d9f2962 Marc Dequènes (Duck)
if cb
5da1960c Marc Dequènes (Duck)
_start_dsl(&cb)
4d9f2962 Marc Dequènes (Duck)
else
429ebb9c Marc Dequènes (Duck)
_finished
4d9f2962 Marc Dequènes (Duck)
end
7bc7e62a Marc Dequènes (Duck)
end

def _notification_criterias_match(msg, criterias)
criterias.each_pair do |key, expected_val|
val = msg[key]
if expected_val.is_a? Regexp
return false if val !~ expected_val
else
return false if val != expected_val
end
end
true
end

429ebb9c Marc Dequènes (Duck)
def _setup
logger.debug "Task '#{@name}': created"
end

5da1960c Marc Dequènes (Duck)
def _start_dsl(&block)
@dsl_runing = false
@subtasks = []
@cancel_on_start_error = false
@error_cb = nil
@success_cb = nil

begin
instance_eval(&block)
rescue
logger.error "Task '#{@name}': error in DSL: " + $!.to_s
logger.debug $!.backtrace.join("\n")
return
end
7bc7e62a Marc Dequènes (Duck)
4d9f2962 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': begining step"

if @subtasks.empty?
_check_finished
else
f6fd4b56 Marc Dequènes (Duck)
@dsl_runing = true
4d9f2962 Marc Dequènes (Duck)
@subtasks.each do |subtask|
# skip broken or canceled subtasks
next if subtask.finished?
subtask.do_it
end
7bc7e62a Marc Dequènes (Duck)
end

true
end
429ebb9c Marc Dequènes (Duck)
def _finished
a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} tasks remaining)"
end
end

# needed for mixin
# TODO: find a cleaner solution
def tasks_info_mutex
@@tasks_info_mutex
end

# needed for mixin
# TODO: find a cleaner solution
def registered_resources
@@registered_resources
429ebb9c Marc Dequènes (Duck)
end
end

# this empty class is used as a trick to be able to inject features
# as module.prepend does not exist yet
class Task < TaskBase
7bc7e62a Marc Dequènes (Duck)
end
end
end