Project

General

Profile

Download (13.6 KB) Statistics
| Branch: | Tag: | Revision:
7bc7e62a Marc Dequènes (Duck)
#--
# CyborgHood, a distributed system management software.
364e4a96 Marc Dequènes (Duck)
# Copyright (c) 2009-2011 Marc Dequènes (Duck) <Duck@DuckCorp.org>
7bc7e62a Marc Dequènes (Duck)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

require 'active_support/basic_object'
4c969e21 Marc Dequènes (Duck)
require 'http_headers'
f6fd4b56 Marc Dequènes (Duck)
require 'set'
7bc7e62a Marc Dequènes (Duck)

module CyborgHood
4d9f2962 Marc Dequènes (Duck)
module DSL
cf95e788 Marc Dequènes (Duck)
class BaseDSL
include I18nTranslation
7bc7e62a Marc Dequènes (Duck)
end

429ebb9c Marc Dequènes (Duck)
class TaskBase < BaseDSL
af81fa28 Marc Dequènes (Duck)
attr_reader :bot, :name, :errors, :results, :preferred_locales, :locale, :user
7bc7e62a Marc Dequènes (Duck)
a089227c Marc Dequènes (Duck)
# mutex used for all class variables in this paragraph
@@tasks_info_mutex = Mutex.new
4d9f2962 Marc Dequènes (Duck)
@@task_wip = 0
a089227c Marc Dequènes (Duck)
@@registered_resources = {}

@@stop_all_tasks = false
4d9f2962 Marc Dequènes (Duck)
def self.idle?
a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip == 0
end
end

def self.stop_all
@@stop_all_tasks = true

# call defuse callback out of the sync block
# (which may need to synchronize later to the
# same mutex in usual conditions)
resources_to_free = true
while not resources_to_free.nil?
@@tasks_info_mutex.synchronize do
resources_to_free = @@registered_resources.shift
end
resources_to_free[1].call unless resources_to_free.nil?
end
4d9f2962 Marc Dequènes (Duck)
end
7bc7e62a Marc Dequènes (Duck)
3b29bf46 Marc Dequènes (Duck)
# the name MUST be unique
def initialize(bot, name, &block)
7bc7e62a Marc Dequènes (Duck)
@bot = bot
@name = name

a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip += 1
end
7bc7e62a Marc Dequènes (Duck)
5da1960c Marc Dequènes (Duck)
@errors = {}.freeze
@results = {}.freeze
7bc7e62a Marc Dequènes (Duck)
@notification_name = "task/#{@name}"
4c969e21 Marc Dequènes (Duck)
@preferred_locales = nil
@locale = nil
af81fa28 Marc Dequènes (Duck)
@user = nil
7bc7e62a Marc Dequènes (Duck)
429ebb9c Marc Dequènes (Duck)
_setup
5da1960c Marc Dequènes (Duck)
_start_dsl &block
end

4c969e21 Marc Dequènes (Duck)
def set_preferred_locales(prefs)
_add_subtask("setting/preferred_locales") do |subtask|
logger.debug "Task '#{@name}': setting preferred locales"
lang_chooser = HTTPHeaders::AcceptLanguage.parse(prefs)
if lang_chooser
@preferred_locales = prefs

ordered_list = lang_chooser.reduce(I18nController.instance.available_locales)
@locale = ordered_list.empty? ? I18nController::DEFAULT_LOCALE : ordered_list.first.range
logger.debug "Task '#{@name}': locale chosen: #{@locale}"
else
@errors << CyberError.new(:unrecoverable, "botnet/client/dsl", _("unparsable preferred locales"))
end
subtask.finish
end
end

af81fa28 Marc Dequènes (Duck)
# temporary setting until Guard is created
def set_user(user)
_add_subtask("setting/user") do |subtask|
logger.debug "Task '#{@name}': setting user to: #{user}"
@user = user

subtask.finish
end
end

7bc7e62a Marc Dequènes (Duck)
# may return a Hash of results
def schedule(&job)
612572f9 Marc Dequènes (Duck)
_add_subtask("job/#{job.hash}") do |subtask|
logger.debug "Task '#{@name}': Scheduling job"
7bc7e62a Marc Dequènes (Duck)
cb = Proc.new do
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
d9ef7de9 Marc Dequènes (Duck)
job_wrapper = Proc.new do
job.call subtask
end
@bot.schedule_task(cb, job_wrapper)
7bc7e62a Marc Dequènes (Duck)
end
end

def send_notification(name, data)
947e1d12 Marc Dequènes (Duck)
name = @notification_name if name == :task
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
_add_subtask("notification/#{name}/out") do |subtask|
logger.debug "Task '#{@name}': Sending notification to '#{name}'"
7bc7e62a Marc Dequènes (Duck)
chan = @bot.get_channel(name)
chan << data
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

def wait_notification(name, criterias = {}, timeout = nil, &cb)
947e1d12 Marc Dequènes (Duck)
name = @notification_name if name == :task
7bc7e62a Marc Dequènes (Duck)
a089227c Marc Dequènes (Duck)
subtask_name = "notification/#{name}/#{criterias.hash}/in"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': subtask '#{subtask.name}': waiting notification on '#{name}'"
subcription_id = nil
subcription_id_mutex = Mutex.new

# callback to end notification and subtask
# (used when shooting the task)
defuse_notification_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"

subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

subtask.finish unless subtask.finished?

# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register notification in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_notification_cb
end

7bc7e62a Marc Dequènes (Duck)
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
612572f9 Marc Dequènes (Duck)
cb.call(subtask, msg)
a089227c Marc Dequènes (Duck)
if subtask.finished?
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end
end
7bc7e62a Marc Dequènes (Duck)
end
end

if timeout
EventMachine.add_timer(timeout) do
a089227c Marc Dequènes (Duck)
subcription_id_mutex.synchronize do
@bot.get_channel(name).unsubscribe(subcription_id) unless subcription_id.nil?
subcription_id = nil
end

# unregister the notification
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end

612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end
end
end

def wait_timer(timeout, repeat = false, &cb)
a089227c Marc Dequènes (Duck)
subtask_name = "timer/#{timeout}/#{cb.hash}"
_add_subtask(subtask_name) do |subtask|
timer_signature = nil
timer_signature_mutex = Mutex.new

# callback to end timer and subtask
# (used when shooting the task)
defuse_timer_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"

timer_signature_mutex.synchronize do
EventMachine.cancel_timer(timer_signature) unless timer_signature.nil?
timer_signature = nil
end

subtask.finish unless subtask.finished?

# we already own the mutex here
@@registered_resources.delete(subtask_name)
end
# register timer in the task
@@tasks_info_mutex.synchronize do
@@registered_resources[subtask_name] = defuse_timer_cb
end
7bc7e62a Marc Dequènes (Duck)
timer_cb = Proc.new do
if repeat
612572f9 Marc Dequènes (Duck)
cb.call(subtask)
a089227c Marc Dequènes (Duck)
timer_signature_mutex.synchronize do
if subtask.finished? and not timer_signature.nil?
EventMachine.cancel_timer(timer_signature)
timer_signature = nil
end
end
7bc7e62a Marc Dequènes (Duck)
else
612572f9 Marc Dequènes (Duck)
cb.call(subtask)
a089227c Marc Dequènes (Duck)
# unregister the timer
@@tasks_info_mutex.synchronize do
@@registered_resources.delete(subtask_name)
end

subtask.finish unless subtask.finished?
7bc7e62a Marc Dequènes (Duck)
end
end

if repeat
a089227c Marc Dequènes (Duck)
timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
7bc7e62a Marc Dequènes (Duck)
else
a089227c Marc Dequènes (Duck)
timer_signature = EventMachine.add_timer(timeout, timer_cb)
7bc7e62a Marc Dequènes (Duck)
end
end
end

3b29bf46 Marc Dequènes (Duck)
# the name MUST be unique
def task(name, &block)
612572f9 Marc Dequènes (Duck)
_add_subtask("task/#{name}") do |subtask|
3b29bf46 Marc Dequènes (Duck)
self.class.new(@bot, name, &block)
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

f6fd4b56 Marc Dequènes (Duck)
def meet(task_list, meetpoint, retry_time = 2)
task_list = [task_list] unless task_list.is_a? Array
task_list = Set.new(task_list)

notification_name = "meeting_point/#{meetpoint}"
notifications_received = Set.new
meet_ok = false

wait_notification(notification_name, {:topic => "MEET"}) do |subtask, msg|
notifications_received << msg[:from] if task_list.include?(msg[:from])
if task_list == notifications_received
meet_ok = true
subtask.finish
end
end
wait_timer(retry_time, true) do |subtask|
# in this order: send a last message before leaving
# (so we are sure the peer received a reply to its message)
send_notification notification_name, {:topic => "MEET", :from => @name}
a089227c Marc Dequènes (Duck)
subtask.finish if meet_ok or @@stop_all_tasks
f6fd4b56 Marc Dequènes (Duck)
end
end

7bc7e62a Marc Dequènes (Duck)
# error when adding subtasks, as we cannot check callbacks
# only applies to subsequently added tasks (wanted behavior)
def cancel_on_start_error
@cancel_on_start_error = true
end

def on_error(&cb)
@error_cb = cb
end

def on_success(&cb)
@success_cb = cb
end

def stop_bot(condition)
612572f9 Marc Dequènes (Duck)
_add_subtask('stop') do |subtask|
a089227c Marc Dequènes (Duck)
@bot.stop(condition)
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
subtask.finish
7bc7e62a Marc Dequènes (Duck)
end
end

protected

612572f9 Marc Dequènes (Duck)
class Subtask
attr_reader :name
attr_accessor :results, :errors
7bc7e62a Marc Dequènes (Duck)
4d9f2962 Marc Dequènes (Duck)
def initialize(task, name, &block)
@task = task
612572f9 Marc Dequènes (Duck)
@name = name
@block = block
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
@finished = false
@results = {}
@errors = []
7bc7e62a Marc Dequènes (Duck)
end

612572f9 Marc Dequènes (Duck)
def do_it
@block.call self
rescue
4d9f2962 Marc Dequènes (Duck)
msg = "Task '#{@task.name}': error: " + $!.to_s
612572f9 Marc Dequènes (Duck)
logger.error msg
@errors << CyberError.new(:unrecoverable, "botnet/client/dsl", msg)
4d9f2962 Marc Dequènes (Duck)
logger.debug $!.backtrace.join("\n")
612572f9 Marc Dequènes (Duck)
finish
end

def finish
if @finished
a089227c Marc Dequènes (Duck)
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask '#{@name}' should have ended, but it lied")
7bc7e62a Marc Dequènes (Duck)
end
612572f9 Marc Dequènes (Duck)
@finished = true
4d9f2962 Marc Dequènes (Duck)
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
@task.__send__(:_check_finished)
7bc7e62a Marc Dequènes (Duck)
end

612572f9 Marc Dequènes (Duck)
def finished?
@finished
end
end
7bc7e62a Marc Dequènes (Duck)
612572f9 Marc Dequènes (Duck)
def _add_subtask(name, cb = nil, &block)
f6fd4b56 Marc Dequènes (Duck)
subtask = Subtask.new(self, name, &block)
@subtasks << subtask
subtask.do_it if @dsl_runing
7bc7e62a Marc Dequènes (Duck)
end

def _subtasks_finished?
654cf341 Marc Dequènes (Duck)
subtasks_running = []
7bc7e62a Marc Dequènes (Duck)
@subtasks.each do |subtask|
654cf341 Marc Dequènes (Duck)
subtasks_running << subtask.name unless subtask.finished?
7bc7e62a Marc Dequènes (Duck)
end
654cf341 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': the following subtasks are still running: " +
subtasks_running.join(', ') unless subtasks_running.empty?

subtasks_running.empty?
7bc7e62a Marc Dequènes (Duck)
end

def _check_finished
return unless _subtasks_finished?

f6fd4b56 Marc Dequènes (Duck)
# avoid race: no subtask will be run in this task now
@dsl_runing = false

654cf341 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': step finished"

a089227c Marc Dequènes (Duck)
if @@stop_all_tasks
_finished

# running no more steps will end the task
return
end

5da1960c Marc Dequènes (Duck)
# compute step result
@errors = {}
@results = {}
7bc7e62a Marc Dequènes (Duck)
@subtasks.each do |subtask|
5da1960c Marc Dequènes (Duck)
@errors[subtask.name] = subtask.errors unless subtask.errors.empty?
@results.merge!(subtask.results)
7bc7e62a Marc Dequènes (Duck)
end
5da1960c Marc Dequènes (Duck)
@errors.freeze
@result.freeze
7619e2d8 Marc Dequènes (Duck)
5da1960c Marc Dequènes (Duck)
# next step
cb = @errors.empty? ? @success_cb : @error_cb
4d9f2962 Marc Dequènes (Duck)
if cb
5da1960c Marc Dequènes (Duck)
_start_dsl(&cb)
4d9f2962 Marc Dequènes (Duck)
else
429ebb9c Marc Dequènes (Duck)
_finished
4d9f2962 Marc Dequènes (Duck)
end
7bc7e62a Marc Dequènes (Duck)
end

def _notification_criterias_match(msg, criterias)
criterias.each_pair do |key, expected_val|
val = msg[key]
if expected_val.is_a? Regexp
return false if val !~ expected_val
else
return false if val != expected_val
end
end
true
end

429ebb9c Marc Dequènes (Duck)
def _setup
logger.debug "Task '#{@name}': created"
end

5da1960c Marc Dequènes (Duck)
def _start_dsl(&block)
@dsl_runing = false
@subtasks = []
@cancel_on_start_error = false
@error_cb = nil
@success_cb = nil

begin
instance_eval(&block)
rescue
logger.error "Task '#{@name}': error in DSL: " + $!.to_s
logger.debug $!.backtrace.join("\n")
return
end
7bc7e62a Marc Dequènes (Duck)
4d9f2962 Marc Dequènes (Duck)
logger.debug "Task '#{@name}': begining step"

if @subtasks.empty?
_check_finished
else
f6fd4b56 Marc Dequènes (Duck)
@dsl_runing = true
4d9f2962 Marc Dequènes (Duck)
@subtasks.each do |subtask|
# skip broken or canceled subtasks
next if subtask.finished?
subtask.do_it
end
7bc7e62a Marc Dequènes (Duck)
end

true
end
429ebb9c Marc Dequènes (Duck)
def _finished
a089227c Marc Dequènes (Duck)
@@tasks_info_mutex.synchronize do
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} tasks remaining)"
end
end

# needed for mixin
# TODO: find a cleaner solution
def tasks_info_mutex
@@tasks_info_mutex
end

# needed for mixin
# TODO: find a cleaner solution
def registered_resources
@@registered_resources
429ebb9c Marc Dequènes (Duck)
end
end

# this empty class is used as a trick to be able to inject features
# as module.prepend does not exist yet
class Task < TaskBase
7bc7e62a Marc Dequènes (Duck)
end
end
end