Update the bundled gnupg.py

This commit is contained in:
mathieui 2016-11-29 21:49:20 +01:00
parent 9eff396227
commit 0dece5c7e1

View file

@ -27,21 +27,22 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved.
Modifications Copyright (C) 2008-2016 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
__version__ = "0.3.8.dev0"
__version__ = "0.4.0.dev0"
__author__ = "Vinay Sajip"
__date__ = "$07-Dec-2014 18:46:17$"
__date__ = "$10-Sep-2016 08:38:35$"
try:
from io import StringIO
except ImportError:
except ImportError: # pragma: no cover
from cStringIO import StringIO
import codecs
import locale
import logging
import os
import re
@ -51,6 +52,13 @@ from subprocess import PIPE
import sys
import threading
STARTUPINFO = None
if os.name == 'nt': # pragma: no cover
try:
from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
except ImportError:
STARTUPINFO = None
try:
import logging.NullHandler as NullHandler
except ImportError:
@ -72,7 +80,7 @@ if not logger.handlers:
logger.addHandler(NullHandler())
# We use the test below because it works for Jython as well as CPython
if os.path.__name__ == 'ntpath':
if os.path.__name__ == 'ntpath': # pragma: no cover
# On Windows, we don't need shell quoting, other than worrying about
# paths with spaces in them.
def shell_quote(s):
@ -97,7 +105,7 @@ else:
command shells
:rtype: The passed-in type
"""
if not isinstance(s, string_types):
if not isinstance(s, string_types): # pragma: no cover
raise TypeError('Expected string type, got %s' % type(s))
if not s:
result = "''"
@ -111,8 +119,18 @@ else:
# Now that we use shell=False, we shouldn't need to quote arguments.
# Use no_quote instead of shell_quote to remind us of where quoting
# was needed.
# was needed. However, note that we still need, on 2.x, to encode any
# Unicode argument with the file system encoding - see Issue #41 and
# Python issue #1759845 ("subprocess.call fails with unicode strings in
# command line").
# Allows the encoding used to be overridden in special cases by setting
# this module attribute appropriately.
fsencoding = sys.getfilesystemencoding()
def no_quote(s):
if not _py3k and isinstance(s, text_type):
s = s.encode(fsencoding)
return s
def _copy_data(instream, outstream):
@ -120,17 +138,23 @@ def _copy_data(instream, outstream):
sent = 0
if hasattr(sys.stdin, 'encoding'):
enc = sys.stdin.encoding
else:
else: # pragma: no cover
enc = 'ascii'
while True:
data = instream.read(1024)
# See issue #39: read can fail when e.g. a text stream is provided
# for what is actually a binary file
try:
data = instream.read(1024)
except UnicodeError:
logger.warning('Exception occurred while reading', exc_info=1)
break
if not data:
break
sent += len(data)
logger.debug("sending chunk (%d): %r", sent, data[:256])
# logger.debug("sending chunk (%d): %r", sent, data[:256])
try:
outstream.write(data)
except UnicodeError:
except UnicodeError: # pragma: no cover
outstream.write(data.encode(enc))
except:
# Can sometimes get 'broken pipe' errors even when the data has all
@ -139,7 +163,7 @@ def _copy_data(instream, outstream):
break
try:
outstream.close()
except IOError:
except IOError: # pragma: no cover
logger.warning('Exception occurred while closing: ignored', exc_info=1)
logger.debug("closed output, %d bytes sent", sent)
@ -163,7 +187,7 @@ def _make_memory_stream(s):
try:
from io import BytesIO
rv = BytesIO(s)
except ImportError:
except ImportError: # pragma: no cover
rv = StringIO(s)
return rv
@ -221,20 +245,21 @@ class Verify(object):
"DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR",
"FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC",
"GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA",
"PROGRESS"):
"PROGRESS", "PINENTRY_LAUNCHED", "NEWSIG",
"KEY_CONSIDERED"):
pass
elif key == "BADSIG":
elif key == "BADSIG": # pragma: no cover
self.valid = False
self.status = 'signature bad'
self.key_id, self.username = value.split(None, 1)
elif key == "ERRSIG":
elif key == "ERRSIG": # pragma: no cover
self.valid = False
(self.key_id,
algo, hash_algo,
cls,
self.timestamp) = value.split()[:5]
self.status = 'signature error'
elif key == "EXPSIG":
elif key == "EXPSIG": # pragma: no cover
self.valid = False
self.status = 'signature expired'
self.key_id, self.username = value.split(None, 1)
@ -253,21 +278,21 @@ class Verify(object):
elif key == "SIG_ID":
(self.signature_id,
self.creation_date, self.timestamp) = value.split()
elif key == "DECRYPTION_FAILED":
elif key == "DECRYPTION_FAILED": # pragma: no cover
self.valid = False
self.key_id = value
self.status = 'decryption failed'
elif key == "NO_PUBKEY":
elif key == "NO_PUBKEY": # pragma: no cover
self.valid = False
self.key_id = value
self.status = 'no public key'
elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"):
elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"): # pragma: no cover
# these are useless in verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures with expired key,
# the relevant flag is EXPKEYSIG or REVKEYSIG.
pass
elif key in ("EXPKEYSIG", "REVKEYSIG"):
elif key in ("EXPKEYSIG", "REVKEYSIG"): # pragma: no cover
# signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
@ -276,10 +301,15 @@ class Verify(object):
else:
self.key_status = 'signing key was revoked'
self.status = self.key_status
elif key == "UNEXPECTED":
elif key in ("UNEXPECTED", "FAILURE"): # pragma: no cover
self.valid = False
self.key_id = value
self.status = 'unexpected data'
if key == "UNEXPECTED":
self.status = 'unexpected data'
else:
# N.B. there might be other reasons
if not self.status:
self.status = 'incorrect passphrase'
else:
raise ValueError("Unknown status message: %r" % key)
@ -322,10 +352,10 @@ class ImportResult(object):
}
def handle_status(self, key, value):
if key == "IMPORTED":
if key in ("IMPORTED", "KEY_CONSIDERED"):
# this duplicates info we already see in import_ok & import_problem
pass
elif key == "NODATA":
elif key == "NODATA": # pragma: no cover
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'No valid data found'})
elif key == "IMPORT_OK":
@ -338,7 +368,7 @@ class ImportResult(object):
self.results.append({'fingerprint': fingerprint,
'ok': reason, 'text': reasontext})
self.fingerprints.append(fingerprint)
elif key == "IMPORT_PROBLEM":
elif key == "IMPORT_PROBLEM": # pragma: no cover
try:
reason, fingerprint = value.split()
except:
@ -350,19 +380,19 @@ class ImportResult(object):
import_res = value.split()
for i, count in enumerate(self.counts):
setattr(self, count, int(import_res[i]))
elif key == "KEYEXPIRED":
elif key == "KEYEXPIRED": # pragma: no cover
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Key expired'})
elif key == "SIGEXPIRED":
elif key == "SIGEXPIRED": # pragma: no cover
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Signature expired'})
else:
else: # pragma: no cover
raise ValueError("Unknown status message: %r" % key)
def summary(self):
l = []
l.append('%d imported' % self.imported)
if self.not_imported:
if self.not_imported: # pragma: no cover
l.append('%d not imported' % self.not_imported)
return ', '.join(l)
@ -403,8 +433,12 @@ class SearchKeys(list):
def get_fields(self, args):
result = {}
for i, var in enumerate(self.FIELDS):
result[var] = args[i]
if i < len(args):
result[var] = args[i]
else:
result[var] = 'unavailable'
result['uids'] = []
result['sigs'] = []
return result
def pub(self, args):
@ -419,11 +453,11 @@ class SearchKeys(list):
self.curkey['uids'].append(uid)
self.uids.append(uid)
def handle_status(self, key, value):
def handle_status(self, key, value): # pragma: no cover
pass
class ListKeys(SearchKeys):
''' Handle status messages for --list-keys.
''' Handle status messages for --list-keys, --list-sigs.
Handle pub and uid (relating the latter to the former).
@ -431,7 +465,6 @@ class ListKeys(SearchKeys):
crt = X.509 certificate
crs = X.509 certificate and private key available
ssb = secret subkey (secondary key)
uat = user attribute (same as user id except for field 10).
sig = signature
rev = revocation signature
@ -441,7 +474,12 @@ class ListKeys(SearchKeys):
'''
UID_INDEX = 9
FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split()
FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid sig'.split()
def __init__(self, gpg):
super(ListKeys, self).__init__(gpg)
self.in_subkey = False
self.key_map = {}
def key(self, args):
self.curkey = curkey = self.get_fields(args)
@ -450,17 +488,35 @@ class ListKeys(SearchKeys):
del curkey['uid']
curkey['subkeys'] = []
self.append(curkey)
self.in_subkey = False
pub = sec = key
def fpr(self, args):
self.curkey['fingerprint'] = args[9]
self.fingerprints.append(args[9])
fp = args[9]
if fp in self.key_map: # pragma: no cover
raise ValueError('Unexpected fingerprint collision: %s' % fp)
if not self.in_subkey:
self.curkey['fingerprint'] = fp
self.fingerprints.append(fp)
self.key_map[fp] = self.curkey
else:
self.curkey['subkeys'][-1].append(fp)
self.key_map[fp] = self.curkey
def sub(self, args):
subkey = [args[4], args[11]]
subkey = [args[4], args[11]] # keyid, type
self.curkey['subkeys'].append(subkey)
self.in_subkey = True
def ssb(self, args):
subkey = [args[4], None] # keyid, type
self.curkey['subkeys'].append(subkey)
self.in_subkey = True
def sig(self, args):
# keyid, uid, sigclass
self.curkey['sigs'].append((args[4], args[9], args[10]))
class ScanKeys(ListKeys):
''' Handle status messages for --with-fingerprint.'''
@ -470,6 +526,7 @@ class ScanKeys(ListKeys):
# use the last value args[-1] instead of args[11]
subkey = [args[4], args[-1]]
self.curkey['subkeys'].append(subkey)
self.in_subkey = True
class TextHandler(object):
def _as_text(self):
@ -501,10 +558,12 @@ class Crypt(Verify, TextHandler):
def handle_status(self, key, value):
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS",
"CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"):
"CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS",
"PINENTRY_LAUNCHED", "KEY_CONSIDERED"):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
if key == "NODATA":
self.status = "no data was provided"
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
"MISSING_PASSPHRASE", "DECRYPTION_FAILED",
"KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"):
@ -521,13 +580,13 @@ class Crypt(Verify, TextHandler):
elif key == "END_ENCRYPTION":
self.status = 'encryption ok'
self.ok = True
elif key == "INV_RECP":
elif key == "INV_RECP": # pragma: no cover
self.status = 'invalid recipient'
elif key == "KEYEXPIRED":
elif key == "KEYEXPIRED": # pragma: no cover
self.status = 'key expired'
elif key == "SIG_CREATED":
elif key == "SIG_CREATED": # pragma: no cover
self.status = 'sig created'
elif key == "SIGEXPIRED":
elif key == "SIGEXPIRED": # pragma: no cover
self.status = 'sig expired'
else:
Verify.handle_status(self, key, value)
@ -549,13 +608,26 @@ class GenKey(object):
return self.fingerprint or ''
def handle_status(self, key, value):
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"):
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED",
"PINENTRY_LAUNCHED", "ERROR", "KEY_CONSIDERED"):
pass
elif key == "KEY_CREATED":
(self.type,self.fingerprint) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)
class ExportResult(GenKey):
"""Handle status messages for --export[-secret-key].
For now, just use an existing class to base it on - if needed, we
can override handle_status for more specific message handling.
"""
def handle_status(self, key, value):
if key in ("EXPORTED", "EXPORT_RES"):
pass
else:
super(ExportResult, self).handle_status(key, value)
class DeleteResult(object):
"Handle status messages for --delete-key and --delete-secret-key"
def __init__(self, gpg):
@ -572,10 +644,12 @@ class DeleteResult(object):
}
def handle_status(self, key, value):
if key == "DELETE_PROBLEM":
if key == "DELETE_PROBLEM": # pragma: no cover
self.status = self.problem_reason.get(value,
"Unknown error: %r" % value)
else:
elif key == "KEY_CONSIDERED":
pass
else: # pragma: no cover
raise ValueError("Unknown status message: %r" % key)
def __nonzero__(self):
@ -601,18 +675,19 @@ class Sign(TextHandler):
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR",
"NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN",
"SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"):
"SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS",
"PINENTRY_LAUNCHED", "FAILURE", "ERROR", "KEY_CONSIDERED"):
pass
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
elif key in ("KEYEXPIRED", "SIGEXPIRED"): # pragma: no cover
self.status = 'key expired'
elif key == "KEYREVOKED":
elif key == "KEYREVOKED": # pragma: no cover
self.status = 'key revoked'
elif key == "SIG_CREATED":
(self.type,
algo, self.hash_algo, cls,
self.timestamp, self.fingerprint
) = value.split()
else:
else: # pragma: no cover
raise ValueError("Unknown status message: %r" % key)
VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
@ -633,6 +708,7 @@ class GPG(object):
'search': SearchKeys,
'sign': Sign,
'verify': Verify,
'export': ExportResult,
}
"Encapsulate access to the gpg executable"
@ -667,7 +743,7 @@ class GPG(object):
self.secret_keyring = secret_keyring
self.verbose = verbose
self.use_agent = use_agent
if isinstance(options, str):
if isinstance(options, str): # pragma: no cover
options = [options]
self.options = options
# Changed in 0.3.7 to use Latin-1 encoding rather than
@ -677,14 +753,19 @@ class GPG(object):
self.encoding = 'latin-1'
if gnupghome and not os.path.isdir(self.gnupghome):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
try:
p = self._open_subprocess(["--version"])
except OSError:
msg = 'Unable to run gpg - it may not be available.'
logger.exception(msg)
raise OSError(msg)
result = self.result_map['verify'](self) # any result will do for this
self._collect_output(p, result, stdin=p.stdin)
if p.returncode != 0:
if p.returncode != 0: # pragma: no cover
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
m = VERSION_RE.match(result.data)
if not m:
if not m: # pragma: no cover
self.version = None
else:
dot = '.'.encode('ascii')
@ -697,6 +778,10 @@ class GPG(object):
a passphrase will be sent to GPG, else False.
"""
cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty']
cmd.extend(['--debug', 'ipc'])
if passphrase and hasattr(self, 'version'):
if self.version >= (2, 1):
cmd[1:1] = ['--pinentry-mode', 'loopback']
if self.gnupghome:
cmd.extend(['--homedir', no_quote(self.gnupghome)])
if self.keyring:
@ -708,7 +793,7 @@ class GPG(object):
cmd.extend(['--secret-keyring', no_quote(fn)])
if passphrase:
cmd.extend(['--batch', '--passphrase-fd', '0'])
if self.use_agent:
if self.use_agent: # pragma: no cover
cmd.append('--use-agent')
if self.options:
cmd.extend(self.options)
@ -718,12 +803,35 @@ class GPG(object):
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
# def debug_print(cmd):
# result = []
# for c in cmd:
# if ' ' not in c:
# result.append(c)
# else:
# if '"' not in c:
# result.append('"%s"' % c)
# elif "'" not in c:
# result.append("'%s'" % c)
# else:
# result.append(c) # give up
# return ' '.join(cmd)
from subprocess import list2cmdline as debug_print
cmd = self.make_args(args, passphrase)
if self.verbose:
pcmd = ' '.join(cmd)
print(pcmd)
logger.debug("%s", cmd)
return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
if self.verbose: # pragma: no cover
print(debug_print(cmd))
if not STARTUPINFO:
si = None
else: # pragma: no cover
si = STARTUPINFO()
si.dwFlags = STARTF_USESHOWWINDOW
si.wShowWindow = SW_HIDE
result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE,
startupinfo=si)
logger.debug("%s: %s", result.pid, debug_print(cmd))
return result
def _read_response(self, stream, result):
# Internal method: reads all the stderr output from GPG, taking notice
@ -738,7 +846,7 @@ class GPG(object):
break
lines.append(line)
line = line.rstrip()
if self.verbose:
if self.verbose: # pragma: no cover
print(line)
logger.debug("%s", line)
if line[0:9] == '[GNUPG:] ':
@ -795,7 +903,7 @@ class GPG(object):
if stdin is not None:
try:
stdin.close()
except IOError:
except IOError: # pragma: no cover
pass
stderr.close()
stdout.close()
@ -805,7 +913,7 @@ class GPG(object):
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
p = self._open_subprocess(args, passphrase is not None)
if not binary:
if not binary: # pragma: no cover
stdin = codecs.getwriter(self.encoding)(p.stdin)
else:
stdin = p.stdin
@ -830,13 +938,13 @@ class GPG(object):
if os.path.exists(output):
# We need to avoid an overwrite confirmation message
args.extend(['--batch', '--yes'])
args.extend(['--output', output])
args.extend(['--output', no_quote(output)])
def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
detach=False, binary=False, output=None):
"""sign file"""
logger.debug("sign_file: %s", file)
if binary:
if binary: # pragma: no cover
args = ['-s']
else:
args = ['-sa']
@ -860,7 +968,7 @@ class GPG(object):
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
writer = _threaded_copy_data(file, stdin)
except IOError:
except IOError: # pragma: no cover
logging.exception("error writing message")
writer = None
self._collect_output(p, result, writer, stdin)
@ -869,8 +977,9 @@ class GPG(object):
def verify(self, data):
"""Verify the signature on the contents of the string 'data'
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input(Passphrase='foo')
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
>>> input = gpg.gen_key_input(passphrase='foo')
>>> key = gpg.gen_key(input)
>>> assert key
>>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
@ -925,49 +1034,8 @@ class GPG(object):
#
def import_keys(self, key_data):
""" import the key_data into our keyring
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> pubkey1 = gpg.export_keys(print1)
>>> seckey1 = gpg.export_keys(print1,secret=True)
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> str(gpg.delete_keys(print1))
'Must delete secret key first'
>>> str(gpg.delete_keys(print1,secret=True))
'ok'
>>> str(gpg.delete_keys(print1))
'ok'
>>> str(gpg.delete_keys("nosuchkey"))
'No such key'
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert not print1 in seckeys.fingerprints
>>> assert not print1 in pubkeys.fingerprints
>>> result = gpg.import_keys('foo')
>>> assert not result
>>> result = gpg.import_keys(pubkey1)
>>> pubkeys = gpg.list_keys()
>>> seckeys = gpg.list_keys(secret=True)
>>> assert not print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> result = gpg.import_keys(seckey1)
>>> assert result
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> assert print2 in pubkeys.fingerprints
"""
Import the key_data into our keyring.
"""
result = self.result_map['import'](self)
logger.debug('import_keys: %r', key_data[:256])
@ -982,9 +1050,10 @@ class GPG(object):
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
>>> os.chmod('keys', 0x1C0)
>>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378')
>>> result = gpg.recv_keys('pgp.mit.edu', '92905378')
>>> assert result
"""
@ -1016,26 +1085,48 @@ class GPG(object):
data.close()
return result
def delete_keys(self, fingerprints, secret=False):
def delete_keys(self, fingerprints, secret=False, passphrase=None):
which='key'
if secret:
if secret: # pragma: no cover
if self.version >= (2, 1) and passphrase is None:
raise ValueError('For GnuPG >= 2.1, deleting secret keys '
'needs a passphrase to be provided')
which='secret-key'
if _is_sequence(fingerprints):
if _is_sequence(fingerprints): # pragma: no cover
fingerprints = [no_quote(s) for s in fingerprints]
else:
fingerprints = [no_quote(fingerprints)]
args = ['--batch', '--delete-%s' % which]
args.extend(fingerprints)
result = self.result_map['delete'](self)
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
if not secret or self.version < (2, 1):
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
else:
# Need to send in a passphrase.
f = _make_binary_stream('', self.encoding)
try:
self._handle_io(args, f, result, passphrase=passphrase,
binary=True)
finally:
f.close()
return result
def export_keys(self, keyids, secret=False, armor=True, minimal=False):
"export the indicated keys. 'keyid' is anything gpg accepts"
def export_keys(self, keyids, secret=False, armor=True, minimal=False,
passphrase=None):
"""
Export the indicated keys. A 'keyid' is anything gpg accepts.
Since GnuPG 2.1, you can't export secret keys without providing a
passphrase.
"""
which=''
if secret:
which='-secret-key'
if self.version >= (2, 1) and passphrase is None:
raise ValueError('For GnuPG >= 2.1, exporting secret keys '
'needs a passphrase to be provided')
if _is_sequence(keyids):
keyids = [no_quote(k) for k in keyids]
else:
@ -1043,17 +1134,30 @@ class GPG(object):
args = ['--export%s' % which]
if armor:
args.append('--armor')
if minimal:
if minimal: # pragma: no cover
args.extend(['--export-options','export-minimal'])
args.extend(keyids)
p = self._open_subprocess(args)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
#stdout, stderr = p.communicate()
result = self.result_map['delete'](self) # any result will do
self._collect_output(p, result, stdin=p.stdin)
result = self.result_map['export'](self)
if not secret or self.version < (2, 1):
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
else:
# Need to send in a passphrase.
f = _make_binary_stream('', self.encoding)
try:
self._handle_io(args, f, result, passphrase=passphrase,
binary=True)
finally:
f.close()
logger.debug('export_keys result: %r', result.data)
return result.data.decode(self.encoding, self.decode_errors)
# Issue #49: Return bytes if armor not specified, else text
result = result.data
if armor:
result = result.decode(self.encoding, self.decode_errors)
return result
def _get_list_output(self, p, kind):
# Get the response information
@ -1061,43 +1165,51 @@ class GPG(object):
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self.encoding,
self.decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub'.split()
valid_keywords = 'pub uid sec fpr sub ssb sig'.split()
for line in lines:
if self.verbose:
if self.verbose: # pragma: no cover
print(line)
logger.debug("line: %r", line.rstrip())
if not line:
if not line: # pragma: no cover
break
L = line.strip().split(':')
if not L:
if not L: # pragma: no cover
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
return result
def list_keys(self, secret=False):
def list_keys(self, secret=False, keys=None, sigs=False):
""" list the keys currently in the keyring
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
>>> input = gpg.gen_key_input(passphrase='foo')
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> fp1 = result.fingerprint
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> fp2 = result.fingerprint
>>> pubkeys = gpg.list_keys()
>>> assert print1 in pubkeys.fingerprints
>>> assert print2 in pubkeys.fingerprints
>>> assert fp1 in pubkeys.fingerprints
>>> assert fp2 in pubkeys.fingerprints
"""
which='keys'
if sigs:
which = 'sigs'
else: which='keys'
if secret:
which='secret-keys'
args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint",
"--with-colons"]
args = ['--list-%s' % which, '--fixed-list-mode',
'--fingerprint', '--fingerprint', # get subkey FPs, too
'--with-colons']
if keys:
if isinstance(keys, string_types):
keys = [keys]
args.extend(keys)
p = self._open_subprocess(args)
return self._get_list_output(p, 'list')
@ -1109,7 +1221,7 @@ class GPG(object):
The function achieves this by running:
$ gpg --with-fingerprint --with-colons filename
"""
args = ['--with-fingerprint', '--with-colons']
args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode']
args.append(no_quote(filename))
p = self._open_subprocess(args)
return self._get_list_output(p, 'scan')
@ -1119,13 +1231,14 @@ class GPG(object):
>>> import shutil
>>> shutil.rmtree('keys')
>>> gpg = GPG(gnupghome='keys')
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
>>> os.chmod('keys', 0x1C0)
>>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
>>> assert result, 'Failed using default keyserver'
>>> keyserver = 'keyserver.ubuntu.com'
>>> result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
>>> assert result, 'Failed using keyserver.ubuntu.com'
>>> #keyserver = 'keyserver.ubuntu.com'
>>> #result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
>>> #assert result, 'Failed using keyserver.ubuntu.com'
"""
query = query.strip()
@ -1143,13 +1256,13 @@ class GPG(object):
self.decode_errors).splitlines()
valid_keywords = ['pub', 'uid']
for line in lines:
if self.verbose:
if self.verbose: # pragma: no cover
print(line)
logger.debug('line: %r', line.rstrip())
if not line: # sometimes get blank lines on Windows
continue
L = line.strip().split(':')
if not L:
if not L: # pragma: no cover
continue
keyword = L[0]
if keyword in valid_keywords:
@ -1160,8 +1273,9 @@ class GPG(object):
"""Generate a key; you might use gen_key_input() to create the
control input.
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
>>> input = gpg.gen_key_input(passphrase='foo')
>>> result = gpg.gen_key(input)
>>> assert result
>>> result = gpg.gen_key('foo')
@ -1187,10 +1301,8 @@ class GPG(object):
parms.setdefault('Key-Type','RSA')
parms.setdefault('Key-Length',2048)
parms.setdefault('Name-Real', "Autogenerated Key")
try:
logname = os.environ['LOGNAME']
except KeyError:
logname = os.environ['USERNAME']
logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or
'unspecified')
hostname = socket.gethostname()
parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
hostname))
@ -1249,11 +1361,11 @@ class GPG(object):
args.append('--armor')
if output: # write the output to a file with the specified name
self.set_output_without_confirmation(args, output)
if sign is True:
if sign is True: # pragma: no cover
args.append('--sign')
elif sign:
elif sign: # pragma: no cover
args.extend(['--sign', '--default-key', no_quote(sign)])
if always_trust:
if always_trust: # pragma: no cover
args.append('--always-trust')
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
@ -1266,39 +1378,40 @@ class GPG(object):
>>> import shutil
>>> if os.path.exists("keys"):
... shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input(passphrase='foo')
>>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
>>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome="keys")
>>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1')
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> input = gpg.gen_key_input()
>>> fp1 = result.fingerprint
>>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2')
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> result = gpg.encrypt("hello",print2)
>>> fp2 = result.fingerprint
>>> result = gpg.encrypt("hello",fp2)
>>> message = str(result)
>>> assert message != 'hello'
>>> result = gpg.decrypt(message)
>>> result = gpg.decrypt(message, passphrase='pp2')
>>> assert result
>>> str(result)
'hello'
>>> result = gpg.encrypt("hello again",print1)
>>> result = gpg.encrypt("hello again", fp1)
>>> message = str(result)
>>> result = gpg.decrypt(message,passphrase='bar')
>>> result = gpg.decrypt(message, passphrase='bar')
>>> result.status in ('decryption failed', 'bad passphrase')
True
>>> assert not result
>>> result = gpg.decrypt(message,passphrase='foo')
>>> result = gpg.decrypt(message, passphrase='pp1')
>>> result.status == 'decryption ok'
True
>>> str(result)
'hello again'
>>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
>>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1')
>>> result.status == 'encryption ok'
True
>>> message = str(result)
>>> result = gpg.decrypt(message)
>>> result = gpg.decrypt(message, passphrase='pp2')
>>> result.status == 'decryption ok'
True
>>> assert result.fingerprint == print1
>>> assert result.fingerprint == fp1
"""
data = _make_binary_stream(data, self.encoding)
@ -1317,7 +1430,7 @@ class GPG(object):
args = ["--decrypt"]
if output: # write the output to a file with the specified name
self.set_output_without_confirmation(args, output)
if always_trust:
if always_trust: # pragma: no cover
args.append("--always-trust")
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase, binary=True)