2020-01-08 20:18:23 +01:00
|
|
|
# Copy this to ".ansible/plugins/lookup"
|
|
|
|
from ansible.errors import AnsibleError
|
|
|
|
from ansible.plugins.lookup import LookupBase
|
|
|
|
from keepasshttplib import keepasshttplib, encrypter
|
2021-07-20 15:35:12 +02:00
|
|
|
import requests
|
2020-01-08 20:18:23 +01:00
|
|
|
|
|
|
|
DOCUMENTATION = """
|
|
|
|
lookup: keepass
|
|
|
|
author: Martin Bauer <bauer_martin@gmx.de>
|
|
|
|
version_added: '0.2'
|
|
|
|
short_description: fetch data from KeePass over KeePassHTTP
|
|
|
|
description:
|
|
|
|
- This lookup returns a username or password queried by the URL of the keepass entry
|
|
|
|
options:
|
|
|
|
_terms:
|
|
|
|
description:
|
|
|
|
- first is the URL to search for
|
|
|
|
- second is a property name of the entry, e.g. username or password
|
|
|
|
required: True
|
|
|
|
notes:
|
|
|
|
- https://github.com/viczem/ansible-keepass
|
|
|
|
|
|
|
|
example:
|
|
|
|
- "{{ lookup('keepass', 'urlOfEntry', 'password') }}"
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class LookupModule(LookupBase):
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super(LookupModule, self).__init__(*args, **kwargs)
|
|
|
|
self.k = keepasshttplib.Keepasshttplib()
|
|
|
|
|
|
|
|
def run(self, terms, variables=None, **kwargs):
|
|
|
|
if not terms or len(terms) > 2:
|
|
|
|
raise AnsibleError('Keepass wrong request format')
|
|
|
|
if len(terms) == 1:
|
|
|
|
entry_path, entry_attr = terms[0], 'password'
|
|
|
|
else:
|
|
|
|
entry_path, entry_attr = terms[0], terms[1]
|
|
|
|
|
2021-07-20 15:35:12 +02:00
|
|
|
#if not self._test_connection():
|
|
|
|
# raise AnsibleError('Keepass is closed!')
|
2020-01-08 20:18:23 +01:00
|
|
|
try:
|
|
|
|
auth = self.k.get_credentials(entry_path)
|
|
|
|
except Exception as e:
|
2021-07-20 15:35:12 +02:00
|
|
|
raise AnsibleError('Keepass error obtaining entry {}: {}'.format(entry_path, e))
|
2020-01-08 20:18:23 +01:00
|
|
|
if auth:
|
|
|
|
if entry_attr not in ('username', 'user', 'pass', 'passwd', 'password'):
|
|
|
|
raise AnsibleError("Keepass wrong entry")
|
|
|
|
|
|
|
|
ret = auth[0] if entry_attr.startswith('user') else auth[1]
|
|
|
|
return [ret]
|
|
|
|
|
|
|
|
def _test_connection(self):
|
|
|
|
key = self.k.get_key_from_keyring()
|
|
|
|
if key is None:
|
|
|
|
key = encrypter.generate_key()
|
|
|
|
id_ = self.k.get_id_from_keyring()
|
|
|
|
try:
|
|
|
|
return self.k.test_associate(key, id_)
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
|
|
raise AnsibleError('Keepass Connection Error: {}'.format(e))
|