Project

General

Profile

Download (9.54 KB) Statistics
| Branch: | Tag: | Revision:
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

# Note: Event machine core is mono-thread, so this code is not threadsafe.
# Only long tasks will be run in threads, which do not care about all this.


module CyborgHood
class BotProtocol
VERSION = "0.1"
CAPABILITIES = []

@@request_callback = proc do |result|
protocol = result[:reply_message].conv_thread.conversation.protocol
protocol.process_request_result(result)
end

def initialize(conversation)
@conversation = conversation

@negociation_received = false
@negociation_sent = false
@negociation_ok = false
end

def negociation_ok?
@negociation_ok
end

def process_received_message(message)
method = "receive_" + message.action_code.downcase.tr(" ", "_")
if respond_to? method
send(method, message)
else
send_error_protocol "unknown action"
end
end

def process_request_result(result)
if result[:error]
send_error_action(result[:reply_message], result[:error])
else
send_reply_result(result[:reply_message], result[:action_result])
end
end

def receive_announce_helo(message)
unless message.conv_thread.id == 0
return send_quit_decline "bad negociation"
end
if message.action_parameters.nil?
return send_quit_decline "missing parameters"
end
unless message.action_parameters[:bot_name] =~ Regexp.new(Conversation::BOT_NAME_PATTERN)
return send_quit_decline "bad bot name"
end
unless message.action_parameters[:protocol_version] == VERSION
return send_quit_decline "protocol version does not match"
end
@negociation_received = true
@conversation.set_peer_info(message.action_parameters[:bot_name], message.action_parameters[:capabilities])

if @negociation_sent
send_announce_ok(message)
@negociation_ok = true
@conversation.set_comm_ready
else
send_announce_helo(message)
end
end

def receive_announce_ok(message)
unless @negociation_sent and @negociation_received
send_quit_decline "bad negociation"
end
@negociation_ok = true
end

def receive_request_capabilities(message)
send_reply_result(message, @conversation.bot.capabilities + CAPABILITIES)
end

def receive_request_call(message)
if message.action_parameters.nil?
return send_error_action(message, "missing parameters")
end
unless @conversation.bot.interface.is_node? message.action_parameters[:node]
return send_error_action(message, "bad node")
end
send_reply_ack(message)
@conversation.bot.schedule_task(@@request_callback) do
result = {
:reply_message => message
}
begin
result[:action_result] = @conversation.bot.interface.call(message.conv_thread.session,
message.action_parameters[:node],
message.action_parameters[:parameters])
rescue CyberError => e
result[:error] = {
:category => e.category,
:severity => e.severity,
:message => e.message
}
rescue
result[:error] = {
:category => 'unknown',
:severity => :unrecoverable,
:message => $!.to_s
}
end
result
end
end

def receive_request_exists(message)
if message.action_parameters.nil?
return send_error_action(message, "missing parameters")
end
unless @conversation.bot.interface.is_node? message.action_parameters[:node]
return send_error_action(message, "bad node")
end
send_reply_ack(message)
@conversation.bot.schedule_task(@@request_callback) do
{
:reply_message => message,
:action_result => @conversation.bot.interface.has_node?(message.action_parameters[:node])
}
end
end

def receive_request_describe(message)
# TODO: implement when ready in the interface
send_reply_decline(message, "not implemented")
end

def receive_error_protocol(message)
logger.error "received protocol error notification from '#{@conversation.peer_name}': #{message.action_parameters[:error]}"
end

def receive_error_action(message)
message.pop_callback do |cb|
if cb
error = message.action_parameters[:error]
exception = CyberError.new(error[:severity], error[:category], error[:message])
cb.call({:status => :error, :exception => exception})
else
send_error_protocol("received reply for unknown action")
end
end
end

# TODO: what if the peer close a thread i have opened ?
# send error to all actions ?
def receive_notify_thread_closed(message)
message.conv_thread.close(false)
end

def receive_notify_event(message)
@conversation.bot.get_channel("peer/#{@conversation.peer_name}/incoming") << {
:from => message.conversation.peer_id,
:topic => message.action_parameters[:name],
:info => message.action_parameters[:info]
}
end

def receive_reply_ack(message)
# TODO: cancel timeout (which does not exist yet)
end

def receive_reply_decline(message)
message.pop_callback do |cb|
if cb
cb.call({:status => :decline, :reason => message.action_parameters[:reason]})
else
send_error_protocol("received reply for unknown action")
end
end
end

def receive_reply_result(message)
message.pop_callback do |cb|
if cb
cb.call({:status => :ok, :result => message.action_parameters[:result]})
else
send_error_protocol("received reply for unknown action")
end
end
end

def receive_quit_decline(message)
logger.warning "peer '#{@conversation.peer_name}' refused more conversation: #{message.action_parameters[:reason]}"
@conversation.set_comm_stop(true)
# TODO: notify client
end

def receive_quit_leaving(message)
logger.info "peer '#{@conversation.peer_name}' is leaving"
@conversation.set_comm_stop(true)
# TODO: notify client
end

def send_announce_helo(recv_message = nil)
action_code = "ANNOUNCE HELO"
action_parameters = {
:bot_name => @conversation.bot.name,
:protocol_version => VERSION
}

message = (recv_message.nil? ? @conversation.thread('system').new_message(action_code, action_parameters) :
recv_message.create_reply(action_code, action_parameters))
message.send
@negociation_sent = true
end

def send_announce_ok(recv_message)
recv_message.create_reply("ANNOUNCE OK").send
end

def send_request_capabilities
@conversation.thread('system').new_message("REQUEST EXISTS", { :node => node }).send
end

def send_request_call(conv_thread, node, *parameters, &callback)
message = conv_thread.new_message("REQUEST CALL", { :node => node, :parameters => parameters }).send
message.register_callback(callback)
end

def send_request_exists(conv_thread, node)
message = conv_thread.new_message("REQUEST EXISTS", { :node => node }).send
message.register_callback(callback)
end

def send_request_describe(conv_thread, node)
message = conv_thread.new_message("REQUEST DESCRIBE", { :node => node }).send
message.register_callback(callback)
end

def send_error_protocol(error, fatal = false)
@conversation.thread('system').new_message("ERROR PROTOCOL", { :error => error }).send
@conversation.set_error_status(fatal)
end

def send_error_action(recv_message, error)
recv_message.create_reply("ERROR ACTION", { :error => error }).send
end

def send_notify_thread_closed(conv_thread)
conv_thread.new_message("NOTIFY THREAD CLOSED").send
end

def send_notify_event(conv_thread, event_name, event_info)
conv_thread.new_message.create_reply("NOTIFY EVENT", { :name => event_name, :info => event_info }).send
end

def send_reply_ack(recv_message)
recv_message.create_reply("REPLY ACK").send
end

def send_reply_decline(recv_message, reason)
recv_message.create_reply("REPLY DECLINE", { :reason => reason }).send
end

def send_reply_result(recv_message, result)
recv_message.create_reply("REPLY RESULT", { :result => result }).send
end

def send_quit_decline(reason)
@conversation.set_comm_stop do
@conversation.thread('system').new_message("QUIT DECLINE", { :reason => reason }).send
end
end

def send_quit_leaving
@conversation.set_comm_stop do
@conversation.thread('system').new_message("QUIT LEAVING").send
end
end
end
end
(7-7/8)