Revision fd4ccbdd
Added by Marc Dequènes about 14 years ago
- ID fd4ccbddaa0989a06add8eac41dba03823d784b2
bin/test_client | ||
---|---|---|
on_success do
|
||
pp "Hop!"
|
||
pp reply
|
||
send_notification 'meetpoint', { :topic => "MYNOTIF", :msg => "plop" }
|
||
end
|
||
end
|
||
end
|
||
... | ... | |
on_success do
|
||
puts "Librarian GoGoGo!"
|
||
pp reply
|
||
wait_notification 'meetpoint', { :topic => "MYNOTIF" }
|
||
on_success do
|
||
puts "NOTIF!"
|
||
pp reply
|
||
end
|
||
end
|
||
end
|
||
end
|
lib/cyborghood/cyborg/botnet_dsl.rb | ||
---|---|---|
@success_cb = nil
|
||
@calls = {}
|
||
@calls_reply = {}
|
||
@notifications = {}
|
||
@notifications_awaiting = 0
|
||
@notifications_received = 0
|
||
|
||
@final_reply = {
|
||
:results => {},
|
||
:errors => {}
|
||
:errors => {},
|
||
:notifications => {}
|
||
}
|
||
|
||
@notification_name = "task/#{@name}"
|
||
end
|
||
|
||
def on_error(&callback)
|
||
... | ... | |
}
|
||
end
|
||
|
||
def send_notification(name, data)
|
||
name = @notification_name if name == :thread
|
||
Logger.instance.debug "Sending notification to '#{name}'"
|
||
@conversation.bot.get_channel(name) << data
|
||
end
|
||
|
||
# TODO: implement settable timeout
|
||
def wait_notification(name, criterias = {})
|
||
name = @notification_name if name == :thread
|
||
@notifications[name] ||= []
|
||
@notifications[name] << criterias
|
||
@notifications_awaiting += 1
|
||
end
|
||
|
||
protected
|
||
|
||
def _start_dsl
|
||
return if @calls.empty? and @notifications.empty?
|
||
|
||
@conversation.thread(@name) do |conv_thread|
|
||
conv_thread.lock(@name)
|
||
|
||
@calls.each_pair do |key, data|
|
||
conv_thread.call(data[:cmd], *data[:args]) do |reply|
|
||
@calls_reply[key] = reply[:status]
|
||
... | ... | |
@final_reply[:errors][key] = reply[:exception]
|
||
end
|
||
|
||
check_finished
|
||
_check_finished(conv_thread)
|
||
end
|
||
end
|
||
|
||
@notifications.each_pair do |name, list|
|
||
subcription_id = @conversation.bot.get_channel(name).subscribe do |msg|
|
||
old_notifications_received = @notifications_received
|
||
list.each do |criterias|
|
||
if _notification_criterias_match(msg, criterias)
|
||
@final_reply[:notifications][name] ||= []
|
||
@final_reply[:notifications][name] << msg
|
||
@notifications_received += 1
|
||
# do not break, the same message may satisfy multiple criterias
|
||
end
|
||
end
|
||
if @final_reply[:notifications][name].size == @notifications[name].size
|
||
@conversation.bot.get_channel(name).unsubscribe(subcription_id)
|
||
end
|
||
_check_finished(conv_thread) if @notifications_received > old_notifications_received
|
||
end
|
||
end
|
||
end
|
||
end
|
||
|
||
def check_finished
|
||
return unless @calls.size == @calls_reply.size
|
||
def _notification_criterias_match(msg, criterias)
|
||
criterias.each_pair do |key, expected_val|
|
||
val = msg[key]
|
||
if expected_val.is_a? Regexp
|
||
return false if val !~ expected_val
|
||
else
|
||
return false if val != expected_val
|
||
end
|
||
end
|
||
true
|
||
end
|
||
|
||
def _check_finished(conv_thread)
|
||
return unless @calls.size == @calls_reply.size and
|
||
@notifications_received == @notifications_awaiting
|
||
|
||
conv_thread.unlock(@name)
|
||
|
||
# process reply in the same thread, but with a new DSL Thread context
|
||
cb = @calls_reply.values.index(:error) ? @error_cb : @success_cb
|
lib/cyborghood/cyborg/conversation.rb | ||
---|---|---|
# no need for session for system thread
|
||
@session = Session.new unless name == 'system'
|
||
@next_action_id = 0
|
||
|
||
@callbacks = {}
|
||
@locks = Set.new
|
||
end
|
||
|
||
def new_message(action_code, parameters = nil, action_id = nil)
|
||
... | ... | |
def call(*args, &callback)
|
||
@conversation.protocol.send_request_call(self, *args, &callback)
|
||
end
|
||
|
||
def register_callback(message, callback)
|
||
@callbacks[message.action_id] = callback
|
||
end
|
||
|
||
def pop_callback(message)
|
||
cb = @callbacks[message.action_id]
|
||
yield cb if block_given?
|
||
@callbacks.delete(message.action_id)
|
||
check_idle
|
||
cb
|
||
end
|
||
|
||
def lock(name)
|
||
@locks << name
|
||
end
|
||
|
||
def unlock(name)
|
||
@locks.delete(name)
|
||
check_idle
|
||
end
|
||
|
||
def locked?
|
||
not @locks.empty?
|
||
end
|
||
|
||
def idle?
|
||
@callbacks.empty? and @locks.empty?
|
||
end
|
||
|
||
protected
|
||
|
||
def check_idle
|
||
return unless idle?
|
||
|
||
@conversation.bot.get_channel("peer/#{@conversation.identifier}/system") << {
|
||
:topic => 'THREAD IDLE',
|
||
:thread => @name
|
||
}
|
||
end
|
||
end
|
||
|
||
class Message
|
||
... | ... | |
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot reply to a newly created message") if self.new?
|
||
self.class.new(@conv_thread, action_code, parameters, @action_id)
|
||
end
|
||
|
||
# convenience method
|
||
def pop_callback(&block)
|
||
@conv_thread.pop_callback(self, &block)
|
||
end
|
||
|
||
# convenience method
|
||
def register_callback(callback)
|
||
@conv_thread.register_callback(self, callback)
|
||
end
|
||
end
|
||
|
||
class Conversation < EventMachine::Protocols::LineAndTextProtocol
|
lib/cyborghood/cyborg/protocol.rb | ||
---|---|---|
@negociation_received = false
|
||
@negociation_sent = false
|
||
@negociation_ok = false
|
||
|
||
@action_followup = {}
|
||
end
|
||
|
||
def negociation_ok?
|
||
... | ... | |
method = "receive_" + message.action_code.downcase.tr(" ", "_")
|
||
if respond_to? method
|
||
send(method, message)
|
||
check_thread_idle(message.conv_thread)
|
||
else
|
||
send_error_protocol "unknown action"
|
||
end
|
||
... | ... | |
end
|
||
|
||
def receive_error_action(message)
|
||
cb = pop_callback(message)
|
||
if cb
|
||
error = message.action_parameters[:error]
|
||
exception = CyberError.new(error[:severity], error[:category], error[:message])
|
||
cb.call({:status => :error, :exception => exception})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
message.pop_callback do |cb|
|
||
if cb
|
||
error = message.action_parameters[:error]
|
||
exception = CyberError.new(error[:severity], error[:category], error[:message])
|
||
cb.call({:status => :error, :exception => exception})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
end
|
||
end
|
||
end
|
||
|
||
... | ... | |
end
|
||
|
||
def receive_reply_decline(message)
|
||
cb = pop_callback(message)
|
||
if cb
|
||
cb.call({:status => :decline, :reason => message.action_parameters[:reason]})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
message.pop_callback do |cb|
|
||
if cb
|
||
cb.call({:status => :decline, :reason => message.action_parameters[:reason]})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
end
|
||
end
|
||
end
|
||
|
||
def receive_reply_result(message)
|
||
cb = pop_callback(message)
|
||
if cb
|
||
cb.call({:status => :ok, :result => message.action_parameters[:result]})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
message.pop_callback do |cb|
|
||
if cb
|
||
cb.call({:status => :ok, :result => message.action_parameters[:result]})
|
||
else
|
||
send_error_protocol("received reply for unknown action")
|
||
end
|
||
end
|
||
end
|
||
|
||
... | ... | |
|
||
def send_request_call(conv_thread, node, *parameters, &callback)
|
||
message = conv_thread.new_message("REQUEST CALL", { :node => node, :parameters => parameters }).send
|
||
register_callback(message, callback)
|
||
message.register_callback(callback)
|
||
end
|
||
|
||
def send_request_exists(conv_thread, node)
|
||
message = conv_thread.new_message("REQUEST EXISTS", { :node => node }).send
|
||
register_callback(message, callback)
|
||
message.register_callback(callback)
|
||
end
|
||
|
||
def send_request_describe(conv_thread, node)
|
||
message = conv_thread.new_message("REQUEST DESCRIBE", { :node => node }).send
|
||
register_callback(message, callback)
|
||
message.register_callback(callback)
|
||
end
|
||
|
||
def send_error_protocol(error, fatal = false)
|
||
... | ... | |
@conversation.thread('system').new_message("QUIT LEAVING").send
|
||
end
|
||
end
|
||
|
||
protected
|
||
|
||
def register_callback(message, callback)
|
||
@action_followup[message.conv_thread.id] ||= {}
|
||
@action_followup[message.conv_thread.id][message.action_id] = callback
|
||
end
|
||
|
||
def pop_callback(message)
|
||
return nil unless @action_followup.has_key? message.conv_thread.id
|
||
return nil unless @action_followup[message.conv_thread.id].has_key? message.action_id
|
||
cb = @action_followup[message.conv_thread.id][message.action_id]
|
||
|
||
@action_followup[message.conv_thread.id].delete message.action_id
|
||
@action_followup.delete message.conv_thread.id if @action_followup[message.conv_thread.id].empty?
|
||
|
||
cb
|
||
end
|
||
|
||
def check_thread_idle(conv_thread)
|
||
return if @action_followup.has_key? conv_thread.id
|
||
@conversation.bot.get_channel("peer/#{@conversation.identifier}/system") << {
|
||
:topic => 'THREAD IDLE',
|
||
:thread => conv_thread.name
|
||
}
|
||
end
|
||
end
|
||
end
|
Also available in: Unified diff
[evol] preliminary work for Conversation synchronization in DSL (action callbacks had to be moved from Protocol to ConversationThread to be able to check idle with the new locks, needed to process notifications without falling idle)