Project

General

Profile

« Previous | Next » 

Revision 429ebb9c

Added by Marc Dequènes almost 14 years ago

  • ID 429ebb9c1a44108e2da28316170ad4345617a58d

[evol] BotnetTask DSL: now that we have one Task per task, close conversation thread as soon as the task is finished

View differences:

lib/cyborghood/cyborg/botnet.rb
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++
require 'cyborghood/cyborg'
require 'cyborghood/cyborg/botnet/interface'
require 'cyborghood/cyborg/botnet/conversation'
require 'cyborghood/cyborg/botnet/dsl'
......
end
end
def contact_peer(peer, block)
if @comm_list.has_key? peer
block.call @comm_list[peer]
def contact_peer(peer, block, dont_open_new_connection = false)
return block.call @comm_list[peer] if @comm_list.has_key? peer
return if dont_open_new_connection
if @comm_list_attempt.has_key? peer
@comm_list_attempt[peer] << block
else
if @comm_list_attempt.has_key? peer
@comm_list_attempt[peer] << block
else
@comm_list_attempt[peer] = [block]
# demultiplex callbacks
callback = proc do |conversation|
block_list = @comm_list_attempt[peer]
# purge list at once to avoid races
@comm_list_attempt.delete(peer)
block_list.each do |block|
block.call conversation
end
end
begin
yield(callback)
rescue
# TODO: retry (wait_timer + recursive call + counter)
block.call false
@comm_list_attempt[peer] = [block]
# demultiplex callbacks
callback = proc do |conversation|
block_list = @comm_list_attempt[peer]
# purge list at once to avoid races
@comm_list_attempt.delete(peer)
block_list.each do |block|
block.call conversation
end
end
begin
yield(callback)
rescue
# TODO: retry (wait_timer + recursive call + counter)
block.call false
end
end
end
lib/cyborghood/cyborg/botnet/backend/unix.rb
File.join(Config::RUN_DIR, peer.downcase + ".sock")
end
def contact_peer(peer, &block)
super(peer, block) do |callback|
def contact_peer(peer, dont_open_new_connection = false, &block)
super(peer, block, dont_open_new_connection) do |callback|
EventMachine.connect_unix_domain(peer_socket(peer), Conversation, self, callback)
end
end
lib/cyborghood/cyborg/botnet/conversation.rb
# don't forget to unlock it when it is not needed anymore
def thread(name = 'default')
th = @conv_threads[name] || new_thread(name)
if block_given?
th.lock
lib/cyborghood/cyborg/botnet/dsl.rb
if conv
logger.debug "Task '#{@name}': Peer '#{peer}' contacted, starting conversation"
@peer_contacted << peer
# don't use the block call to leave the conversation thread open
conv_thread = conv.thread(@notification_name)
conv_thread.call(cmd, *args) do |reply|
......
end
end
end
end
Task.class_eval do
include BotnetTask
def _setup
super
@peer_contacted = Set.new
end
def _finished
super
cb = Proc.new do |conv|
conv.close_thread(@notification_name)
end
@peer_contacted.each do |peer|
@bot.contact_peer(peer, true, &cb)
end
end
end
Task.class_eval("include BotnetTask")
end
end
lib/cyborghood/cyborg/dsl.rb
reveal :logger
end
class Task < BaseDSL
class TaskBase < BaseDSL
attr_reader :bot, :name, :errors, :results
@@task_wip = 0
......
@notification_name = "task/#{@name}"
_setup
_start_dsl &block
end
def run_dsl
end
# may return a Hash of results
def schedule(&job)
_add_subtask("job/#{job.hash}") do |subtask|
......
if cb
_start_dsl(&cb)
else
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
_finished
end
end
......
true
end
def _setup
logger.debug "Task '#{@name}': created"
end
def _start_dsl(&block)
@dsl_runing = false
@subtasks = []
......
true
end
def _finished
@@task_wip -= 1
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
end
end
# this empty class is used as a trick to be able to inject features
# as module.prepend does not exist yet
class Task < TaskBase
end
end
end

Also available in: Unified diff