First working code

This commit is contained in:
Helge 2020-12-22 17:26:09 +01:00
parent d6b0bf9f05
commit 867ad7a9c2
2 changed files with 88 additions and 117 deletions

2
.vscode/launch.json vendored
View file

@ -10,7 +10,7 @@
"request": "launch",
"program": "/home/pi/dev/certbot/venv3/bin/certbot",
"console": "integratedTerminal",
"args": ["certonly", "-a", "dns-ionos", "-d", "example.com", "--config-dir", "my_debug/config", "--work-dir", "my_debug/work", "--logs-dir", "my_debug/logs"]
"args": ["certonly", "-a", "dns-ionos", "-d", "*.erbehome.de", "--config-dir", "my_debug/config", "--work-dir", "my_debug/work", "--logs-dir", "my_debug/logs"]
}
]
}

View file

@ -78,50 +78,64 @@ class _ionosClient(object):
def __init__(self, endpoint, prefix, secret):
logger.debug("creating ionosclient")
self.endpoint = endpoint
self.prefix = prefix
self.secret = secret
self.session = requests.Session()
self.session_id = None
self.headers = {}
self.headers['accept'] = 'application/json'
self.headers['X-API-Key'] = prefix + '.' + secret
def _login(self):
if self.session_id is not None:
return
logger.debug("logging in")
logindata = {"prefix": self.prefix, "secret": self.secret}
self.session_id = self._api_request("login", logindata)
logger.debug("session id is %s", self.session_id)
def _find_managed_zone_id(self, domain):
"""
Find the managed zone for a given domain.
def _api_request(self, action, data):
if self.session_id is not None:
data["session_id"] = self.session_id
:param str domain: The domain for which to find the managed zone.
:returns: The ID of the managed zone, if found.
:rtype: str
"""
logger.debug("get zones")
zones = self._api_request(type='get', action="/dns/v1/zones")
logger.debug("zones found %s", zones)
for zone in zones:
# get the zone id
if zone['name'] == domain:
return zone['id'], zone['name']
return None
def _api_request(self, type, action, data = None):
url = self._get_url(action)
resp = self.session.get(url, json=data)
resp = None
if type == 'get':
resp = requests.get(url, headers=self.headers)
elif type == 'put':
headers = self.headers
headers['Content-Type'] = 'application/json'
resp = requests.put(url, headers=headers, data=json.dumps(data))
elif type == 'patch':
headers = self.headers
headers['Content-Type'] = 'application/json'
resp = requests.patch(url, headers=headers, data=json.dumps(data))
elif type == 'delete':
resp = requests.delete(url, headers=self.headers)
else:
raise errors.PluginError(
"HTTP Error during request. Unknown type {0}".format(type)
)
logger.debug("API REquest to URL: %s", url)
if resp.status_code != 200:
error_msg = resp.reason + " " + resp.text['message']
raise errors.PluginError(
"HTTP Error during login {0}".format(resp.status_code)
"HTTP Error during request {0}:{1}".format(resp.status_code, error_msg)
)
try:
result = resp.json()
except:
raise errors.PluginError(
"API response with non JSON: {0}".format(resp.text)
)
if result["code"] == "ok":
return result["response"]
elif result["code"] == "remote_fault":
raise errors.PluginError(
"API response with an error: {0}".format(result["message"])
)
else:
raise errors.PluginError("API response unknown {0}".format(resp.text))
result = None
if type == 'get':
try:
result = resp.json()
except:
raise errors.PluginError(
"API response with non JSON: {0}".format(resp.text)
)
return result
def _get_url(self, action):
return "{0}?{1}".format(self.endpoint, action)
def _get_server_id(self, zone_id):
zone = self._api_request("dns_zone_get", {"primary_id": zone_id})
return zone["server_id"]
return "{0}{1}".format(self.endpoint, action)
def add_txt_record(self, domain, record_name, record_content, record_ttl):
"""
@ -133,25 +147,19 @@ class _ionosClient(object):
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the IONOS API
"""
self._login()
zone_id, zone_name = self._find_managed_zone_id(domain, record_name)
zone_id, zone_name = self._find_managed_zone_id(domain)
if zone_id is None:
raise errors.PluginError("Domain not known")
logger.debug("domain found: %s with id: %s", zone_name, zone_id)
o_record_name = record_name
record_name = record_name.replace(zone_name, "")[:-1]
logger.debug(
"using record_name: %s from original: %s", record_name, o_record_name
)
record = self.get_existing_txt(zone_id, record_name, record_content)
record = self.get_existing_txt(zone_id, record_name)
if record is not None:
if record["data"] == record_content:
if record["content"] == record_content:
logger.info("already there, id {0}".format(record["id"]))
return
else:
logger.info("update {0}".format(record["id"]))
logger.info("update txt record")
self._update_txt_record(
zone_id, record["id"], record_name, record_content, record_ttl
zone_id, record["id"], record_content, record_ttl
)
else:
logger.info("insert new txt record")
@ -167,82 +175,48 @@ class _ionosClient(object):
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the IONOS API
"""
self._login()
zone_id, zone_name = self._find_managed_zone_id(domain, record_name)
zone_id, zone_name = self._find_managed_zone_id(domain)
if zone_id is None:
raise errors.PluginError("Domain not known")
logger.debug("domain found: %s with id: %s", zone_name, zone_id)
o_record_name = record_name
record_name = record_name.replace(zone_name, "")[:-1]
logger.debug(
"using record_name: %s from original: %s", record_name, o_record_name
)
record = self.get_existing_txt(zone_id, record_name, record_content)
record = self.get_existing_txt(zone_id, record_name)
if record is not None:
if record["data"] == record_content:
#seem record "content" is double quoted. Remove quotes
content = record["content"]
# or, if they only occur at start...
content = content.lstrip('\"')
content = content.rstrip('\"')
if content == record_content:
logger.debug("delete TXT record: %s", record["id"])
self._delete_txt_record(record["id"])
self._delete_txt_record(zone_id, record["id"])
def _prepare_rr_data(self, zone_id, record_name, record_content, record_ttl):
server_id = self._get_server_id(zone_id)
data = {
"client_id": None,
"rr_type": "TXT",
"params": {
"server_id": server_id,
"name": record_name,
"active": "Y",
"type": "TXT",
"data": record_content,
"zone": zone_id,
"ttl": record_ttl,
"update_serial": False,
"stamp": time.strftime('%Y-%m-%d %H:%M:%S'),
},
}
return data
def _update_txt_record(self, zone_id, primary_id, record_content, record_ttl):
data = {}
data['disabled'] = False
data['content'] = record_content
data['ttl'] = record_ttl
data['prio'] = 0
logger.debug("update with data: %s", data)
self._api_request(type='put', action='/dns/v1/zones/{0}/records/{1}'.format(zone_id,primary_id), data=data)
def _insert_txt_record(self, zone_id, record_name, record_content, record_ttl):
data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl)
data = {}
data['disabled'] = False
data['type'] = 'TXT'
data['name'] = record_name
data['content'] = record_content
data['ttl'] = record_ttl
data['prio'] = 0
records = []
records.append(data)
logger.debug("insert with data: %s", data)
result = self._api_request("dns_txt_add", data)
self._api_request(type='patch', action='/dns/v1/zones/{0}'.format(zone_id), data=records)
def _update_txt_record(
self, zone_id, primary_id, record_name, record_content, record_ttl
):
data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl)
data["primary_id"] = primary_id
logger.debug("update with data: %s", data)
result = self._api_request("dns_txt_update", data)
def _delete_txt_record(self, zone_id, primary_id):
logger.debug("delete id: %s", primary_id)
self._api_request(type='delete', action='/dns/v1/zones/{0}/records/{1}'.format(zone_id,primary_id))
def _delete_txt_record(self, primary_id):
data = {"primary_id": primary_id}
logger.debug("delete with data: %s", data)
result = self._api_request("dns_txt_delete", data)
def _find_managed_zone_id(self, domain, record_name):
"""
Find the managed zone for a given domain.
:param str domain: The domain for which to find the managed zone.
:returns: The ID of the managed zone, if found.
:rtype: str
:raises certbot.errors.PluginError: if the managed zone cannot be found.
"""
zone_dns_name_guesses = [record_name] + dns_common.base_domain_name_guesses(domain)
for zone_name in zone_dns_name_guesses:
# get the zone id
try:
logger.debug("looking for zone: %s", zone_name)
zone_id = self._api_request("dns_zone_get_id", {"origin": zone_name})
return zone_id, zone_name
except errors.PluginError as e:
pass
return None
def get_existing_txt(self, zone_id, record_name, record_content):
def get_existing_txt(self, zone_id, record_name):
"""
Get existing TXT records from the RRset for the record name.
@ -256,14 +230,11 @@ class _ionosClient(object):
:rtype: `string` or `None`
"""
self._login()
read_zone_data = {"zone_id": zone_id}
zone_data = self._api_request("dns_rr_get_all_by_zone", read_zone_data)
for entry in zone_data:
zone_data = self._api_request(type='get', action='/dns/v1/zones/{0}'.format(zone_id))
for entry in zone_data['records']:
if (
entry["name"] == record_name
and entry["type"] == "TXT"
and entry["data"] == record_content
):
return entry
return None