Project

General

Profile

Download (5.19 KB) Statistics
| Branch: | Tag: | Revision:
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2011 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++


module CyborgHood
module DSL
module BotnetTask
def ask(peer, key, cmd, *args)
action_name = ['ask', cmd, *args].hash

_add_subtask_using_peer(action_name, key, peer, cmd, args) do |conv_thread, env, cmd, args, reply_cb|
if peer == :self
@bot.interface._call(nil, env, cmd, args, &reply_cb)
else
conv_thread.call(env, cmd, *args, &reply_cb)
end
end
end

def know?(peer, key, cmd)
action_name = ['know', cmd].hash

_add_subtask_using_peer(action_name, key, peer, cmd, args) do |conv_thread, env, cmd, args, reply_cb|
if peer == :self
@bot.interface._is_node?(nil, env, cmd, &reply_cb)
else
conv_thread.exists?(env, cmd, &reply_cb)
end
end
end

def _add_subtask_using_peer(action_name, key, peer, cmd, args = nil, &cmd_cb)
subtask_name = "botnet/peer/#{peer}/#{action_name}/out"

_add_subtask(subtask_name) do |subtask|
if peer == :self
logger.debug "Task '#{@name}': Talking to oneself"
else
logger.debug "Task '#{@name}': Trying to contact peer '#{peer}'"
end

# callback to end peer action and subtask
# (used when shooting the task)
defuse_peer_action_cb = Proc.new do
logger.debug "Task '#{@name}': defusing subtask '#{subtask.name}'"

subtask.finish unless subtask.finished?

# we already own the mutex here
registered_resources.delete(subtask_name)
end
# register peer action in the task
tasks_info_mutex.synchronize do
registered_resources[subtask_name] = defuse_peer_action_cb
end

cmd = cmd.call if cmd.is_a? Proc
args.collect!{|a| a.is_a?(Proc) ? a.call : a }

# catch current environment to transmit it
env = {
:preferred_locales => @preferred_locales,
:user => @user
}

reply_cb = Proc.new do |reply|
case reply[:status]
when :ok
subtask.results = {key => reply[:result]}
when :decline
# TODO: remove this case ???
when :error
subtask.errors += reply[:exceptions]
end

subtask.finish
end

if peer == :self
_run_subtask_using_peer_local(subtask, env, cmd, args, cmd_cb, reply_cb)
else
_run_subtask_using_peer_remote(subtask, env, peer, cmd, args, cmd_cb, reply_cb)
end
end
end

def _run_subtask_using_peer_remote(subtask, env, peer, cmd, args, cmd_cb, reply_cb)
@bot.contact_peer(peer) do |conv|
if conv
logger.debug "Task '#{@name}': subtask '#{subtask.name}': peer '#{peer}' contacted, starting conversation"

@peer_contacted << peer

# don't use the block call to leave the conversation thread open
conv_thread = conv.thread(@notification_name)

cmd_cb.call(conv_thread, env, cmd, args, reply_cb)
else
logger.debug "Task '#{@name}': Could not contact peer '#{peer}'"
subtask.errors << CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@name}': could not contact peer '#{peer}'")
subtask.finish
end
end
end

def _run_subtask_using_peer_local(subtask, env, cmd, args, cmd_cb, reply_cb)
process_result_cb = Proc.new do |reply|
# reformat reply
new_reply = {
:status => :ok,
:result => reply[:results]
}

reply_cb.call new_reply
end

begin
cmd_cb.call(nil, env, cmd, args, process_result_cb)
rescue CyberError => e
subtask.errors << e
subtask.finish
rescue
subtask.errors << CyberError.new(:unrecoverable, 'unknown', $!.message)
subtask.finish
end
end

def _setup
super

@peer_contacted = Set.new
end

def _finished
super

# close opened thread
cb = Proc.new do |conv|
conv.close_thread(@notification_name)
end

# loop on all contacted peers to close the thread
@peer_contacted.each do |peer|
@bot.contact_peer(peer, true, &cb)
end
end
end

Task.class_eval("include BotnetTask")
end
end
(5-5/5)