From 9c61c2882fd1e3617dc6c7617a2da896596a4371 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 6 Apr 2012 15:13:04 -0400 Subject: [PATCH] Add support for XEP-0027 --- LICENSE | 29 + sleekxmpp/plugins/__init__.py | 1 + sleekxmpp/plugins/xep_0027/__init__.py | 15 + sleekxmpp/plugins/xep_0027/gpg.py | 161 ++++ sleekxmpp/plugins/xep_0027/stanza.py | 56 ++ sleekxmpp/thirdparty/__init__.py | 5 + sleekxmpp/thirdparty/gnupg.py | 1017 ++++++++++++++++++++++++ 7 files changed, 1284 insertions(+) create mode 100644 sleekxmpp/plugins/xep_0027/__init__.py create mode 100644 sleekxmpp/plugins/xep_0027/gpg.py create mode 100644 sleekxmpp/plugins/xep_0027/stanza.py create mode 100644 sleekxmpp/thirdparty/gnupg.py diff --git a/LICENSE b/LICENSE index 612dd1d7..a868b337 100644 --- a/LICENSE +++ b/LICENSE @@ -138,3 +138,32 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +python-gnupg: A Python wrapper for the GNU Privacy Guard +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Copyright (c) 2008-2012 by Vinay Sajip. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * The name(s) of the copyright holder(s) may not be used to endorse or + promote products derived from this software without specific prior + written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) "AS IS" AND ANY EXPRESS OR +IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL THE COPYRIGHT HOLDER(S) BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/sleekxmpp/plugins/__init__.py b/sleekxmpp/plugins/__init__.py index c374f27b..6913a49a 100644 --- a/sleekxmpp/plugins/__init__.py +++ b/sleekxmpp/plugins/__init__.py @@ -18,6 +18,7 @@ __all__ = [ 'xep_0004', # Data Forms 'xep_0009', # Jabber-RPC 'xep_0012', # Last Activity + 'xep_0027', # Current Jabber OpenPGP Usage 'xep_0030', # Service Discovery 'xep_0033', # Extended Stanza Addresses 'xep_0045', # Multi-User Chat (Client) diff --git a/sleekxmpp/plugins/xep_0027/__init__.py b/sleekxmpp/plugins/xep_0027/__init__.py new file mode 100644 index 00000000..b6ed9676 --- /dev/null +++ b/sleekxmpp/plugins/xep_0027/__init__.py @@ -0,0 +1,15 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0027.stanza import Signed, Encrypted +from sleekxmpp.plugins.xep_0027.gpg import XEP_0027 + + +register_plugin(XEP_0027) diff --git a/sleekxmpp/plugins/xep_0027/gpg.py b/sleekxmpp/plugins/xep_0027/gpg.py new file mode 100644 index 00000000..ccc7c400 --- /dev/null +++ b/sleekxmpp/plugins/xep_0027/gpg.py @@ -0,0 +1,161 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.thirdparty import GPG + +from sleekxmpp.stanza import Presence, Message +from sleekxmpp.plugins.base import BasePlugin, register_plugin +from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.xmlstream.matcher import StanzaPath +from sleekxmpp.plugins.xep_0027 import stanza, Signed, Encrypted + + +def _extract_data(data, kind): + stripped = [] + begin_headers = False + begin_data = False + for line in data.split('\n'): + if not begin_headers and 'BEGIN PGP %s' % kind in line: + begin_headers = True + continue + if begin_headers and line == '': + begin_data = True + continue + if 'END PGP %s' % kind in line: + return '\n'.join(stripped) + if begin_data: + stripped.append(line) + return '' + + +class XEP_0027(BasePlugin): + + name = 'xep_0027' + description = 'XEP-0027: Current Jabber OpenPGP Usage' + dependencies = set() + stanza = stanza + + def plugin_init(self): + self.gpg_binary = self.config.get('gpg_binary', 'gpg') + self.gpg_home = self.config.get('gpg_home', '') + self.use_agent = self.config.get('use_agent', True) + self.keyring = self.config.get('keyring', None) + self.key_server = self.config.get('key_server', 'pgp.mit.edu') + + self.gpg = GPG(gnupghome=self.gpg_home, + gpgbinary=self.gpg_binary, + use_agent=self.use_agent, + keyring=self.keyring) + + self.xmpp.add_filter('out', self._sign_presence) + + self._keyids = {} + + self.api.register(self._set_keyid, 'set_keyid', default=True) + self.api.register(self._get_keyid, 'get_keyid', default=True) + self.api.register(self._del_keyid, 'del_keyid', default=True) + self.api.register(self._get_keyids, 'get_keyids', default=True) + + register_stanza_plugin(Presence, Signed) + register_stanza_plugin(Message, Encrypted) + + self.xmpp.add_event_handler('unverified_signed_presence', + self._handle_unverified_signed_presence, + threaded=True) + + self.xmpp.register_handler( + Callback('Signed Presence', + StanzaPath('presence/signed'), + self._handle_signed_presence)) + + self.xmpp.register_handler( + Callback('Encrypted Message', + StanzaPath('message/encrypted'), + self._handle_encrypted_message)) + + def _sign_presence(self, stanza): + if isinstance(stanza, Presence): + if stanza['type'] == 'available' or stanza['type'] in Presence.showtypes: + stanza['signed'] = stanza['status'] + return stanza + + def sign(self, data, jid=None): + keyid = self.get_keyid(jid) + if keyid: + signed = self.gpg.sign(data, keyid=keyid) + return _extract_data(signed.data, 'SIGNATURE') + + def encrypt(self, data, jid=None): + keyid = self.get_keyid(jid) + if keyid: + enc = self.gpg.encrypt(data, keyid) + return _extract_data(enc.data, 'MESSAGE') + + def decrypt(self, data, jid=None): + template = '-----BEGIN PGP MESSAGE-----\n' + \ + '\n' + \ + '%s\n' + \ + '-----END PGP MESSAGE-----\n' + dec = self.gpg.decrypt(template % data) + return dec.data + + def verify(self, data, sig, jid=None): + template = '-----BEGIN PGP SIGNED MESSAGE-----\n' + \ + 'Hash: SHA1\n' + \ + '\n' + \ + '%s\n' + \ + '-----BEGIN PGP SIGNATURE-----\n' + \ + '\n' + \ + '%s\n' + \ + '-----END PGP SIGNATURE-----\n' + v = self.gpg.verify(template % (data, sig)) + return v + + def set_keyid(self, jid=None, keyid=None): + self.api['set_keyid'](jid, args=keyid) + + def get_keyid(self, jid=None): + return self.api['get_keyid'](jid) + + def del_keyid(self, jid=None): + self.api['del_keyid'](jid) + + def get_keyids(self): + return self.api['get_keyids']() + + def _handle_signed_presence(self, pres): + self.xmpp.event('unverified_signed_presence', pres) + + def _handle_unverified_signed_presence(self, pres): + verified = self.verify(pres['status'], pres['signed']) + if verified.key_id: + if not self.get_keyid(pres['from']): + known_keyids = [e['keyid'] for e in self.gpg.list_keys()] + if verified.key_id not in known_keyids: + self.gpg.recv_keys(self.key_server, verified.key_id) + self.set_keyid(jid=pres['from'], keyid=verified.key_id) + self.xmpp.event('signed_presence', pres) + + def _handle_encrypted_message(self, msg): + self.xmpp.event('encrypted_message', msg) + + # ================================================================= + + def _set_keyid(self, jid, node, ifrom, keyid): + self._keyids[jid] = keyid + + def _get_keyid(self, jid, node, ifrom, keyid): + return self._keyids.get(jid, None) + + def _del_keyid(self, jid, node, ifrom, keyid): + if jid in self._keyids: + del self._keyids[jid] + + def _get_keyids(self, jid, node, ifrom, data): + return self._keyids diff --git a/sleekxmpp/plugins/xep_0027/stanza.py b/sleekxmpp/plugins/xep_0027/stanza.py new file mode 100644 index 00000000..927693ad --- /dev/null +++ b/sleekxmpp/plugins/xep_0027/stanza.py @@ -0,0 +1,56 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream import ElementBase + + +class Signed(ElementBase): + name = 'x' + namespace = 'jabber:x:signed' + plugin_attrib = 'signed' + interfaces = set(['signed']) + is_extension = True + + def set_signed(self, value): + parent = self.parent() + xmpp = parent.stream + data = xmpp['xep_0027'].sign(value, parent['from']) + if data: + self.xml.text = data + else: + del parent['signed'] + + def get_signed(self): + return self.xml.text + + +class Encrypted(ElementBase): + name = 'x' + namespace = 'jabber:x:encrypted' + plugin_attrib = 'encrypted' + interfaces = set(['encrypted']) + is_extension = True + + def set_encrypted(self, value): + parent = self.parent() + xmpp = parent.stream + data = xmpp['xep_0027'].encrypt(value, parent['to'].bare) + if data: + self.xml.text = data + else: + del parent['encrypted'] + + def get_encrypted(self): + parent = self.parent() + xmpp = parent.stream + if self.xml.text: + return xmpp['xep_0027'].decrypt(self.xml.text, parent['to']) + return None + + + diff --git a/sleekxmpp/thirdparty/__init__.py b/sleekxmpp/thirdparty/__init__.py index 1c7bf651..b9c82a7f 100644 --- a/sleekxmpp/thirdparty/__init__.py +++ b/sleekxmpp/thirdparty/__init__.py @@ -3,5 +3,10 @@ try: except: from sleekxmpp.thirdparty.ordereddict import OrderedDict +try: + from gnupg import GPG +except: + from sleekxmpp.thirdparty.gnupg import GPG + from sleekxmpp.thirdparty import suelta from sleekxmpp.thirdparty.mini_dateutil import tzutc, tzoffset, parse_iso diff --git a/sleekxmpp/thirdparty/gnupg.py b/sleekxmpp/thirdparty/gnupg.py new file mode 100644 index 00000000..a89289fd --- /dev/null +++ b/sleekxmpp/thirdparty/gnupg.py @@ -0,0 +1,1017 @@ +""" A wrapper for the 'gpg' command:: + +Portions of this module are derived from A.M. Kuchling's well-designed +GPG.py, using Richard Jones' updated version 1.3, which can be found +in the pycrypto CVS repository on Sourceforge: + +http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py + +This module is *not* forward-compatible with amk's; some of the +old interface has changed. For instance, since I've added decrypt +functionality, I elected to initialize with a 'gnupghome' argument +instead of 'keyring', so that gpg can find both the public and secret +keyrings. I've also altered some of the returned objects in order for +the caller to not have to know as much about the internals of the +result classes. + +While the rest of ISconf is released under the GPL, I am releasing +this single file under the same terms that A.M. Kuchling used for +pycrypto. + +Steve Traugott, stevegt@terraluna.org +Thu Jun 23 21:27:20 PDT 2005 + +This version of the module has been modified from Steve Traugott's version +(see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by +Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() +and so does not work on Windows). Renamed to gnupg.py to avoid confusion with +the previous versions. + +Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved. + +A unittest harness (test_gnupg.py) has also been added. +""" +import locale + +__version__ = "0.2.9" +__author__ = "Vinay Sajip" +__date__ = "$29-Mar-2012 21:12:58$" + +try: + from io import StringIO +except ImportError: + from cStringIO import StringIO + +import codecs +import locale +import logging +import os +import socket +from subprocess import Popen +from subprocess import PIPE +import sys +import threading + +try: + import logging.NullHandler as NullHandler +except ImportError: + class NullHandler(logging.Handler): + def handle(self, record): + pass +try: + unicode + _py3k = False +except NameError: + _py3k = True + +logger = logging.getLogger(__name__) +if not logger.handlers: + logger.addHandler(NullHandler()) + +def _copy_data(instream, outstream): + # Copy one stream to another + sent = 0 + if hasattr(sys.stdin, 'encoding'): + enc = sys.stdin.encoding + else: + enc = 'ascii' + while True: + data = instream.read(1024) + if len(data) == 0: + break + sent += len(data) + logger.debug("sending chunk (%d): %r", sent, data[:256]) + try: + outstream.write(data) + except UnicodeError: + outstream.write(data.encode(enc)) + except: + # Can sometimes get 'broken pipe' errors even when the data has all + # been sent + logger.exception('Error sending data') + break + try: + outstream.close() + except IOError: + logger.warning('Exception occurred while closing: ignored', exc_info=1) + logger.debug("closed output, %d bytes sent", sent) + +def _threaded_copy_data(instream, outstream): + wr = threading.Thread(target=_copy_data, args=(instream, outstream)) + wr.setDaemon(True) + logger.debug('data copier: %r, %r, %r', wr, instream, outstream) + wr.start() + return wr + +def _write_passphrase(stream, passphrase, encoding): + passphrase = '%s\n' % passphrase + passphrase = passphrase.encode(encoding) + stream.write(passphrase) + logger.debug("Wrote passphrase: %r", passphrase) + +def _is_sequence(instance): + return isinstance(instance,list) or isinstance(instance,tuple) + +def _make_binary_stream(s, encoding): + try: + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) + from io import BytesIO + rv = BytesIO(s) + except ImportError: + rv = StringIO(s) + return rv + +class Verify(object): + "Handle status messages for --verify" + + def __init__(self, gpg): + self.gpg = gpg + self.valid = False + self.fingerprint = self.creation_date = self.timestamp = None + self.signature_id = self.key_id = None + self.username = None + + def __nonzero__(self): + return self.valid + + __bool__ = __nonzero__ + + def handle_status(self, key, value): + if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL", + "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA", + "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", + "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY", "IMPORTED"): + pass + elif key == "BADSIG": + self.valid = False + self.status = 'signature bad' + self.key_id, self.username = value.split(None, 1) + elif key == "GOODSIG": + self.valid = True + self.status = 'signature good' + self.key_id, self.username = value.split(None, 1) + elif key == "VALIDSIG": + (self.fingerprint, + self.creation_date, + self.sig_timestamp, + self.expire_timestamp) = value.split()[:4] + # may be different if signature is made with a subkey + self.pubkey_fingerprint = value.split()[-1] + self.status = 'signature valid' + elif key == "SIG_ID": + (self.signature_id, + self.creation_date, self.timestamp) = value.split() + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "DECRYPTION_FAILED": + self.valid = False + self.key_id = value + self.status = 'decryption failed' + elif key == "NO_PUBKEY": + self.valid = False + self.key_id = value + self.status = 'no public key' + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + # these are useless in verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures with expired key, + # the relevant flag is EXPKEYSIG. + pass + elif key in ("EXPKEYSIG", "REVKEYSIG"): + # signed with expired or revoked key + self.valid = False + self.key_id = value.split()[0] + self.status = (('%s %s') % (key[:3], key[3:])).lower() + else: + raise ValueError("Unknown status message: %r" % key) + +class ImportResult(object): + "Handle status messages for --import" + + counts = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported + sec_dups not_imported'''.split() + def __init__(self, gpg): + self.gpg = gpg + self.imported = [] + self.results = [] + self.fingerprints = [] + for result in self.counts: + setattr(self, result, None) + + def __nonzero__(self): + if self.not_imported: return False + if not self.fingerprints: return False + return True + + __bool__ = __nonzero__ + + ok_reason = { + '0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + } + + problem_reason = { + '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', + } + + def handle_status(self, key, value): + if key == "IMPORTED": + # this duplicates info we already see in import_ok & import_problem + pass + elif key == "NODATA": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'No valid data found'}) + elif key == "IMPORT_OK": + reason, fingerprint = value.split() + reasons = [] + for code, text in list(self.ok_reason.items()): + if int(reason) | int(code) == int(reason): + reasons.append(text) + reasontext = '\n'.join(reasons) + "\n" + self.results.append({'fingerprint': fingerprint, + 'ok': reason, 'text': reasontext}) + self.fingerprints.append(fingerprint) + elif key == "IMPORT_PROBLEM": + try: + reason, fingerprint = value.split() + except: + reason = value + fingerprint = '' + self.results.append({'fingerprint': fingerprint, + 'problem': reason, 'text': self.problem_reason[reason]}) + elif key == "IMPORT_RES": + import_res = value.split() + for i in range(len(self.counts)): + setattr(self, self.counts[i], int(import_res[i])) + elif key == "KEYEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Key expired'}) + elif key == "SIGEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Signature expired'}) + else: + raise ValueError("Unknown status message: %r" % key) + + def summary(self): + l = [] + l.append('%d imported'%self.imported) + if self.not_imported: + l.append('%d not imported'%self.not_imported) + return ', '.join(l) + +class ListKeys(list): + ''' Handle status messages for --list-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about (info from src/DETAILS): + + crt = X.509 certificate + crs = X.509 certificate and private key available + sub = subkey (secondary key) + ssb = secret subkey (secondary key) + uat = user attribute (same as user id except for field 10). + sig = signature + rev = revocation signature + pkd = public key data (special field format, see below) + grp = reserved for gpgsm + rvk = revocation key + ''' + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def key(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [] + if self.curkey['uid']: + self.curkey['uids'].append(self.curkey['uid']) + del self.curkey['uid'] + self.append(self.curkey) + + pub = sec = key + + def fpr(self, args): + self.curkey['fingerprint'] = args[9] + self.fingerprints.append(args[9]) + + def uid(self, args): + self.curkey['uids'].append(args[9]) + self.uids.append(args[9]) + + def handle_status(self, key, value): + pass + +class Crypt(Verify): + "Handle status messages for --encrypt and --decrypt" + def __init__(self, gpg): + Verify.__init__(self, gpg) + self.data = '' + self.ok = False + self.status = '' + + def __nonzero__(self): + if self.ok: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "CARDCTRL"): + # in the case of ERROR, this is because a more specific error + # message will have come first + pass + elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", + "MISSING_PASSPHRASE", "DECRYPTION_FAILED", + "KEY_NOT_CREATED"): + self.status = key.replace("_", " ").lower() + elif key == "NEED_PASSPHRASE_SYM": + self.status = 'need symmetric passphrase' + elif key == "BEGIN_DECRYPTION": + self.status = 'decryption incomplete' + elif key == "BEGIN_ENCRYPTION": + self.status = 'encryption incomplete' + elif key == "DECRYPTION_OKAY": + self.status = 'decryption ok' + self.ok = True + elif key == "END_ENCRYPTION": + self.status = 'encryption ok' + self.ok = True + elif key == "INV_RECP": + self.status = 'invalid recipient' + elif key == "KEYEXPIRED": + self.status = 'key expired' + elif key == "SIG_CREATED": + self.status = 'sig created' + elif key == "SIGEXPIRED": + self.status = 'sig expired' + else: + Verify.handle_status(self, key, value) + +class GenKey(object): + "Handle status messages for --gen-key" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + if self.fingerprint: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.fingerprint or '' + + def handle_status(self, key, value): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"): + pass + elif key == "KEY_CREATED": + (self.type,self.fingerprint) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + +class DeleteResult(object): + "Handle status messages for --delete-key and --delete-secret-key" + def __init__(self, gpg): + self.gpg = gpg + self.status = 'ok' + + def __str__(self): + return self.status + + problem_reason = { + '1': 'No such key', + '2': 'Must delete secret key first', + '3': 'Ambigious specification', + } + + def handle_status(self, key, value): + if key == "DELETE_PROBLEM": + self.status = self.problem_reason.get(value, + "Unknown error: %r" % value) + else: + raise ValueError("Unknown status message: %r" % key) + +class Sign(object): + "Handle status messages for --sign" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + return self.fingerprint is not None + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL"): + pass + elif key == "SIG_CREATED": + (self.type, + algo, hashalgo, cls, + self.timestamp, self.fingerprint + ) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + + +class GPG(object): + + decode_errors = 'strict' + + result_map = { + 'crypt': Crypt, + 'delete': DeleteResult, + 'generate': GenKey, + 'import': ImportResult, + 'list': ListKeys, + 'sign': Sign, + 'verify': Verify, + } + + "Encapsulate access to the gpg executable" + def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, + use_agent=False, keyring=None): + """Initialize a GPG process wrapper. Options are: + + gpgbinary -- full pathname for GPG binary. + + gnupghome -- full pathname to where we can find the public and + private keyrings. Default is whatever gpg defaults to. + keyring -- name of alternative keyring file to use. If specified, + the default keyring is not used. + """ + self.gpgbinary = gpgbinary + self.gnupghome = gnupghome + self.keyring = keyring + self.verbose = verbose + self.use_agent = use_agent + self.encoding = locale.getpreferredencoding() + if self.encoding is None: # This happens on Jython! + self.encoding = sys.stdin.encoding + if gnupghome and not os.path.isdir(self.gnupghome): + os.makedirs(self.gnupghome,0x1C0) + p = self._open_subprocess(["--version"]) + result = self.result_map['verify'](self) # any result will do for this + self._collect_output(p, result, stdin=p.stdin) + if p.returncode != 0: + raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, + result.stderr)) + + def _open_subprocess(self, args, passphrase=False): + # Internal method: open a pipe to a GPG subprocess and return + # the file objects for communicating with it. + cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] + if self.gnupghome: + cmd.append('--homedir "%s" ' % self.gnupghome) + if self.keyring: + cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring) + if passphrase: + cmd.append('--batch --passphrase-fd 0') + if self.use_agent: + cmd.append('--use-agent') + cmd.extend(args) + cmd = ' '.join(cmd) + if self.verbose: + print(cmd) + logger.debug("%s", cmd) + return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + def _read_response(self, stream, result): + # Internal method: reads all the stderr output from GPG, taking notice + # only of lines that begin with the magic [GNUPG:] prefix. + # + # Calls methods on the response object for each valid token found, + # with the arg being the remainder of the status line. + lines = [] + while True: + line = stream.readline() + if len(line) == 0: + break + lines.append(line) + line = line.rstrip() + if self.verbose: + print(line) + logger.debug("%s", line) + if line[0:9] == '[GNUPG:] ': + # Chop off the prefix + line = line[9:] + L = line.split(None, 1) + keyword = L[0] + if len(L) > 1: + value = L[1] + else: + value = "" + result.handle_status(keyword, value) + result.stderr = ''.join(lines) + + def _read_data(self, stream, result): + # Read the contents of the file from GPG's stdout + chunks = [] + while True: + data = stream.read(1024) + if len(data) == 0: + break + logger.debug("chunk: %r" % data[:256]) + chunks.append(data) + if _py3k: + # Join using b'' or '', as appropriate + result.data = type(data)().join(chunks) + else: + result.data = ''.join(chunks) + + def _collect_output(self, process, result, writer=None, stdin=None): + """ + Drain the subprocesses output streams, writing the collected output + to the result. If a writer thread (writing to the subprocess) is given, + make sure it's joined before returning. If a stdin stream is given, + close it before returning. + """ + stderr = codecs.getreader(self.encoding)(process.stderr) + rr = threading.Thread(target=self._read_response, args=(stderr, result)) + rr.setDaemon(True) + logger.debug('stderr reader: %r', rr) + rr.start() + + stdout = process.stdout + dr = threading.Thread(target=self._read_data, args=(stdout, result)) + dr.setDaemon(True) + logger.debug('stdout reader: %r', dr) + dr.start() + + dr.join() + rr.join() + if writer is not None: + writer.join() + process.wait() + if stdin is not None: + try: + stdin.close() + except IOError: + pass + stderr.close() + stdout.close() + + def _handle_io(self, args, file, result, passphrase=None, binary=False): + "Handle a call to GPG - pass input data, collect output data" + # Handle a basic data call - pass data to GPG, handle the output + # including status information. Garbage In, Garbage Out :) + p = self._open_subprocess(args, passphrase is not None) + if not binary: + stdin = codecs.getwriter(self.encoding)(p.stdin) + else: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + self._collect_output(p, result, writer, stdin) + return result + + # + # SIGNATURE METHODS + # + def sign(self, message, **kwargs): + """sign message""" + f = _make_binary_stream(message, self.encoding) + result = self.sign_file(f, **kwargs) + f.close() + return result + + def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, + detach=False, binary=False): + """sign file""" + logger.debug("sign_file: %s", file) + if binary: + args = ['-s'] + else: + args = ['-sa'] + # You can't specify detach-sign and clearsign together: gpg ignores + # the detach-sign in that case. + if detach: + args.append("--detach-sign") + elif clearsign: + args.append("--clearsign") + if keyid: + args.append('--default-key "%s"' % keyid) + args.extend(['--no-version', "--comment ''"]) + result = self.result_map['sign'](self) + #We could use _handle_io here except for the fact that if the + #passphrase is bad, gpg bails and you can't write the message. + p = self._open_subprocess(args, passphrase is not None) + try: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + except IOError: + logging.exception("error writing message") + writer = None + self._collect_output(p, result, writer, stdin) + return result + + def verify(self, data): + """Verify the signature on the contents of the string 'data' + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(Passphrase='foo') + >>> key = gpg.gen_key(input) + >>> assert key + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') + >>> assert not sig + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') + >>> assert sig + >>> verify = gpg.verify(sig.data) + >>> assert verify + + """ + f = _make_binary_stream(data, self.encoding) + result = self.verify_file(f) + f.close() + return result + + def verify_file(self, file, data_filename=None): + "Verify the signature on the contents of the file-like object 'file'" + logger.debug('verify_file: %r, %r', file, data_filename) + result = self.result_map['verify'](self) + args = ['--verify'] + if data_filename is None: + self._handle_io(args, file, result, binary=True) + else: + logger.debug('Handling detached verification') + import tempfile + fd, fn = tempfile.mkstemp(prefix='pygpg') + s = file.read() + file.close() + logger.debug('Wrote to temp file: %r', s) + os.write(fd, s) + os.close(fd) + args.append(fn) + args.append('"%s"' % data_filename) + try: + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + finally: + os.unlink(fn) + return result + + # + # KEY MANAGEMENT + # + + def import_keys(self, key_data): + """ import the key_data into our keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkey1 = gpg.export_keys(print1) + >>> seckey1 = gpg.export_keys(print1,secret=True) + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> str(gpg.delete_keys(print1)) + 'Must delete secret key first' + >>> str(gpg.delete_keys(print1,secret=True)) + 'ok' + >>> str(gpg.delete_keys(print1)) + 'ok' + >>> str(gpg.delete_keys("nosuchkey")) + 'No such key' + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert not print1 in seckeys.fingerprints + >>> assert not print1 in pubkeys.fingerprints + >>> result = gpg.import_keys('foo') + >>> assert not result + >>> result = gpg.import_keys(pubkey1) + >>> pubkeys = gpg.list_keys() + >>> seckeys = gpg.list_keys(secret=True) + >>> assert not print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> result = gpg.import_keys(seckey1) + >>> assert result + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + result = self.result_map['import'](self) + logger.debug('import_keys: %r', key_data[:256]) + data = _make_binary_stream(key_data, self.encoding) + self._handle_io(['--import'], data, result, binary=True) + logger.debug('import_keys result: %r', result.__dict__) + data.close() + return result + + def recv_keys(self, keyserver, *keyids): + """Import a key from a keyserver + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA') + >>> assert result + + """ + result = self.result_map['import'](self) + logger.debug('recv_keys: %r', keyids) + data = _make_binary_stream("", self.encoding) + #data = "" + args = ['--keyserver', keyserver, '--recv-keys'] + args.extend(keyids) + self._handle_io(args, data, result, binary=True) + logger.debug('recv_keys result: %r', result.__dict__) + data.close() + return result + + def delete_keys(self, fingerprints, secret=False): + which='key' + if secret: + which='secret-key' + if _is_sequence(fingerprints): + fingerprints = ' '.join(fingerprints) + args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + result = self.result_map['delete'](self) + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + return result + + def export_keys(self, keyids, secret=False): + "export the indicated keys. 'keyid' is anything gpg accepts" + which='' + if secret: + which='-secret-key' + if _is_sequence(keyids): + keyids = ' '.join(['"%s"' % k for k in keyids]) + args = ["--armor --export%s %s" % (which, keyids)] + p = self._open_subprocess(args) + # gpg --export produces no status-fd output; stdout will be + # empty in case of failure + #stdout, stderr = p.communicate() + result = self.result_map['delete'](self) # any result will do + self._collect_output(p, result, stdin=p.stdin) + logger.debug('export_keys result: %r', result.data) + return result.data.decode(self.encoding, self.decode_errors) + + def list_keys(self, secret=False): + """ list the keys currently in the keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkeys = gpg.list_keys() + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + + which='keys' + if secret: + which='secret-keys' + args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,) + args = [args] + p = self._open_subprocess(args) + + # there might be some status thingumy here I should handle... (amk) + # ...nope, unless you care about expired sigs or keys (stevegt) + + # Get the response information + result = self.result_map['list'](self) + self._collect_output(p, result, stdin=p.stdin) + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr'.split() + for line in lines: + if self.verbose: + print(line) + logger.debug("line: %r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + + def gen_key(self, input): + """Generate a key; you might use gen_key_input() to create the + control input. + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> assert result + >>> result = gpg.gen_key('foo') + >>> assert not result + + """ + args = ["--gen-key --batch"] + result = self.result_map['generate'](self) + f = _make_binary_stream(input, self.encoding) + self._handle_io(args, f, result, binary=True) + f.close() + return result + + def gen_key_input(self, **kwargs): + """ + Generate --gen-key input per gpg doc/DETAILS + """ + parms = {} + for key, val in list(kwargs.items()): + key = key.replace('_','-').title() + parms[key] = val + parms.setdefault('Key-Type','RSA') + parms.setdefault('Key-Length',1024) + parms.setdefault('Name-Real', "Autogenerated Key") + parms.setdefault('Name-Comment', "Generated by gnupg.py") + try: + logname = os.environ['LOGNAME'] + except KeyError: + logname = os.environ['USERNAME'] + hostname = socket.gethostname() + parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), + hostname)) + out = "Key-Type: %s\n" % parms.pop('Key-Type') + for key, val in list(parms.items()): + out += "%s: %s\n" % (key, val) + out += "%commit\n" + return out + + # Key-Type: RSA + # Key-Length: 1024 + # Name-Real: ISdlink Server on %s + # Name-Comment: Created by %s + # Name-Email: isdlink@%s + # Expire-Date: 0 + # %commit + # + # + # Key-Type: DSA + # Key-Length: 1024 + # Subkey-Type: ELG-E + # Subkey-Length: 1024 + # Name-Real: Joe Tester + # Name-Comment: with stupid passphrase + # Name-Email: joe@foo.bar + # Expire-Date: 0 + # Passphrase: abc + # %pubring foo.pub + # %secring foo.sec + # %commit + + # + # ENCRYPTION + # + def encrypt_file(self, file, recipients, sign=None, + always_trust=False, passphrase=None, + armor=True, output=None, symmetric=False): + "Encrypt the message read from the file-like object 'file'" + args = ['--no-version', "--comment ''"] + if symmetric: + args.append('--symmetric') + else: + args.append('--encrypt') + if not _is_sequence(recipients): + recipients = (recipients,) + for recipient in recipients: + args.append('--recipient "%s"' % recipient) + if armor: # create ascii-armored output - set to False for binary output + args.append('--armor') + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output "%s"' % output) + if sign: + args.append('--sign --default-key "%s"' % sign) + if always_trust: + args.append("--always-trust") + result = self.result_map['crypt'](self) + self._handle_io(args, file, result, passphrase=passphrase, binary=True) + logger.debug('encrypt result: %r', result.data) + return result + + def encrypt(self, data, recipients, **kwargs): + """Encrypt the message contained in the string 'data' + + >>> import shutil + >>> if os.path.exists("keys"): + ... shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> result = gpg.encrypt("hello",print2) + >>> message = str(result) + >>> assert message != 'hello' + >>> result = gpg.decrypt(message) + >>> assert result + >>> str(result) + 'hello' + >>> result = gpg.encrypt("hello again",print1) + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status == 'need passphrase' + True + >>> result = gpg.decrypt(message,passphrase='bar') + >>> result.status in ('decryption failed', 'bad passphrase') + True + >>> assert not result + >>> result = gpg.decrypt(message,passphrase='foo') + >>> result.status == 'decryption ok' + True + >>> str(result) + 'hello again' + >>> result = gpg.encrypt("signed hello",print2,sign=print1) + >>> result.status == 'need passphrase' + True + >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') + >>> result.status == 'encryption ok' + True + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status == 'decryption ok' + True + >>> assert result.fingerprint == print1 + + """ + data = _make_binary_stream(data, self.encoding) + result = self.encrypt_file(data, recipients, **kwargs) + data.close() + return result + + def decrypt(self, message, **kwargs): + data = _make_binary_stream(message, self.encoding) + result = self.decrypt_file(data, **kwargs) + data.close() + return result + + def decrypt_file(self, file, always_trust=False, passphrase=None, + output=None): + args = ["--decrypt"] + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output "%s"' % output) + if always_trust: + args.append("--always-trust") + result = self.result_map['crypt'](self) + self._handle_io(args, file, result, passphrase, binary=True) + logger.debug('decrypt result: %r', result.data) + return result +