Revision 861ef12d
Added by Marc Dequènes almost 16 years ago
- ID 861ef12d8f80e90bc5cf7cefd994e7a9c787c28a
TODO | ||
---|---|---|
- ban keys from unknow users flooding -> counter, reseted when key added in DB
|
||
- protect against intercepted mail with falsified headers (From/Reply-To/... could be tampered to get replies, reply tampered too, and then resent to avoid being detected)
|
||
- check "protocol" field in "Content-Type" for received signed/encrypted mails
|
lib/cyborghood/mail.rb | ||
---|---|---|
require 'cyborghood/base'
|
||
require 'delegate'
|
||
require 'tmail'
|
||
require 'tmail_extra'
|
||
require 'tmail_gpg'
|
||
require 'action_mailer/quoting'
|
||
require 'action_mailer/utils'
|
||
require 'net/smtp'
|
lib/tmail_extra.rb | ||
---|---|---|
require 'gpgme' # >= 1.0.2 needed for :always_trust sign option
|
||
|
||
# Attempt to handle PGP/GPG features in a RFC3156-compliant way
|
||
#
|
||
# notes: These methods have been designed to be able to sign, crypt,
|
||
# or sign+crypt with RFC 1847 Encapsulation. There is no
|
||
# support (yet?) for the combined method described in chapter
|
||
# 6.2 of RFC3156.
|
||
module TMail
|
||
class Mail
|
||
def is_pgp_signed?
|
||
content_type == "multipart/signed" and parts.size == 2 and
|
||
parts[1].content_type == "application/pgp-signature"
|
||
end
|
||
|
||
def pgp_signature
|
||
return nil unless is_pgp_signed?
|
||
parts[1].decoded
|
||
end
|
||
|
||
def pgp_signed_part
|
||
return nil unless is_pgp_signed?
|
||
parts[0]
|
||
end
|
||
|
||
def verify_pgp_signature
|
||
return nil unless is_pgp_signed?
|
||
|
||
content = parts[0].to_rfc3156
|
||
sig = pgp_signature()
|
||
|
||
sigs_check = nil
|
||
GPGME.verify(sig, content) do |signature|
|
||
sigs_check ||= []
|
||
sigs_check << signature
|
||
end
|
||
|
||
sigs_check
|
||
end
|
||
|
||
def is_pgp_encrypted?
|
||
content_type == "multipart/encrypted" and parts.size == 2 and
|
||
parts[0].content_type == "application/pgp-encrypted" and
|
||
parts[1].content_type == "application/octet-stream"
|
||
end
|
||
|
||
def pgp_crypt_info
|
||
return nil unless is_pgp_encrypted?
|
||
a = parts[0].body.split("\n").collect{|l| l.chomp.split(": ") if l =~ /: / }.compact.flatten
|
||
Hash[*a]
|
||
end
|
||
|
||
def pgp_encrypted_part
|
||
return nil unless is_pgp_encrypted?
|
||
parts[1].body
|
||
end
|
||
|
||
def pgp_decrypt(&passphrase_callback)
|
||
return nil unless is_pgp_encrypted?
|
||
protocol_version = pgp_crypt_info()["Version"].to_i
|
||
raise NotImplementedError, "pgp-encrypted protocol version #{protocol_version} is not implemented" unless protocol_version == 1
|
||
|
||
encrypted_data = pgp_encrypted_part()
|
||
GPGME.decrypt(encrypted_data, {:passphrase_callback => method(:gpg_passphrase_callback_wrapper),
|
||
:passphrase_callback_value => passphrase_callback, :textmode => true})
|
||
end
|
||
|
||
def pgp_crypt(crypters_id)
|
||
crypters_id = [crypters_id] unless crypters_id.is_a? Array
|
||
crypters = crypters_id.collect{|key_id| gpg_key(key_id, false) }
|
||
GPGME.encrypt(crypters, self.to_s, {:armor => true, :always_trust => true})
|
||
end
|
||
|
||
def pgp_sign(signers_id, &passphrase_callback)
|
||
signers_id = [signers_id] unless signers_id.is_a? Array
|
||
signers = signers_id.collect{|key_id| gpg_key(key_id, true) }
|
||
# we don't use GPGME.sign(), because we need to get operation information to get the hash_algo and compute the micalg parameter
|
||
gpg = GPGME::Ctx.new({:signers => signers, :passphrase_callback => method(:gpg_passphrase_callback_wrapper),
|
||
:passphrase_callback_value => passphrase_callback, :armor => true})
|
||
gpg.add_signer(*signers)
|
||
sig_data = GPGME::Data.new
|
||
gpg.sign(GPGME::Data.new(self.to_rfc3156), sig_data, GPGME::SIG_MODE_NORMAL)
|
||
hash_algo = GPGME.gpgme_op_sign_result(gpg).signatures.first.hash_algo
|
||
micalg = "pgp-" + GPGME.gpgme_hash_algo_name(hash_algo).downcase
|
||
sig_data.seek(0, IO::SEEK_SET)
|
||
{:signature => sig_data.read, :micalg => micalg}
|
||
end
|
||
|
||
def to_rfc3156
|
||
# using RAW part, without any decoding
|
||
# remove last EOL due to MIME protocol and properly convert all EOL to CRLF
|
||
raw.chomp.gsub(/\r?\n/, "\r\n")
|
||
end
|
||
|
||
def create_encrypted(crypters_id)
|
||
clear_data = build_intermediate_mail()
|
||
encrypted_data = clear_data.pgp_crypt(crypters_id)
|
||
|
||
# build properly encrypted mail
|
||
# (preserving headers from original mail)
|
||
mail = TMail::Mail.new
|
||
mail.copy_headers_from(self)
|
||
mail.set_content_type("multipart", "encrypted", {'boundary' => TMail.new_boundary, "protocol" => "application/pgp-encrypted"})
|
||
mail.transfer_encoding = "7bit"
|
||
mail['content-disposition'] = nil
|
||
mail.body = "This mail is a RFC3156 encrypted message.\n"
|
||
mail.parts.clear
|
||
|
||
# cryptographic info
|
||
p_pgp = TMail::Mail.new
|
||
p_pgp.set_content_type("application", "pgp-encrypted")
|
||
p_pgp.transfer_encoding = "7bit"
|
||
p_pgp.content_disposition = "inline"
|
||
p_pgp.body = "Version: 1\n"
|
||
mail.parts << p_pgp
|
||
|
||
# encrypted message
|
||
p_encrypted = TMail::Mail.new
|
||
p_encrypted.set_content_type("application", "octet-stream")
|
||
p_encrypted.transfer_encoding = "7bit"
|
||
p_encrypted.content_disposition = "inline"
|
||
p_encrypted.body = encrypted_data
|
||
mail.parts << p_encrypted
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
mail.write_back
|
||
|
||
mail
|
||
end
|
||
|
||
def create_signed(signers_id)
|
||
data = build_intermediate_mail()
|
||
sign_data = data.pgp_sign(signers_id) do |uid_hint, passphrase_info, prev_was_bad|
|
||
yield(uid_hint, passphrase_info, prev_was_bad)
|
||
end
|
||
|
||
# build properly signed mail
|
||
# (preserving headers from original mail)
|
||
mail = TMail::Mail.new
|
||
mail.copy_headers_from(self)
|
||
mail.set_content_type("multipart", "signed", {'boundary' => TMail.new_boundary, 'protocol' => "application/pgp-signature", 'micalg' => sign_data[:micalg]})
|
||
mail.transfer_encoding = "7bit"
|
||
mail['content-disposition'] = nil
|
||
mail.body = "This mail is a RFC3156 signed message.\n"
|
||
mail.parts.clear
|
||
|
||
# signed message
|
||
p_signed = data
|
||
mail.parts << p_signed
|
||
|
||
# signature
|
||
p_signature = TMail::Mail.new
|
||
p_signature.set_content_type("application", "pgp-signature")
|
||
p_signature.transfer_encoding = "7bit"
|
||
p_signature.content_disposition = "inline"
|
||
p_signature.body = sign_data[:signature]
|
||
mail.parts << p_signature
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
mail.write_back
|
||
|
||
mail
|
||
end
|
||
|
||
def create_signed_and_encrypted(signers_id, crypters_id)
|
||
create_signed(signers_id) do |uid_hint, passphrase_info, prev_was_bad|
|
||
yield(uid_hint, passphrase_info, prev_was_bad)
|
||
end.create_encrypted(crypters_id)
|
||
end
|
||
|
||
def copy_headers_from(mail)
|
||
mail.header.keys.each do |h|
|
||
self[h] = mail[h].to_s
|
||
end
|
||
end
|
||
|
||
protected
|
||
|
||
def build_intermediate_mail
|
||
# build a fake mail to get the generated content to be crypted/signed
|
||
fake_mail = TMail::Mail.new
|
||
fake_mail['content-type'] = self['content-type'].to_s
|
||
fake_mail.transfer_encoding = self.transfer_encoding if self.transfer_encoding
|
||
fake_mail.content_disposition = self.content_disposition if self.content_disposition
|
||
if self.multipart?
|
||
self.each_part {|p| fake_mail.parts << p }
|
||
else
|
||
fake_mail.body = self.body
|
||
end
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
fake_mail.write_back
|
||
|
||
fake_mail
|
||
end
|
||
|
||
def gpg_key(fingerprint, secret = false)
|
||
gpg = GPGME::Ctx.new
|
||
gpg.get_key(fingerprint, secret)
|
||
end
|
||
|
||
def gpg_passphrase_callback_wrapper(hook, uid_hint, passphrase_info, prev_was_bad, fd)
|
||
io = IO.for_fd(fd, 'w')
|
||
io.puts hook.call(uid_hint, passphrase_info, prev_was_bad)
|
||
io.flush
|
||
end
|
||
|
||
def raw
|
||
@port.read_all
|
||
end
|
||
end
|
||
end
|
lib/tmail_gpg.rb | ||
---|---|---|
require 'gpgme' # >= 1.0.2 needed for :always_trust sign option
|
||
|
||
# Attempt to handle PGP/GPG features in a RFC3156-compliant way
|
||
#
|
||
# notes: These methods have been designed to be able to sign, crypt,
|
||
# or sign+crypt with RFC 1847 Encapsulation. There is no
|
||
# support (yet?) for the combined method described in chapter
|
||
# 6.2 of RFC3156.
|
||
module TMail
|
||
class Mail
|
||
def is_pgp_signed?
|
||
content_type == "multipart/signed" and
|
||
header['content-type'].params['protocol'] == "application/pgp-signature" and
|
||
parts.size == 2 and
|
||
parts[1].content_type == "application/pgp-signature"
|
||
end
|
||
|
||
def pgp_signature
|
||
return nil unless is_pgp_signed?
|
||
parts[1].decoded
|
||
end
|
||
|
||
def pgp_signed_part
|
||
return nil unless is_pgp_signed?
|
||
parts[0]
|
||
end
|
||
|
||
def verify_pgp_signature
|
||
return nil unless is_pgp_signed?
|
||
|
||
content = parts[0].to_rfc3156
|
||
sig = pgp_signature()
|
||
|
||
sigs_check = nil
|
||
GPGME.verify(sig, content) do |signature|
|
||
sigs_check ||= []
|
||
sigs_check << signature
|
||
end
|
||
|
||
sigs_check
|
||
end
|
||
|
||
def is_pgp_encrypted?
|
||
content_type == "multipart/encrypted" and
|
||
header['content-type'].params['protocol'] == "application/pgp-encrypted" and
|
||
parts.size == 2 and
|
||
parts[0].content_type == "application/pgp-encrypted" and
|
||
parts[1].content_type == "application/octet-stream"
|
||
end
|
||
|
||
def pgp_crypt_info
|
||
return nil unless is_pgp_encrypted?
|
||
a = parts[0].body.split("\n").collect{|l| l.chomp.split(": ") if l =~ /: / }.compact.flatten
|
||
Hash[*a]
|
||
end
|
||
|
||
def pgp_encrypted_part
|
||
return nil unless is_pgp_encrypted?
|
||
parts[1].body
|
||
end
|
||
|
||
def pgp_decrypt(&passphrase_callback)
|
||
return nil unless is_pgp_encrypted?
|
||
protocol_version = pgp_crypt_info()["Version"].to_i
|
||
raise NotImplementedError, "pgp-encrypted protocol version #{protocol_version} is not implemented" unless protocol_version == 1
|
||
|
||
encrypted_data = pgp_encrypted_part()
|
||
GPGME.decrypt(encrypted_data, {:passphrase_callback => method(:gpg_passphrase_callback_wrapper),
|
||
:passphrase_callback_value => passphrase_callback, :textmode => true})
|
||
end
|
||
|
||
def pgp_crypt(crypters_id)
|
||
crypters_id = [crypters_id] unless crypters_id.is_a? Array
|
||
crypters = crypters_id.collect{|key_id| gpg_key(key_id, false) }
|
||
GPGME.encrypt(crypters, self.to_s, {:armor => true, :always_trust => true})
|
||
end
|
||
|
||
def pgp_sign(signers_id, &passphrase_callback)
|
||
signers_id = [signers_id] unless signers_id.is_a? Array
|
||
signers = signers_id.collect{|key_id| gpg_key(key_id, true) }
|
||
# we don't use GPGME.sign(), because we need to get operation information to get the hash_algo and compute the micalg parameter
|
||
gpg = GPGME::Ctx.new({:signers => signers, :passphrase_callback => method(:gpg_passphrase_callback_wrapper),
|
||
:passphrase_callback_value => passphrase_callback, :armor => true})
|
||
gpg.add_signer(*signers)
|
||
sig_data = GPGME::Data.new
|
||
gpg.sign(GPGME::Data.new(self.to_rfc3156), sig_data, GPGME::SIG_MODE_NORMAL)
|
||
hash_algo = GPGME.gpgme_op_sign_result(gpg).signatures.first.hash_algo
|
||
micalg = "pgp-" + GPGME.gpgme_hash_algo_name(hash_algo).downcase
|
||
sig_data.seek(0, IO::SEEK_SET)
|
||
{:signature => sig_data.read, :micalg => micalg}
|
||
end
|
||
|
||
def to_rfc3156
|
||
# using RAW part, without any decoding
|
||
# remove last EOL due to MIME protocol and properly convert all EOL to CRLF
|
||
raw.chomp.gsub(/\r?\n/, "\r\n")
|
||
end
|
||
|
||
def create_encrypted(crypters_id)
|
||
clear_data = build_intermediate_mail()
|
||
encrypted_data = clear_data.pgp_crypt(crypters_id)
|
||
|
||
# build properly encrypted mail
|
||
# (preserving headers from original mail)
|
||
mail = TMail::Mail.new
|
||
mail.copy_headers_from(self)
|
||
mail.set_content_type("multipart", "encrypted", {'boundary' => TMail.new_boundary, "protocol" => "application/pgp-encrypted"})
|
||
mail.transfer_encoding = "7bit"
|
||
mail['content-disposition'] = nil
|
||
mail.body = "This mail is a RFC3156 encrypted message.\n"
|
||
mail.parts.clear
|
||
|
||
# cryptographic info
|
||
p_pgp = TMail::Mail.new
|
||
p_pgp.set_content_type("application", "pgp-encrypted")
|
||
p_pgp.transfer_encoding = "7bit"
|
||
p_pgp.content_disposition = "inline"
|
||
p_pgp.body = "Version: 1\n"
|
||
mail.parts << p_pgp
|
||
|
||
# encrypted message
|
||
p_encrypted = TMail::Mail.new
|
||
p_encrypted.set_content_type("application", "octet-stream")
|
||
p_encrypted.transfer_encoding = "7bit"
|
||
p_encrypted.content_disposition = "inline"
|
||
p_encrypted.body = encrypted_data
|
||
mail.parts << p_encrypted
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
mail.write_back
|
||
|
||
mail
|
||
end
|
||
|
||
def create_signed(signers_id)
|
||
data = build_intermediate_mail()
|
||
sign_data = data.pgp_sign(signers_id) do |uid_hint, passphrase_info, prev_was_bad|
|
||
yield(uid_hint, passphrase_info, prev_was_bad)
|
||
end
|
||
|
||
# build properly signed mail
|
||
# (preserving headers from original mail)
|
||
mail = TMail::Mail.new
|
||
mail.copy_headers_from(self)
|
||
mail.set_content_type("multipart", "signed", {'boundary' => TMail.new_boundary, 'protocol' => "application/pgp-signature", 'micalg' => sign_data[:micalg]})
|
||
mail.transfer_encoding = "7bit"
|
||
mail['content-disposition'] = nil
|
||
mail.body = "This mail is a RFC3156 signed message.\n"
|
||
mail.parts.clear
|
||
|
||
# signed message
|
||
p_signed = data
|
||
mail.parts << p_signed
|
||
|
||
# signature
|
||
p_signature = TMail::Mail.new
|
||
p_signature.set_content_type("application", "pgp-signature")
|
||
p_signature.transfer_encoding = "7bit"
|
||
p_signature.content_disposition = "inline"
|
||
p_signature.body = sign_data[:signature]
|
||
mail.parts << p_signature
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
mail.write_back
|
||
|
||
mail
|
||
end
|
||
|
||
def create_signed_and_encrypted(signers_id, crypters_id)
|
||
create_signed(signers_id) do |uid_hint, passphrase_info, prev_was_bad|
|
||
yield(uid_hint, passphrase_info, prev_was_bad)
|
||
end.create_encrypted(crypters_id)
|
||
end
|
||
|
||
def copy_headers_from(mail)
|
||
mail.header.keys.each do |h|
|
||
self[h] = mail[h].to_s
|
||
end
|
||
end
|
||
|
||
protected
|
||
|
||
def build_intermediate_mail
|
||
# build a fake mail to get the generated content to be crypted/signed
|
||
fake_mail = TMail::Mail.new
|
||
fake_mail['content-type'] = self['content-type'].to_s
|
||
fake_mail.transfer_encoding = self.transfer_encoding if self.transfer_encoding
|
||
fake_mail.content_disposition = self.content_disposition if self.content_disposition
|
||
if self.multipart?
|
||
self.each_part {|p| fake_mail.parts << p }
|
||
else
|
||
fake_mail.body = self.body
|
||
end
|
||
|
||
# store the calculated content, to be able to use parsing methods
|
||
fake_mail.write_back
|
||
|
||
fake_mail
|
||
end
|
||
|
||
def gpg_key(fingerprint, secret = false)
|
||
gpg = GPGME::Ctx.new
|
||
gpg.get_key(fingerprint, secret)
|
||
end
|
||
|
||
def gpg_passphrase_callback_wrapper(hook, uid_hint, passphrase_info, prev_was_bad, fd)
|
||
io = IO.for_fd(fd, 'w')
|
||
io.puts hook.call(uid_hint, passphrase_info, prev_was_bad)
|
||
io.flush
|
||
end
|
||
|
||
def raw
|
||
@port.read_all
|
||
end
|
||
end
|
||
end
|
Also available in: Unified diff
[evol] check content-type protocol parameter when validating pgp mail content, and renamed tmail_extra library into tmail_gpg (as it is dedicated to gpg mails only)