Project

General

Profile

Download (9.28 KB) Statistics
| Branch: | Tag: | Revision:
b7f7d214 Marc Dequènes (Duck)
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

cdd6154d Marc Dequènes (Duck)
require 'yaml'
b7f7d214 Marc Dequènes (Duck)
require 'cyborghood/cyborg/session'

e26c015f Marc Dequènes (Duck)
# Note: Event machine core is mono-thread, so this code is not threadsafe.
# Only long tasks will be run in threads, which do not care about all this.
b7f7d214 Marc Dequènes (Duck)
module CyborgHood
cdd6154d Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
module BotProtocol
cdd6154d Marc Dequènes (Duck)
VERSION = "0.1"

e26c015f Marc Dequènes (Duck)
module_function

def process_received_message(message)
end

def receive(action, parameters)
method = "receive_" + action.downcase.tr(" ", "_")
if respond_to? method
send(method, parameters)
else
send_error_protocol("unknown action")
end
cdd6154d Marc Dequènes (Duck)
end

def receive_announce_helo(parameters)
unless parameters[:bot_name] =~ Conversation::BOT_ID_PATTERN
return send_error_protocol "bad bot name"
end
unless parameters[:protocol_version] == VERSION
e26c015f Marc Dequènes (Duck)
# TODO: send QUIT DECLINE "protocol version does not match"
return
cdd6154d Marc Dequènes (Duck)
end
@conversation.set_peer_info(parameters[:bot_name], parameters[:capabilities])
send_reply_ok
end

def send_error_protocol(msg = nil)
e26c015f Marc Dequènes (Duck)
@conversation.set_error_status
@conversation.send_peer("ERROR PROTO", { :error => msg || "" })
cdd6154d Marc Dequènes (Duck)
end

def send_reply_ack
e26c015f Marc Dequènes (Duck)
@conversation.send_peer("REPLY ACK")
end

def send_quit(reason = nil)
action = "QUIT " + (reason.nil? ? "LEAVING" : "DECLINE")
parameters = (reason.nil? ? nil : { :reason => reason })
@conversation.send_peer(action, parameters)
end
end

class ConversationThread
attr_reader :conversation, :name, :id, :session

@@next_id = 0

def initialized(conversation, name)
@conversation = conversation
@name = name

@id = @@next_id
@@next_id += 1
@session = Session.new
@next_action_id = 0
end

def new_message(action_code, parameters = nil, action_id = nil)
Message.new(this, action_code, parameters)
end

def next_action_id
id = @next_action_id
@next_action_id +=1
id
end

def close
@session.clear
# TODO: enforce destruction ???
end
end

class Message
private_class_method :new

attr_reader :conv_thread, :action_code, :action_parameters, :action_id

def initialized(conv_thread, action_code, action_parameters = nil, action_id = nil)
@conv_thread = conv_thread
@action_code = action_code
@action_parameters = action_parameters
@action_id = action_id
end

def new?
@action_id.nil?
end

def send
raise CyberError.new(:unrecoverable, "bot/conversation", "Not sending twice the same message") unless self.new?
@action_id = @conv_thread.next_action_id
@conv_thread.conversation.send_message(this)
end

def create_reply(action_code, parameters)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot reply to a newly created message") if self.new?
new(this, action_code, parameters)
cdd6154d Marc Dequènes (Duck)
end
end

b7f7d214 Marc Dequènes (Duck)
class Conversation < EventMachine::Protocols::LineAndTextProtocol
private_class_method :new

# don't rely on EventMachine's default, it may change one day
MaxLineLength = 16*1024

EOD = "\033[0J"
cdd6154d Marc Dequènes (Duck)
BOT_ID_PATTERN = "[a-zA-Z0-9]+"
e26c015f Marc Dequènes (Duck)
ACTION_WORD_PATTERN = "[a-zA-Z0-9]+"
ACTION_PATTERN = "^(#{BOT_ID_PATTERN})(\d{4})-(\d{4})([+]?) (#{ACTION_WORD_PATTERN}( #{ACTION_WORD_PATTERN})*)$"
cc684758 Marc Dequènes (Duck)
MAXIMUM_ERROR_COUNT = 3
cdd6154d Marc Dequènes (Duck)
MAXIMUM_LINES = 1024

attr_reader :interface
b7f7d214 Marc Dequènes (Duck)
def initialize(interface)
@interface = interface

super

@config = Config.instance
e26c015f Marc Dequènes (Duck)
@receive_error_count = 0
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
e26c015f Marc Dequènes (Duck)
# associated conversation threads
@conv_threads = {}
# thread 0 is reserved
@system_thread = self.thread('system')

# post-negociation peer info
cdd6154d Marc Dequènes (Duck)
@peer_id = nil
@peer_capabilities = []
e26c015f Marc Dequènes (Duck)
end
cdd6154d Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
def clear_receive_info
@receive_error = false
@receive_fatal_error = false
b7f7d214 Marc Dequènes (Duck)
end

def send_line(msg)
send_data "#{msg}\n"
logger.debug "Sent data [#{identifier}]: #{msg}"
end

def post_init
logger.debug "New conversation with #{identifier}"
end

def receive_line(data)
return if data.empty?

e26c015f Marc Dequènes (Duck)
clear_receive_info
cdd6154d Marc Dequènes (Duck)
b7f7d214 Marc Dequènes (Duck)
if data == EOD
logger.debug "Received EOD [#{identifier}]"
exit_split_mode
else
if @split_data_mode
logger.debug "Received data (split mode) [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if @split_data.size > MAXIMUM_LINES
reply_fatal_error "overflow"
else
@split_data << data
end
b7f7d214 Marc Dequènes (Duck)
else
logger.debug "Received data [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if data =~ Regexp.new(ACTION_PATTERN)
e26c015f Marc Dequènes (Duck)
conv_thread_id = $1
action_id = $2
cdd6154d Marc Dequènes (Duck)
flags = $3 || ""
e26c015f Marc Dequènes (Duck)
action_code = $4
cc684758 Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
conv_thread = self.thread_by_id(conv_thread_id)
message = conv_thread.new_message(action_code, nil, action_id)

if flags.index '+'
enter_split_mode(message)
cc684758 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
receive_message(message)
cc684758 Marc Dequènes (Duck)
end
e26c015f Marc Dequènes (Duck)
else
reply_syntax_error "bad action format"
b7f7d214 Marc Dequènes (Duck)
end
end
end
cdd6154d Marc Dequènes (Duck)
send_reply
e26c015f Marc Dequènes (Duck)
# TODO: properly QUIT
@error_count += 1 if @receive_error
if @error_count >= MAXIMUM_ERROR_COUNT
reply_fatal_error "too much errors, terminating"
end
cdd6154d Marc Dequènes (Duck)
close_connection_after_writing if @fatal_error
b7f7d214 Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def receive_message(message)
logger.debug "Received message '#{action}' [#{identifier}]"
cdd6154d Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
BotProtocol.process_received_message(message)
b7f7d214 Marc Dequènes (Duck)
end

def receive_error(msg)
logger.error "Error [#{identifier}]: #{msg}"
end

def unbind
logger.debug "Conversation finished with #{identifier}"
e26c015f Marc Dequènes (Duck)
@conv_threads.each_values {|s| s.close }
cdd6154d Marc Dequènes (Duck)
end

def set_peer_info(id, capabilities)
@peer_id = id
@peer_capabilities = capabilities || []
end

e26c015f Marc Dequènes (Duck)
def set_error_status(fatal = false)
# fatal status is conservative, it cannot be canceled
@reveive_fatal_error = @reveive_fatal_error || fatal
@receive_error = true
end

def thread(name = 'default')
th = @conv_threads[name] || ConversationThread.new(this, name)
# allow searching by id too
@conv_threads_index[th.id] = name
th
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def thread_by_id(id)
name = @conv_threads_index[id]
name.nil? ? ConversationThread.new(this, "noname/#{index}") : @conv_threads[name]
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def send_message(message)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot send message without action id") if message.action_id.nil?

send_line "#{@config.bot_id}-#{message.conv_thread.id}-#{message.action_id}" + (message.action_parameters.nil? : "" : "+") + " #{message.action_code}"
unless message.action_parameters.nil?
message.action_parameters.to_yaml.split.each {|l| send_line l }
send_line EOD
end

action_id
b7f7d214 Marc Dequènes (Duck)
end

protected

cdd6154d Marc Dequènes (Duck)
def reply_syntax_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: syntax error (#{msg})"
set_error_status

cdd6154d Marc Dequènes (Duck)
msg = "syntax error" + (msg ? ": " + msg : "")
@protocol.send_error_protocol(msg)
end

def reply_fatal_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: fatal error (#{msg})"
set_error_status(true)

cdd6154d Marc Dequènes (Duck)
msg = "fatal error" + (msg ? ": " + msg : "")
@protocol.send_error_protocol(msg)
end

e26c015f Marc Dequènes (Duck)
def enter_split_mode(message)
b7f7d214 Marc Dequènes (Duck)
if @split_data_mode
e26c015f Marc Dequènes (Duck)
reply_fatal_error "already in split mode"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: entered split mode for action '#{message.action_id}'"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = true
e26c015f Marc Dequènes (Duck)
@split_data_message = message
b7f7d214 Marc Dequènes (Duck)
end
@split_data = []
end

def exit_split_mode
if @split_data_mode
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: quit split mode for action '#{@split_data_message.action_id}'"

parameters = YAML.parse(@split_data.join("\n"))
reply_syntax_error("bad parameters format") if parameters.nil?

message = @split_data_message.conv_thread.new_message(@split_data_message.action_code, parameters, @split_data_message.action_id)
receive_message(message)
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
reply_fatal_error "not in split mode"
b7f7d214 Marc Dequènes (Duck)
end
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
end
end

class ConversationUNIXSocket < Conversation
public_class_method :new

def identifier
"unix_socket/#{@signature}"
end
end
end