You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

119 lines
5.0 KiB

# -*- encoding: utf-8 -*-
##############################################################################
#
# Tracks Authentication Attempts and Prevents Brute-force Attacks module
# Copyright (C) 2015-Today GRAP (http://www.grap.coop)
# @author Sylvain LE GAL (https://twitter.com/legalsylvain)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import logging
from openerp import fields, http, registry, SUPERUSER_ID
from openerp.http import request
from openerp.addons.web.controllers.main import Home, ensure_db
_logger = logging.getLogger(__name__)
class LoginController(Home):
@http.route()
def web_login(self, redirect=None, **kw):
if request.httprequest.method == 'POST':
ensure_db()
remote = request.httprequest.remote_addr
# Get registry and cursor
config_obj = registry(request.session.db)['ir.config_parameter']
attempt_obj = registry(
request.session.db)['res.authentication.attempt']
banned_remote_obj = registry(
request.session.db)['res.banned.remote']
cursor = attempt_obj.pool.cursor()
# Get Settings
max_attempts_qty = int(config_obj.search_read(
cursor, SUPERUSER_ID,
[('key', '=', 'auth_brute_force.max_attempt_qty')],
['value'])[0]['value'])
environ_log = config_obj.search_read(
cursor, SUPERUSER_ID,
[('key', '=', 'auth_brute_force.environ_log')],
['value'])
# Test if remote user is banned
banned = banned_remote_obj.search(cursor, SUPERUSER_ID, [
('remote', '=', remote)])
if banned:
_logger.warning(
"Authentication tried from remote '%s'. The request has"
" been ignored because the remote has been banned after"
" %d attempts without success. Login tried : '%s'." % (
remote, max_attempts_qty, request.params['login']))
request.params['password'] = ''
else:
# Try to authenticate
result = request.session.authenticate(
request.session.db, request.params['login'],
request.params['password'])
# Log attempt
cursor.commit()
environ = ''
if environ_log:
filter_value = environ_log[0]['value']
filter_keys = [k.strip() for k in filter_value.split(',')]
for key, value in request.httprequest.environ.items():
if key in filter_keys or filter_value == '*':
environ += '%s=%s\n' % (key, value)
attempt_obj.create(cursor, SUPERUSER_ID, {
'attempt_date': fields.Datetime.now(),
'login': request.params['login'],
'remote': remote,
'environ': environ,
'result': banned and 'banned' or (
result and 'successfull' or 'failed'),
})
cursor.commit()
if not banned and not result:
# Get last bad attempts quantity
attempts_qty = len(attempt_obj.search_last_failed(
cursor, SUPERUSER_ID, remote))
if max_attempts_qty <= attempts_qty:
# We ban the remote
_logger.warning(
"Authentication failed from remote '%s'. "
"The remote has been banned. Login tried : '%s'." % (
remote, request.params['login']))
banned_remote_obj.create(cursor, SUPERUSER_ID, {
'remote': remote,
'ban_date': fields.Datetime.now(),
})
cursor.commit()
else:
_logger.warning(
"Authentication failed from remote '%s'."
" Login tried : '%s'. Attempt %d / %d." % (
remote, request.params['login'], attempts_qty,
max_attempts_qty))
cursor.close()
return super(LoginController, self).web_login(redirect=redirect, **kw)