#
# Copyright 2013 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base classes for DB backend implementation test"""
import datetime
import operator
import mock
from oslo_utils import timeutils
from panko import storage
from panko.storage import models
from panko.tests import db as tests_db
[docs]class EventTestBase(tests_db.TestBase):
    """Separate test base class.
    We don't want to inherit all the Meter stuff.
    """
[docs]    def setUp(self):
        super(EventTestBase, self).setUp()
        self.prepare_data() 
[docs]    def prepare_data(self):
        self.models = []
        base = 0
        self.start = datetime.datetime(2013, 12, 31, 5, 0)
        now = self.start
        for event_type in ['Foo', 'Bar', 'Zoo', 'Foo', 'Bar', 'Zoo']:
            trait_models = [models.Trait(name, dtype, value)
                            for name, dtype, value in [
                                ('trait_A', models.Trait.TEXT_TYPE,
                                    "my_%s_text" % event_type),
                                ('trait_B', models.Trait.INT_TYPE,
                                    base + 1),
                                ('trait_C', models.Trait.FLOAT_TYPE,
                                    float(base) + 0.123456),
                                ('trait_D', models.Trait.DATETIME_TYPE,
                                    now)]]
            self.models.append(
                models.Event("id_%s_%d" % (event_type, base),
                             event_type, now, trait_models,
                             {'status': {'nested': 'started'}}))
            base += 100
            now = now + datetime.timedelta(hours=1)
        self.end = now
        self.conn.record_events(self.models)  
@tests_db.run_with('sqlite', 'mysql', 'pgsql')
[docs]class EventTTLTest(EventTestBase):
    @mock.patch.object(timeutils, 'utcnow')
[docs]    def test_clear_expired_data(self, mock_utcnow):
        mock_utcnow.return_value = datetime.datetime(2013, 12, 31, 10, 0)
        self.conn.clear_expired_data(3600)
        events = list(self.conn.get_events(storage.EventFilter()))
        self.assertEqual(2, len(events))
        event_types = list(self.conn.get_event_types())
        self.assertEqual(['Bar', 'Zoo'], event_types)
        for event_type in event_types:
            trait_types = list(self.conn.get_trait_types(event_type))
            self.assertEqual(4, len(trait_types))
            traits = list(self.conn.get_traits(event_type))
            self.assertEqual(4, len(traits))  
@tests_db.run_with('sqlite', 'mysql', 'pgsql', 'mongodb')
[docs]class EventTest(EventTestBase):
[docs]    def test_duplicate_message_id(self):
        now = datetime.datetime.utcnow()
        m = [models.Event("1", "Foo", now, None, {}),
             models.Event("1", "Zoo", now, [], {})]
        with mock.patch('%s.LOG' %
                        self.conn.record_events.__module__) as log:
            self.conn.record_events(m)
            self.assertEqual(1, log.info.call_count) 
[docs]    def test_bad_event(self):
        now = datetime.datetime.utcnow()
        broken_event = models.Event("1", "Foo", now, None, {})
        del(broken_event.__dict__['raw'])
        m = [broken_event, broken_event]
        with mock.patch('%s.LOG' %
                        self.conn.record_events.__module__) as log:
            self.assertRaises(AttributeError, self.conn.record_events, m)
            # ensure that record_events does not break on first error but
            # delays exception and tries to record each event.
            self.assertEqual(2, log.exception.call_count)  
[docs]class GetEventTest(EventTestBase):
[docs]    def test_generated_is_datetime(self):
        event_filter = storage.EventFilter(self.start, self.end)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(6, len(events))
        for i, event in enumerate(events):
            self.assertIsInstance(event.generated, datetime.datetime)
            self.assertEqual(event.generated,
                             self.models[i].generated)
            model_traits = self.models[i].traits
            for j, trait in enumerate(event.traits):
                if trait.dtype == models.Trait.DATETIME_TYPE:
                    self.assertIsInstance(trait.value, datetime.datetime)
                    self.assertEqual(trait.value, model_traits[j].value) 
[docs]    def test_simple_get(self):
        event_filter = storage.EventFilter(self.start, self.end)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(6, len(events))
        start_time = None
        for i, type in enumerate(['Foo', 'Bar', 'Zoo']):
            self.assertEqual(type, events[i].event_type)
            self.assertEqual(4, len(events[i].traits))
            # Ensure sorted results ...
            if start_time is not None:
                # Python 2.6 has no assertLess :(
                self.assertTrue(start_time < events[i].generated)
            start_time = events[i].generated 
[docs]    def test_simple_get_event_type(self):
        expected_trait_values = {
            'id_Bar_100': {
                'trait_A': 'my_Bar_text',
                'trait_B': 101,
                'trait_C': 100.123456,
                'trait_D': self.start + datetime.timedelta(hours=1)
            },
            'id_Bar_400': {
                'trait_A': 'my_Bar_text',
                'trait_B': 401,
                'trait_C': 400.123456,
                'trait_D': self.start + datetime.timedelta(hours=4)
            }
        }
        event_filter = storage.EventFilter(self.start, self.end, "Bar")
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Bar", events[0].event_type)
        self.assertEqual("Bar", events[1].event_type)
        self.assertEqual(4, len(events[0].traits))
        self.assertEqual(4, len(events[1].traits))
        for event in events:
            trait_values = expected_trait_values.get(event.message_id,
                                                     None)
            if not trait_values:
                self.fail("Unexpected event ID returned:" % event.message_id)
            for trait in event.traits:
                expected_val = trait_values.get(trait.name)
                if not expected_val:
                    self.fail("Unexpected trait type: %s" % trait.dtype)
                self.assertEqual(expected_val, trait.value) 
[docs]    def test_get_event_trait_filter(self):
        trait_filters = [{'key': 'trait_B', 'integer': 101}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Bar", events[0].event_type)
        self.assertEqual(4, len(events[0].traits)) 
[docs]    def test_get_event_trait_filter_op_string(self):
        trait_filters = [{'key': 'trait_A', 'string': 'my_Foo_text',
                          'op': 'eq'}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Foo", events[0].event_type)
        self.assertEqual(4, len(events[0].traits))
        trait_filters[0].update({'key': 'trait_A', 'op': 'lt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Bar", events[0].event_type)
        trait_filters[0].update({'key': 'trait_A', 'op': 'le'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Bar", events[1].event_type)
        trait_filters[0].update({'key': 'trait_A', 'op': 'ne'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Zoo", events[3].event_type)
        trait_filters[0].update({'key': 'trait_A', 'op': 'gt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Zoo", events[0].event_type)
        trait_filters[0].update({'key': 'trait_A', 'op': 'ge'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Foo", events[2].event_type) 
[docs]    def test_get_event_trait_filter_op_integer(self):
        trait_filters = [{'key': 'trait_B', 'integer': 101, 'op': 'eq'}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Bar", events[0].event_type)
        self.assertEqual(4, len(events[0].traits))
        trait_filters[0].update({'key': 'trait_B', 'op': 'lt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Foo", events[0].event_type)
        trait_filters[0].update({'key': 'trait_B', 'op': 'le'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Bar", events[1].event_type)
        trait_filters[0].update({'key': 'trait_B', 'op': 'ne'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(5, len(events))
        self.assertEqual("Zoo", events[4].event_type)
        trait_filters[0].update({'key': 'trait_B', 'op': 'gt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Zoo", events[0].event_type)
        trait_filters[0].update({'key': 'trait_B', 'op': 'ge'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(5, len(events))
        self.assertEqual("Foo", events[2].event_type) 
[docs]    def test_get_event_trait_filter_op_float(self):
        trait_filters = [{'key': 'trait_C', 'float': 300.123456, 'op': 'eq'}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Foo", events[0].event_type)
        self.assertEqual(4, len(events[0].traits))
        trait_filters[0].update({'key': 'trait_C', 'op': 'lt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(3, len(events))
        self.assertEqual("Zoo", events[2].event_type)
        trait_filters[0].update({'key': 'trait_C', 'op': 'le'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Bar", events[1].event_type)
        trait_filters[0].update({'key': 'trait_C', 'op': 'ne'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(5, len(events))
        self.assertEqual("Zoo", events[2].event_type)
        trait_filters[0].update({'key': 'trait_C', 'op': 'gt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        self.assertEqual("Bar", events[0].event_type)
        trait_filters[0].update({'key': 'trait_C', 'op': 'ge'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(3, len(events))
        self.assertEqual("Zoo", events[2].event_type) 
[docs]    def test_get_event_trait_filter_op_datetime(self):
        trait_filters = [{'key': 'trait_D',
                          'datetime': self.start + datetime.timedelta(hours=2),
                          'op': 'eq'}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Zoo", events[0].event_type)
        self.assertEqual(4, len(events[0].traits))
        trait_filters[0].update({'key': 'trait_D', 'op': 'lt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(2, len(events))
        trait_filters[0].update({'key': 'trait_D', 'op': 'le'})
        self.assertEqual("Bar", events[1].event_type)
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(3, len(events))
        self.assertEqual("Bar", events[1].event_type)
        trait_filters[0].update({'key': 'trait_D', 'op': 'ne'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(5, len(events))
        self.assertEqual("Foo", events[2].event_type)
        trait_filters[0].update({'key': 'trait_D', 'op': 'gt'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(3, len(events))
        self.assertEqual("Zoo", events[2].event_type)
        trait_filters[0].update({'key': 'trait_D', 'op': 'ge'})
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(4, len(events))
        self.assertEqual("Bar", events[2].event_type) 
[docs]    def test_get_event_multiple_trait_filter(self):
        trait_filters = [{'key': 'trait_B', 'integer': 1},
                         {'key': 'trait_A', 'string': 'my_Foo_text'},
                         {'key': 'trait_C', 'float': 0.123456}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("Foo", events[0].event_type)
        self.assertEqual(4, len(events[0].traits)) 
[docs]    def test_get_event_multiple_trait_filter_expect_none(self):
        trait_filters = [{'key': 'trait_B', 'integer': 1},
                         {'key': 'trait_A', 'string': 'my_Zoo_text'}]
        event_filter = storage.EventFilter(self.start, self.end,
                                           traits_filter=trait_filters)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(0, len(events)) 
[docs]    def test_get_event_types(self):
        event_types = [e for e in
                       self.conn.get_event_types()]
        self.assertEqual(3, len(event_types))
        self.assertIn("Bar", event_types)
        self.assertIn("Foo", event_types)
        self.assertIn("Zoo", event_types) 
[docs]    def test_get_trait_types(self):
        trait_types = [tt for tt in
                       self.conn.get_trait_types("Foo")]
        self.assertEqual(4, len(trait_types))
        trait_type_names = map(lambda x: x['name'], trait_types)
        self.assertIn("trait_A", trait_type_names)
        self.assertIn("trait_B", trait_type_names)
        self.assertIn("trait_C", trait_type_names)
        self.assertIn("trait_D", trait_type_names) 
[docs]    def test_get_trait_types_unknown_event(self):
        trait_types = [tt for tt in
                       self.conn.get_trait_types("Moo")]
        self.assertEqual(0, len(trait_types)) 
[docs]    def test_get_traits(self):
        traits = self.conn.get_traits("Bar")
        # format results in a way that makes them easier to work with
        trait_dict = {}
        for trait in traits:
            trait_dict[trait.name] = trait.dtype
        self.assertIn("trait_A", trait_dict)
        self.assertEqual(models.Trait.TEXT_TYPE, trait_dict["trait_A"])
        self.assertIn("trait_B", trait_dict)
        self.assertEqual(models.Trait.INT_TYPE, trait_dict["trait_B"])
        self.assertIn("trait_C", trait_dict)
        self.assertEqual(models.Trait.FLOAT_TYPE, trait_dict["trait_C"])
        self.assertIn("trait_D", trait_dict)
        self.assertEqual(models.Trait.DATETIME_TYPE,
                         trait_dict["trait_D"]) 
[docs]    def test_get_all_traits(self):
        traits = self.conn.get_traits("Foo")
        traits = sorted([t for t in traits], key=operator.attrgetter('dtype'))
        self.assertEqual(8, len(traits))
        trait = traits[0]
        self.assertEqual("trait_A", trait.name)
        self.assertEqual(models.Trait.TEXT_TYPE, trait.dtype) 
[docs]    def test_simple_get_event_no_traits(self):
        new_events = [models.Event("id_notraits", "NoTraits",
                      self.start, [], {})]
        self.conn.record_events(new_events)
        event_filter = storage.EventFilter(
            self.start, self.end, "NoTraits")
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        self.assertEqual("id_notraits", events[0].message_id)
        self.assertEqual("NoTraits", events[0].event_type)
        self.assertEqual(0, len(events[0].traits)) 
[docs]    def test_simple_get_no_filters(self):
        event_filter = storage.EventFilter(None, None, None)
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(6, len(events)) 
[docs]    def test_get_by_message_id(self):
        new_events = [models.Event("id_testid", "MessageIDTest",
                                   self.start, [], {})]
        self.conn.record_events(new_events)
        event_filter = storage.EventFilter(message_id="id_testid")
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertEqual(1, len(events))
        event = events[0]
        self.assertEqual("id_testid", event.message_id) 
[docs]    def test_simple_get_raw(self):
        event_filter = storage.EventFilter()
        events = [event for event in self.conn.get_events(event_filter)]
        self.assertTrue(events)
        self.assertEqual({'status': {'nested': 'started'}}, events[0].raw) 
[docs]    def test_trait_type_enforced_on_none(self):
        new_events = [models.Event(
            "id_testid", "MessageIDTest", self.start,
            [models.Trait('text', models.Trait.TEXT_TYPE, ''),
             models.Trait('int', models.Trait.INT_TYPE, 0),
             models.Trait('float', models.Trait.FLOAT_TYPE, 0.0)],
            {})]
        self.conn.record_events(new_events)
        event_filter = storage.EventFilter(message_id="id_testid")
        events = [event for event in self.conn.get_events(event_filter)]
        options = [(models.Trait.TEXT_TYPE, ''),
                   (models.Trait.INT_TYPE, 0.0),
                   (models.Trait.FLOAT_TYPE, 0.0)]
        for trait in events[0].traits:
            options.remove((trait.dtype, trait.value))