Project

General

Profile

« Previous | Next » 

Revision 7bc7e62a

Added by Marc Dequènes about 14 years ago

  • ID 7bc7e62a2e25c9c40bd6d3d92504a784ffe80e81

[evol] added preliminary work on new Task-based client DSL

View differences:

lib/cyborghood/cyborg/dsl.rb
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++
require 'active_support/basic_object'
module CyborgHood
module BotnetDSL
class BaseDSL < ActiveSupport::BasicObject
def initialize(&block)
self.instance_eval(&block)
_start_dsl
end
end
class Task < BaseDSL
attr_reader :bot
@@task_list = {}
def initialize(bot, name = nil, &block)
@bot = bot
@name = name
@@task_list[@name] = self unless @name.nil?
@subtasks = []
@cancel_on_start_error = false
@cancel_run = false
@error_cb = nil
@success_cb = nil
@logguer = Logger.instance
@notification_name = "task/#{@name}"
begin
super(&block)
rescue
@logger.error "Task '#{@name}': could not initialize DSL: " + $!.to_s
@logger.debug $!.backtrace.join("\n")
@cancel_run = true
end
end
# may return a Hash of results
def schedule(&job)
_add_subtask("job/#{job.hash}") do |subtask_cb|
@logger.debug "Task '#{@name}': Scheduling job"
cb = Proc.new do
subtask_cb true
end
@bot.schedule_task(cb, job)
end
end
def send_notification(name, data)
name = @notification_name if name == :thread
_add_subtask("notification/#{name}/out") do |subtask_cb|
@logger.debug "Task '#{@name}': Sending notification to '#{name}'"
chan = @bot.get_channel(name)
chan << data
subtask_cb.call(true)
end
end
def wait_notification(name, criterias = {}, timeout = nil, &cb)
name = @notification_name if name == :thread
full_results = {}
_add_subtask("notification/#{name}/in") do |subtask_cb|
@logger.debug "Task '#{@name}': Waiting notification on '#{name}'"
subcription_id = @bot.get_channel(name).subscribe do |msg|
if _notification_criterias_match(msg, criterias)
stop, results = cb.call(msg)
@bot.get_channel(name).unsubscribe(subcription_id) if stop
subtask_cb.call(stop, results)
end
end
if timeout
EventMachine.add_timer(timeout) do
@bot.get_channel(name).unsubscribe(subcription_id)
subtask_cb.call
end
end
end
end
def wait_timer(timeout, repeat = false, &cb)
_add_subtask("timer/#{timeout}/#{cb.hash}") do |subtask_cb|
periodic_timer_signature = nil
timer_cb = Proc.new do
if repeat
stop, results = cb.call
EventMachine.cancel_timer(periodic_timer_signature) if stop
else
stop = true
results = cb.call
end
subtask_cb.call(stop, results)
end
if repeat
periodic_timer_signature = EventMachine.add_periodic_timer(timeout, timer_cb)
else
EventMachine.add_timer(timeout, timer_cb)
end
end
end
def task(name = nil, &block)
_add_subtask("task/#{name}") do |subtask_cb|
self.class.new(name || @name, &block)
subtask_cb.call(true)
end
end
# error when adding subtasks, as we cannot check callbacks
# only applies to subsequently added tasks (wanted behavior)
def cancel_on_start_error
@cancel_on_start_error = true
end
def on_error(&cb)
@error_cb = cb
end
def on_success(&cb)
@success_cb = cb
end
def stop_bot(condition)
_add_subtask('stop') do |subtask_cb|
case condition
when :when_finished
# try to stop gracefully
@bot.try_stop
when :at_once
# it won't wait for anything to finish…
@bot.stop
end
subtask_cb.call(true)
end
end
protected
def _add_subtask(name, cb = nil, &block)
# TODO: create a Subtask object, with the following attributes, plus the subtask_cb
# and pass it to the block instead of the subtask_cb, to allow easier
# manipulation of errors/results/... and being more extensible
subtask = {
:name => name,
:done => false,
:results => {},
:errors => []
}
# can be called multiple times with stop=false to collect result
# and stop=true will consider it stopper
subtask_cb = Proc.new do |stop, results|
if subtask[:done]
subtask[:error] = CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@name}': subtask should have ended, but it lied")
else
if results.is_a? CyberError
subtask[:errors] << results
elsif results.is_a? Hash
subtask[:results].merge results
elsif not results.nil? # nil just mean no result
subtask[:errors] << CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@name}': unknown result for subtask")
end
if stop
subtask[:done] = true
_check_finished
end
end
end
subtask[:block] = Proc.new do
begin
block.call subtask_cb
rescue
msg = "Task '#{@name}': error: " + $!.to_s
logger.error msg
subtask[:errors] << CyberError.new(:unrecoverable, "botnet/client/dsl", msg)
subtask[:done] = true
end
end
@subtasks << subtask
nil
end
def _subtasks_finished?
@subtasks.each do |subtask|
return false unless subtask[:done]
end
return true
end
def _check_finished
return unless _subtasks_finished?
error = false
next_step = {
:errors = {},
:results = {}
}
@subtasks.each do |subtask|
# z
end
end
def _notification_criterias_match(msg, criterias)
criterias.each_pair do |key, expected_val|
val = msg[key]
if expected_val.is_a? Regexp
return false if val !~ expected_val
else
return false if val != expected_val
end
end
true
end
def _start_dsl
return false if @cancel_run
@subtasks.each do |subtask|
# skip broken or canceled subtasks
next if subtask[:done]
subtask[:block].call
end
true
end
end
end
end

Also available in: Unified diff