Project

General

Profile

Download (8.42 KB) Statistics
| Branch: | Tag: | Revision:
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

require 'active_support/basic_object'
require 'set'


module CyborgHood
module DSL
class BaseDSL < ActiveSupport::BasicObject
def initialize(&block)
self.instance_eval(&block)
_start_dsl
end

reveal :class
reveal :logger
end

class Task < BaseDSL
attr_reader :bot, :name, :errors, :results

@@task_wip = 0

def self.idle?
@@task_wip == 0
end

def initialize(bot, name = nil, previous_step_data = {}, &block)
@bot = bot
@name = name

@@task_wip += 1

@errors = (previous_step_data[:errors] || {}).freeze
@results = (previous_step_data[:results] || {}).freeze

@dsl_runing = false
@subtasks = []
@cancel_on_start_error = false
@cancel_run = false

@error_cb = nil
@success_cb = nil

@notification_name = "task/#{@name}"

begin
super(&block)
rescue
logger.error "Task '#{@name}': error in DSL: " + $!.to_s
logger.debug $!.backtrace.join("\n")
@cancel_run = true
end
end

# may return a Hash of results
def schedule(&job)
_add_subtask("job/#{job.hash}") do |subtask|
logger.debug "Task '#{@name}': Scheduling job"
cb = Proc.new do
subtask.finish
end
job_wrapper = Proc.new do
job.call subtask
end
@bot.schedule_task(cb, job_wrapper)
end
end

def send_notification(name, data)
name = @notification_name if name == :task

_add_subtask("notification/#{name}/out") do |subtask|
logger.debug "Task '#{@name}': Sending notification to '#{name}'"
chan = @bot.get_channel(name)
chan << data
subtask.finish
end
end

def wait_notification(name, criterias = {}, timeout = nil, &cb)
name = @notification_name if name == :task

_add_subtask("notification/#{name}/in") do |subtask|
logger.debug "Task '#{@name}': Waiting notification on '#{name}'"
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
cb.call(subtask, msg)
@bot.get_channel(name).unsubscribe(subcription_id) if subtask.finished?
end
end

if timeout
EventMachine.add_timer(timeout) do
@bot.get_channel(name).unsubscribe(subcription_id)
subtask.finish
end
end
end
end

def wait_timer(timeout, repeat = false, &cb)
_add_subtask("timer/#{timeout}/#{cb.hash}") do |subtask|
periodic_timer_signature = nil

timer_cb = Proc.new do
if repeat
cb.call(subtask)
EventMachine.cancel_timer(periodic_timer_signature) if subtask.finished?
else
cb.call(subtask)
subtask.finish
end
end

if repeat
periodic_timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
else
EventMachine.add_timer(timeout, timer_cb)
end
end
end

def task(name = nil, &block)
_add_subtask("task/#{name}") do |subtask|
self.class.new(@bot, name || @name, &block)
subtask.finish
end
end

def meet(task_list, meetpoint, retry_time = 2)
task_list = [task_list] unless task_list.is_a? Array
task_list = Set.new(task_list)

notification_name = "meeting_point/#{meetpoint}"
notifications_received = Set.new
meet_ok = false

wait_notification(notification_name, {:topic => "MEET"}) do |subtask, msg|
notifications_received << msg[:from] if task_list.include?(msg[:from])
if task_list == notifications_received
meet_ok = true
subtask.finish
end
end
wait_timer(retry_time, true) do |subtask|
# in this order: send a last message before leaving
# (so we are sure the peer received a reply to its message)
send_notification notification_name, {:topic => "MEET", :from => @name}
subtask.finish if meet_ok
end
end

# error when adding subtasks, as we cannot check callbacks
# only applies to subsequently added tasks (wanted behavior)
def cancel_on_start_error
@cancel_on_start_error = true
end

def on_error(&cb)
@error_cb = cb
end

def on_success(&cb)
@success_cb = cb
end

def stop_bot(condition)
_add_subtask('stop') do |subtask|
case condition
when :when_finished
# try to stop gracefully
@bot.try_stop
when :at_once
# it won't wait for anything to finish…
@bot.stop
end

subtask.finish
end
end

protected

class Subtask
attr_reader :name
attr_accessor :results, :errors

def initialize(task, name, &block)
@task = task
@name = name
@block = block

@finished = false
@results = {}
@errors = []
end

def do_it
@block.call self
rescue
msg = "Task '#{@task.name}': error: " + $!.to_s
logger.error msg
@errors << CyberError.new(:unrecoverable, "botnet/client/dsl", msg)
logger.debug $!.backtrace.join("\n")
finish
end

def finish
if @finished
raise CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@task.name}': subtask should have ended, but it lied")
end
@finished = true
logger.debug "Task '#{@task.name}': subtask '#{@name}' finished"
@task.__send__(:_check_finished)
end

def finished?
@finished
end
end

def _add_subtask(name, cb = nil, &block)
subtask = Subtask.new(self, name, &block)
@subtasks << subtask
subtask.do_it if @dsl_runing
end

def _subtasks_finished?
@subtasks.each do |subtask|
return false unless subtask.finished?
end
return true
end

def _check_finished
return unless _subtasks_finished?

# avoid race: no subtask will be run in this task now
@dsl_runing = false

next_step_data = {
:errors => {},
:results => {}
}

@subtasks.each do |subtask|
next_step_data[:errors][subtask.name] = subtask.errors unless subtask.errors.empty?
next_step_data[:results].merge!(subtask.results)
end

cb = next_step_data[:errors].empty? ? @success_cb : @error_cb
if cb
self.class.new(@bot, @name, next_step_data, &cb)
else
logger.debug "Task '#{@name}': finished"
end

@@task_wip -= 1
end

def _notification_criterias_match(msg, criterias)
criterias.each_pair do |key, expected_val|
val = msg[key]
if expected_val.is_a? Regexp
return false if val !~ expected_val
else
return false if val != expected_val
end
end
true
end

def _start_dsl
return false if @cancel_run

logger.debug "Task '#{@name}': begining step"

if @subtasks.empty?
_check_finished
else
@dsl_runing = true
@subtasks.each do |subtask|
# skip broken or canceled subtasks
next if subtask.finished?
subtask.do_it
end
end

true
end
end
end
end
(2-2/2)