Project

General

Profile

« Previous | Next » 

Revision fd4ccbdd

Added by Marc Dequènes over 13 years ago

  • ID fd4ccbddaa0989a06add8eac41dba03823d784b2

[evol] preliminary work for Conversation synchronization in DSL (action callbacks had to be moved from Protocol to ConversationThread to be able to check idle with the new locks, needed to process notifications without falling idle)

View differences:

bin/test_client
on_success do
pp "Hop!"
pp reply
send_notification 'meetpoint', { :topic => "MYNOTIF", :msg => "plop" }
end
end
end
......
on_success do
puts "Librarian GoGoGo!"
pp reply
wait_notification 'meetpoint', { :topic => "MYNOTIF" }
on_success do
puts "NOTIF!"
pp reply
end
end
end
end
lib/cyborghood/cyborg/botnet_dsl.rb
@success_cb = nil
@calls = {}
@calls_reply = {}
@notifications = {}
@notifications_awaiting = 0
@notifications_received = 0
@final_reply = {
:results => {},
:errors => {}
:errors => {},
:notifications => {}
}
@notification_name = "task/#{@name}"
end
def on_error(&callback)
......
}
end
def send_notification(name, data)
name = @notification_name if name == :thread
Logger.instance.debug "Sending notification to '#{name}'"
@conversation.bot.get_channel(name) << data
end
# TODO: implement settable timeout
def wait_notification(name, criterias = {})
name = @notification_name if name == :thread
@notifications[name] ||= []
@notifications[name] << criterias
@notifications_awaiting += 1
end
protected
def _start_dsl
return if @calls.empty? and @notifications.empty?
@conversation.thread(@name) do |conv_thread|
conv_thread.lock(@name)
@calls.each_pair do |key, data|
conv_thread.call(data[:cmd], *data[:args]) do |reply|
@calls_reply[key] = reply[:status]
......
@final_reply[:errors][key] = reply[:exception]
end
check_finished
_check_finished(conv_thread)
end
end
@notifications.each_pair do |name, list|
subcription_id = @conversation.bot.get_channel(name).subscribe do |msg|
old_notifications_received = @notifications_received
list.each do |criterias|
if _notification_criterias_match(msg, criterias)
@final_reply[:notifications][name] ||= []
@final_reply[:notifications][name] << msg
@notifications_received += 1
# do not break, the same message may satisfy multiple criterias
end
end
if @final_reply[:notifications][name].size == @notifications[name].size
@conversation.bot.get_channel(name).unsubscribe(subcription_id)
end
_check_finished(conv_thread) if @notifications_received > old_notifications_received
end
end
end
end
def check_finished
return unless @calls.size == @calls_reply.size
def _notification_criterias_match(msg, criterias)
criterias.each_pair do |key, expected_val|
val = msg[key]
if expected_val.is_a? Regexp
return false if val !~ expected_val
else
return false if val != expected_val
end
end
true
end
def _check_finished(conv_thread)
return unless @calls.size == @calls_reply.size and
@notifications_received == @notifications_awaiting
conv_thread.unlock(@name)
# process reply in the same thread, but with a new DSL Thread context
cb = @calls_reply.values.index(:error) ? @error_cb : @success_cb
lib/cyborghood/cyborg/conversation.rb
# no need for session for system thread
@session = Session.new unless name == 'system'
@next_action_id = 0
@callbacks = {}
@locks = Set.new
end
def new_message(action_code, parameters = nil, action_id = nil)
......
def call(*args, &callback)
@conversation.protocol.send_request_call(self, *args, &callback)
end
def register_callback(message, callback)
@callbacks[message.action_id] = callback
end
def pop_callback(message)
cb = @callbacks[message.action_id]
yield cb if block_given?
@callbacks.delete(message.action_id)
check_idle
cb
end
def lock(name)
@locks << name
end
def unlock(name)
@locks.delete(name)
check_idle
end
def locked?
not @locks.empty?
end
def idle?
@callbacks.empty? and @locks.empty?
end
protected
def check_idle
return unless idle?
@conversation.bot.get_channel("peer/#{@conversation.identifier}/system") << {
:topic => 'THREAD IDLE',
:thread => @name
}
end
end
class Message
......
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot reply to a newly created message") if self.new?
self.class.new(@conv_thread, action_code, parameters, @action_id)
end
# convenience method
def pop_callback(&block)
@conv_thread.pop_callback(self, &block)
end
# convenience method
def register_callback(callback)
@conv_thread.register_callback(self, callback)
end
end
class Conversation < EventMachine::Protocols::LineAndTextProtocol
lib/cyborghood/cyborg/protocol.rb
@negociation_received = false
@negociation_sent = false
@negociation_ok = false
@action_followup = {}
end
def negociation_ok?
......
method = "receive_" + message.action_code.downcase.tr(" ", "_")
if respond_to? method
send(method, message)
check_thread_idle(message.conv_thread)
else
send_error_protocol "unknown action"
end
......
end
def receive_error_action(message)
cb = pop_callback(message)
if cb
error = message.action_parameters[:error]
exception = CyberError.new(error[:severity], error[:category], error[:message])
cb.call({:status => :error, :exception => exception})
else
send_error_protocol("received reply for unknown action")
message.pop_callback do |cb|
if cb
error = message.action_parameters[:error]
exception = CyberError.new(error[:severity], error[:category], error[:message])
cb.call({:status => :error, :exception => exception})
else
send_error_protocol("received reply for unknown action")
end
end
end
......
end
def receive_reply_decline(message)
cb = pop_callback(message)
if cb
cb.call({:status => :decline, :reason => message.action_parameters[:reason]})
else
send_error_protocol("received reply for unknown action")
message.pop_callback do |cb|
if cb
cb.call({:status => :decline, :reason => message.action_parameters[:reason]})
else
send_error_protocol("received reply for unknown action")
end
end
end
def receive_reply_result(message)
cb = pop_callback(message)
if cb
cb.call({:status => :ok, :result => message.action_parameters[:result]})
else
send_error_protocol("received reply for unknown action")
message.pop_callback do |cb|
if cb
cb.call({:status => :ok, :result => message.action_parameters[:result]})
else
send_error_protocol("received reply for unknown action")
end
end
end
......
def send_request_call(conv_thread, node, *parameters, &callback)
message = conv_thread.new_message("REQUEST CALL", { :node => node, :parameters => parameters }).send
register_callback(message, callback)
message.register_callback(callback)
end
def send_request_exists(conv_thread, node)
message = conv_thread.new_message("REQUEST EXISTS", { :node => node }).send
register_callback(message, callback)
message.register_callback(callback)
end
def send_request_describe(conv_thread, node)
message = conv_thread.new_message("REQUEST DESCRIBE", { :node => node }).send
register_callback(message, callback)
message.register_callback(callback)
end
def send_error_protocol(error, fatal = false)
......
@conversation.thread('system').new_message("QUIT LEAVING").send
end
end
protected
def register_callback(message, callback)
@action_followup[message.conv_thread.id] ||= {}
@action_followup[message.conv_thread.id][message.action_id] = callback
end
def pop_callback(message)
return nil unless @action_followup.has_key? message.conv_thread.id
return nil unless @action_followup[message.conv_thread.id].has_key? message.action_id
cb = @action_followup[message.conv_thread.id][message.action_id]
@action_followup[message.conv_thread.id].delete message.action_id
@action_followup.delete message.conv_thread.id if @action_followup[message.conv_thread.id].empty?
cb
end
def check_thread_idle(conv_thread)
return if @action_followup.has_key? conv_thread.id
@conversation.bot.get_channel("peer/#{@conversation.identifier}/system") << {
:topic => 'THREAD IDLE',
:thread => conv_thread.name
}
end
end
end

Also available in: Unified diff