Source code for octavia_tempest_plugin.tests.api.v2.test_l7rule

# Copyright 2018 GoDaddy
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import time
from uuid import UUID

from dateutil import parser
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators

from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
from octavia_tempest_plugin.tests import waiters

CONF = config.CONF


[docs] class L7RuleAPITest(test_base.LoadBalancerBaseTest): """Test the l7rule object API."""
[docs] @classmethod def resource_setup(cls): """Setup resources needed by the tests.""" super(L7RuleAPITest, cls).resource_setup() lb_name = data_utils.rand_name("lb_member_lb1_l7rule") lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider, const.NAME: lb_name} cls._setup_lb_network_kwargs(lb_kwargs) lb = cls.mem_lb_client.create_loadbalancer(**lb_kwargs) cls.lb_id = lb[const.ID] cls.addClassResourceCleanup( cls.mem_lb_client.cleanup_loadbalancer, cls.lb_id) waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer, cls.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.lb_build_interval, CONF.load_balancer.lb_build_timeout) listener_name = data_utils.rand_name("lb_member_listener1_l7rule") listener_kwargs = { const.NAME: listener_name, const.PROTOCOL: const.HTTP, const.PROTOCOL_PORT: '80', const.LOADBALANCER_ID: cls.lb_id, } listener = cls.mem_listener_client.create_listener(**listener_kwargs) cls.listener_id = listener[const.ID] cls.addClassResourceCleanup( cls.mem_listener_client.cleanup_listener, cls.listener_id, lb_client=cls.mem_lb_client, lb_id=cls.lb_id) waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer, cls.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) pool_name = data_utils.rand_name("lb_member_pool1_l7rule") pool_kwargs = { const.NAME: pool_name, const.PROTOCOL: const.HTTP, const.LB_ALGORITHM: const.LB_ALGORITHM_ROUND_ROBIN, const.LISTENER_ID: cls.listener_id, } pool = cls.mem_pool_client.create_pool(**pool_kwargs) cls.pool_id = pool[const.ID] cls.addClassResourceCleanup( cls.mem_pool_client.cleanup_pool, cls.pool_id, lb_client=cls.mem_lb_client, lb_id=cls.lb_id) waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer, cls.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7policy_name = data_utils.rand_name("lb_member_l7policy1_l7rule") l7policy_kwargs = { const.NAME: l7policy_name, const.LISTENER_ID: cls.listener_id, const.ACTION: const.REJECT, } l7policy = cls.mem_l7policy_client.create_l7policy(**l7policy_kwargs) cls.l7policy_id = l7policy[const.ID] cls.addClassResourceCleanup( cls.mem_l7policy_client.cleanup_l7policy, cls.l7policy_id, lb_client=cls.mem_lb_client, lb_id=cls.lb_id) waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer, cls.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout)
# Note: This test also covers basic l7rule show API
[docs] @decorators.idempotent_id('55ac1337-189d-40a6-b614-47d7a8e991f6') def test_l7rule_create(self): """Tests l7rule create and basic show APIs. * Tests that users without the loadbalancer member role cannot create l7rules. * Create a fully populated l7rule. * Show l7rule details. * Validate the show reflects the requested values. """ l7rule_kwargs = { const.ADMIN_STATE_UP: True, const.L7POLICY_ID: self.l7policy_id, const.TYPE: const.HEADER, const.VALUE: 'myvalue-create', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey-create', const.INVERT: False, } if self.mem_l7policy_client.is_version_supported( self.api_version, '2.5'): l7_rule_tags = ["Hello", "World"] l7rule_kwargs.update({ const.TAGS: l7_rule_tags }) # Test that a user without the loadbalancer role cannot # create an L7 rule. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if expected_allowed: self.check_create_RBAC_enforcement( 'L7RuleClient', 'create_l7rule', expected_allowed, status_method=self.mem_lb_client.show_loadbalancer, obj_id=self.lb_id, **l7rule_kwargs) l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs) self.addClassResourceCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule[const.ID], l7policy_id=self.l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) if not CONF.load_balancer.test_with_noop: l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.OPERATING_STATUS, const.ONLINE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) parser.parse(l7rule[const.CREATED_AT]) parser.parse(l7rule[const.UPDATED_AT]) UUID(l7rule[const.ID]) # Operating status for a l7rule will be ONLINE if it is enabled: if l7rule[const.ADMIN_STATE_UP]: self.assertEqual(const.ONLINE, l7rule[const.OPERATING_STATUS]) else: self.assertEqual(const.OFFLINE, l7rule[const.OPERATING_STATUS]) equal_items = [const.ADMIN_STATE_UP, const.TYPE, const.VALUE, const.COMPARE_TYPE, const.KEY, const.INVERT] for item in equal_items: self.assertEqual(l7rule_kwargs[item], l7rule[item]) if self.mem_listener_client.is_version_supported( self.api_version, '2.5'): self.assertCountEqual(l7rule_kwargs[const.TAGS], l7rule[const.TAGS])
[docs] @decorators.idempotent_id('69095254-f106-4fb6-9f54-7a78cc14fb51') def test_l7rule_list(self): """Tests l7rule list API and field filtering. * Create a clean l7policy. * Create three l7rules. * Validates that other accounts cannot list the l7rules. * List the l7rules using the default sort order. * List the l7rules using descending sort order. * List the l7rules using ascending sort order. * List the l7rules returning one field at a time. * List the l7rules returning two fields. * List the l7rules filtering to one of the three. * List the l7rules filtered, one field, and sorted. """ # IDs of L7 rules created in the test test_ids = [] l7policy_name = data_utils.rand_name("lb_member_l7policy2_l7rule-list") l7policy = self.mem_l7policy_client.create_l7policy( name=l7policy_name, listener_id=self.listener_id, action=const.REJECT) l7policy_id = l7policy[const.ID] self.addCleanup( self.mem_l7policy_client.cleanup_l7policy, l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7rule1_kwargs = { const.L7POLICY_ID: l7policy_id, const.ADMIN_STATE_UP: True, const.TYPE: const.HEADER, const.VALUE: '2', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey2-list', } if self.mem_lb_client.is_version_supported( self.api_version, '2.5'): l7rule1_tags = ["English", "Mathematics", "Marketing", "Creativity"] l7rule1_kwargs.update({const.TAGS: l7rule1_tags}) l7rule1 = self.mem_l7rule_client.create_l7rule( **l7rule1_kwargs) self.addCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule1[const.ID], l7policy_id=l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) l7rule1 = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule1[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=l7policy_id) waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) test_ids.append(l7rule1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. time.sleep(1) l7rule2_kwargs = { const.L7POLICY_ID: l7policy_id, const.ADMIN_STATE_UP: True, const.TYPE: const.HEADER, const.VALUE: '1', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey1-list', } if self.mem_lb_client.is_version_supported( self.api_version, '2.5'): l7rule2_tags = ["English", "Spanish", "Soft_skills", "Creativity"] l7rule2_kwargs.update({const.TAGS: l7rule2_tags}) l7rule2 = self.mem_l7rule_client.create_l7rule( **l7rule2_kwargs) self.addCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule2[const.ID], l7policy_id=l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) l7rule2 = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule2[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=l7policy_id) waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) test_ids.append(l7rule2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. time.sleep(1) l7rule3_kwargs = { const.L7POLICY_ID: l7policy_id, const.ADMIN_STATE_UP: False, const.TYPE: const.HEADER, const.VALUE: '3', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey3-list', } if self.mem_lb_client.is_version_supported( self.api_version, '2.5'): l7rule3_tags = ["English", "Project_management", "Communication", "Creativity"] l7rule3_kwargs.update({const.TAGS: l7rule3_tags}) l7rule3 = self.mem_l7rule_client.create_l7rule( **l7rule3_kwargs) self.addCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule3[const.ID], l7policy_id=l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) l7rule3 = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule3[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=l7policy_id) waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) test_ids.append(l7rule3[const.ID]) # Test credentials that should see these L7 rules can see them. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_system_reader', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_system_reader', 'os_roles_lb_admin', 'os_roles_lb_member', 'os_roles_lb_global_observer'] if expected_allowed: self.check_list_IDs_RBAC_enforcement( 'L7RuleClient', 'list_l7rules', expected_allowed, test_ids, l7policy_id) # Test that users without the lb member role cannot list L7 rules. # Note: The parent policy ID blocks non-owners from listing # L7 Rules. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] # Note: os_admin is here because it evaluaties to "project_admin" # in oslo_policy and since keystone considers "project_admin" # a superscope of "project_reader". This means it can read # objects in the "admin" credential's project. if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_system_reader', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_system_reader', 'os_roles_lb_admin', 'os_roles_lb_global_observer', 'os_roles_lb_member'] if expected_allowed: self.check_list_RBAC_enforcement( 'L7RuleClient', 'list_l7rules', expected_allowed, l7policy_id) # Check the default sort order, created_at l7rules = self.mem_l7rule_client.list_l7rules(l7policy_id) self.assertEqual(l7rule1[const.VALUE], l7rules[0][const.VALUE]) self.assertEqual(l7rule2[const.VALUE], l7rules[1][const.VALUE]) self.assertEqual(l7rule3[const.VALUE], l7rules[2][const.VALUE]) # Test sort descending by `value` l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{sort}={value}:{desc}'.format( sort=const.SORT, value=const.VALUE, desc=const.DESC)) self.assertEqual(l7rule1[const.VALUE], l7rules[1][const.VALUE]) self.assertEqual(l7rule2[const.VALUE], l7rules[2][const.VALUE]) self.assertEqual(l7rule3[const.VALUE], l7rules[0][const.VALUE]) # Test sort ascending by `value` l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{sort}={value}:{asc}'.format( sort=const.SORT, value=const.VALUE, asc=const.ASC)) self.assertEqual(l7rule1[const.VALUE], l7rules[1][const.VALUE]) self.assertEqual(l7rule2[const.VALUE], l7rules[0][const.VALUE]) self.assertEqual(l7rule3[const.VALUE], l7rules[2][const.VALUE]) # Test fields for field in const.SHOW_L7RULE_RESPONSE_FIELDS: l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{fields}={field}'.format( fields=const.FIELDS, field=field)) self.assertEqual(1, len(l7rules[0])) self.assertEqual(l7rule1[field], l7rules[0][field]) self.assertEqual(l7rule2[field], l7rules[1][field]) self.assertEqual(l7rule3[field], l7rules[2][field]) # Test multiple fields at the same time l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{fields}={admin}&{fields}={created}'.format( fields=const.FIELDS, admin=const.ADMIN_STATE_UP, created=const.CREATED_AT)) self.assertEqual(2, len(l7rules[0])) self.assertTrue(l7rules[0][const.ADMIN_STATE_UP]) parser.parse(l7rules[0][const.CREATED_AT]) self.assertTrue(l7rules[1][const.ADMIN_STATE_UP]) parser.parse(l7rules[1][const.CREATED_AT]) self.assertFalse(l7rules[2][const.ADMIN_STATE_UP]) parser.parse(l7rules[2][const.CREATED_AT]) # Test filtering l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{value}={rule_value}'.format( value=const.VALUE, rule_value=l7rule2[const.VALUE])) self.assertEqual(1, len(l7rules)) self.assertEqual(l7rule2[const.VALUE], l7rules[0][const.VALUE]) # Test combined params l7rules = self.mem_l7rule_client.list_l7rules( l7policy_id, query_params='{admin}={true}&' '{fields}={value}&{fields}={id}&' '{sort}={value}:{desc}'.format( admin=const.ADMIN_STATE_UP, true=const.ADMIN_STATE_UP_TRUE, fields=const.FIELDS, value=const.VALUE, id=const.ID, sort=const.SORT, desc=const.DESC)) # Should get two l7rules self.assertEqual(2, len(l7rules)) # l7rules should have two fields self.assertEqual(2, len(l7rules[0])) # Should be in descending order by `value` self.assertEqual(l7rule2[const.VALUE], l7rules[1][const.VALUE]) self.assertEqual(l7rule1[const.VALUE], l7rules[0][const.VALUE]) # Creating a list of 3 l7rules, each one contains different tags if self.mem_l7rule_client.is_version_supported( self.api_version, '2.5'): list_of_l7rules = [l7rule1, l7rule2, l7rule3] test_list = [] for l7rule in list_of_l7rules: # If tags "English" and "Creativity" are in the l7rule's tags # and "Spanish" is not, add the l7rule to the list if "English" in l7rule[const.TAGS] and "Creativity" in ( l7rule[const.TAGS]) and "Spanish" not in ( l7rule[const.TAGS]): test_list.append(l7rule[const.VALUE]) # Tests if only the first and the third ones have those tags self.assertEqual( [l7rule1[const.VALUE], l7rule3[const.VALUE]], test_list) # Tests that filtering by an empty tag will return an empty list self.assertTrue(not any(["" in l7rule[const.TAGS] for l7rule in list_of_l7rules]))
[docs] @decorators.idempotent_id('b80b34c3-09fc-467b-8027-7350adb17070') def test_l7rule_show(self): """Tests l7rule show API. * Create a fully populated l7rule. * Show l7rule details. * Validate the show reflects the requested values. * Validates that other accounts cannot see the l7rule. """ l7rule_kwargs = { const.ADMIN_STATE_UP: True, const.L7POLICY_ID: self.l7policy_id, const.TYPE: const.HEADER, const.VALUE: 'myvalue-show', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey-show', const.INVERT: False, } l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs) self.addClassResourceCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule[const.ID], l7policy_id=self.l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) if not CONF.load_balancer.test_with_noop: l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.OPERATING_STATUS, const.ONLINE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) parser.parse(l7rule[const.CREATED_AT]) parser.parse(l7rule[const.UPDATED_AT]) UUID(l7rule[const.ID]) # Operating status for a l7rule will be ONLINE if it is enabled: if l7rule[const.ADMIN_STATE_UP]: self.assertEqual(const.ONLINE, l7rule[const.OPERATING_STATUS]) else: self.assertEqual(const.OFFLINE, l7rule[const.OPERATING_STATUS]) equal_items = [const.ADMIN_STATE_UP, const.TYPE, const.VALUE, const.COMPARE_TYPE, const.KEY, const.INVERT] for item in equal_items: self.assertEqual(l7rule_kwargs[item], l7rule[item]) # Test that the appropriate users can see or not see the L7 rule # based on the API RBAC. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_system_reader', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_system_reader', 'os_roles_lb_admin', 'os_roles_lb_global_observer', 'os_roles_lb_member'] if expected_allowed: self.check_show_RBAC_enforcement( 'L7RuleClient', 'show_l7rule', expected_allowed, l7rule[const.ID], l7policy_id=self.l7policy_id)
[docs] @decorators.idempotent_id('f8cee23b-89b6-4f3a-a842-1463daf42cf7') def test_l7rule_update(self): """Tests l7rule show API and field filtering. * Create a fully populated l7rule. * Show l7rule details. * Validate the show reflects the initial values. * Validates that other accounts cannot update the l7rule. * Update the l7rule details. * Show l7rule details. * Validate the show reflects the initial values. """ l7rule_kwargs = { const.ADMIN_STATE_UP: False, const.L7POLICY_ID: self.l7policy_id, const.TYPE: const.HEADER, const.VALUE: 'myvalue-update', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey-update', const.INVERT: False, } if self.mem_listener_client.is_version_supported( self.api_version, '2.5'): l7_rule_tags = ["Hello", "World"] l7rule_kwargs.update({ const.TAGS: l7_rule_tags }) l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs) self.addClassResourceCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule[const.ID], l7policy_id=self.l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) parser.parse(l7rule[const.CREATED_AT]) parser.parse(l7rule[const.UPDATED_AT]) UUID(l7rule[const.ID]) # Operating status for a l7rule will be ONLINE if it is enabled: if l7rule[const.ADMIN_STATE_UP]: self.assertEqual(const.ONLINE, l7rule[const.OPERATING_STATUS]) else: self.assertEqual(const.OFFLINE, l7rule[const.OPERATING_STATUS]) equal_items = [const.ADMIN_STATE_UP, const.TYPE, const.VALUE, const.COMPARE_TYPE, const.KEY, const.INVERT] if self.mem_listener_client.is_version_supported( self.api_version, '2.5'): self.assertCountEqual(l7rule_kwargs[const.TAGS], l7rule[const.TAGS]) for item in equal_items: self.assertEqual(l7rule_kwargs[item], l7rule[item]) # Test that a user, without the loadbalancer member role, cannot # update this L7 rule. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if expected_allowed: self.check_update_RBAC_enforcement( 'L7RuleClient', 'update_l7rule', expected_allowed, None, None, l7rule[const.ID], l7policy_id=self.l7policy_id, admin_state_up=True) # Assert we didn't go into PENDING_* l7rule_check = self.mem_l7rule_client.show_l7rule( l7rule[const.ID], l7policy_id=self.l7policy_id) self.assertEqual(const.ACTIVE, l7rule_check[const.PROVISIONING_STATUS]) self.assertFalse(l7rule_check[const.ADMIN_STATE_UP]) l7rule_update_kwargs = { const.L7POLICY_ID: self.l7policy_id, const.ADMIN_STATE_UP: True, const.TYPE: const.COOKIE, const.VALUE: 'myvalue-UPDATED', const.COMPARE_TYPE: const.CONTAINS, const.KEY: 'mykey-UPDATED', const.INVERT: True, } if self.mem_listener_client.is_version_supported( self.api_version, '2.5'): l7rule_update_kwargs.update({ const.TAGS: ["Hola", "Mundo"] }) l7rule = self.mem_l7rule_client.update_l7rule( l7rule[const.ID], **l7rule_update_kwargs) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) l7rule = waiters.wait_for_status( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout, l7policy_id=self.l7policy_id) # Operating status for a l7rule will be ONLINE if it is enabled: if l7rule[const.ADMIN_STATE_UP]: self.assertEqual(const.ONLINE, l7rule[const.OPERATING_STATUS]) else: self.assertEqual(const.OFFLINE, l7rule[const.OPERATING_STATUS]) # Test changed items (which is all of them, for l7rules) equal_items = [const.ADMIN_STATE_UP, const.TYPE, const.VALUE, const.COMPARE_TYPE, const.KEY, const.INVERT] if self.mem_listener_client.is_version_supported( self.api_version, '2.5'): self.assertCountEqual(l7rule_update_kwargs[const.TAGS], l7rule[const.TAGS]) for item in equal_items: self.assertEqual(l7rule_update_kwargs[item], l7rule[item])
[docs] @decorators.idempotent_id('8e15d68d-70e7-4cf3-82bc-9604384654a0') def test_l7rule_delete(self): """Tests l7rule create and delete APIs. * Creates a l7rule. * Validates that other accounts cannot delete the l7rule * Deletes the l7rule. * Validates the l7rule is in the DELETED state. """ l7rule_kwargs = { const.L7POLICY_ID: self.l7policy_id, const.TYPE: const.HEADER, const.VALUE: 'myvalue-delete', const.COMPARE_TYPE: const.EQUAL_TO, const.KEY: 'mykey-delete', } l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs) self.addClassResourceCleanup( self.mem_l7rule_client.cleanup_l7rule, l7rule[const.ID], l7policy_id=self.l7policy_id, lb_client=self.mem_lb_client, lb_id=self.lb_id) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) # Test that a user without the loadbalancer role cannot delete this # L7 rule. expected_allowed = [] if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: expected_allowed = ['os_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: expected_allowed = ['os_system_admin', 'os_roles_lb_admin', 'os_roles_lb_member'] if expected_allowed: self.check_delete_RBAC_enforcement( 'L7RuleClient', 'delete_l7rule', expected_allowed, None, None, l7rule[const.ID], l7policy_id=self.l7policy_id) self.mem_l7rule_client.delete_l7rule(l7rule[const.ID], l7policy_id=self.l7policy_id) waiters.wait_for_deleted_status_or_not_found( self.mem_l7rule_client.show_l7rule, l7rule[const.ID], const.PROVISIONING_STATUS, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout, l7policy_id=self.l7policy_id) waiters.wait_for_status( self.mem_lb_client.show_loadbalancer, self.lb_id, const.PROVISIONING_STATUS, const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout)