Source code for glance.tests.unit.test_db_metadef

# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Intel Corporation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from oslo_utils import encodeutils

from glance.common import exception
import glance.context
import glance.db
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils

TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
TENANT3 = '5a3e60e8-cfa9-4a9e-a90a-62b42cea92b8'
TENANT4 = 'c6c87f25-8a94-47ed-8c83-053c25f42df4'

USER1 = '54492ba0-f4df-4e4e-be62-27f4d76b29cf'

NAMESPACE1 = 'namespace1'
NAMESPACE2 = 'namespace2'
NAMESPACE3 = 'namespace3'
NAMESPACE4 = 'namespace4'

PROPERTY1 = 'Property1'
PROPERTY2 = 'Property2'
PROPERTY3 = 'Property3'

OBJECT1 = 'Object1'
OBJECT2 = 'Object2'
OBJECT3 = 'Object3'

TAG1 = 'Tag1'
TAG2 = 'Tag2'
TAG3 = 'Tag3'
TAG4 = 'Tag4'
TAG5 = 'Tag5'

RESOURCE_TYPE1 = 'ResourceType1'
RESOURCE_TYPE2 = 'ResourceType2'
RESOURCE_TYPE3 = 'ResourceType3'


def _db_namespace_fixture(**kwargs):
    namespace = {
        'namespace': None,
        'display_name': None,
        'description': None,
        'visibility': True,
        'protected': False,
        'owner': None
    }
    namespace.update(kwargs)
    return namespace


def _db_property_fixture(name, **kwargs):
    property = {
        'name': name,
        'json_schema': {"type": "string", "title": "title"},
    }
    property.update(kwargs)
    return property


def _db_object_fixture(name, **kwargs):
    obj = {
        'name': name,
        'description': None,
        'json_schema': {},
        'required': '[]',
    }
    obj.update(kwargs)
    return obj


def _db_tag_fixture(name, **kwargs):
    obj = {
        'name': name
    }
    obj.update(kwargs)
    return obj


def _db_tags_fixture(names=None):
    tags = []
    if names:
        tag_name_list = names
    else:
        tag_name_list = [TAG1, TAG2, TAG3]

    for tag_name in tag_name_list:
        tags.append(_db_tag_fixture(tag_name))
    return tags


def _db_resource_type_fixture(name, **kwargs):
    obj = {
        'name': name,
        'protected': False,
    }
    obj.update(kwargs)
    return obj


def _db_namespace_resource_type_fixture(name, **kwargs):
    obj = {
        'name': name,
        'properties_target': None,
        'prefix': None,
    }
    obj.update(kwargs)
    return obj


[docs]class TestMetadefRepo(test_utils.BaseTestCase):
[docs] def setUp(self): super(TestMetadefRepo, self).setUp() self.db = unit_test_utils.FakeDB(initialize=False) self.context = glance.context.RequestContext(user=USER1, tenant=TENANT1) self.namespace_repo = glance.db.MetadefNamespaceRepo(self.context, self.db) self.property_repo = glance.db.MetadefPropertyRepo(self.context, self.db) self.object_repo = glance.db.MetadefObjectRepo(self.context, self.db) self.tag_repo = glance.db.MetadefTagRepo(self.context, self.db) self.resource_type_repo = glance.db.MetadefResourceTypeRepo( self.context, self.db) self.namespace_factory = glance.domain.MetadefNamespaceFactory() self.property_factory = glance.domain.MetadefPropertyFactory() self.object_factory = glance.domain.MetadefObjectFactory() self.tag_factory = glance.domain.MetadefTagFactory() self.resource_type_factory = glance.domain.MetadefResourceTypeFactory() self._create_namespaces() self._create_properties() self._create_objects() self._create_tags() self._create_resource_types()
def _create_namespaces(self): self.namespaces = [ _db_namespace_fixture(namespace=NAMESPACE1, display_name='1', description='desc1', visibility='private', protected=True, owner=TENANT1), _db_namespace_fixture(namespace=NAMESPACE2, display_name='2', description='desc2', visibility='public', protected=False, owner=TENANT1), _db_namespace_fixture(namespace=NAMESPACE3, display_name='3', description='desc3', visibility='private', protected=True, owner=TENANT3), _db_namespace_fixture(namespace=NAMESPACE4, display_name='4', description='desc4', visibility='public', protected=True, owner=TENANT3) ] [self.db.metadef_namespace_create(None, namespace) for namespace in self.namespaces] def _create_properties(self): self.properties = [ _db_property_fixture(name=PROPERTY1), _db_property_fixture(name=PROPERTY2), _db_property_fixture(name=PROPERTY3) ] [self.db.metadef_property_create(self.context, NAMESPACE1, property) for property in self.properties] [self.db.metadef_property_create(self.context, NAMESPACE4, property) for property in self.properties] def _create_objects(self): self.objects = [ _db_object_fixture(name=OBJECT1, description='desc1'), _db_object_fixture(name=OBJECT2, description='desc2'), _db_object_fixture(name=OBJECT3, description='desc3'), ] [self.db.metadef_object_create(self.context, NAMESPACE1, object) for object in self.objects] [self.db.metadef_object_create(self.context, NAMESPACE4, object) for object in self.objects] def _create_tags(self): self.tags = [ _db_tag_fixture(name=TAG1), _db_tag_fixture(name=TAG2), _db_tag_fixture(name=TAG3), ] [self.db.metadef_tag_create(self.context, NAMESPACE1, tag) for tag in self.tags] [self.db.metadef_tag_create(self.context, NAMESPACE4, tag) for tag in self.tags] def _create_resource_types(self): self.resource_types = [ _db_resource_type_fixture(name=RESOURCE_TYPE1, protected=False), _db_resource_type_fixture(name=RESOURCE_TYPE2, protected=False), _db_resource_type_fixture(name=RESOURCE_TYPE3, protected=True), ] [self.db.metadef_resource_type_create(self.context, resource_type) for resource_type in self.resource_types]
[docs] def test_get_namespace(self): namespace = self.namespace_repo.get(NAMESPACE1) self.assertEqual(NAMESPACE1, namespace.namespace) self.assertEqual('desc1', namespace.description) self.assertEqual('1', namespace.display_name) self.assertEqual(TENANT1, namespace.owner) self.assertTrue(namespace.protected) self.assertEqual('private', namespace.visibility)
[docs] def test_get_namespace_not_found(self): fake_namespace = "fake_namespace" exc = self.assertRaises(exception.NotFound, self.namespace_repo.get, fake_namespace) self.assertIn(fake_namespace, encodeutils.exception_to_unicode(exc))
[docs] def test_get_namespace_forbidden(self): self.assertRaises(exception.NotFound, self.namespace_repo.get, NAMESPACE3)
[docs] def test_list_namespace(self): namespaces = self.namespace_repo.list() namespace_names = set([n.namespace for n in namespaces]) self.assertEqual(set([NAMESPACE1, NAMESPACE2, NAMESPACE4]), namespace_names)
[docs] def test_list_private_namespaces(self): filters = {'visibility': 'private'} namespaces = self.namespace_repo.list(filters=filters) namespace_names = set([n.namespace for n in namespaces]) self.assertEqual(set([NAMESPACE1]), namespace_names)
[docs] def test_add_namespace(self): # NOTE(pawel-koniszewski): Change db_namespace_fixture to # namespace_factory when namespace primary key in DB # will be changed from Integer to UUID namespace = _db_namespace_fixture(namespace='added_namespace', display_name='fake', description='fake_desc', visibility='public', protected=True, owner=TENANT1) self.assertEqual('added_namespace', namespace['namespace']) self.db.metadef_namespace_create(None, namespace) retrieved_namespace = self.namespace_repo.get(namespace['namespace']) self.assertEqual('added_namespace', retrieved_namespace.namespace)
[docs] def test_save_namespace(self): namespace = self.namespace_repo.get(NAMESPACE1) namespace.display_name = 'save_name' namespace.description = 'save_desc' self.namespace_repo.save(namespace) namespace = self.namespace_repo.get(NAMESPACE1) self.assertEqual('save_name', namespace.display_name) self.assertEqual('save_desc', namespace.description)
[docs] def test_remove_namespace(self): namespace = self.namespace_repo.get(NAMESPACE1) self.namespace_repo.remove(namespace) self.assertRaises(exception.NotFound, self.namespace_repo.get, NAMESPACE1)
[docs] def test_remove_namespace_not_found(self): fake_name = 'fake_name' namespace = self.namespace_repo.get(NAMESPACE1) namespace.namespace = fake_name exc = self.assertRaises(exception.NotFound, self.namespace_repo.remove, namespace) self.assertIn(fake_name, encodeutils.exception_to_unicode(exc))
[docs] def test_get_property(self): property = self.property_repo.get(NAMESPACE1, PROPERTY1) namespace = self.namespace_repo.get(NAMESPACE1) self.assertEqual(PROPERTY1, property.name) self.assertEqual(namespace.namespace, property.namespace.namespace)
[docs] def test_get_property_not_found(self): exc = self.assertRaises(exception.NotFound, self.property_repo.get, NAMESPACE2, PROPERTY1) self.assertIn(PROPERTY1, encodeutils.exception_to_unicode(exc))
[docs] def test_list_property(self): properties = self.property_repo.list(filters={'namespace': NAMESPACE1}) property_names = set([p.name for p in properties]) self.assertEqual(set([PROPERTY1, PROPERTY2, PROPERTY3]), property_names)
[docs] def test_list_property_empty_result(self): properties = self.property_repo.list(filters={'namespace': NAMESPACE2}) property_names = set([p.name for p in properties]) self.assertEqual(set([]), property_names)
[docs] def test_list_property_namespace_not_found(self): exc = self.assertRaises(exception.NotFound, self.property_repo.list, filters={'namespace': 'not-a-namespace'}) self.assertIn('not-a-namespace', encodeutils.exception_to_unicode(exc))
[docs] def test_add_property(self): # NOTE(pawel-koniszewski): Change db_property_fixture to # property_factory when property primary key in DB # will be changed from Integer to UUID property = _db_property_fixture(name='added_property') self.assertEqual('added_property', property['name']) self.db.metadef_property_create(self.context, NAMESPACE1, property) retrieved_property = self.property_repo.get(NAMESPACE1, 'added_property') self.assertEqual('added_property', retrieved_property.name)
[docs] def test_add_property_namespace_forbidden(self): # NOTE(pawel-koniszewski): Change db_property_fixture to # property_factory when property primary key in DB # will be changed from Integer to UUID property = _db_property_fixture(name='added_property') self.assertEqual('added_property', property['name']) self.assertRaises(exception.Forbidden, self.db.metadef_property_create, self.context, NAMESPACE3, property)
[docs] def test_add_property_namespace_not_found(self): # NOTE(pawel-koniszewski): Change db_property_fixture to # property_factory when property primary key in DB # will be changed from Integer to UUID property = _db_property_fixture(name='added_property') self.assertEqual('added_property', property['name']) self.assertRaises(exception.NotFound, self.db.metadef_property_create, self.context, 'not_a_namespace', property)
[docs] def test_save_property(self): property = self.property_repo.get(NAMESPACE1, PROPERTY1) property.schema = '{"save": "schema"}' self.property_repo.save(property) property = self.property_repo.get(NAMESPACE1, PROPERTY1) self.assertEqual(PROPERTY1, property.name) self.assertEqual('{"save": "schema"}', property.schema)
[docs] def test_remove_property(self): property = self.property_repo.get(NAMESPACE1, PROPERTY1) self.property_repo.remove(property) self.assertRaises(exception.NotFound, self.property_repo.get, NAMESPACE1, PROPERTY1)
[docs] def test_remove_property_not_found(self): fake_name = 'fake_name' property = self.property_repo.get(NAMESPACE1, PROPERTY1) property.name = fake_name self.assertRaises(exception.NotFound, self.property_repo.remove, property)
[docs] def test_get_object(self): object = self.object_repo.get(NAMESPACE1, OBJECT1) namespace = self.namespace_repo.get(NAMESPACE1) self.assertEqual(OBJECT1, object.name) self.assertEqual('desc1', object.description) self.assertEqual(['[]'], object.required) self.assertEqual({}, object.properties) self.assertEqual(namespace.namespace, object.namespace.namespace)
[docs] def test_get_object_not_found(self): exc = self.assertRaises(exception.NotFound, self.object_repo.get, NAMESPACE2, OBJECT1) self.assertIn(OBJECT1, encodeutils.exception_to_unicode(exc))
[docs] def test_list_object(self): objects = self.object_repo.list(filters={'namespace': NAMESPACE1}) object_names = set([o.name for o in objects]) self.assertEqual(set([OBJECT1, OBJECT2, OBJECT3]), object_names)
[docs] def test_list_object_empty_result(self): objects = self.object_repo.list(filters={'namespace': NAMESPACE2}) object_names = set([o.name for o in objects]) self.assertEqual(set([]), object_names)
[docs] def test_list_object_namespace_not_found(self): exc = self.assertRaises(exception.NotFound, self.object_repo.list, filters={'namespace': 'not-a-namespace'}) self.assertIn('not-a-namespace', encodeutils.exception_to_unicode(exc))
[docs] def test_add_object(self): # NOTE(pawel-koniszewski): Change db_object_fixture to # object_factory when object primary key in DB # will be changed from Integer to UUID object = _db_object_fixture(name='added_object') self.assertEqual('added_object', object['name']) self.db.metadef_object_create(self.context, NAMESPACE1, object) retrieved_object = self.object_repo.get(NAMESPACE1, 'added_object') self.assertEqual('added_object', retrieved_object.name)
[docs] def test_add_object_namespace_forbidden(self): # NOTE(pawel-koniszewski): Change db_object_fixture to # object_factory when object primary key in DB # will be changed from Integer to UUID object = _db_object_fixture(name='added_object') self.assertEqual('added_object', object['name']) self.assertRaises(exception.Forbidden, self.db.metadef_object_create, self.context, NAMESPACE3, object)
[docs] def test_add_object_namespace_not_found(self): # NOTE(pawel-koniszewski): Change db_object_fixture to # object_factory when object primary key in DB # will be changed from Integer to UUID object = _db_object_fixture(name='added_object') self.assertEqual('added_object', object['name']) self.assertRaises(exception.NotFound, self.db.metadef_object_create, self.context, 'not-a-namespace', object)
[docs] def test_save_object(self): object = self.object_repo.get(NAMESPACE1, OBJECT1) object.required = ['save_req'] object.description = 'save_desc' self.object_repo.save(object) object = self.object_repo.get(NAMESPACE1, OBJECT1) self.assertEqual(OBJECT1, object.name) self.assertEqual(['save_req'], object.required) self.assertEqual('save_desc', object.description)
[docs] def test_remove_object(self): object = self.object_repo.get(NAMESPACE1, OBJECT1) self.object_repo.remove(object) self.assertRaises(exception.NotFound, self.object_repo.get, NAMESPACE1, OBJECT1)
[docs] def test_remove_object_not_found(self): fake_name = 'fake_name' object = self.object_repo.get(NAMESPACE1, OBJECT1) object.name = fake_name self.assertRaises(exception.NotFound, self.object_repo.remove, object)
[docs] def test_list_resource_type(self): resource_type = self.resource_type_repo.list( filters={'namespace': NAMESPACE1}) self.assertEqual(0, len(resource_type))
[docs] def test_get_tag(self): tag = self.tag_repo.get(NAMESPACE1, TAG1) namespace = self.namespace_repo.get(NAMESPACE1) self.assertEqual(TAG1, tag.name) self.assertEqual(namespace.namespace, tag.namespace.namespace)
[docs] def test_get_tag_not_found(self): exc = self.assertRaises(exception.NotFound, self.tag_repo.get, NAMESPACE2, TAG1) self.assertIn(TAG1, encodeutils.exception_to_unicode(exc))
[docs] def test_list_tag(self): tags = self.tag_repo.list(filters={'namespace': NAMESPACE1}) tag_names = set([t.name for t in tags]) self.assertEqual(set([TAG1, TAG2, TAG3]), tag_names)
[docs] def test_list_tag_empty_result(self): tags = self.tag_repo.list(filters={'namespace': NAMESPACE2}) tag_names = set([t.name for t in tags]) self.assertEqual(set([]), tag_names)
[docs] def test_list_tag_namespace_not_found(self): exc = self.assertRaises(exception.NotFound, self.tag_repo.list, filters={'namespace': 'not-a-namespace'}) self.assertIn('not-a-namespace', encodeutils.exception_to_unicode(exc))
[docs] def test_add_tag(self): # NOTE(pawel-koniszewski): Change db_tag_fixture to # tag_factory when tag primary key in DB # will be changed from Integer to UUID tag = _db_tag_fixture(name='added_tag') self.assertEqual('added_tag', tag['name']) self.db.metadef_tag_create(self.context, NAMESPACE1, tag) retrieved_tag = self.tag_repo.get(NAMESPACE1, 'added_tag') self.assertEqual('added_tag', retrieved_tag.name)
[docs] def test_add_tags(self): tags = self.tag_repo.list(filters={'namespace': NAMESPACE1}) tag_names = set([t.name for t in tags]) self.assertEqual(set([TAG1, TAG2, TAG3]), tag_names) tags = _db_tags_fixture([TAG3, TAG4, TAG5]) self.db.metadef_tag_create_tags(self.context, NAMESPACE1, tags) tags = self.tag_repo.list(filters={'namespace': NAMESPACE1}) tag_names = set([t.name for t in tags]) self.assertEqual(set([TAG3, TAG4, TAG5]), tag_names)
[docs] def test_add_duplicate_tags_with_pre_existing_tags(self): tags = self.tag_repo.list(filters={'namespace': NAMESPACE1}) tag_names = set([t.name for t in tags]) self.assertEqual(set([TAG1, TAG2, TAG3]), tag_names) tags = _db_tags_fixture([TAG5, TAG4, TAG5]) self.assertRaises(exception.Duplicate, self.db.metadef_tag_create_tags, self.context, NAMESPACE1, tags) tags = self.tag_repo.list(filters={'namespace': NAMESPACE1}) tag_names = set([t.name for t in tags]) self.assertEqual(set([TAG1, TAG2, TAG3]), tag_names)
[docs] def test_add_tag_namespace_forbidden(self): # NOTE(pawel-koniszewski): Change db_tag_fixture to # tag_factory when tag primary key in DB # will be changed from Integer to UUID tag = _db_tag_fixture(name='added_tag') self.assertEqual('added_tag', tag['name']) self.assertRaises(exception.Forbidden, self.db.metadef_tag_create, self.context, NAMESPACE3, tag)
[docs] def test_add_tag_namespace_not_found(self): # NOTE(pawel-koniszewski): Change db_tag_fixture to # tag_factory when tag primary key in DB # will be changed from Integer to UUID tag = _db_tag_fixture(name='added_tag') self.assertEqual('added_tag', tag['name']) self.assertRaises(exception.NotFound, self.db.metadef_tag_create, self.context, 'not-a-namespace', tag)
[docs] def test_save_tag(self): tag = self.tag_repo.get(NAMESPACE1, TAG1) self.tag_repo.save(tag) tag = self.tag_repo.get(NAMESPACE1, TAG1) self.assertEqual(TAG1, tag.name)
[docs] def test_remove_tag(self): tag = self.tag_repo.get(NAMESPACE1, TAG1) self.tag_repo.remove(tag) self.assertRaises(exception.NotFound, self.tag_repo.get, NAMESPACE1, TAG1)
[docs] def test_remove_tag_not_found(self): fake_name = 'fake_name' tag = self.tag_repo.get(NAMESPACE1, TAG1) tag.name = fake_name self.assertRaises(exception.NotFound, self.tag_repo.remove, tag)