Project

General

Profile

Download (14.8 KB) Statistics
| Branch: | Tag: | Revision:
b7f7d214 Marc Dequènes (Duck)
#--
# CyborgHood, a distributed system management software.
# Copyright (c) 2009-2010 Marc Dequènes (Duck) <Duck@DuckCorp.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#++

cdd6154d Marc Dequènes (Duck)
require 'yaml'
b7f7d214 Marc Dequènes (Duck)
require 'cyborghood/cyborg/session'
7e9520c0 Marc Dequènes (Duck)
require 'cyborghood/cyborg/protocol'
c501ebc2 Marc Dequènes (Duck)
require 'set'
b7f7d214 Marc Dequènes (Duck)

module CyborgHood
e26c015f Marc Dequènes (Duck)
class ConversationThread
attr_reader :conversation, :name, :id, :session

3d653143 Marc Dequènes (Duck)
def initialize(conversation, id, name)
e26c015f Marc Dequènes (Duck)
@conversation = conversation
@name = name
688a21d7 Marc Dequènes (Duck)
@id = id
e26c015f Marc Dequènes (Duck)
4afb2001 Marc Dequènes (Duck)
# no need for session for system thread
@session = Session.new unless name == 'system'
e26c015f Marc Dequènes (Duck)
@next_action_id = 0
fd4ccbdd Marc Dequènes (Duck)
@callbacks = {}
@locks = Set.new
e26c015f Marc Dequènes (Duck)
end

def new_message(action_code, parameters = nil, action_id = nil)
45333094 Marc Dequènes (Duck)
Message.new(self, action_code, parameters, action_id)
e26c015f Marc Dequènes (Duck)
end

def next_action_id
id = @next_action_id
@next_action_id +=1
id
end

4afb2001 Marc Dequènes (Duck)
def close(notify = true)
# the system thread cannot be closed
return if name == 'system'
@conversation.protocol.send_notify_thread_closed(self) if notify
@conversation.delete_thread(self)
e26c015f Marc Dequènes (Duck)
@session.clear
end
f8c25bdc Marc Dequènes (Duck)
# convenience method
def call(*args, &callback)
@conversation.protocol.send_request_call(self, *args, &callback)
end
fd4ccbdd Marc Dequènes (Duck)
eea1abf0 Marc Dequènes (Duck)
# convenience method
def notify(event_name, event_info)
@conversation.protocol.send_notify_event(self, event_name, event_info)
end

fd4ccbdd Marc Dequènes (Duck)
def register_callback(message, callback)
@callbacks[message.action_id] = callback
end

def pop_callback(message)
cb = @callbacks[message.action_id]
yield cb if block_given?
@callbacks.delete(message.action_id)
cb
end

def lock(name)
@locks << name
end

def unlock(name)
@locks.delete(name)
end

def locked?
not @locks.empty?
end

cb79cd54 Marc Dequènes (Duck)
# check for known actions only, conversation API messages and
# attempts in error are not taken into account, that's why it is
# not sufficent to deduce idleness at connection level
fd4ccbdd Marc Dequènes (Duck)
def idle?
@callbacks.empty? and @locks.empty?
end
e26c015f Marc Dequènes (Duck)
end

class Message
3cd21861 Marc Dequènes (Duck)
attr_reader :conv_thread, :action_code, :action_parameters, :action_id
e26c015f Marc Dequènes (Duck)
3d653143 Marc Dequènes (Duck)
def initialize(conv_thread, action_code, action_parameters = nil, action_id = nil)
e26c015f Marc Dequènes (Duck)
@conv_thread = conv_thread
@action_code = action_code
@action_parameters = action_parameters
3cd21861 Marc Dequènes (Duck)
# reply with the matching action id
# (the namespace for requests is on our side, the namespace for replies is on peer side,
# and we may end up using the same action id is a server acts as client)
@action_id = action_id
45333094 Marc Dequènes (Duck)
3cd21861 Marc Dequènes (Duck)
@sent = false
e26c015f Marc Dequènes (Duck)
end

def new?
3cd21861 Marc Dequènes (Duck)
@action_id.nil?
45333094 Marc Dequènes (Duck)
end

def sent?
3cd21861 Marc Dequènes (Duck)
@sent
e26c015f Marc Dequènes (Duck)
end

def send
3cd21861 Marc Dequènes (Duck)
raise CyberError.new(:unrecoverable, "bot/conversation", "Not sending twice the same message") if self.sent?
@action_id = @conv_thread.next_action_id if @action_id.nil?
688a21d7 Marc Dequènes (Duck)
@conv_thread.conversation.send_message(self)
3cd21861 Marc Dequènes (Duck)
@sent = true
21a7eaf9 Marc Dequènes (Duck)
# return message (convenience)
self
e26c015f Marc Dequènes (Duck)
end

45333094 Marc Dequènes (Duck)
def create_reply(action_code, parameters = nil)
e26c015f Marc Dequènes (Duck)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot reply to a newly created message") if self.new?
45333094 Marc Dequènes (Duck)
self.class.new(@conv_thread, action_code, parameters, @action_id)
cdd6154d Marc Dequènes (Duck)
end
fd4ccbdd Marc Dequènes (Duck)
# convenience method
def pop_callback(&block)
@conv_thread.pop_callback(self, &block)
end

# convenience method
def register_callback(callback)
@conv_thread.register_callback(self, callback)
end
cdd6154d Marc Dequènes (Duck)
end

b7f7d214 Marc Dequènes (Duck)
class Conversation < EventMachine::Protocols::LineAndTextProtocol
# don't rely on EventMachine's default, it may change one day
MaxLineLength = 16*1024

EOD = "\033[0J"
45333094 Marc Dequènes (Duck)
BOT_NAME_PATTERN = "[A-Z][a-zA-Z0-9]+"
e26c015f Marc Dequènes (Duck)
ACTION_WORD_PATTERN = "[a-zA-Z0-9]+"
45333094 Marc Dequènes (Duck)
ACTION_PATTERN = "^(#{BOT_NAME_PATTERN})-([0-9]{4})-([0-9]{4})([+]?) (#{ACTION_WORD_PATTERN}( #{ACTION_WORD_PATTERN})*)$"
cc684758 Marc Dequènes (Duck)
MAXIMUM_ERROR_COUNT = 3
cdd6154d Marc Dequènes (Duck)
MAXIMUM_LINES = 1024

d9694ad7 Marc Dequènes (Duck)
attr_reader :bot, :peer_name, :peer_capabilities, :protocol, :auto_close_threads
b7f7d214 Marc Dequènes (Duck)
45333094 Marc Dequènes (Duck)
def initialize(bot, block = nil)
3d653143 Marc Dequènes (Duck)
@bot = bot
d57fd602 Marc Dequènes (Duck)
@comm_logic_block = block
b7f7d214 Marc Dequènes (Duck)
super

@config = Config.instance
e26c015f Marc Dequènes (Duck)
45333094 Marc Dequènes (Duck)
@error_count = 0
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
8f6f45f7 Marc Dequènes (Duck)
@comm_stop = false
e26c015f Marc Dequènes (Duck)
# associated conversation threads
d9694ad7 Marc Dequènes (Duck)
@auto_close_threads = 60 # max idle time before closing (nil => never close threads)
e26c015f Marc Dequènes (Duck)
@conv_threads = {}
45333094 Marc Dequènes (Duck)
@conv_threads_index = {}
d9694ad7 Marc Dequènes (Duck)
@conv_threads_timers = {}
@conv_threads_closing = []
e26c015f Marc Dequènes (Duck)
# thread 0 is reserved
688a21d7 Marc Dequènes (Duck)
@next_thread_id = 0
e26c015f Marc Dequènes (Duck)
@system_thread = self.thread('system')

# post-negociation peer info
45333094 Marc Dequènes (Duck)
@peer_name = nil
cdd6154d Marc Dequènes (Duck)
@peer_capabilities = []
688a21d7 Marc Dequènes (Duck)
@protocol = BotProtocol.new(self)
8c18bc9a Marc Dequènes (Duck)
06f77931 Marc Dequènes (Duck)
# we don't know the peer name yet
@system_notification_name = "peer/#{identifier}/system"
@system_notification = @bot.get_channel(@system_notification_name)
8f6f45f7 Marc Dequènes (Duck)
@system_notification_processing = @system_notification.subscribe do |msg|
8c18bc9a Marc Dequènes (Duck)
process_system_notification(msg)
end
e26c015f Marc Dequènes (Duck)
end
cdd6154d Marc Dequènes (Duck)
b7f7d214 Marc Dequènes (Duck)
def post_init
45333094 Marc Dequènes (Duck)
logger.info "New conversation with #{identifier}"
@protocol.send_announce_helo unless @comm_logic_block.nil?
rescue
logger.error "Conversation post_init: " + $!
connection_close
b7f7d214 Marc Dequènes (Duck)
end

def receive_line(data)
45333094 Marc Dequènes (Duck)
return if @comm_stop or data.empty?
b7f7d214 Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
clear_receive_info
cdd6154d Marc Dequènes (Duck)
b7f7d214 Marc Dequènes (Duck)
if data == EOD
logger.debug "Received EOD [#{identifier}]"
exit_split_mode
else
if @split_data_mode
logger.debug "Received data (split mode) [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if @split_data.size > MAXIMUM_LINES
reply_fatal_error "overflow"
else
@split_data << data
end
b7f7d214 Marc Dequènes (Duck)
else
logger.debug "Received data [#{identifier}]: #{data}"

cdd6154d Marc Dequènes (Duck)
if data =~ Regexp.new(ACTION_PATTERN)
45333094 Marc Dequènes (Duck)
peer_name = $1
conv_thread_id = $2.to_i
action_id = $3.to_i
flags = $4 || ""
action_code = $5
cc684758 Marc Dequènes (Duck)
e26c015f Marc Dequènes (Duck)
conv_thread = self.thread_by_id(conv_thread_id)
message = conv_thread.new_message(action_code, nil, action_id)

if flags.index '+'
enter_split_mode(message)
cc684758 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
receive_message(message)
cc684758 Marc Dequènes (Duck)
end
e26c015f Marc Dequènes (Duck)
else
reply_syntax_error "bad action format"
b7f7d214 Marc Dequènes (Duck)
end
end
end
cdd6154d Marc Dequènes (Duck)
fccfdfb9 Marc Dequènes (Duck)
check_errors
end

e26c015f Marc Dequènes (Duck)
def receive_message(message)
45333094 Marc Dequènes (Duck)
logger.debug "Received message '#{message.action_code}' [#{identifier}]"
cdd6154d Marc Dequènes (Duck)
d9694ad7 Marc Dequènes (Duck)
reset_idle_thread_check(message.conv_thread)

688a21d7 Marc Dequènes (Duck)
@protocol.process_received_message(message)
d9694ad7 Marc Dequènes (Duck)
check_idle_thread(message.conv_thread)
b7f7d214 Marc Dequènes (Duck)
end

def receive_error(msg)
logger.error "Error [#{identifier}]: #{msg}"
fccfdfb9 Marc Dequènes (Duck)
@error_count += 1
b7f7d214 Marc Dequènes (Duck)
end

def unbind
45333094 Marc Dequènes (Duck)
logger.info "Conversation finished with #{identifier} (#{@peer_name})"
@bot.unregister_communication @peer_name unless @peer_name.nil?
06f77931 Marc Dequènes (Duck)
@bot.drop_channel(@system_notification_name)
4afb2001 Marc Dequènes (Duck)
@conv_threads.each_value {|s| s.close(false) }
@conv_threads = {}
@conv_threads_index = {}
45333094 Marc Dequènes (Duck)
@comm_logic_block.call false unless @comm_logic_block.nil? or @protocol.negociation_ok?
cdd6154d Marc Dequènes (Duck)
end

3d653143 Marc Dequènes (Duck)
def bye
45333094 Marc Dequènes (Duck)
@protocol.send_quit_leaving
3d653143 Marc Dequènes (Duck)
end

45333094 Marc Dequènes (Duck)
def set_peer_info(name, capabilities)
@peer_name = name
cdd6154d Marc Dequènes (Duck)
@peer_capabilities = capabilities || []
45333094 Marc Dequènes (Duck)
logger.info "Peer name for #{identifier}: #{@peer_name}"
logger.info "Peer capabilities for #{identifier}: #{@peer_capabilities.join(", ")}"
cdd6154d Marc Dequènes (Duck)
end

d57fd602 Marc Dequènes (Duck)
def set_comm_ready
45333094 Marc Dequènes (Duck)
logger.info "Protocol negociation with '#{@peer_name}' on #{identifier} succeeded"
379a5cfb Marc Dequènes (Duck)
@bot.register_communication @peer_name, self
45333094 Marc Dequènes (Duck)
@comm_logic_block.call self unless @comm_logic_block.nil?
end

f292535b Marc Dequènes (Duck)
def set_comm_stop(peer_left = false)
45333094 Marc Dequènes (Duck)
@comm_stop = true
8f6f45f7 Marc Dequènes (Duck)
@system_notification.unsubscribe(@system_notification_processing)
45333094 Marc Dequènes (Duck)
yield if block_given?
f292535b Marc Dequènes (Duck)
peer_left ? close_connection : close_connection_after_writing
d57fd602 Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def set_error_status(fatal = false)
# fatal status is conservative, it cannot be canceled
@reveive_fatal_error = @reveive_fatal_error || fatal
@receive_error = true
end

cb79cd54 Marc Dequènes (Duck)
# threads opened using the block variant are locked
# don't forget to unlock it when it is not needed anymore
e26c015f Marc Dequènes (Duck)
def thread(name = 'default')
4afb2001 Marc Dequènes (Duck)
th = @conv_threads[name] || new_thread(name)
if block_given?
cb79cd54 Marc Dequènes (Duck)
th.lock
8c18bc9a Marc Dequènes (Duck)
cb79cd54 Marc Dequènes (Duck)
yield th
4afb2001 Marc Dequènes (Duck)
else
th
end
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def thread_by_id(id)
name = @conv_threads_index[id]
688a21d7 Marc Dequènes (Duck)
name.nil? ? new_thread("noname/#{id}", id) : @conv_threads[name]
cdd6154d Marc Dequènes (Duck)
end

4afb2001 Marc Dequènes (Duck)
def close_thread(name)
return if name == 'system'
# ignore mistakes
return unless @conv_threads.has_key? name

d9694ad7 Marc Dequènes (Duck)
@conv_threads_closing << name

4afb2001 Marc Dequènes (Duck)
@conv_threads[name].close
7d493229 Marc Dequènes (Duck)
# if only the system thread remains, notify idleness
if @conv_threads.size == 1
06f77931 Marc Dequènes (Duck)
@bot.get_channel('global/system') << {
:topic => 'CONVERSATION IDLE',
:peer => peer_name
}
7d493229 Marc Dequènes (Duck)
end
4afb2001 Marc Dequènes (Duck)
end

def delete_thread(th)
@conv_threads_index.delete(th.id)
@conv_threads.delete(th.name)
d9694ad7 Marc Dequènes (Duck)
@conv_threads_closing.delete(th.name)
4afb2001 Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def send_message(message)
raise CyberError.new(:unrecoverable, "bot/conversation", "Cannot send message without action id") if message.action_id.nil?

d9694ad7 Marc Dequènes (Duck)
reset_idle_thread_check(message.conv_thread)

45333094 Marc Dequènes (Duck)
flags = ""
flags += "+" unless message.action_parameters.nil?

send_line sprintf("%s-%04d-%04d%s %s", @bot.name, message.conv_thread.id, message.action_id, flags, message.action_code)
e26c015f Marc Dequènes (Duck)
unless message.action_parameters.nil?
45333094 Marc Dequènes (Duck)
message.action_parameters.to_yaml.each_line {|l| send_line l }
e26c015f Marc Dequènes (Duck)
send_line EOD
end
d9694ad7 Marc Dequènes (Duck)
check_idle_thread(message.conv_thread)
b7f7d214 Marc Dequènes (Duck)
end

06f77931 Marc Dequènes (Duck)
def identifier
"#{@bot.identifier_prefix}/#{@signature}"
end

b7f7d214 Marc Dequènes (Duck)
protected

69a12fdb Marc Dequènes (Duck)
def clear_receive_info
@receive_error = false
@receive_fatal_error = false
end

def send_line(msg)
d57fd602 Marc Dequènes (Duck)
return if error?

69a12fdb Marc Dequènes (Duck)
logger.debug "Sending data [#{identifier}]: #{msg}"
send_data "#{msg}\n"
end

def check_errors
@error_count += 1 if @receive_error

msg_quit = nil
if @error_count >= MAXIMUM_ERROR_COUNT
msg_quit = "too much errors, terminating"
elsif @fatal_error
msg_quit = "previous fatal error"
end

45333094 Marc Dequènes (Duck)
@protocol.send_quit_decline msg_quit unless msg_quit.nil?
69a12fdb Marc Dequènes (Duck)
end

688a21d7 Marc Dequènes (Duck)
def new_thread(name, id = nil)
id ||= @next_thread_id
th = ConversationThread.new(self, id, name)
@next_thread_id = [@next_thread_id, id + 1].max

@conv_threads[th.name] = th
# allow searching by id too
@conv_threads_index[th.id] = name
th
end

cdd6154d Marc Dequènes (Duck)
def reply_syntax_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: syntax error (#{msg})"

cdd6154d Marc Dequènes (Duck)
msg = "syntax error" + (msg ? ": " + msg : "")
@protocol.send_error_protocol(msg)
end

def reply_fatal_error(msg = nil)
e26c015f Marc Dequènes (Duck)
logger.error "Protocol error [#{identifier}]: fatal error (#{msg})"

cdd6154d Marc Dequènes (Duck)
msg = "fatal error" + (msg ? ": " + msg : "")
fccfdfb9 Marc Dequènes (Duck)
@protocol.send_error_protocol(msg, true)
cdd6154d Marc Dequènes (Duck)
end

e26c015f Marc Dequènes (Duck)
def enter_split_mode(message)
b7f7d214 Marc Dequènes (Duck)
if @split_data_mode
e26c015f Marc Dequènes (Duck)
reply_fatal_error "already in split mode"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: entered split mode for action '#{message.action_id}'"
b7f7d214 Marc Dequènes (Duck)
@split_data_mode = true
e26c015f Marc Dequènes (Duck)
@split_data_message = message
b7f7d214 Marc Dequènes (Duck)
end
@split_data = []
end

def exit_split_mode
if @split_data_mode
e26c015f Marc Dequènes (Duck)
logger.debug "Protocol info [#{identifier}]: quit split mode for action '#{@split_data_message.action_id}'"

45333094 Marc Dequènes (Duck)
parameters = YAML.load(@split_data.join("\n"))
e26c015f Marc Dequènes (Duck)
reply_syntax_error("bad parameters format") if parameters.nil?

3cd21861 Marc Dequènes (Duck)
message = @split_data_message.conv_thread.new_message(@split_data_message.action_code, parameters, @split_data_message.action_id)
e26c015f Marc Dequènes (Duck)
receive_message(message)
b7f7d214 Marc Dequènes (Duck)
else
e26c015f Marc Dequènes (Duck)
reply_fatal_error "not in split mode"
b7f7d214 Marc Dequènes (Duck)
end
@split_data_mode = false
e26c015f Marc Dequènes (Duck)
@split_data_message = nil
b7f7d214 Marc Dequènes (Duck)
@split_data = []
end

8c18bc9a Marc Dequènes (Duck)
def process_system_notification(msg)
cb79cd54 Marc Dequènes (Duck)
# TODO: process other things, like remote notifications
8c18bc9a Marc Dequènes (Duck)
end
d9694ad7 Marc Dequènes (Duck)
def reset_idle_thread_check(conv_thread)
return if @auto_close_threads.nil?
return if @conv_threads_closing.include? conv_thread.name

# ignore system thread
return if conv_thread.name == "system"

logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is working, canceling idle check"

# cancel time if exists
timer = @conv_threads_timers[conv_thread.id]
EventMachine.cancel_timer(timer) unless timer.nil?
@conv_threads_timers[conv_thread.id] = nil
end

cb79cd54 Marc Dequènes (Duck)
# check for idleness at connection level: not only actions done in a thread,
# but any communication through the thread (errors, protocol stuff, ...)
# that's why it is checked here and not in the ConversationThread class
d9694ad7 Marc Dequènes (Duck)
def check_idle_thread(conv_thread)
return if @auto_close_threads.nil?
return if @conv_threads_closing.include? conv_thread.name

# ignore system thread
return if conv_thread.name == "system"

# if a timer is running, do nothing
return unless @conv_threads_timers[conv_thread.id].nil?

# test for idleness
return unless conv_thread.idle?
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is currently idle, doing another check in #{@auto_close_threads}s"

cb79cd54 Marc Dequènes (Duck)
# send notification
@bot.get_channel(@system_notification_name) << {
:topic => 'THREAD IDLE',
:thread => conv_thread.name
}

d9694ad7 Marc Dequènes (Duck)
# set timer for closing
@conv_threads_timers[conv_thread.id] = EventMachine.add_timer(@auto_close_threads) do
@conv_threads_timers[conv_thread.id] = nil
# if it is not idle, then do nothing, wait for another call to check_idle_thread
if conv_thread.idle?
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is still idle, closing"
close_thread(conv_thread.name)
else
logger.debug "Thread '#{conv_thread.name}@#{@peer_name}' is no more idle"
end
end
end
b7f7d214 Marc Dequènes (Duck)
end
end