mirror of https://github.com/muk-it/muk_base
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
84 lines
3.5 KiB
84 lines
3.5 KiB
###################################################################################
|
|
#
|
|
# Copyright (C) 2018 MuK IT GmbH
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
###################################################################################
|
|
|
|
import urllib
|
|
import logging
|
|
|
|
import requests
|
|
import werkzeug
|
|
|
|
from odoo.http import request
|
|
from odoo.tools import config
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
def get_route(url):
|
|
url_parts = url.split('?')
|
|
path = url_parts[0]
|
|
query_string = url_parts[1] if len(url_parts) > 1 else None
|
|
router = request.httprequest.app.get_db_router(request.db).bind('')
|
|
match = router.match(path, query_args=query_string)
|
|
method = router.match(path, query_args=query_string)[0]
|
|
params = dict(urllib.parse.parse_qsl(query_string))
|
|
if len(match) > 1:
|
|
params.update(match[1])
|
|
return method, params, path
|
|
|
|
def make_error_response(status, message):
|
|
exception = werkzeug.exceptions.HTTPException()
|
|
exception.code = status
|
|
exception.description = message
|
|
return exception
|
|
|
|
def get_response(url):
|
|
if not bool(urllib.parse.urlparse(url).netloc):
|
|
base_url = request.env['ir.config_parameter'].sudo().get_param('web.base.url')
|
|
method, params, path = get_route(url)
|
|
params.update({'csrf_token': request.csrf_token()})
|
|
session = requests.Session()
|
|
session.cookies['session_id'] = request.session.sid
|
|
try:
|
|
response = session.post("%s%s" % (base_url, path), params, verify=False)
|
|
return response.status_code, response.headers, response.content
|
|
except requests.exceptions.SSLError as exception:
|
|
_logger.info("Trying custom certificate")
|
|
custom_cert = config.get("muk_custom_certificate", False)
|
|
try:
|
|
_logger.info("Using Certificate: {}".format(custom_cert))
|
|
response = session.post("%s%s" % (base_url, path), params, verify=custom_cert)
|
|
return response.status_code, response.headers, response.reason
|
|
except Exception as e:
|
|
try:
|
|
_logger.info("Custom Certificate didn't work")
|
|
response = session.post("%s%s" % (base_url, path), params, verify=False)
|
|
return response.status_code, response.headers, response.reason
|
|
except Exception as e:
|
|
_logger.exception("Request failed!")
|
|
return 501, [], str(e)
|
|
else:
|
|
try:
|
|
response = requests.post(url)
|
|
return response.status_code, response.headers, response.content
|
|
except requests.exceptions.RequestException as exception:
|
|
try:
|
|
return exception.response.status_code, exception.response.headers, exception.response.reason
|
|
except Exception as e:
|
|
_logger.exception("Request failed!")
|
|
return 501, [], str(e)
|
|
|