Project

General

Profile

Download (10.8 KB) Statistics
| Branch: | Tag: | Revision:
b7f7d214 Marc Dequènes (Duck)
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

cdd6154d Marc Dequènes (Duck)
require 'yaml'
b7f7d214 Marc Dequènes (Duck)
require 'cyborghood/cyborg/session'

e26c015f Marc Dequènes (Duck)
# Note: Event machine core is mono-thread, so this code is not threadsafe.
# Only long tasks will be run in threads, which do not care about all this.
b7f7d214 Marc Dequènes (Duck)
module CyborgHood
cdd6154d Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
module BotProtocol
cdd6154d Marc Dequènes (Duck)
VERSION = "0.1"

688a21d7 Marc Dequènes (Duck)
# TODO:
# - check for request/reply couples (reply to wrong of non-existent request)
# - check for negociation wip/done
e26c015f Marc Dequènes (Duck)
688a21d7 Marc Dequènes (Duck)
def initialize(conversation)
@conversation = conversation

@negociation_received = false
@negociation_sent = false
@negociation_ok = false
end

def negociation_ok?
@negociation_ok
e26c015f Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def process_received_message(message)
method = "receive_" + message.action_code.downcase.tr(" ", "_")
e26c015f Marc Dequènes (Duck)
if respond_to? method
688a21d7 Marc Dequènes (Duck)
send(method, message)
e26c015f Marc Dequènes (Duck)
else
send_error_protocol("unknown action")
end
cdd6154d Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def receive_announce_helo(message)
unless message.conv_thread.id == 0
return send_quit_decline "bad negociation"
cdd6154d Marc Dequènes (Duck)
end
688a21d7 Marc Dequènes (Duck)
unless message.parameters[:bot_name] =~ Conversation::BOT_ID_PATTERN
return send_quit_decline "bad bot name"
cdd6154d Marc Dequènes (Duck)
end
688a21d7 Marc Dequènes (Duck)
unless message.parameters[:protocol_version] == VERSION
return send_quit_decline "protocol version does not match"
end
@negociation_received = true
@conversation.set_peer_info(message.parameters[:bot_name], message.parameters[:capabilities])

if @negociation_sent
send_announce_ok(message)
@negociation_ok = true
else
send_announce_helo(message)
@negociation_sent = true
end
end

def receive_announce_ok(message)
unless @negociation_sent and @negociation_received
send_quit_decline "bad negociation"
end
@negociation_ok = true
end

def send_announce_helo(recv_message = nil)
action_code = "ANNOUNCE HELO"
message = (recv_message.nil? ? @conversation.thread('system').new_message(action_code) :
recv_message.create_reply(action_code))
message.send
cdd6154d Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def send_announce_ok(recv_message)
recv_message.create_reply("ANNOUNCE OK").send
cdd6154d Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def send_error_protocol(message = nil)
# message can be nil if nothing could be parsed (or we may close the conversation, dunno)
# TODO
# @conversation.set_error_status
# @conversation.send_peer("ERROR PROTO", { :error => msg || "" })
e26c015f Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def send_reply_ack(recv_message)
recv_message.create_reply("REPLY ACK").send
end

def send_quit_decline(recv_message, reason)
recv_message.create_reply("QUIT LEAVING", { :reason => reason }).send
end

def send_quit_leaving
conversation.thread('system').new_message("QUIT LEAVING").send
e26c015f Marc Dequènes (Duck)
end
end

class ConversationThread
attr_reader :conversation, :name, :id, :session

688a21d7 Marc Dequènes (Duck)
def initialized(conversation, id, name)
e26c015f Marc Dequènes (Duck)
@conversation = conversation
@name = name
688a21d7 Marc Dequènes (Duck)
@id = id
e26c015f Marc Dequènes (Duck)
@session = Session.new
@next_action_id = 0
end

def new_message(action_code, parameters = nil, action_id = nil)
688a21d7 Marc Dequènes (Duck)
Message.new(self, action_code, parameters)
e26c015f Marc Dequènes (Duck)
end

def next_action_id
id = @next_action_id
@next_action_id +=1
id
end

def close
@session.clear
# TODO: enforce destruction ???
end
end

class Message
private_class_method :new

attr_reader :conv_thread, :action_code, :action_parameters, :action_id

def initialized(conv_thread, action_code, action_parameters = nil, action_id = nil)
@conv_thread = conv_thread
@action_code = action_code
@action_parameters = action_parameters
@action_id = action_id
end

def new?
@action_id.nil?
end

def send
raise CyberError.new(:unrecoverable, "bot/conversation", "Not sending twice the same message") unless self.new?
@action_id = @conv_thread.next_action_id
688a21d7 Marc Dequènes (Duck)
@conv_thread.conversation.send_message(self)
e26c015f Marc Dequènes (Duck)
end

def create_reply(action_code, parameters)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot reply to a newly created message") if self.new?
688a21d7 Marc Dequènes (Duck)
new(self, action_code, parameters)
cdd6154d Marc Dequènes (Duck)
end
end

b7f7d214 Marc Dequènes (Duck)
class Conversation < EventMachine::Protocols::LineAndTextProtocol
private_class_method :new

# don't rely on EventMachine's default, it may change one day
MaxLineLength = 16*1024

EOD = "\033[0J"
cdd6154d Marc Dequènes (Duck)
BOT_ID_PATTERN = "[a-zA-Z0-9]+"
e26c015f Marc Dequènes (Duck)
ACTION_WORD_PATTERN = "[a-zA-Z0-9]+"
ACTION_PATTERN = "^(#{BOT_ID_PATTERN})(\d{4})-(\d{4})([+]?) (#{ACTION_WORD_PATTERN}( #{ACTION_WORD_PATTERN})*)$"
cc684758 Marc Dequènes (Duck)
MAXIMUM_ERROR_COUNT = 3
cdd6154d Marc Dequènes (Duck)
MAXIMUM_LINES = 1024

attr_reader :interface
b7f7d214 Marc Dequènes (Duck)
def initialize(interface)
@interface = interface

super

@config = Config.instance
e26c015f Marc Dequènes (Duck)
@receive_error_count = 0
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
e26c015f Marc Dequènes (Duck)
# associated conversation threads
@conv_threads = {}
# thread 0 is reserved
688a21d7 Marc Dequènes (Duck)
@next_thread_id = 0
e26c015f Marc Dequènes (Duck)
@system_thread = self.thread('system')

# post-negociation peer info
cdd6154d Marc Dequènes (Duck)
@peer_id = nil
@peer_capabilities = []
688a21d7 Marc Dequènes (Duck)
@protocol = BotProtocol.new(self)
e26c015f Marc Dequènes (Duck)
end
cdd6154d Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
def clear_receive_info
@receive_error = false
@receive_fatal_error = false
b7f7d214 Marc Dequènes (Duck)
end

def send_line(msg)
send_data "#{msg}\n"
logger.debug "Sent data [#{identifier}]: #{msg}"
end

def post_init
logger.debug "New conversation with #{identifier}"
end

def receive_line(data)
return if data.empty?

e26c015f Marc Dequènes (Duck)
clear_receive_info
cdd6154d Marc Dequènes (Duck)
b7f7d214 Marc Dequènes (Duck)
if data == EOD
logger.debug "Received EOD [#{identifier}]"
exit_split_mode
else
if @split_data_mode
logger.debug "Received data (split mode) [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if @split_data.size > MAXIMUM_LINES
reply_fatal_error "overflow"
else
@split_data << data
end
b7f7d214 Marc Dequènes (Duck)
else
logger.debug "Received data [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if data =~ Regexp.new(ACTION_PATTERN)
e26c015f Marc Dequènes (Duck)
conv_thread_id = $1
action_id = $2
cdd6154d Marc Dequènes (Duck)
flags = $3 || ""
e26c015f Marc Dequènes (Duck)
action_code = $4
cc684758 Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
conv_thread = self.thread_by_id(conv_thread_id)
message = conv_thread.new_message(action_code, nil, action_id)

if flags.index '+'
enter_split_mode(message)
cc684758 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
receive_message(message)
cc684758 Marc Dequènes (Duck)
end
e26c015f Marc Dequènes (Duck)
else
reply_syntax_error "bad action format"
b7f7d214 Marc Dequènes (Duck)
end
end
end
cdd6154d Marc Dequènes (Duck)
send_reply
e26c015f Marc Dequènes (Duck)
# TODO: properly QUIT
@error_count += 1 if @receive_error
if @error_count >= MAXIMUM_ERROR_COUNT
reply_fatal_error "too much errors, terminating"
end
cdd6154d Marc Dequènes (Duck)
close_connection_after_writing if @fatal_error
b7f7d214 Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def receive_message(message)
logger.debug "Received message '#{action}' [#{identifier}]"
cdd6154d Marc Dequènes (Duck)
688a21d7 Marc Dequènes (Duck)
@protocol.process_received_message(message)
b7f7d214 Marc Dequènes (Duck)
end

def receive_error(msg)
logger.error "Error [#{identifier}]: #{msg}"
end

def unbind
logger.debug "Conversation finished with #{identifier}"
e26c015f Marc Dequènes (Duck)
@conv_threads.each_values {|s| s.close }
cdd6154d Marc Dequènes (Duck)
end

def set_peer_info(id, capabilities)
@peer_id = id
@peer_capabilities = capabilities || []
end

e26c015f Marc Dequènes (Duck)
def set_error_status(fatal = false)
# fatal status is conservative, it cannot be canceled
@reveive_fatal_error = @reveive_fatal_error || fatal
@receive_error = true
end

def thread(name = 'default')
688a21d7 Marc Dequènes (Duck)
@conv_threads[name] || new_thread(name)
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def thread_by_id(id)
name = @conv_threads_index[id]
688a21d7 Marc Dequènes (Duck)
name.nil? ? new_thread("noname/#{id}", id) : @conv_threads[name]
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def send_message(message)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot send message without action id") if message.action_id.nil?

send_line "#{@config.bot_id}-#{message.conv_thread.id}-#{message.action_id}" + (message.action_parameters.nil? : "" : "+") + " #{message.action_code}"
unless message.action_parameters.nil?
message.action_parameters.to_yaml.split.each {|l| send_line l }
send_line EOD
end

action_id
b7f7d214 Marc Dequènes (Duck)
end

protected

688a21d7 Marc Dequènes (Duck)
def new_thread(name, id = nil)
id ||= @next_thread_id
th = ConversationThread.new(self, id, name)
@next_thread_id = [@next_thread_id, id + 1].max

@conv_threads[th.name] = th
# allow searching by id too
@conv_threads_index[th.id] = name
th
end

cdd6154d Marc Dequènes (Duck)
def reply_syntax_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: syntax error (#{msg})"
set_error_status

cdd6154d Marc Dequènes (Duck)
msg = "syntax error" + (msg ? ": " + msg : "")
@protocol.send_error_protocol(msg)
end

def reply_fatal_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: fatal error (#{msg})"
set_error_status(true)

cdd6154d Marc Dequènes (Duck)
msg = "fatal error" + (msg ? ": " + msg : "")
@protocol.send_error_protocol(msg)
end

e26c015f Marc Dequènes (Duck)
def enter_split_mode(message)
b7f7d214 Marc Dequènes (Duck)
if @split_data_mode
e26c015f Marc Dequènes (Duck)
reply_fatal_error "already in split mode"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: entered split mode for action '#{message.action_id}'"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = true
e26c015f Marc Dequènes (Duck)
@split_data_message = message
b7f7d214 Marc Dequènes (Duck)
end
@split_data = []
end

def exit_split_mode
if @split_data_mode
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: quit split mode for action '#{@split_data_message.action_id}'"

parameters = YAML.parse(@split_data.join("\n"))
reply_syntax_error("bad parameters format") if parameters.nil?

message = @split_data_message.conv_thread.new_message(@split_data_message.action_code, parameters, @split_data_message.action_id)
receive_message(message)
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
reply_fatal_error "not in split mode"
b7f7d214 Marc Dequènes (Duck)
end
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
end
end

class ConversationUNIXSocket < Conversation
public_class_method :new

def identifier
"unix_socket/#{@signature}"
end
end
end