faz_query

Metadata

Name: faz_query

Description: Provides information on data objects within FortiAnalyzer so that playbooks can perform conditionals.

Author(s): Luke Weighall (@lweighall)

Ansible Version Added/Required: 2.8

Dev Status: No status updates, yet. Contact Authors.

Parameters

adom

  • Description: The ADOM the configuration should belong to.
  • Required: False
  • default: root

custom_dict

  • Description: ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FAZ JSON API!

    DICTIONARY JSON FORMAT ONLY – Custom dictionary/datagram to send to the endpoint.

  • Required: False

custom_endpoint

  • Description: ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FAZ JSON API!

    The HTTP Endpoint on FortiAnalyzer you wish to GET from.

  • Required: False

device_ip

  • Description: The IP of the device you want to query.
  • Required: False

device_serial

  • Description: The serial number of the device you want to query.
  • Required: False

device_unique_name

  • Description: The desired “friendly” name of the device you want to query.
  • Required: False

nodes

  • Description: A LIST of firewalls in the cluster you want to verify i.e. [“firewall_A”,”firewall_B”].
  • Required: False

object

  • Description: The data object we wish to query (device, package, rule, etc). Will expand choices as improves.
  • Required: True
  • choices: [‘device’, ‘cluster_nodes’, ‘task’, ‘custom’]

task_id

  • Description: The ID of the task you wish to query status on. If left blank and object = ‘task’ a list of tasks are returned.
  • Required: False

Functions

  • faz_get_custom
def faz_get_custom(faz, paramgram):
    """
    :param faz: The faz object instance from fortianalyzer.py
    :type faz: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiAnalyzer
    :rtype: dict
    """
    # IF THE CUSTOM DICTIONARY (OFTEN CONTAINING FILTERS) IS DEFINED CREATED THAT
    if paramgram["custom_dict"] is not None:
        datagram = paramgram["custom_dict"]
    else:
        datagram = dict()

    # SET THE CUSTOM ENDPOINT PROVIDED
    url = paramgram["custom_endpoint"]
    # MAKE THE CALL AND RETURN RESULTS
    response = faz.process_request(url, datagram, FAZMethods.GET)
    return response
  • faz_get_task_status
def faz_get_task_status(faz, paramgram):
    """
    :param faz: The faz object instance from fortianalyzer.py
    :type faz: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiAnalyzer
    :rtype: dict
    """
    # IF THE TASK_ID IS DEFINED, THEN GET THAT SPECIFIC TASK
    # OTHERWISE, GET ALL RECENT TASKS IN A LIST
    if paramgram["task_id"] is not None:

        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task/{task_id}'.format(task_id=paramgram["task_id"])
        response = faz.process_request(url, datagram, FAZMethods.GET)
    else:
        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task'
        response = faz.process_request(url, datagram, FAZMethods.GET)
    return response
  • main
def main():
    argument_spec = dict(
        adom=dict(required=False, type="str", default="root"),
        object=dict(required=True, type="str", choices=["task", "custom"]),
        custom_endpoint=dict(required=False, type="str"),
        custom_dict=dict(required=False, type="dict"),
        task_id=dict(required=False, type="str")
    )

    module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False, )
    paramgram = {
        "adom": module.params["adom"],
        "object": module.params["object"],
        "task_id": module.params["task_id"],
        "custom_endpoint": module.params["custom_endpoint"],
        "custom_dict": module.params["custom_dict"]
    }
    module.paramgram = paramgram
    faz = None
    if module._socket_path:
        connection = Connection(module._socket_path)
        faz = FortiAnalyzerHandler(connection, module)
        faz.tools = FAZCommon()
    else:
        module.fail_json(**FAIL_SOCKET_MSG)

    results = DEFAULT_RESULT_OBJ

    try:
        # IF OBJECT IS TASK
        if paramgram["object"] == "task":
            results = faz_get_task_status(faz, paramgram)
            if results[0] != 0:
                module.fail_json(**results[1])
            if results[0] == 0:
                module.exit_json(**results[1])
    except Exception as err:
        raise FAZBaseException(err)

    try:
        # IF OBJECT IS CUSTOM
        if paramgram["object"] == "custom":
            results = faz_get_custom(faz, paramgram)
            if results[0] != 0:
                module.fail_json(msg="QUERY FAILED -- Please check syntax check JSON guide if needed.")
            if results[0] == 0:
                results_len = len(results[1])
                if results_len > 0:
                    results_combine = dict()
                    if isinstance(results[1], dict):
                        results_combine["results"] = results[1]
                    if isinstance(results[1], list):
                        results_combine["results"] = results[1][0:results_len]
                    module.exit_json(msg="Custom Query Success", **results_combine)
                else:
                    module.exit_json(msg="NO RESULTS")
    except Exception as err:
        raise FAZBaseException(err)

    # PROCESS RESULTS
    try:
        faz.govern_response(module=module, results=results,
                            ansible_facts=faz.construct_ansible_facts(results, module.params, paramgram))
    except BaseException as err:
        raise FAZBaseException(msg="An error occurred with govern_response(). Error: " + str(err))

    return module.exit_json(**results[1])

Module Source Code

#!/usr/bin/python
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#

from __future__ import absolute_import, division, print_function
__metaclass__ = type

ANSIBLE_METADATA = {
    "metadata_version": "1.1",
    "status": ["preview"],
    "supported_by": "community"
}

DOCUMENTATION = '''
---
module: faz_query
version_added: "2.8"
notes:
    - Full Documentation at U(https://ftnt-ansible-docs.readthedocs.io/en/latest/).
author: Luke Weighall (@lweighall)
short_description: Query FortiAnalyzer data objects for use in Ansible workflows.
description:
  - Provides information on data objects within FortiAnalyzer so that playbooks can perform conditionals.

options:
  adom:
    description:
      - The ADOM the configuration should belong to.
    required: false
    default: root

  object:
    description:
      - The data object we wish to query (device, package, rule, etc). Will expand choices as improves.
    required: true
    choices:
    - device
    - cluster_nodes
    - task
    - custom

  custom_endpoint:
    description:
        - ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FAZ JSON API!
        - The HTTP Endpoint on FortiAnalyzer you wish to GET from.
    required: false

  custom_dict:
    description:
        - ADVANCED USERS ONLY! REQUIRES KNOWLEDGE OF FAZ JSON API!
        - DICTIONARY JSON FORMAT ONLY -- Custom dictionary/datagram to send to the endpoint.
    required: false

  device_ip:
    description:
      - The IP of the device you want to query.
    required: false

  device_unique_name:
    description:
      - The desired "friendly" name of the device you want to query.
    required: false

  device_serial:
    description:
      - The serial number of the device you want to query.
    required: false

  task_id:
    description:
      - The ID of the task you wish to query status on. If left blank and object = 'task' a list of tasks are returned.
    required: false

  nodes:
    description:
      - A LIST of firewalls in the cluster you want to verify i.e. ["firewall_A","firewall_B"].
    required: false
'''


EXAMPLES = '''
- name: GET STATUS OF TASK ID
  faz_query:
    adom: "ansible"
    object: "task"
    task_id: "3"

- name: USE CUSTOM TYPE TO QUERY AVAILABLE SCRIPTS
  faz_query:
    adom: "ansible"
    object: "custom"
    custom_endpoint: "/dvmdb/adom/ansible/script"
    custom_dict: { "type": "cli" }
'''

RETURN = """
api_result:
  description: full API response, includes status code and message
  returned: always
  type: str
"""

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.connection import Connection
from ansible.module_utils.network.fortianalyzer.fortianalyzer import FortiAnalyzerHandler
from ansible.module_utils.network.fortianalyzer.common import FAZBaseException
from ansible.module_utils.network.fortianalyzer.common import FAZCommon
from ansible.module_utils.network.fortianalyzer.common import FAZMethods
from ansible.module_utils.network.fortianalyzer.common import DEFAULT_RESULT_OBJ
from ansible.module_utils.network.fortianalyzer.common import FAIL_SOCKET_MSG


def faz_get_custom(faz, paramgram):
    """
    :param faz: The faz object instance from fortianalyzer.py
    :type faz: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiAnalyzer
    :rtype: dict
    """
    # IF THE CUSTOM DICTIONARY (OFTEN CONTAINING FILTERS) IS DEFINED CREATED THAT
    if paramgram["custom_dict"] is not None:
        datagram = paramgram["custom_dict"]
    else:
        datagram = dict()

    # SET THE CUSTOM ENDPOINT PROVIDED
    url = paramgram["custom_endpoint"]
    # MAKE THE CALL AND RETURN RESULTS
    response = faz.process_request(url, datagram, FAZMethods.GET)
    return response


def faz_get_task_status(faz, paramgram):
    """
    :param faz: The faz object instance from fortianalyzer.py
    :type faz: class object
    :param paramgram: The formatted dictionary of options to process
    :type paramgram: dict
    :return: The response from the FortiAnalyzer
    :rtype: dict
    """
    # IF THE TASK_ID IS DEFINED, THEN GET THAT SPECIFIC TASK
    # OTHERWISE, GET ALL RECENT TASKS IN A LIST
    if paramgram["task_id"] is not None:

        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task/{task_id}'.format(task_id=paramgram["task_id"])
        response = faz.process_request(url, datagram, FAZMethods.GET)
    else:
        datagram = {
            "adom": paramgram["adom"]
        }
        url = '/task/task'
        response = faz.process_request(url, datagram, FAZMethods.GET)
    return response


def main():
    argument_spec = dict(
        adom=dict(required=False, type="str", default="root"),
        object=dict(required=True, type="str", choices=["task", "custom"]),
        custom_endpoint=dict(required=False, type="str"),
        custom_dict=dict(required=False, type="dict"),
        task_id=dict(required=False, type="str")
    )

    module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False, )
    paramgram = {
        "adom": module.params["adom"],
        "object": module.params["object"],
        "task_id": module.params["task_id"],
        "custom_endpoint": module.params["custom_endpoint"],
        "custom_dict": module.params["custom_dict"]
    }
    module.paramgram = paramgram
    faz = None
    if module._socket_path:
        connection = Connection(module._socket_path)
        faz = FortiAnalyzerHandler(connection, module)
        faz.tools = FAZCommon()
    else:
        module.fail_json(**FAIL_SOCKET_MSG)

    results = DEFAULT_RESULT_OBJ

    try:
        # IF OBJECT IS TASK
        if paramgram["object"] == "task":
            results = faz_get_task_status(faz, paramgram)
            if results[0] != 0:
                module.fail_json(**results[1])
            if results[0] == 0:
                module.exit_json(**results[1])
    except Exception as err:
        raise FAZBaseException(err)

    try:
        # IF OBJECT IS CUSTOM
        if paramgram["object"] == "custom":
            results = faz_get_custom(faz, paramgram)
            if results[0] != 0:
                module.fail_json(msg="QUERY FAILED -- Please check syntax check JSON guide if needed.")
            if results[0] == 0:
                results_len = len(results[1])
                if results_len > 0:
                    results_combine = dict()
                    if isinstance(results[1], dict):
                        results_combine["results"] = results[1]
                    if isinstance(results[1], list):
                        results_combine["results"] = results[1][0:results_len]
                    module.exit_json(msg="Custom Query Success", **results_combine)
                else:
                    module.exit_json(msg="NO RESULTS")
    except Exception as err:
        raise FAZBaseException(err)

    # PROCESS RESULTS
    try:
        faz.govern_response(module=module, results=results,
                            ansible_facts=faz.construct_ansible_facts(results, module.params, paramgram))
    except BaseException as err:
        raise FAZBaseException(msg="An error occurred with govern_response(). Error: " + str(err))

    return module.exit_json(**results[1])


if __name__ == "__main__":
    main()