gns3api - Simple python module to access the GNS3 API
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

gns3api.py 9.5KB


  1. """
  2. Access GNS3 controller via API
  3. """
  4. import os
  5. import sys
  6. import ssl
  7. import json
  8. from base64 import b64encode
  9. try:
  10. import configparser
  11. except ImportError: # fallback to Python 2 module
  12. import ConfigParser as configparser
  13. try:
  14. import http.client as http_client
  15. from urllib.parse import urlsplit
  16. GNS3BaseException = OSError
  17. except ImportError: # fallback to Python 2 module
  18. import httplib as http_client
  19. from urlparse import urlsplit
  20. GNS3BaseException = IOError
  21. class GNS3ApiException(GNS3BaseException):
  22. """
  23. GNS3 API Exceptions, base class
  24. """
  25. def __init__(self, *args):
  26. super(GNS3ApiException, self).__init__()
  27. self.args = args
  28. class GNS3ConfigurationError(GNS3ApiException):
  29. """
  30. GNS3 configuration error
  31. """
  32. def __init__(self, message="Missing/invalid GNS3 configuration"):
  33. GNS3ApiException.__init__(self, message)
  34. class HTTPClientError(GNS3ApiException):
  35. """
  36. HTTP client library error
  37. """
  38. def __str__(self):
  39. return ": ".join(str(x) for x in self.args)
  40. class HTTPError(GNS3ApiException):
  41. """
  42. HTTP response error
  43. """
  44. def __str__(self):
  45. if len(self.args) >= 2:
  46. return '[Status {}] '.format(self.args[0]) + \
  47. " ".join(str(x) for x in self.args[1:])
  48. return str(self.args[0])
  49. class GNS3Api:
  50. """
  51. GNS3 API - an API to GNS3
  52. """
  53. def __init__(self, url=None, user=None, password=None,
  54. profile=None, version=None, verify=True):
  55. """
  56. GNS3 API
  57. :param url: Server URL, if None the connection parameters
  58. are read from the GNS3 configuration file
  59. :param user: User name, None for no authentification
  60. :param password: Password
  61. :param profile: GNS3 configuration profile
  62. :param version: API version, None for autodetect
  63. :param verify: Verify CERT (on https), default True
  64. False: no CERT verification
  65. True: verification using the system CA certificates
  66. file: verification using the file and the system CA
  67. """
  68. if not url:
  69. (url, user, password) = \
  70. GNS3Api.get_controller_connection(profile, version)
  71. # split URL
  72. try:
  73. url_tuple = urlsplit(url, "http")
  74. if not url_tuple.netloc: # fix missing "//" before host
  75. url_tuple = urlsplit(url_tuple.scheme + "://" + url_tuple.path)
  76. proto = url_tuple.scheme
  77. host = url_tuple.hostname
  78. port = url_tuple.port or 3080
  79. if user is None:
  80. user = url_tuple.username
  81. if password is None:
  82. password = url_tuple.password
  83. except ValueError as err:
  84. raise HTTPClientError("UrlError", str(err))
  85. if host == '0.0.0.0':
  86. host = '127.0.0.1'
  87. elif host == '::':
  88. host = '::1'
  89. if ':' in host: # IPv6
  90. self.controller = "{}://[{}]:{}".format(proto, host, port)
  91. else:
  92. self.controller = "{}://{}:{}".format(proto, host, port)
  93. self.status_code = None
  94. # authentication
  95. self._auth = {}
  96. if user is not None and user != '':
  97. if password is None:
  98. password = ''
  99. self._auth['Authorization'] = 'Basic ' + \
  100. b64encode((user+':'+password).encode('utf-8')).decode('ascii')
  101. # open connection
  102. try:
  103. if proto == 'http':
  104. self._conn = http_client.HTTPConnection(host, port, timeout=10)
  105. elif proto == 'https':
  106. context = ssl.create_default_context()
  107. if isinstance(verify, str):
  108. context.check_hostname = False
  109. context.load_verify_locations(cafile=verify)
  110. elif not verify:
  111. context.check_hostname = False
  112. context.verify_mode = ssl.CERT_NONE
  113. self._conn = http_client.HTTPSConnection(host, port, timeout=10,
  114. context=context)
  115. else:
  116. raise HTTPClientError("UnknownProtocol", proto)
  117. self._conn.connect()
  118. except (IOError, OSError, http_client.HTTPException) as err:
  119. raise HTTPClientError(type(err).__name__, str(err))
  120. @staticmethod
  121. def get_controller_settings(profile=None, version=None):
  122. """
  123. Get GNS3 controller settings
  124. :param profile: GNS3 configuration profile
  125. :param version: API version, None for autodetect
  126. :returns: controller settings
  127. """
  128. # find config base directory
  129. if sys.platform.startswith('win'):
  130. fn_conf = os.path.join(os.path.expandvars('%APPDATA%'), 'GNS3')
  131. fn_file = 'gns3_server.ini'
  132. else:
  133. fn_conf = os.path.join(os.path.expanduser('~'), '.config', 'GNS3')
  134. fn_file = 'gns3_server.conf'
  135. # add version
  136. if version is None: # search for highest version
  137. version = ""
  138. version_split = []
  139. for name in os.listdir(fn_conf):
  140. try:
  141. name_split = list(map(int, name.split('.')))
  142. if name_split > version_split:
  143. version = name
  144. version_split = name_split
  145. except ValueError:
  146. pass
  147. else: # use only <major>.<minor> part
  148. version = '.'.join(version.split('.', 2)[0:2])
  149. if version not in ('', '2.0', '2.1'):
  150. fn_conf = os.path.join(fn_conf, version)
  151. # add profile
  152. if profile and profile != "default":
  153. fn_conf = os.path.join(fn_conf, 'profiles', profile)
  154. # add config filename
  155. fn_conf = os.path.join(fn_conf, fn_file)
  156. # parse config
  157. config = configparser.ConfigParser()
  158. serv_conf = None
  159. try:
  160. if config.read(fn_conf):
  161. serv_conf = dict(config.items('Server'))
  162. except (IOError, OSError, configparser.Error) as err:
  163. raise GNS3ConfigurationError("Error reading GNS3 configuration: {}".format(err))
  164. if serv_conf is None:
  165. raise GNS3ConfigurationError("Missing GNS3 configuration file '{}'".format(fn_conf))
  166. # check for mandatory host key
  167. if 'host' not in serv_conf:
  168. raise GNS3ConfigurationError("GNS3 configuration: Missing host")
  169. return serv_conf
  170. @staticmethod
  171. def get_controller_connection(profile=None, version=None):
  172. """
  173. Get GNS3 controller connection parameters
  174. :param profile: GNS3 configuration profile
  175. :param version: API version, None for autodetect
  176. :returns: Tuple of url, user, password
  177. """
  178. serv_conf = GNS3Api.get_controller_settings(profile, version)
  179. host = serv_conf['host']
  180. proto = serv_conf.get('protocol', 'http')
  181. port = int(serv_conf.get('port', 3080))
  182. user = serv_conf.get('user', None)
  183. password = serv_conf.get('password', None)
  184. # create URL
  185. if ':' in host: # IPv6
  186. url = "{}://[{}]:{}".format(proto, host, port)
  187. else:
  188. url = "{}://{}:{}".format(proto, host, port)
  189. return (url, user, password)
  190. def request(self, method, path, args=None, timeout=60):
  191. """
  192. API request
  193. :param method: HTTP method ('GET'/'PUT'/'POST'/'DELETE')
  194. :param path: URL path, can be a list or tuple
  195. :param args: arguments to the API endpoint
  196. :param timeout: timeout, default 60
  197. :returns: result
  198. """
  199. # json encode args
  200. if args is None:
  201. body = None
  202. else:
  203. body = json.dumps(args, separators=(',', ':'))
  204. # methods are upper case
  205. method.upper()
  206. # make path variable to an URL path
  207. if isinstance(path, (list, tuple)):
  208. path = "/".join(str(x) for x in path)
  209. else:
  210. path = str(path)
  211. if not path.startswith("/"):
  212. path = "/" + path
  213. # send request
  214. if self._conn.timeout != timeout:
  215. self._conn.timeout = timeout
  216. if self._conn.sock:
  217. self._conn.sock.settimeout(timeout)
  218. headers = {'Content-Type': 'application/json',
  219. 'User-Agent': 'GNS3Api'}
  220. headers.update(self._auth)
  221. try:
  222. # send request / get response
  223. self._conn.request(method, path, body, headers=headers)
  224. resp = self._conn.getresponse()
  225. data = resp.read()
  226. if resp.getheader('Content-Type') == 'application/json':
  227. result = json.loads(data.decode('utf-8', errors='ignore'))
  228. else:
  229. result = data
  230. except (IOError, OSError, http_client.HTTPException) as err:
  231. raise HTTPClientError(type(err).__name__, str(err))
  232. # check for errors
  233. self.status_code = resp.status
  234. if self.status_code < 200 or self.status_code >= 300:
  235. try:
  236. message = result['message']
  237. except (TypeError, KeyError):
  238. if data is not None and data != b'':
  239. message = data.decode('utf-8', errors='ignore')
  240. else:
  241. message = resp.reason
  242. raise HTTPError(self.status_code, message)
  243. return result
  244. def close(self):
  245. """
  246. Closes HTTP(S) connection
  247. """
  248. self._conn.close()