Revision 429ebb9c
Added by Marc Dequènes about 14 years ago
- ID 429ebb9c1a44108e2da28316170ad4345617a58d
lib/cyborghood/cyborg/botnet.rb | ||
---|---|---|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
#++
|
||
|
||
require 'cyborghood/cyborg'
|
||
require 'cyborghood/cyborg/botnet/interface'
|
||
require 'cyborghood/cyborg/botnet/conversation'
|
||
require 'cyborghood/cyborg/botnet/dsl'
|
||
... | ... | |
end
|
||
end
|
||
|
||
def contact_peer(peer, block)
|
||
if @comm_list.has_key? peer
|
||
block.call @comm_list[peer]
|
||
def contact_peer(peer, block, dont_open_new_connection = false)
|
||
return block.call @comm_list[peer] if @comm_list.has_key? peer
|
||
|
||
return if dont_open_new_connection
|
||
|
||
if @comm_list_attempt.has_key? peer
|
||
@comm_list_attempt[peer] << block
|
||
else
|
||
if @comm_list_attempt.has_key? peer
|
||
@comm_list_attempt[peer] << block
|
||
else
|
||
@comm_list_attempt[peer] = [block]
|
||
|
||
# demultiplex callbacks
|
||
callback = proc do |conversation|
|
||
block_list = @comm_list_attempt[peer]
|
||
# purge list at once to avoid races
|
||
@comm_list_attempt.delete(peer)
|
||
|
||
block_list.each do |block|
|
||
block.call conversation
|
||
end
|
||
end
|
||
begin
|
||
yield(callback)
|
||
rescue
|
||
# TODO: retry (wait_timer + recursive call + counter)
|
||
block.call false
|
||
@comm_list_attempt[peer] = [block]
|
||
|
||
# demultiplex callbacks
|
||
callback = proc do |conversation|
|
||
block_list = @comm_list_attempt[peer]
|
||
# purge list at once to avoid races
|
||
@comm_list_attempt.delete(peer)
|
||
|
||
block_list.each do |block|
|
||
block.call conversation
|
||
end
|
||
end
|
||
begin
|
||
yield(callback)
|
||
rescue
|
||
# TODO: retry (wait_timer + recursive call + counter)
|
||
block.call false
|
||
end
|
||
end
|
||
end
|
||
|
lib/cyborghood/cyborg/botnet/backend/unix.rb | ||
---|---|---|
File.join(Config::RUN_DIR, peer.downcase + ".sock")
|
||
end
|
||
|
||
def contact_peer(peer, &block)
|
||
super(peer, block) do |callback|
|
||
def contact_peer(peer, dont_open_new_connection = false, &block)
|
||
super(peer, block, dont_open_new_connection) do |callback|
|
||
EventMachine.connect_unix_domain(peer_socket(peer), Conversation, self, callback)
|
||
end
|
||
end
|
lib/cyborghood/cyborg/botnet/conversation.rb | ||
---|---|---|
# don't forget to unlock it when it is not needed anymore
|
||
def thread(name = 'default')
|
||
th = @conv_threads[name] || new_thread(name)
|
||
|
||
if block_given?
|
||
th.lock
|
||
|
lib/cyborghood/cyborg/botnet/dsl.rb | ||
---|---|---|
if conv
|
||
logger.debug "Task '#{@name}': Peer '#{peer}' contacted, starting conversation"
|
||
|
||
@peer_contacted << peer
|
||
|
||
# don't use the block call to leave the conversation thread open
|
||
conv_thread = conv.thread(@notification_name)
|
||
conv_thread.call(cmd, *args) do |reply|
|
||
... | ... | |
end
|
||
end
|
||
end
|
||
end
|
||
|
||
Task.class_eval do
|
||
include BotnetTask
|
||
def _setup
|
||
super
|
||
|
||
@peer_contacted = Set.new
|
||
end
|
||
|
||
def _finished
|
||
super
|
||
|
||
cb = Proc.new do |conv|
|
||
conv.close_thread(@notification_name)
|
||
end
|
||
|
||
@peer_contacted.each do |peer|
|
||
@bot.contact_peer(peer, true, &cb)
|
||
end
|
||
end
|
||
end
|
||
|
||
Task.class_eval("include BotnetTask")
|
||
end
|
||
end
|
lib/cyborghood/cyborg/dsl.rb | ||
---|---|---|
reveal :logger
|
||
end
|
||
|
||
class Task < BaseDSL
|
||
class TaskBase < BaseDSL
|
||
attr_reader :bot, :name, :errors, :results
|
||
|
||
@@task_wip = 0
|
||
... | ... | |
|
||
@notification_name = "task/#{@name}"
|
||
|
||
_setup
|
||
_start_dsl &block
|
||
end
|
||
|
||
def run_dsl
|
||
end
|
||
|
||
# may return a Hash of results
|
||
def schedule(&job)
|
||
_add_subtask("job/#{job.hash}") do |subtask|
|
||
... | ... | |
if cb
|
||
_start_dsl(&cb)
|
||
else
|
||
@@task_wip -= 1
|
||
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
|
||
_finished
|
||
end
|
||
end
|
||
|
||
... | ... | |
true
|
||
end
|
||
|
||
def _setup
|
||
logger.debug "Task '#{@name}': created"
|
||
end
|
||
|
||
def _start_dsl(&block)
|
||
@dsl_runing = false
|
||
@subtasks = []
|
||
... | ... | |
|
||
true
|
||
end
|
||
|
||
def _finished
|
||
@@task_wip -= 1
|
||
logger.debug "Task '#{@name}': finished (#{@@task_wip} remaining)"
|
||
end
|
||
end
|
||
|
||
# this empty class is used as a trick to be able to inject features
|
||
# as module.prepend does not exist yet
|
||
class Task < TaskBase
|
||
end
|
||
end
|
||
end
|
Also available in: Unified diff
[evol] BotnetTask DSL: now that we have one Task per task, close conversation thread as soon as the task is finished