# coding=utf-8
"""A Python Module for interacting with the Infoblox WAPI. The module supports auth sessions via the
requests module as well as numerous shortcuts for manipulating objects within Infoblox.
"""
# Copyright (C) 2015-2021 Jesse Almanrode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests
import json
from collections import namedtuple
[docs]def ipv4addr_obj(ipaddr, **kwargs):
""" Create a new IPv4 Address dictionary
:param ipaddr: ipv4 address
:param kwargs: key/value options for ipv4addr object
:return: ipvraddr dictionary with default values
"""
ipv4_obj = dict(configure_for_dhcp=False, ipv4addr=ipaddr)
ipv4_obj.update(kwargs)
return ipv4_obj
[docs]class Infoblox(object):
"""Create a new instance of the Infoblox WAPI Object
:param uri: Full url to the Infoblox WAPI
:param username: Infoblox User with API Access
:param password: Password for Infoblox User
:param verify_ssl: Verify SSL Certificate
:return: Infoblox Object
:property view: The DNS view to add objects to (default == 'default')
"""
def __init__(self, uri, username=None, password=None, verify_ssl=False):
if uri.endswith("/") is False:
uri += "/"
self.uri = uri
auth = namedtuple('auth', ['username', 'password'])
self.auth = auth(username=username, password=password)
self.verify_ssl = verify_ssl
self._return_type = "json" # This is what the WAPI defaults to
self.returnTypes = ('json', 'json-pretty', 'xml', 'xml-pretty')
self.session = requests.Session()
self.view = 'default'
if verify_ssl is False:
# This is so you don't get weird warning messages about not verifying ssl certs
try:
requests.packages.urllib3.disable_warnings()
except AttributeError:
pass
def __str__(self):
return str(self.__dict__)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return None
[docs] def verify(self, **kwargs):
"""Private method for verifying the named argument data and preparing it for a wapi call
.. note::
Key modifiers will also be fixed if the key ends in one of the following strings:
* _plus = +
* _regex = ~
* _notequal = !
* _ignorecase = :
* _lessthan = <
* _greaterthan = >
:param kwargs: key/value pairs
:return: Dictionary ready for wapi call
"""
nkeys = kwargs.keys()
if '_ref' not in nkeys and 'objtype' not in nkeys:
if '_function' not in nkeys:
raise ValueError("objtype or _ref is required!")
if "_return_type" in nkeys:
if kwargs['_return_type'] not in self.returnTypes:
raise ValueError(kwargs['_return_type'] + " is not a valid return type!")
else:
self._return_type = kwargs['_return_type']
newdict = {}
for key, value in kwargs.items():
if key.endswith("_regex"):
key = key.replace("_regex", "~")
elif key.endswith("_plus"):
key = key.replace("_plus", "+")
elif key.endswith("_notequal"):
key = key.replace("_notequal", "!")
elif key.endswith("_ignorecase"):
key = key.replace("_ignorecase", ":")
elif key.endswith("_lessthan"):
key = key.replace("_lessthan", "<")
elif key.endswith("_greaterthan"):
key = key.replace("_greaterthan", ">")
newdict[key] = value
return newdict
[docs] def get(self, **kwargs):
""" Query the Infoblox WAPI
Query the Infoblox WAPI for records of a specific type (with or without regular expressions)
OR
Query the Infoblox WAPI for a specific record by its _ref
:param kwargs: Key/Value parameters
:return: string of _return_type (json or xml)
"""
kwargs = self.__verify(**kwargs)
nkeys = kwargs.keys()
if "_ref" in nkeys:
result = self.session.get(self.uri + kwargs['_ref'], auth=self.auth, verify=self.verify_ssl)
else:
objtype = kwargs['objtype']
del kwargs['objtype']
result = self.session.get(self.uri + objtype, auth=self.auth, verify=self.verify_ssl, params=kwargs)
if self._return_type == 'json':
return result.json()
else:
return result.text
[docs] def add(self, **kwargs):
"""Add a record of a given type to Infoblox via the WAPI
:param kwargs: key/value parameters
:return: string of _return_type (json or xml)
"""
kwargs = self.__verify(**kwargs)
objtype = kwargs['objtype']
del kwargs['objtype']
result = self.session.post(self.uri + objtype, auth=self.auth, verify=self.verify_ssl, data=json.dumps(kwargs))
if self._return_type == 'json':
return result.json()
else:
return result.text
[docs] def delete(self, ref, **kwargs):
"""Delete a record in Infoblox based on its _ref id.
:param ref: Reference id for a given object
:param kwargs: key/value parameters
:return: string of _return_type (json or xml)
"""
kwargs['_ref'] = ref
kwargs = self.__verify(**kwargs)
del kwargs['_ref']
result = self.session.delete(self.uri + ref, auth=self.auth, verify=self.verify_ssl, data=json.dumps(kwargs))
if self._return_type == 'json':
return result.json()
else:
return result.text
[docs] def modify(self, ref, **kwargs):
"""Modify/Update an existing object
:param ref: The _ref id of the object to update
:param kwargs: key/value parameters
:return: string of _return_type (json or xml)
"""
kwargs['_ref'] = ref
kwargs = self.__verify(**kwargs)
del kwargs['_ref']
result = self.session.put(self.uri + ref, auth=self.auth, verify=self.verify_ssl, data=json.dumps(kwargs))
if self._return_type == 'json':
return result.json()
else:
return result.text
[docs] def call(self, ref, func, **kwargs):
"""Call a specific function on a given object
:param ref: The _ref of the object
:param func: The function to call
:param kwargs: Data to be passed to the function
:return: String of _return_type(json or xml)
"""
_function = {'_function': func}
kwargs['_ref'] = ref
kwargs = self.__verify(**kwargs)
del kwargs['_ref']
result = self.session.post(self.uri + ref, auth=self.auth, verify=self.verify_ssl,
params=_function, data=json.dumps(kwargs))
if self._return_type == 'json':
return result.json()
else:
return result.text
[docs] def get_host(self, **kwargs):
"""Shortcut for finding host records
:param kwargs: Can contain dictionary of data to search for or _ref of specific record
:return: string of _return_type (json or xml)
"""
if "_ref" in kwargs.keys():
return self.__get(**kwargs)
else:
kwargs['objtype'] = "record:host"
return self.__get(**kwargs)
[docs] def get_host_by_ip(self, ipaddr):
"""Shortcut for finding a host record by its primary IPV4 address
:param ipaddr: IPV4 address
:return: string of _return_type (json or xml)
"""
return self.__get_host(ipv4addr=ipaddr)
[docs] def get_host_by_name(self, fqdn):
"""Shortcut for finding a host record by its fully qualified domain name
:param fqdn: Fully Qualified Domain Name
:return: string of _return_type (json or xml)
"""
return self.__get_host(name=fqdn)
[docs] def add_host(self, fqdn, ipaddr, **kwargs):
"""Shortcut for adding a host record with an iPV4 address
:param fqdn: Fully Qualified Domain Name of the host to add
:param ipaddr: IPV4 address of the host (can be a list of ipv4 addresses)
:param kwargs: Key/Value dictionary of any extra options to add to host record
:return: string of _return_type (json or xml)
"""
ipv4addrs = []
if type(ipaddr) in (list, tuple):
for ip in map(ipv4addr_obj, ipaddr):
ipv4addrs.append(ip)
else:
ipv4addrs.append(ipv4addr_obj(ipaddr))
newhost = dict(objtype='record:host', name=fqdn, ipv4addrs=ipv4addrs, view=self.view)
newhost.update(kwargs)
return self.__add(**newhost)
[docs] def add_host_ip(self, fqdn, ipaddr):
"""Shortcut for adding an ip to a given host
:param fqdn: Fully Qualified Domain name of the host
:param ipaddr: IPV4 address of the host (can be a list of ipv4 addresses)
:return: Modified host record
"""
try:
host = self.__get_host_by_name(fqdn)[0]
except IndexError:
raise IndexError("Unable to find host with name " + fqdn)
if type(ipaddr) in (list, tuple):
for ip in map(ipv4addr_obj, ipaddr):
host['ipv4addrs'].append(ip)
else:
host['ipv4addrs'].append(ipv4addr_obj(ipaddr))
return self.__modify(host['_ref'], ipv4addrs=host['ipv4addrs'])
[docs] def add_alias(self, fqdn, alias):
"""Shortcut for adding an alias/CNAME to a given host record
:param fqdn: Fully Qualified Domain Name of the host
:param alias: A fqdn name (or list of fqdns) to add as aliases/CNAMES
:return: string of _return_type (json or xml)
"""
thishost = self.__get_host(name=fqdn, _return_fields_plus="aliases")[0]
if 'aliases' not in thishost.keys():
thishost['aliases'] = []
if type(alias) in (list, tuple):
for name in alias:
if name not in thishost['aliases']:
thishost['aliases'].append(name)
else:
if alias not in thishost['aliases']:
thishost['aliases'].append(alias)
return self.__modify(thishost['_ref'], aliases=thishost['aliases'])
[docs] def delete_alias(self, fqdn, alias):
"""Shortcut for adding an alias/CNAME to a given host record
:param fqdn: Fully Qualified Domain Name of the host
:param alias: The fqdn of the alias (or list of fqdns) you wish to remove
:return: string of _return_type (json or xml)
"""
thishost = self.__get_host(name=fqdn, _return_fields_plus='aliases')[0]
if 'aliases' not in thishost.keys():
return thishost
else:
if type(alias) in (list, tuple):
for name in alias:
if name in thishost['aliases']:
thishost['aliases'].remove(name)
else:
thishost['aliases'].remove(alias)
return self.__modify(thishost['_ref'], aliases=thishost['aliases'])
# Allowing Infoblox to be subclassed while protecting internal calls
__verify = verify
__get = get
__add = add
__delete = delete
__modify = modify
__call = call
__get_host = get_host
__get_host_by_name = get_host_by_name