# Copy this to ".ansible/plugins/lookup" from ansible.errors import AnsibleError from ansible.plugins.lookup import LookupBase from keepasshttplib import keepasshttplib, encrypter DOCUMENTATION = """ lookup: keepass author: Martin Bauer version_added: '0.2' short_description: fetch data from KeePass over KeePassHTTP description: - This lookup returns a username or password queried by the URL of the keepass entry options: _terms: description: - first is the URL to search for - second is a property name of the entry, e.g. username or password required: True notes: - https://github.com/viczem/ansible-keepass example: - "{{ lookup('keepass', 'urlOfEntry', 'password') }}" """ class LookupModule(LookupBase): def __init__(self, *args, **kwargs): super(LookupModule, self).__init__(*args, **kwargs) self.k = keepasshttplib.Keepasshttplib() def run(self, terms, variables=None, **kwargs): if not terms or len(terms) > 2: raise AnsibleError('Keepass wrong request format') if len(terms) == 1: entry_path, entry_attr = terms[0], 'password' else: entry_path, entry_attr = terms[0], terms[1] if not self._test_connection(): raise AnsibleError('Keepass is closed!') try: auth = self.k.get_credentials(entry_path) except Exception as e: raise AnsibleError('Keepass error obtaining entry {}: {}'.format(host_name, e)) if auth: if entry_attr not in ('username', 'user', 'pass', 'passwd', 'password'): raise AnsibleError("Keepass wrong entry") ret = auth[0] if entry_attr.startswith('user') else auth[1] return [ret] def _test_connection(self): key = self.k.get_key_from_keyring() if key is None: key = encrypter.generate_key() id_ = self.k.get_id_from_keyring() try: return self.k.test_associate(key, id_) except requests.exceptions.ConnectionError as e: raise AnsibleError('Keepass Connection Error: {}'.format(e))