# -*- coding: utf-8 -*-
# Copyright 2017 Tecnativa - Jairo Llopis
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from threading import current_thread
from urllib import urlencode

from decorator import decorator
from mock import patch
from werkzeug.utils import redirect

from odoo import http
from odoo.tests.common import at_install, HttpCase, post_install
from odoo.tools import mute_logger

from ..models import res_authentication_attempt, res_users


GARBAGE_LOGGERS = (
    "werkzeug",
    res_authentication_attempt.__name__,
    res_users.__name__,
)


# HACK https://github.com/odoo/odoo/pull/24833
def skip_unless_addons_installed(*addons):
    """Decorator to skip a test unless some addons are installed.

    :param *str addons:
        Addon names that should be installed.

    :param reason:
        Explain why you must skip this test.
    """

    @decorator
    def _wrapper(method, self, *args, **kwargs):
        installed = self.addons_installed(*addons)
        if not installed:
            missing = set(addons) - installed
            self.skipTest("Required addons not installed: %s" %
                          ",".join(sorted(missing)))
        return method(self, *args, **kwargs)

    return _wrapper


@at_install(False)
@post_install(True)
# Skip CSRF validation on tests
@patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True)
# Skip specific browser forgery on redirections
@patch(http.__name__ + ".redirect_with_hash", side_effect=redirect)
# Faster tests without calls to geolocation API
@patch(res_authentication_attempt.__name__ + ".urlopen", return_value="")
class BruteForceCase(HttpCase):
    def setUp(self):
        super(BruteForceCase, self).setUp()
        # Some tests could retain environ from last test and produce fake
        # results without this patch
        # HACK https://github.com/odoo/odoo/issues/24183
        # TODO Remove in v12
        try:
            del current_thread().environ
        except AttributeError:
            pass
        # Complex password to avoid conflicts with `password_security`
        self.good_password = "Admin$%02584"
        self.data_demo = {
            "login": "demo",
            "password": "Demo%&/(908409**",
        }
        with self.cursor() as cr:
            env = self.env(cr)
            env["ir.config_parameter"].set_param(
                "auth_brute_force.max_by_ip_user", 3)
            env["ir.config_parameter"].set_param(
                "auth_brute_force.max_by_ip", 4)
            # Clean attempts to be able to count in tests
            env["res.authentication.attempt"].search([]).unlink()
            # Make sure involved users have good passwords
            env.user.password = self.good_password
            env["res.users"].search([
                ("login", "=", self.data_demo["login"]),
            ]).password = self.data_demo["password"]

    # HACK https://github.com/odoo/odoo/pull/24833
    def addons_installed(self, *addons):
        """Know if the specified addons are installed."""
        found = self.env["ir.module.module"].search([
            ("name", "in", addons),
            ("state", "not in", ["uninstalled", "uninstallable"]),
        ])
        return set(addons) - set(found.mapped("name"))

    @skip_unless_addons_installed("web")
    @mute_logger(*GARBAGE_LOGGERS)
    def test_web_login_existing(self, *args):
        """Remote is banned with real user on web login form."""
        data1 = {
            "login": "admin",
            "password": "1234",  # Wrong
        }
        # Make sure user is logged out
        self.url_open("/web/session/logout", timeout=30)
        # Fail 3 times
        for n in range(3):
            response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
            # If you fail, you get /web/login again
            self.assertTrue(
                response.geturl().endswith("/web/login"),
                "Unexpected URL %s" % response.geturl(),
            )
        # Admin banned, demo not
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    "demo",
                ),
            )
        # Now I know the password, but login is rejected too
        data1["password"] = self.good_password
        response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
        self.assertTrue(
            response.geturl().endswith("/web/login"),
            "Unexpected URL %s" % response.geturl(),
        )
        # IP has been banned, demo user cannot login
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    "demo",
                ),
            )
        # Attempts recorded
        with self.cursor() as cr:
            env = self.env(cr)
            failed = env["res.authentication.attempt"].search([
                ("result", "=", "failed"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(failed), 3)
            banned = env["res.authentication.attempt"].search([
                ("result", "=", "banned"),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(banned), 1)
            # Unban
            banned.action_whitelist_add()
        # Try good login, it should work now
        response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
        self.assertTrue(response.geturl().endswith("/web"))

    @skip_unless_addons_installed("web")
    @mute_logger(*GARBAGE_LOGGERS)
    def test_web_login_unexisting(self, *args):
        """Remote is banned with fake user on web login form."""
        data1 = {
            "login": "administrator",  # Wrong
            "password": self.good_password,
        }
        # Make sure user is logged out
        self.url_open("/web/session/logout", timeout=30)
        # Fail 3 times
        for n in range(3):
            response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
            # If you fail, you get /web/login again
            self.assertTrue(
                response.geturl().endswith("/web/login"),
                "Unexpected URL %s" % response.geturl(),
            )
        # Admin banned, demo not
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    self.data_demo["login"],
                ),
            )
        # Demo user can login
        response = self.url_open(
            "/web/login",
            bytes(urlencode(self.data_demo)),
            30,
        )
        # If you pass, you get /web
        self.assertTrue(
            response.geturl().endswith("/web"),
            "Unexpected URL %s" % response.geturl(),
        )
        self.url_open("/web/session/logout", timeout=30)
        # Attempts recorded
        with self.cursor() as cr:
            env = self.env(cr)
            failed = env["res.authentication.attempt"].search([
                ("result", "=", "failed"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(failed), 3)
            banned = env["res.authentication.attempt"].search([
                ("result", "=", "banned"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(banned), 0)

    @mute_logger(*GARBAGE_LOGGERS)
    def test_xmlrpc_login_existing(self, *args):
        """Remote is banned with real user on XML-RPC login."""
        data1 = {
            "login": "admin",
            "password": "1234",  # Wrong
        }
        # Fail 3 times
        for n in range(3):
            self.assertFalse(self.xmlrpc_common.authenticate(
                self.env.cr.dbname, data1["login"], data1["password"], {}))
        # Admin banned, demo not
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    "demo",
                ),
            )
        # Now I know the password, but login is rejected too
        data1["password"] = self.good_password
        self.assertFalse(self.xmlrpc_common.authenticate(
            self.env.cr.dbname, data1["login"], data1["password"], {}))
        # IP has been banned, demo user cannot login
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    "demo",
                ),
            )
        # Attempts recorded
        with self.cursor() as cr:
            env = self.env(cr)
            failed = env["res.authentication.attempt"].search([
                ("result", "=", "failed"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(failed), 3)
            banned = env["res.authentication.attempt"].search([
                ("result", "=", "banned"),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(banned), 1)
            # Unban
            banned.action_whitelist_add()
        # Try good login, it should work now
        self.assertTrue(self.xmlrpc_common.authenticate(
            self.env.cr.dbname, data1["login"], data1["password"], {}))

    @mute_logger(*GARBAGE_LOGGERS)
    def test_xmlrpc_login_unexisting(self, *args):
        """Remote is banned with fake user on XML-RPC login."""
        data1 = {
            "login": "administrator",  # Wrong
            "password": self.good_password,
        }
        # Fail 3 times
        for n in range(3):
            self.assertFalse(self.xmlrpc_common.authenticate(
                self.env.cr.dbname, data1["login"], data1["password"], {}))
        # Admin banned, demo not
        with self.cursor() as cr:
            env = self.env(cr)
            self.assertFalse(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    self.data_demo["login"],
                ),
            )
        # Demo user can login
        self.assertTrue(self.xmlrpc_common.authenticate(
            self.env.cr.dbname,
            self.data_demo["login"],
            self.data_demo["password"],
            {},
        ))
        # Attempts recorded
        with self.cursor() as cr:
            env = self.env(cr)
            failed = env["res.authentication.attempt"].search([
                ("result", "=", "failed"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(failed), 3)
            banned = env["res.authentication.attempt"].search([
                ("result", "=", "banned"),
                ("login", "=", data1["login"]),
                ("remote", "=", "127.0.0.1"),
            ])
            self.assertEqual(len(banned), 0)

    @mute_logger(*GARBAGE_LOGGERS)
    def test_orm_login_existing(self, *args):
        """No bans on ORM login with an existing user."""
        data1 = {
            "login": "admin",
            "password": "1234",  # Wrong
        }
        with self.cursor() as cr:
            env = self.env(cr)
            # Fail 3 times
            for n in range(3):
                self.assertFalse(
                    env["res.users"].authenticate(
                        cr.dbname, data1["login"], data1["password"], {}))
            self.assertEqual(
                env["res.authentication.attempt"].search(count=True, args=[]),
                0,
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            # Now I know the password, and login works
            data1["password"] = self.good_password
            self.assertTrue(
                env["res.users"].authenticate(
                    cr.dbname, data1["login"], data1["password"], {}))

    @mute_logger(*GARBAGE_LOGGERS)
    def test_orm_login_unexisting(self, *args):
        """No bans on ORM login with an unexisting user."""
        data1 = {
            "login": "administrator",  # Wrong
            "password": self.good_password,
        }
        with self.cursor() as cr:
            env = self.env(cr)
            # Fail 3 times
            for n in range(3):
                self.assertFalse(
                    env["res.users"].authenticate(
                        cr.dbname, data1["login"], data1["password"], {}))
            self.assertEqual(
                env["res.authentication.attempt"].search(count=True, args=[]),
                0,
            )
            self.assertTrue(
                env["res.authentication.attempt"]._trusted(
                    "127.0.0.1",
                    data1["login"],
                ),
            )
            # Now I know the user, and login works
            data1["login"] = "admin"
            self.assertTrue(
                env["res.users"].authenticate(
                    cr.dbname, data1["login"], data1["password"], {}))