Source code for horizon.middleware.operation_log

# Copyright 2016 NEC Corporation.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import json
import logging
import re
from urllib import parse

from django.conf import settings
from django.contrib import messages as django_messages
from django.core.exceptions import MiddlewareNotUsed


from horizon.utils import settings as setting_utils

LOG = logging.getLogger(__name__)


[docs] class OperationLogMiddleware(object): """Middleware to output operation log. This log can includes information below: - ``domain name`` - ``domain id`` - ``project name`` - ``project id`` - ``user name`` - ``user id`` - ``request scheme`` - ``referer url`` - ``request url`` - ``message`` - ``method`` - ``http status`` - ``request parameters`` and log format is defined in OPERATION_LOG_OPTIONS. """ @property def OPERATION_LOG(self): # In order to allow to access from mock in test cases. return self._logger def __init__(self, get_response): if not settings.OPERATION_LOG_ENABLED: raise MiddlewareNotUsed self.get_response = get_response # set configurations _available_methods = ['POST', 'GET', 'PUT', 'DELETE'] _methods = setting_utils.get_dict_config( 'OPERATION_LOG_OPTIONS', 'target_methods') self.target_methods = [x for x in _methods if x in _available_methods] self.mask_fields = setting_utils.get_dict_config( 'OPERATION_LOG_OPTIONS', 'mask_fields') self.format = setting_utils.get_dict_config( 'OPERATION_LOG_OPTIONS', 'format') self._logger = logging.getLogger('horizon.operation_log') self._ignored_urls = [re.compile(url) for url in setting_utils.get_dict_config( 'OPERATION_LOG_OPTIONS', 'ignore_urls')] def __call__(self, request): response = self.get_response(request) response = self._process_response(request, response) return response def _process_response(self, request, response): """Log user operation.""" log_format = self._get_log_format(request) if not log_format: return response params = self._get_parameters_from_request(request) # log a message displayed to user messages = django_messages.get_messages(request) result_message = None if messages: result_message = ', '.join('%s: %s' % (message.tags, message) for message in messages) elif 'action' in request.POST: result_message = request.POST['action'] params['message'] = result_message params['http_status'] = response.status_code self.OPERATION_LOG.info(log_format, params) return response
[docs] def process_exception(self, request, exception): """Log error info when exception occurred.""" log_format = self._get_log_format(request) if log_format is None: return params = self._get_parameters_from_request(request, True) params['message'] = exception params['http_status'] = '-' self.OPERATION_LOG.info(log_format, params)
def _get_log_format(self, request): """Return operation log format.""" request_url = parse.unquote(request.path) # Log the /auth/password/ form even when user is not logged in. if '/auth/password/' not in request_url: user = getattr(request, 'user', None) if not user: return if not request.user.is_authenticated: return method = request.method.upper() if not (method in self.target_methods): return for rule in self._ignored_urls: if rule.search(request_url): return return self.format def _get_parameters_from_request(self, request, exception=False): """Get parameters to log in OPERATION_LOG.""" user = request.user referer_url = None try: referer_dic = parse.urlsplit( parse.unquote(request.META.get('HTTP_REFERER'))) referer_url = referer_dic[2] if referer_dic[3]: referer_url += "?" + referer_dic[3] if isinstance(referer_url, str): referer_url = referer_url.decode('utf-8') except Exception: pass request_url = parse.unquote(request.path) if request.META['QUERY_STRING']: request_url += '?' + request.META['QUERY_STRING'] return { 'client_ip': request.META.get('REMOTE_ADDR', None), 'domain_name': getattr(user, 'domain_name', None), 'domain_id': getattr(user, 'domain_id', None), 'project_name': getattr(user, 'project_name', None), 'project_id': getattr(user, 'project_id', None), 'user_name': getattr(user, 'username', None), 'user_id': request.session.get('user_id', None), 'request_scheme': request.scheme, 'referer_url': referer_url, 'request_url': request_url, 'method': request.method if not exception else None, 'param': self._get_request_param(request), } def _get_request_param(self, request): """Change POST data to JSON string and mask data.""" params = {} try: params = request.POST.copy() if not params: params = json.loads(request.body) except Exception: pass for key in params: # replace a value to a masked characters if key in self.mask_fields: params[key] = '*' * 8 # when a file uploaded (E.g create image) files = request.FILES.values() if list(files): filenames = ', '.join( [up_file.name for up_file in files]) params['file_name'] = filenames try: return json.dumps(params, ensure_ascii=False) except Exception: return 'Unserializable Object'