From 0dece5c7e1dfa453ed7d4ca43c95378dd6036ebd Mon Sep 17 00:00:00 2001 From: mathieui Date: Tue, 29 Nov 2016 21:49:20 +0100 Subject: [PATCH] Update the bundled gnupg.py --- plugins/gpg/gnupg.py | 453 +++++++++++++++++++++++++++---------------- 1 file changed, 283 insertions(+), 170 deletions(-) diff --git a/plugins/gpg/gnupg.py b/plugins/gpg/gnupg.py index 8a06156a..1882ad87 100644 --- a/plugins/gpg/gnupg.py +++ b/plugins/gpg/gnupg.py @@ -27,21 +27,22 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() and so does not work on Windows). Renamed to gnupg.py to avoid confusion with the previous versions. -Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved. +Modifications Copyright (C) 2008-2016 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ -__version__ = "0.3.8.dev0" +__version__ = "0.4.0.dev0" __author__ = "Vinay Sajip" -__date__ = "$07-Dec-2014 18:46:17$" +__date__ = "$10-Sep-2016 08:38:35$" try: from io import StringIO -except ImportError: +except ImportError: # pragma: no cover from cStringIO import StringIO import codecs +import locale import logging import os import re @@ -51,6 +52,13 @@ from subprocess import PIPE import sys import threading +STARTUPINFO = None +if os.name == 'nt': # pragma: no cover + try: + from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE + except ImportError: + STARTUPINFO = None + try: import logging.NullHandler as NullHandler except ImportError: @@ -72,7 +80,7 @@ if not logger.handlers: logger.addHandler(NullHandler()) # We use the test below because it works for Jython as well as CPython -if os.path.__name__ == 'ntpath': +if os.path.__name__ == 'ntpath': # pragma: no cover # On Windows, we don't need shell quoting, other than worrying about # paths with spaces in them. def shell_quote(s): @@ -97,7 +105,7 @@ else: command shells :rtype: The passed-in type """ - if not isinstance(s, string_types): + if not isinstance(s, string_types): # pragma: no cover raise TypeError('Expected string type, got %s' % type(s)) if not s: result = "''" @@ -111,8 +119,18 @@ else: # Now that we use shell=False, we shouldn't need to quote arguments. # Use no_quote instead of shell_quote to remind us of where quoting -# was needed. +# was needed. However, note that we still need, on 2.x, to encode any +# Unicode argument with the file system encoding - see Issue #41 and +# Python issue #1759845 ("subprocess.call fails with unicode strings in +# command line"). + +# Allows the encoding used to be overridden in special cases by setting +# this module attribute appropriately. +fsencoding = sys.getfilesystemencoding() + def no_quote(s): + if not _py3k and isinstance(s, text_type): + s = s.encode(fsencoding) return s def _copy_data(instream, outstream): @@ -120,17 +138,23 @@ def _copy_data(instream, outstream): sent = 0 if hasattr(sys.stdin, 'encoding'): enc = sys.stdin.encoding - else: + else: # pragma: no cover enc = 'ascii' while True: - data = instream.read(1024) + # See issue #39: read can fail when e.g. a text stream is provided + # for what is actually a binary file + try: + data = instream.read(1024) + except UnicodeError: + logger.warning('Exception occurred while reading', exc_info=1) + break if not data: break sent += len(data) - logger.debug("sending chunk (%d): %r", sent, data[:256]) + # logger.debug("sending chunk (%d): %r", sent, data[:256]) try: outstream.write(data) - except UnicodeError: + except UnicodeError: # pragma: no cover outstream.write(data.encode(enc)) except: # Can sometimes get 'broken pipe' errors even when the data has all @@ -139,7 +163,7 @@ def _copy_data(instream, outstream): break try: outstream.close() - except IOError: + except IOError: # pragma: no cover logger.warning('Exception occurred while closing: ignored', exc_info=1) logger.debug("closed output, %d bytes sent", sent) @@ -163,7 +187,7 @@ def _make_memory_stream(s): try: from io import BytesIO rv = BytesIO(s) - except ImportError: + except ImportError: # pragma: no cover rv = StringIO(s) return rv @@ -221,20 +245,21 @@ class Verify(object): "DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR", "FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC", "GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA", - "PROGRESS"): + "PROGRESS", "PINENTRY_LAUNCHED", "NEWSIG", + "KEY_CONSIDERED"): pass - elif key == "BADSIG": + elif key == "BADSIG": # pragma: no cover self.valid = False self.status = 'signature bad' self.key_id, self.username = value.split(None, 1) - elif key == "ERRSIG": + elif key == "ERRSIG": # pragma: no cover self.valid = False (self.key_id, algo, hash_algo, cls, self.timestamp) = value.split()[:5] self.status = 'signature error' - elif key == "EXPSIG": + elif key == "EXPSIG": # pragma: no cover self.valid = False self.status = 'signature expired' self.key_id, self.username = value.split(None, 1) @@ -253,21 +278,21 @@ class Verify(object): elif key == "SIG_ID": (self.signature_id, self.creation_date, self.timestamp) = value.split() - elif key == "DECRYPTION_FAILED": + elif key == "DECRYPTION_FAILED": # pragma: no cover self.valid = False self.key_id = value self.status = 'decryption failed' - elif key == "NO_PUBKEY": + elif key == "NO_PUBKEY": # pragma: no cover self.valid = False self.key_id = value self.status = 'no public key' - elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"): + elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"): # pragma: no cover # these are useless in verify, since they are spit out for any # pub/subkeys on the key, not just the one doing the signing. # if we want to check for signatures with expired key, # the relevant flag is EXPKEYSIG or REVKEYSIG. pass - elif key in ("EXPKEYSIG", "REVKEYSIG"): + elif key in ("EXPKEYSIG", "REVKEYSIG"): # pragma: no cover # signed with expired or revoked key self.valid = False self.key_id = value.split()[0] @@ -276,10 +301,15 @@ class Verify(object): else: self.key_status = 'signing key was revoked' self.status = self.key_status - elif key == "UNEXPECTED": + elif key in ("UNEXPECTED", "FAILURE"): # pragma: no cover self.valid = False self.key_id = value - self.status = 'unexpected data' + if key == "UNEXPECTED": + self.status = 'unexpected data' + else: + # N.B. there might be other reasons + if not self.status: + self.status = 'incorrect passphrase' else: raise ValueError("Unknown status message: %r" % key) @@ -322,10 +352,10 @@ class ImportResult(object): } def handle_status(self, key, value): - if key == "IMPORTED": + if key in ("IMPORTED", "KEY_CONSIDERED"): # this duplicates info we already see in import_ok & import_problem pass - elif key == "NODATA": + elif key == "NODATA": # pragma: no cover self.results.append({'fingerprint': None, 'problem': '0', 'text': 'No valid data found'}) elif key == "IMPORT_OK": @@ -338,7 +368,7 @@ class ImportResult(object): self.results.append({'fingerprint': fingerprint, 'ok': reason, 'text': reasontext}) self.fingerprints.append(fingerprint) - elif key == "IMPORT_PROBLEM": + elif key == "IMPORT_PROBLEM": # pragma: no cover try: reason, fingerprint = value.split() except: @@ -350,19 +380,19 @@ class ImportResult(object): import_res = value.split() for i, count in enumerate(self.counts): setattr(self, count, int(import_res[i])) - elif key == "KEYEXPIRED": + elif key == "KEYEXPIRED": # pragma: no cover self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Key expired'}) - elif key == "SIGEXPIRED": + elif key == "SIGEXPIRED": # pragma: no cover self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Signature expired'}) - else: + else: # pragma: no cover raise ValueError("Unknown status message: %r" % key) def summary(self): l = [] l.append('%d imported' % self.imported) - if self.not_imported: + if self.not_imported: # pragma: no cover l.append('%d not imported' % self.not_imported) return ', '.join(l) @@ -403,8 +433,12 @@ class SearchKeys(list): def get_fields(self, args): result = {} for i, var in enumerate(self.FIELDS): - result[var] = args[i] + if i < len(args): + result[var] = args[i] + else: + result[var] = 'unavailable' result['uids'] = [] + result['sigs'] = [] return result def pub(self, args): @@ -419,11 +453,11 @@ class SearchKeys(list): self.curkey['uids'].append(uid) self.uids.append(uid) - def handle_status(self, key, value): + def handle_status(self, key, value): # pragma: no cover pass class ListKeys(SearchKeys): - ''' Handle status messages for --list-keys. + ''' Handle status messages for --list-keys, --list-sigs. Handle pub and uid (relating the latter to the former). @@ -431,7 +465,6 @@ class ListKeys(SearchKeys): crt = X.509 certificate crs = X.509 certificate and private key available - ssb = secret subkey (secondary key) uat = user attribute (same as user id except for field 10). sig = signature rev = revocation signature @@ -441,7 +474,12 @@ class ListKeys(SearchKeys): ''' UID_INDEX = 9 - FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split() + FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid sig'.split() + + def __init__(self, gpg): + super(ListKeys, self).__init__(gpg) + self.in_subkey = False + self.key_map = {} def key(self, args): self.curkey = curkey = self.get_fields(args) @@ -450,17 +488,35 @@ class ListKeys(SearchKeys): del curkey['uid'] curkey['subkeys'] = [] self.append(curkey) + self.in_subkey = False pub = sec = key def fpr(self, args): - self.curkey['fingerprint'] = args[9] - self.fingerprints.append(args[9]) + fp = args[9] + if fp in self.key_map: # pragma: no cover + raise ValueError('Unexpected fingerprint collision: %s' % fp) + if not self.in_subkey: + self.curkey['fingerprint'] = fp + self.fingerprints.append(fp) + self.key_map[fp] = self.curkey + else: + self.curkey['subkeys'][-1].append(fp) + self.key_map[fp] = self.curkey def sub(self, args): - subkey = [args[4], args[11]] + subkey = [args[4], args[11]] # keyid, type self.curkey['subkeys'].append(subkey) + self.in_subkey = True + def ssb(self, args): + subkey = [args[4], None] # keyid, type + self.curkey['subkeys'].append(subkey) + self.in_subkey = True + + def sig(self, args): + # keyid, uid, sigclass + self.curkey['sigs'].append((args[4], args[9], args[10])) class ScanKeys(ListKeys): ''' Handle status messages for --with-fingerprint.''' @@ -470,6 +526,7 @@ class ScanKeys(ListKeys): # use the last value args[-1] instead of args[11] subkey = [args[4], args[-1]] self.curkey['subkeys'].append(subkey) + self.in_subkey = True class TextHandler(object): def _as_text(self): @@ -501,10 +558,12 @@ class Crypt(Verify, TextHandler): def handle_status(self, key, value): if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS", - "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"): + "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS", + "PINENTRY_LAUNCHED", "KEY_CONSIDERED"): # in the case of ERROR, this is because a more specific error # message will have come first - pass + if key == "NODATA": + self.status = "no data was provided" elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "DECRYPTION_FAILED", "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"): @@ -521,13 +580,13 @@ class Crypt(Verify, TextHandler): elif key == "END_ENCRYPTION": self.status = 'encryption ok' self.ok = True - elif key == "INV_RECP": + elif key == "INV_RECP": # pragma: no cover self.status = 'invalid recipient' - elif key == "KEYEXPIRED": + elif key == "KEYEXPIRED": # pragma: no cover self.status = 'key expired' - elif key == "SIG_CREATED": + elif key == "SIG_CREATED": # pragma: no cover self.status = 'sig created' - elif key == "SIGEXPIRED": + elif key == "SIGEXPIRED": # pragma: no cover self.status = 'sig expired' else: Verify.handle_status(self, key, value) @@ -549,13 +608,26 @@ class GenKey(object): return self.fingerprint or '' def handle_status(self, key, value): - if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED", + "PINENTRY_LAUNCHED", "ERROR", "KEY_CONSIDERED"): pass elif key == "KEY_CREATED": (self.type,self.fingerprint) = value.split() else: raise ValueError("Unknown status message: %r" % key) +class ExportResult(GenKey): + """Handle status messages for --export[-secret-key]. + + For now, just use an existing class to base it on - if needed, we + can override handle_status for more specific message handling. + """ + def handle_status(self, key, value): + if key in ("EXPORTED", "EXPORT_RES"): + pass + else: + super(ExportResult, self).handle_status(key, value) + class DeleteResult(object): "Handle status messages for --delete-key and --delete-secret-key" def __init__(self, gpg): @@ -572,10 +644,12 @@ class DeleteResult(object): } def handle_status(self, key, value): - if key == "DELETE_PROBLEM": + if key == "DELETE_PROBLEM": # pragma: no cover self.status = self.problem_reason.get(value, "Unknown error: %r" % value) - else: + elif key == "KEY_CONSIDERED": + pass + else: # pragma: no cover raise ValueError("Unknown status message: %r" % key) def __nonzero__(self): @@ -601,18 +675,19 @@ class Sign(TextHandler): if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN", - "SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"): + "SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS", + "PINENTRY_LAUNCHED", "FAILURE", "ERROR", "KEY_CONSIDERED"): pass - elif key in ("KEYEXPIRED", "SIGEXPIRED"): + elif key in ("KEYEXPIRED", "SIGEXPIRED"): # pragma: no cover self.status = 'key expired' - elif key == "KEYREVOKED": + elif key == "KEYREVOKED": # pragma: no cover self.status = 'key revoked' elif key == "SIG_CREATED": (self.type, algo, self.hash_algo, cls, self.timestamp, self.fingerprint ) = value.split() - else: + else: # pragma: no cover raise ValueError("Unknown status message: %r" % key) VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I) @@ -633,6 +708,7 @@ class GPG(object): 'search': SearchKeys, 'sign': Sign, 'verify': Verify, + 'export': ExportResult, } "Encapsulate access to the gpg executable" @@ -667,7 +743,7 @@ class GPG(object): self.secret_keyring = secret_keyring self.verbose = verbose self.use_agent = use_agent - if isinstance(options, str): + if isinstance(options, str): # pragma: no cover options = [options] self.options = options # Changed in 0.3.7 to use Latin-1 encoding rather than @@ -677,14 +753,19 @@ class GPG(object): self.encoding = 'latin-1' if gnupghome and not os.path.isdir(self.gnupghome): os.makedirs(self.gnupghome,0x1C0) - p = self._open_subprocess(["--version"]) + try: + p = self._open_subprocess(["--version"]) + except OSError: + msg = 'Unable to run gpg - it may not be available.' + logger.exception(msg) + raise OSError(msg) result = self.result_map['verify'](self) # any result will do for this self._collect_output(p, result, stdin=p.stdin) - if p.returncode != 0: + if p.returncode != 0: # pragma: no cover raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, result.stderr)) m = VERSION_RE.match(result.data) - if not m: + if not m: # pragma: no cover self.version = None else: dot = '.'.encode('ascii') @@ -697,6 +778,10 @@ class GPG(object): a passphrase will be sent to GPG, else False. """ cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty'] + cmd.extend(['--debug', 'ipc']) + if passphrase and hasattr(self, 'version'): + if self.version >= (2, 1): + cmd[1:1] = ['--pinentry-mode', 'loopback'] if self.gnupghome: cmd.extend(['--homedir', no_quote(self.gnupghome)]) if self.keyring: @@ -708,7 +793,7 @@ class GPG(object): cmd.extend(['--secret-keyring', no_quote(fn)]) if passphrase: cmd.extend(['--batch', '--passphrase-fd', '0']) - if self.use_agent: + if self.use_agent: # pragma: no cover cmd.append('--use-agent') if self.options: cmd.extend(self.options) @@ -718,12 +803,35 @@ class GPG(object): def _open_subprocess(self, args, passphrase=False): # Internal method: open a pipe to a GPG subprocess and return # the file objects for communicating with it. + + # def debug_print(cmd): + # result = [] + # for c in cmd: + # if ' ' not in c: + # result.append(c) + # else: + # if '"' not in c: + # result.append('"%s"' % c) + # elif "'" not in c: + # result.append("'%s'" % c) + # else: + # result.append(c) # give up + # return ' '.join(cmd) + from subprocess import list2cmdline as debug_print + cmd = self.make_args(args, passphrase) - if self.verbose: - pcmd = ' '.join(cmd) - print(pcmd) - logger.debug("%s", cmd) - return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE) + if self.verbose: # pragma: no cover + print(debug_print(cmd)) + if not STARTUPINFO: + si = None + else: # pragma: no cover + si = STARTUPINFO() + si.dwFlags = STARTF_USESHOWWINDOW + si.wShowWindow = SW_HIDE + result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, + startupinfo=si) + logger.debug("%s: %s", result.pid, debug_print(cmd)) + return result def _read_response(self, stream, result): # Internal method: reads all the stderr output from GPG, taking notice @@ -738,7 +846,7 @@ class GPG(object): break lines.append(line) line = line.rstrip() - if self.verbose: + if self.verbose: # pragma: no cover print(line) logger.debug("%s", line) if line[0:9] == '[GNUPG:] ': @@ -795,7 +903,7 @@ class GPG(object): if stdin is not None: try: stdin.close() - except IOError: + except IOError: # pragma: no cover pass stderr.close() stdout.close() @@ -805,7 +913,7 @@ class GPG(object): # Handle a basic data call - pass data to GPG, handle the output # including status information. Garbage In, Garbage Out :) p = self._open_subprocess(args, passphrase is not None) - if not binary: + if not binary: # pragma: no cover stdin = codecs.getwriter(self.encoding)(p.stdin) else: stdin = p.stdin @@ -830,13 +938,13 @@ class GPG(object): if os.path.exists(output): # We need to avoid an overwrite confirmation message args.extend(['--batch', '--yes']) - args.extend(['--output', output]) + args.extend(['--output', no_quote(output)]) def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, detach=False, binary=False, output=None): """sign file""" logger.debug("sign_file: %s", file) - if binary: + if binary: # pragma: no cover args = ['-s'] else: args = ['-sa'] @@ -860,7 +968,7 @@ class GPG(object): if passphrase: _write_passphrase(stdin, passphrase, self.encoding) writer = _threaded_copy_data(file, stdin) - except IOError: + except IOError: # pragma: no cover logging.exception("error writing message") writer = None self._collect_output(p, result, writer, stdin) @@ -869,8 +977,9 @@ class GPG(object): def verify(self, data): """Verify the signature on the contents of the string 'data' - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input(Passphrase='foo') + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') >>> key = gpg.gen_key(input) >>> assert key >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') @@ -925,49 +1034,8 @@ class GPG(object): # def import_keys(self, key_data): - """ import the key_data into our keyring - - >>> import shutil - >>> shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() - >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint - >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint - >>> pubkey1 = gpg.export_keys(print1) - >>> seckey1 = gpg.export_keys(print1,secret=True) - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> str(gpg.delete_keys(print1)) - 'Must delete secret key first' - >>> str(gpg.delete_keys(print1,secret=True)) - 'ok' - >>> str(gpg.delete_keys(print1)) - 'ok' - >>> str(gpg.delete_keys("nosuchkey")) - 'No such key' - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert not print1 in seckeys.fingerprints - >>> assert not print1 in pubkeys.fingerprints - >>> result = gpg.import_keys('foo') - >>> assert not result - >>> result = gpg.import_keys(pubkey1) - >>> pubkeys = gpg.list_keys() - >>> seckeys = gpg.list_keys(secret=True) - >>> assert not print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> result = gpg.import_keys(seckey1) - >>> assert result - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> assert print2 in pubkeys.fingerprints - + """ + Import the key_data into our keyring. """ result = self.result_map['import'](self) logger.debug('import_keys: %r', key_data[:256]) @@ -982,9 +1050,10 @@ class GPG(object): >>> import shutil >>> shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") >>> os.chmod('keys', 0x1C0) - >>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378') + >>> result = gpg.recv_keys('pgp.mit.edu', '92905378') >>> assert result """ @@ -1016,26 +1085,48 @@ class GPG(object): data.close() return result - def delete_keys(self, fingerprints, secret=False): + def delete_keys(self, fingerprints, secret=False, passphrase=None): which='key' - if secret: + if secret: # pragma: no cover + if self.version >= (2, 1) and passphrase is None: + raise ValueError('For GnuPG >= 2.1, deleting secret keys ' + 'needs a passphrase to be provided') which='secret-key' - if _is_sequence(fingerprints): + if _is_sequence(fingerprints): # pragma: no cover fingerprints = [no_quote(s) for s in fingerprints] else: fingerprints = [no_quote(fingerprints)] args = ['--batch', '--delete-%s' % which] args.extend(fingerprints) result = self.result_map['delete'](self) - p = self._open_subprocess(args) - self._collect_output(p, result, stdin=p.stdin) + if not secret or self.version < (2, 1): + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + else: + # Need to send in a passphrase. + f = _make_binary_stream('', self.encoding) + try: + self._handle_io(args, f, result, passphrase=passphrase, + binary=True) + finally: + f.close() return result - def export_keys(self, keyids, secret=False, armor=True, minimal=False): - "export the indicated keys. 'keyid' is anything gpg accepts" + def export_keys(self, keyids, secret=False, armor=True, minimal=False, + passphrase=None): + """ + Export the indicated keys. A 'keyid' is anything gpg accepts. + + Since GnuPG 2.1, you can't export secret keys without providing a + passphrase. + """ + which='' if secret: which='-secret-key' + if self.version >= (2, 1) and passphrase is None: + raise ValueError('For GnuPG >= 2.1, exporting secret keys ' + 'needs a passphrase to be provided') if _is_sequence(keyids): keyids = [no_quote(k) for k in keyids] else: @@ -1043,17 +1134,30 @@ class GPG(object): args = ['--export%s' % which] if armor: args.append('--armor') - if minimal: + if minimal: # pragma: no cover args.extend(['--export-options','export-minimal']) args.extend(keyids) - p = self._open_subprocess(args) # gpg --export produces no status-fd output; stdout will be # empty in case of failure #stdout, stderr = p.communicate() - result = self.result_map['delete'](self) # any result will do - self._collect_output(p, result, stdin=p.stdin) + result = self.result_map['export'](self) + if not secret or self.version < (2, 1): + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + else: + # Need to send in a passphrase. + f = _make_binary_stream('', self.encoding) + try: + self._handle_io(args, f, result, passphrase=passphrase, + binary=True) + finally: + f.close() logger.debug('export_keys result: %r', result.data) - return result.data.decode(self.encoding, self.decode_errors) + # Issue #49: Return bytes if armor not specified, else text + result = result.data + if armor: + result = result.decode(self.encoding, self.decode_errors) + return result def _get_list_output(self, p, kind): # Get the response information @@ -1061,43 +1165,51 @@ class GPG(object): self._collect_output(p, result, stdin=p.stdin) lines = result.data.decode(self.encoding, self.decode_errors).splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() + valid_keywords = 'pub uid sec fpr sub ssb sig'.split() for line in lines: - if self.verbose: + if self.verbose: # pragma: no cover print(line) logger.debug("line: %r", line.rstrip()) - if not line: + if not line: # pragma: no cover break L = line.strip().split(':') - if not L: + if not L: # pragma: no cover continue keyword = L[0] if keyword in valid_keywords: getattr(result, keyword)(L) return result - def list_keys(self, secret=False): + def list_keys(self, secret=False, keys=None, sigs=False): """ list the keys currently in the keyring >>> import shutil >>> shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint + >>> fp1 = result.fingerprint >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint + >>> fp2 = result.fingerprint >>> pubkeys = gpg.list_keys() - >>> assert print1 in pubkeys.fingerprints - >>> assert print2 in pubkeys.fingerprints + >>> assert fp1 in pubkeys.fingerprints + >>> assert fp2 in pubkeys.fingerprints """ - which='keys' + if sigs: + which = 'sigs' + else: which='keys' if secret: which='secret-keys' - args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint", - "--with-colons"] + args = ['--list-%s' % which, '--fixed-list-mode', + '--fingerprint', '--fingerprint', # get subkey FPs, too + '--with-colons'] + if keys: + if isinstance(keys, string_types): + keys = [keys] + args.extend(keys) p = self._open_subprocess(args) return self._get_list_output(p, 'list') @@ -1109,7 +1221,7 @@ class GPG(object): The function achieves this by running: $ gpg --with-fingerprint --with-colons filename """ - args = ['--with-fingerprint', '--with-colons'] + args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode'] args.append(no_quote(filename)) p = self._open_subprocess(args) return self._get_list_output(p, 'scan') @@ -1119,13 +1231,14 @@ class GPG(object): >>> import shutil >>> shutil.rmtree('keys') - >>> gpg = GPG(gnupghome='keys') + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys') >>> os.chmod('keys', 0x1C0) >>> result = gpg.search_keys('') >>> assert result, 'Failed using default keyserver' - >>> keyserver = 'keyserver.ubuntu.com' - >>> result = gpg.search_keys('', keyserver) - >>> assert result, 'Failed using keyserver.ubuntu.com' + >>> #keyserver = 'keyserver.ubuntu.com' + >>> #result = gpg.search_keys('', keyserver) + >>> #assert result, 'Failed using keyserver.ubuntu.com' """ query = query.strip() @@ -1143,13 +1256,13 @@ class GPG(object): self.decode_errors).splitlines() valid_keywords = ['pub', 'uid'] for line in lines: - if self.verbose: + if self.verbose: # pragma: no cover print(line) logger.debug('line: %r', line.rstrip()) if not line: # sometimes get blank lines on Windows continue L = line.strip().split(':') - if not L: + if not L: # pragma: no cover continue keyword = L[0] if keyword in valid_keywords: @@ -1160,8 +1273,9 @@ class GPG(object): """Generate a key; you might use gen_key_input() to create the control input. - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') >>> result = gpg.gen_key(input) >>> assert result >>> result = gpg.gen_key('foo') @@ -1187,10 +1301,8 @@ class GPG(object): parms.setdefault('Key-Type','RSA') parms.setdefault('Key-Length',2048) parms.setdefault('Name-Real', "Autogenerated Key") - try: - logname = os.environ['LOGNAME'] - except KeyError: - logname = os.environ['USERNAME'] + logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or + 'unspecified') hostname = socket.gethostname() parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), hostname)) @@ -1249,11 +1361,11 @@ class GPG(object): args.append('--armor') if output: # write the output to a file with the specified name self.set_output_without_confirmation(args, output) - if sign is True: + if sign is True: # pragma: no cover args.append('--sign') - elif sign: + elif sign: # pragma: no cover args.extend(['--sign', '--default-key', no_quote(sign)]) - if always_trust: + if always_trust: # pragma: no cover args.append('--always-trust') result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase=passphrase, binary=True) @@ -1266,39 +1378,40 @@ class GPG(object): >>> import shutil >>> if os.path.exists("keys"): ... shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input(passphrase='foo') + >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg') + >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys") + >>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1') >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint - >>> input = gpg.gen_key_input() + >>> fp1 = result.fingerprint + >>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2') >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint - >>> result = gpg.encrypt("hello",print2) + >>> fp2 = result.fingerprint + >>> result = gpg.encrypt("hello",fp2) >>> message = str(result) >>> assert message != 'hello' - >>> result = gpg.decrypt(message) + >>> result = gpg.decrypt(message, passphrase='pp2') >>> assert result >>> str(result) 'hello' - >>> result = gpg.encrypt("hello again",print1) + >>> result = gpg.encrypt("hello again", fp1) >>> message = str(result) - >>> result = gpg.decrypt(message,passphrase='bar') + >>> result = gpg.decrypt(message, passphrase='bar') >>> result.status in ('decryption failed', 'bad passphrase') True >>> assert not result - >>> result = gpg.decrypt(message,passphrase='foo') + >>> result = gpg.decrypt(message, passphrase='pp1') >>> result.status == 'decryption ok' True >>> str(result) 'hello again' - >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') + >>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1') >>> result.status == 'encryption ok' True >>> message = str(result) - >>> result = gpg.decrypt(message) + >>> result = gpg.decrypt(message, passphrase='pp2') >>> result.status == 'decryption ok' True - >>> assert result.fingerprint == print1 + >>> assert result.fingerprint == fp1 """ data = _make_binary_stream(data, self.encoding) @@ -1317,7 +1430,7 @@ class GPG(object): args = ["--decrypt"] if output: # write the output to a file with the specified name self.set_output_without_confirmation(args, output) - if always_trust: + if always_trust: # pragma: no cover args.append("--always-trust") result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase, binary=True)