Project

General

Profile

« Previous | Next » 

Revision 9f407d69

Added by Marc Dequènes about 13 years ago

  • ID 9f407d69de35c34fa55382f4a137b1cec6e739af

[evol] Botnet/Task: added 'ask :self' support (useful for Clerk batch processing)

View differences:

lib/cyborghood/cyborg/botnet/task.rb
def ask(peer, key, cmd, *args)
action_name = ['ask', cmd, *args].hash
_add_subtask_using_peer(action_name, peer) do |subtask, conv_thread, env|
cmd = cmd.call if cmd.is_a? Proc
args.collect!{|a| a.is_a?(Proc) ? a.call : a }
conv_thread.call(env, cmd, *args) do |reply|
case reply[:status]
when :ok
subtask.results = {key => reply[:result]}
when :decline
# TODO: remove this case ???
when :error
subtask.errors += reply[:exceptions]
end
subtask.finish
_add_subtask_using_peer(action_name, key, peer, cmd, args) do |conv_thread, env, cmd, args, reply_cb|
if peer == :self
@bot.interface._call(nil, env, cmd, args, &reply_cb)
else
conv_thread.call(env, cmd, *args, &reply_cb)
end
end
end
......
def know?(peer, key, cmd)
action_name = ['know', cmd].hash
_add_subtask_using_peer(action_name, peer) do |subtask, conv_thread, env|
cmd = cmd.call if cmd.is_a? Proc
conv_thread.exists?(env, cmd) do |reply|
case reply[:status]
when :ok
subtask.results = {key => reply[:result]}
when :decline
# TODO: remove this case ???
when :error
subtask.errors += reply[:exceptions]
end
subtask.finish
_add_subtask_using_peer(action_name, key, peer, cmd, args) do |conv_thread, env, cmd, args, reply_cb|
if peer == :self
@bot.interface._is_node?(nil, env, cmd, &reply_cb)
else
conv_thread.exists?(env, cmd, &reply_cb)
end
end
end
def _add_subtask_using_peer(action_name, peer)
def _add_subtask_using_peer(action_name, key, peer, cmd, args = nil, &cmd_cb)
subtask_name = "botnet/peer/#{peer}/#{action_name}/out"
_add_subtask(subtask_name) do |subtask|
logger.debug "Task '#{@name}': Trying to contact peer '#{peer}'"
if peer == :self
logger.debug "Task '#{@name}': Talking to oneself"
else
logger.debug "Task '#{@name}': Trying to contact peer '#{peer}'"
end
# callback to end peer action and subtask
# (used when shooting the task)
......
registered_resources[subtask_name] = defuse_peer_action_cb
end
cmd = cmd.call if cmd.is_a? Proc
args.collect!{|a| a.is_a?(Proc) ? a.call : a }
# catch current environment to transmit it
env = {
:preferred_locales => @preferred_locales,
:user => @user
}
@bot.contact_peer(peer) do |conv|
if conv
logger.debug "Task '#{@name}': subtask '#{subtask.name}': peer '#{peer}' contacted, starting conversation"
reply_cb = Proc.new do |reply|
case reply[:status]
when :ok
subtask.results = {key => reply[:result]}
when :decline
# TODO: remove this case ???
when :error
subtask.errors += reply[:exceptions]
end
@peer_contacted << peer
subtask.finish
end
# don't use the block call to leave the conversation thread open
conv_thread = conv.thread(@notification_name)
if peer == :self
_run_subtask_using_peer_local(subtask, env, cmd, args, cmd_cb, reply_cb)
else
_run_subtask_using_peer_remote(subtask, env, peer, cmd, args, cmd_cb, reply_cb)
end
end
end
yield(subtask, conv_thread, env)
else
logger.debug "Task '#{@name}': Could not contact peer '#{peer}'"
subtask.errors << CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@name}': could not contact peer '#{peer}'")
subtask.finish
end
def _run_subtask_using_peer_remote(subtask, env, peer, cmd, args, cmd_cb, reply_cb)
@bot.contact_peer(peer) do |conv|
if conv
logger.debug "Task '#{@name}': subtask '#{subtask.name}': peer '#{peer}' contacted, starting conversation"
@peer_contacted << peer
# don't use the block call to leave the conversation thread open
conv_thread = conv.thread(@notification_name)
cmd_cb.call(conv_thread, env, cmd, args, reply_cb)
else
logger.debug "Task '#{@name}': Could not contact peer '#{peer}'"
subtask.errors << CyberError.new(:unrecoverable, "botnet/client/dsl", "Task '#{@name}': could not contact peer '#{peer}'")
subtask.finish
end
end
end
def _run_subtask_using_peer_local(subtask, env, cmd, args, cmd_cb, reply_cb)
process_result_cb = Proc.new do |reply|
# reformat reply
new_reply = {
:status => :ok,
:result => reply[:results]
}
reply_cb.call new_reply
end
begin
cmd_cb.call(nil, env, cmd, args, process_result_cb)
rescue CyberError => e
subtask.errors << e
subtask.finish
rescue
subtask.errors << CyberError.new(:unrecoverable, 'unknown', $!.message)
subtask.finish
end
end
def _setup
super

Also available in: Unified diff