gns3api - Simple python module to access the GNS3 API
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

gns3api.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # Copyright (C) 2017-2019 Bernhard Ehlers
  2. #
  3. # This program is free software: you can redistribute it and/or modify
  4. # it under the terms of the GNU General Public License as published by
  5. # the Free Software Foundation, either version 3 of the License, or
  6. # (at your option) any later version.
  7. #
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. """
  16. Access GNS3 controller via API
  17. """
  18. import os
  19. import sys
  20. import ssl
  21. import json
  22. from base64 import b64encode
  23. import configparser
  24. import http.client
  25. from urllib.parse import urlsplit
  26. class GNS3ApiException(OSError):
  27. """
  28. GNS3 API Exceptions, base class
  29. """
  30. def __init__(self, *args):
  31. super().__init__()
  32. self.args = args
  33. class GNS3ConfigurationError(GNS3ApiException):
  34. """
  35. GNS3 configuration error
  36. """
  37. def __init__(self, message="Missing/invalid GNS3 configuration"):
  38. GNS3ApiException.__init__(self, message)
  39. class HTTPClientError(GNS3ApiException):
  40. """
  41. HTTP client library error
  42. """
  43. def __str__(self):
  44. return ": ".join(str(x) for x in self.args)
  45. class HTTPError(GNS3ApiException):
  46. """
  47. HTTP response error
  48. """
  49. def __str__(self):
  50. if len(self.args) >= 2:
  51. return '[Status {}] '.format(self.args[0]) + \
  52. " ".join(str(x) for x in self.args[1:])
  53. return str(self.args[0])
  54. class GNS3Api:
  55. """
  56. GNS3 API - an API to GNS3
  57. """
  58. def __init__(self, url=None, user=None, password=None,
  59. profile=None, version=None, verify=True):
  60. """
  61. GNS3 API
  62. :param url: Server URL, if None the connection parameters
  63. are read from the GNS3 configuration file
  64. :param user: User name, None for no authentification
  65. :param password: Password
  66. :param profile: GNS3 configuration profile
  67. :param version: API version, None for autodetect
  68. :param verify: Verify CERT (on https), default True
  69. False: no CERT verification
  70. True: verification using the system CA certificates
  71. file: verification using the file and the system CA
  72. """
  73. if not url:
  74. (url, user, password) = \
  75. GNS3Api.get_controller_connection(profile, version)
  76. # split URL
  77. try:
  78. url_tuple = urlsplit(url, "http")
  79. if not url_tuple.netloc: # fix missing "//" before host
  80. url_tuple = urlsplit(url_tuple.scheme + "://" + url_tuple.path)
  81. proto = url_tuple.scheme
  82. host = url_tuple.hostname
  83. port = url_tuple.port or 3080
  84. if user is None:
  85. user = url_tuple.username
  86. if password is None:
  87. password = url_tuple.password
  88. except ValueError as err:
  89. raise HTTPClientError("UrlError", str(err))
  90. if host == '0.0.0.0':
  91. host = '127.0.0.1'
  92. elif host == '::':
  93. host = '::1'
  94. if ':' in host: # IPv6
  95. self.controller = "{}://[{}]:{}".format(proto, host, port)
  96. else:
  97. self.controller = "{}://{}:{}".format(proto, host, port)
  98. # authentication
  99. self._auth = {}
  100. if user is not None and user != '':
  101. if password is None:
  102. password = ''
  103. self._auth['Authorization'] = 'Basic ' + \
  104. b64encode((user+':'+password).encode('utf-8')).decode('ascii')
  105. # open connection
  106. self.status_code = None
  107. try:
  108. if proto == 'http':
  109. self._conn = http.client.HTTPConnection(host, port, timeout=10)
  110. elif proto == 'https':
  111. context = ssl.create_default_context()
  112. if isinstance(verify, str):
  113. context.check_hostname = False
  114. context.load_verify_locations(cafile=verify)
  115. elif not verify:
  116. context.check_hostname = False
  117. context.verify_mode = ssl.CERT_NONE
  118. self._conn = http.client.HTTPSConnection(host, port, timeout=10,
  119. context=context)
  120. else:
  121. raise HTTPClientError("UnknownProtocol", proto)
  122. self._conn.connect()
  123. except (OSError, http.client.HTTPException) as err:
  124. raise HTTPClientError(type(err).__name__, str(err))
  125. @staticmethod
  126. def get_controller_settings(profile=None, version=None):
  127. """
  128. Get GNS3 controller settings
  129. :param profile: GNS3 configuration profile
  130. :param version: API version, None for autodetect
  131. :returns: controller settings
  132. """
  133. # find config base directory
  134. if sys.platform.startswith('win'):
  135. fn_conf = os.path.join(os.path.expandvars('%APPDATA%'), 'GNS3')
  136. fn_file = 'gns3_server.ini'
  137. else:
  138. fn_conf = os.path.join(os.path.expanduser('~'), '.config', 'GNS3')
  139. fn_file = 'gns3_server.conf'
  140. # add version
  141. if version is None: # search for highest version
  142. version = ""
  143. version_split = []
  144. for name in os.listdir(fn_conf):
  145. try:
  146. name_split = list(map(int, name.split('.')))
  147. if name_split > version_split:
  148. version = name
  149. version_split = name_split
  150. except ValueError:
  151. pass
  152. else: # use only <major>.<minor> part
  153. version = '.'.join(version.split('.', 2)[0:2])
  154. if version not in ('', '2.0', '2.1'):
  155. fn_conf = os.path.join(fn_conf, version)
  156. # add profile
  157. if profile and profile != "default":
  158. fn_conf = os.path.join(fn_conf, 'profiles', profile)
  159. # add config filename
  160. fn_conf = os.path.join(fn_conf, fn_file)
  161. # parse config
  162. config = configparser.ConfigParser()
  163. serv_conf = None
  164. try:
  165. if config.read(fn_conf):
  166. serv_conf = config['Server']
  167. except (OSError, KeyError, configparser.Error) as err:
  168. raise GNS3ConfigurationError("Error reading GNS3 configuration: {}".format(err))
  169. if serv_conf is None:
  170. raise GNS3ConfigurationError("Missing GNS3 configuration file '{}'".format(fn_conf))
  171. # check for mandatory host key
  172. if 'host' not in serv_conf:
  173. raise GNS3ConfigurationError("GNS3 configuration: Missing host")
  174. return serv_conf
  175. @staticmethod
  176. def get_controller_connection(profile=None, version=None):
  177. """
  178. Get GNS3 controller connection parameters
  179. :param profile: GNS3 configuration profile
  180. :param version: API version, None for autodetect
  181. :returns: Tuple of url, user, password
  182. """
  183. serv_conf = GNS3Api.get_controller_settings(profile, version)
  184. host = serv_conf['host']
  185. proto = serv_conf.get('protocol', 'http')
  186. port = int(serv_conf.get('port', 3080))
  187. if serv_conf.get('auth'):
  188. user = serv_conf.get('user', None)
  189. password = serv_conf.get('password', None)
  190. else:
  191. user = None
  192. password = None
  193. # create URL
  194. if ':' in host: # IPv6
  195. url = "{}://[{}]:{}".format(proto, host, port)
  196. else:
  197. url = "{}://{}:{}".format(proto, host, port)
  198. return (url, user, password)
  199. def request_file(self, method, path, args=None, timeout=60):
  200. """
  201. API request
  202. :param method: HTTP method ('GET'/'PUT'/'POST'/'DELETE')
  203. :param path: URL path, can be a list or tuple
  204. :param args: arguments to the API endpoint
  205. :param timeout: timeout, default 60
  206. :returns: HTTPResponse, a file object
  207. """
  208. headers = {'User-Agent': 'GNS3Api'}
  209. self.status_code = None
  210. # json encode args
  211. if isinstance(args, dict):
  212. body = json.dumps(args, separators=(',', ':'))
  213. headers['Content-Type'] = 'application/json'
  214. else:
  215. body = args
  216. headers['Content-Type'] = 'application/octet-stream'
  217. # methods are upper case
  218. method.upper()
  219. # make path variable to an URL path
  220. if isinstance(path, (list, tuple)):
  221. path = "/".join(str(x) for x in path)
  222. else:
  223. path = str(path)
  224. if not path.startswith("/"):
  225. path = "/" + path
  226. # update timeout
  227. if self._conn.timeout != timeout:
  228. self._conn.timeout = timeout
  229. if self._conn.sock:
  230. self._conn.sock.settimeout(timeout)
  231. # send request / get response
  232. headers.update(self._auth)
  233. try:
  234. self._conn.request(method, path, body, headers=headers)
  235. resp = self._conn.getresponse()
  236. self.status_code = resp.status
  237. except (OSError, http.client.HTTPException) as err:
  238. raise HTTPClientError(type(err).__name__, str(err))
  239. # check for errors
  240. if self.status_code < 200 or self.status_code >= 300:
  241. try:
  242. message = resp.read()
  243. except (OSError, http.client.HTTPException):
  244. message = resp.reason
  245. else:
  246. if message:
  247. message = message.decode('utf-8', errors='ignore')
  248. if resp.getheader('Content-Type') == 'application/json':
  249. message = json.loads(message)
  250. try:
  251. message = message['message']
  252. except (TypeError, KeyError):
  253. pass
  254. else:
  255. message = resp.reason
  256. raise HTTPError(self.status_code, message)
  257. return resp
  258. def request(self, method, path, args=None, timeout=60):
  259. """
  260. API request
  261. :param method: HTTP method ('GET'/'PUT'/'POST'/'DELETE')
  262. :param path: URL path, can be a list or tuple
  263. :param args: arguments to the API endpoint
  264. :param timeout: timeout, default 60
  265. :returns: result
  266. """
  267. resp = self.request_file(method, path, args, timeout)
  268. # read response
  269. try:
  270. result = resp.read()
  271. except (OSError, http.client.HTTPException) as err:
  272. raise HTTPClientError(type(err).__name__, str(err))
  273. if resp.getheader('Content-Type') == 'application/json':
  274. result = json.loads(result.decode('utf-8', errors='ignore'))
  275. return result
  276. def close(self):
  277. """
  278. Closes HTTP(S) connection
  279. """
  280. self._conn.close()