Keystone认证和授权流程分析




本文基于mitaka版本keystone、nova源码进行分析

密码管理

创建用户、更新用户密码时会保存加密后的密码到keystone数据库password表中,密码采用passlib.hash库进行sha512算法加盐计算多轮hash后保存,加密后密码类似:

$6$rounds=10000$Lu4gJYSkJijDptF5$AUhTPBXXWZXAwLHyDQFYbZ5uo282V5SX.s4OK4/I6oNm88XiCqUvomTOUqLtN0ZV7WhfZHcalNP7ClIh0QAXk.

使用$符号进行分割,对应格式为:$6$rounds=rounds$salt$checksum

第一段的$6$表示是passlib中的sha512-crypt hashes算法,第二段中的rounds=10000表示经过多少次sha512 hash运算,第三段的salt是加密用的盐,明文保存,第四段是密码真正的hash加密密文。其中salt是由passlib随机生成的16位字符串,每个用户的密码都不同,或者说每次生成密文时都随机产生一次,保证所有用户salt都不一样,这样即使被拖库后基于彩虹表的密码反推破解方式就非常困难,每个用户的密码都要根据salt字符串生成一份彩虹表,然后加上加密rounds数量10000次,极大的增加了破解难度,从而达到破解密码成本比账号本身价值高的目的,所谓得不偿失,也就不太乐意去干了。另外也加salt方式也可以有效避免拖库后被拿去撞库攻击其他网站。参考:http://blog.jobbole.com/61872/

参考资料:https://passlib.readthedocs.io/en/stable/lib/passlib.hash.sha256_crypt.html

上面的链接是sha256的,sha512算法跟sha256除了加密后的字符数量长一倍(更安全)之外,没有明显区别(sha256的算法标记位是$5$),具体差异参考:https://passlib.readthedocs.io/en/stable/lib/passlib.hash.sha512_crypt.html#passlib.hash.sha512_crypt

passlib代码写的比较绕(至少我看起来是这样。。。可能是我比较弱。。。),总是找不到具体执行加密的地方,但是大致看明白了,有两种方式,一种是使用操作系统hash加密算法库,另一种是如果操作系统没提供相关算法,就使用passlib内置算法(就是自己写的一套算法实现),相比较而言操作系统提供的算法库更快更高效,一般Linux都提供了常见的加密算法,sha512就是其中之一。

#=============================================================================
# host OS helpers
#=============================================================================

try:
    from crypt import crypt as _crypt   #### 系统自带算法
except ImportError: # pragma: no cover
    _crypt = None
    has_crypt = False
    def safe_crypt(secret, hash):
        return None
else:
    has_crypt = True
    _NULL = '\x00'

keystone创建用户流程

### 根据用户管理API(url),以及keystone-paste.ini配置的composite、app、filter,
### 找到users相关api对应的controller(使用v3版本)
class Routers(wsgi.RoutersBase):

    def append_v3_routers(self, mapper, routers):
        user_controller = controllers.UserV3()
        routers.append(
            router.Router(user_controller,
                          'users', 'user',
                          resource_descriptions=self.v3_resources))
@dependency.requires('identity_api')
## 这个装饰器没太看明白怎么加载的identity_api,应该是根@dependency.provider('identity_api')对应的
class UserV3(controller.V3Controller):
    collection_name = 'users'
    member_name = 'user'

    ......

    @controller.protected()
    @validation.validated(schema.user_create, 'user')
    def create_user(self, context, user):
        # The manager layer will generate the unique ID for users
        ref = self._normalize_dict(user)
        ref = self._normalize_domain_id(context, ref)
        initiator = notifications._get_request_audit_info(context)
        ref = self.identity_api.create_user(ref, initiator)  ### 通过上面的装饰器加载的identity_api
        return UserV3.wrap_member(context, ref)
## 接下来应该是走到这里
@notifications.listener
@dependency.provider('identity_api')   ### identity_api dependency的provider
@dependency.requires('assignment_api', 'credential_api', 'id_mapping_api',
 'resource_api', 'revoke_api', 'shadow_users_api')
class Manager(manager.Manager):
    ......
    @domains_configured
    @exception_translated('user')
    def create_user(self, user_ref, initiator=None):
        user = user_ref.copy()
        user['name'] = clean.user_name(user['name'])
        user.setdefault('enabled', True)
        user['enabled'] = clean.user_enabled(user['enabled'])
        domain_id = user['domain_id']
        self.resource_api.get_domain(domain_id)

        # For creating a user, the domain is in the object itself
        domain_id = user_ref['domain_id']
        driver = self._select_identity_driver(domain_id)  ## 加载sql后端
        user = self._clear_domain_id_if_domain_unaware(driver, user)
        # Generate a local ID - in the future this might become a function of
        # the underlying driver so that it could conform to rules set down by
        # that particular driver type.
        user['id'] = uuid.uuid4().hex
        ref = driver.create_user(user['id'], user)
        notifications.Audit.created(self._USER, user['id'], initiator)
        return self._set_domain_id_and_mapping(
            ref, domain_id, driver, mapping.EntityType.USER)
    # user crud

    @sql.handle_conflicts(conflict_type='user')
    def create_user(self, user_id, user):
        user = utils.hash_user_password(user)   ### 生成加密hash密码
        with sql.session_for_write() as session:
            user_ref = User.from_dict(user)
            session.add(user_ref)   ### 写入数据库
            return identity.filter_user(user_ref.to_dict())
def hash_user_password(user):
    """Hash a user dict's password without modifying the passed-in dict."""
    password = user.get('password')
    if password is None:
        return user

    return dict(user, password=hash_password(password))


def hash_password(password):
    """Hash a password. Hard."""
    ### 密码最长4096位,超出默认不严格限制仅截断后打印warning日志,根据配置项确定
    ### max_length = CONF.identity.max_password_length(默认4096)
    ### CONF.strict_password_check(默认False),打开严格检查超出后直接抛异常
    password_utf8 = verify_length_and_trunc_password(password).encode('utf-8')
    return passlib.hash.sha512_crypt.encrypt(   ### 调用passlib生成加密密码,rounds默认10000
        password_utf8, rounds=CONF.crypt_strength)

修改密码流程

修改密码场景首先需要用旧密码获取token之后进行修改,或者管理员权限进行修改。整体流程与创建用户非常接近,参考上面的创建用户流程即可分析清楚。更新密码有专门的API接口和对应的method,也可以通过update user API来实现,keystone代码流程最终都是走的update_user。

class Routers(wsgi.RoutersBase):
    def append_v3_routers(self, mapper, routers):
        ......
        self._add_resource(
            mapper, user_controller,
            path='/users/{user_id}/password',   ### 修改密码专用API的url
            post_action='change_password',   ### controller中对应的method
            rel=json_home.build_v3_resource_relation('user_change_password'),
            path_vars={
                'user_id': json_home.Parameters.USER_ID,
            })
    @sql.handle_conflicts(conflict_type='user')
    def update_user(self, user_id, user):
        with sql.session_for_write() as session:  ## 更新数据库
            user_ref = self._get_user(session, user_id)
            old_user_dict = user_ref.to_dict()
            user = utils.hash_user_password(user)  ### 加密新密码
            for k in user:
                old_user_dict[k] = user[k]
            new_user = User.from_dict(old_user_dict)
            for attr in User.attributes:
                if attr != 'id':
                    setattr(user_ref, attr, getattr(new_user, attr))
            user_ref.extra = new_user.extra
            return identity.filter_user(
                user_ref.to_dict(include_extra_dict=True))

用户认证

获取token

认证就是用户登录或者说获取token的过程(登录就是一种比较特殊的获取token的过程,可参考之前写的一篇文章:OpenStack Dashboard用户登录为啥不需要输入租户名),因此这里只讨论根据用户名密码租户等信息生成token的过程,根据token操作的API(url),以及keystone-paste.ini配置的composite、app、filter,找到对应的router和controller:

class Routers(wsgi.RoutersBase):

    def append_v3_routers(self, mapper, routers):
        auth_controller = controllers.Auth()

        self._add_resource(
            mapper, auth_controller,
            path='/auth/tokens',
            get_action='validate_token',
            head_action='check_token',
            post_action='authenticate_for_token',
            delete_action='revoke_token',
            rel=json_home.build_v3_resource_relation('auth_tokens'))
@dependency.requires('assignment_api', 'catalog_api', 'identity_api',
                     'resource_api', 'token_provider_api', 'trust_api')
### 跟上面创建用户的@dependency.requires原理类似,需要找到对应的provider
class Auth(controller.V3Controller):

    # Note(atiwari): From V3 auth controller code we are
    # calling protection() wrappers, so we need to setup
    # the member_name and  collection_name attributes of
    # auth controller code.
    # In the absence of these attributes, default 'entity'
    # string will be used to represent the target which is
    # generic. Policy can be defined using 'entity' but it
    # would not reflect the exact entity that is in context.
    # We are defining collection_name = 'tokens' and
    # member_name = 'token' to facilitate policy decisions.
    collection_name = 'tokens'
    member_name = 'token'

    def __init__(self, *args, **kw):
        super(Auth, self).__init__(*args, **kw)
        config.setup_authentication()

    def authenticate_for_token(self, context, auth=None):
        """Authenticate user and issue a token."""
        include_catalog = 'nocatalog' not in context['query_string']

        try:
            auth_info = AuthInfo.create(context, auth=auth)
            auth_context = AuthContext(extras={},
                                       method_names=[],
                                       bind={})
            self.authenticate(context, auth_info, auth_context)
            if auth_context.get('access_token_id'):
                auth_info.set_scope(None, auth_context['project_id'], None)
            self._check_and_set_default_scoping(auth_info, auth_context)
            (domain_id, project_id, trust, unscoped) = auth_info.get_scope()

            method_names = auth_info.get_method_names()
            method_names += auth_context.get('method_names', [])
            # make sure the list is unique
            method_names = list(set(method_names))
            expires_at = auth_context.get('expires_at')
            # NOTE(morganfainberg): define this here so it is clear what the
            # argument is during the issue_v3_token provider call.
            metadata_ref = None

            token_audit_id = auth_context.get('audit_id')
            ### token_provider_api的provider是keystone.token.provider.Manager
            (token_id, token_data) = self.token_provider_api.issue_v3_token(
                auth_context['user_id'], method_names, expires_at, project_id,
                domain_id, auth_context, trust, metadata_ref, include_catalog,
                parent_audit_id=token_audit_id)

            # NOTE(wanghong): We consume a trust use only when we are using
            # trusts and have successfully issued a token.
            if trust:
                self.trust_api.consume_use(trust['id'])

            return render_token_data_response(token_id, token_data,
                                              created=True)
        except exception.TrustNotFound as e:
            raise exception.Unauthorized(e)
@dependency.provider('token_provider_api')
@dependency.requires('assignment_api', 'revoke_api')
class Manager(manager.Manager):
    """Default pivot point for the token provider backend.

    See :mod:`keystone.common.manager.Manager` for more details on how this
    dynamically calls the backend.

    """
    ### setup.cfg的entry_points的namespace:
    ### keystone.token.provider =
    ###     fernet = keystone.token.providers.fernet:Provider
    ###     uuid = keystone.token.providers.uuid:Provider
    ###     pki = keystone.token.providers.pki:Provider
    ###     pkiz = keystone.token.providers.pkiz:Provider
    driver_namespace = 'keystone.token.provider'
    ......
    def __init__(self):
        ### [token]
        ### provider = fernet  # 我们用的是这个,好处见下面的链接,对我来说最好的地方是不需要保存token到数据库
        ### expiration = 6000
        super(Manager, self).__init__(CONF.token.provider)
        self._register_callback_listeners()
    def issue_v3_token(self, user_id, method_names, expires_at=None,
                       project_id=None, domain_id=None, auth_context=None,
                       trust=None, metadata_ref=None, include_catalog=True,
                       parent_audit_id=None):
        token_id, token_data = self.driver.issue_v3_token(  ### 调用fernet token driver
            user_id, method_names, expires_at, project_id, domain_id,
            auth_context, trust, metadata_ref, include_catalog,
            parent_audit_id)

        ......
        if self._needs_persistence:
            self._create_token(token_id, data)
        return token_id, token_data
    def issue_v3_token(self, *args, **kwargs):
        token_id, token_data = super(Provider, self).issue_v3_token(
            *args, **kwargs)   ### 通过base class的方法生成
        self._build_issued_at_info(token_id, token_data)
        return token_id, token_data
    def issue_v3_token(self, user_id, method_names, expires_at=None,
                       project_id=None, domain_id=None, auth_context=None,
                       trust=None, metadata_ref=None, include_catalog=True,
                       parent_audit_id=None):

        ......
        token_data = self.v3_token_data_helper.get_token_data(  ### 生成token信息
            user_id,
            method_names,
            domain_id=domain_id,
            project_id=project_id,
            expires=expires_at,
            trust=trust,
            bind=auth_context.get('bind') if auth_context else None,
            token=token_ref,
            include_catalog=include_catalog,
            access_token=access_token,
            audit_info=parent_audit_id)

        token_id = self._get_token_id(token_data)  ### 生成token id
        return token_id, token_data
    ### 回到子类keystone.token.providers.fernet.core.Provider#_get_token_id
    def _get_token_id(self, token_data):
        ......
        return self.token_formatter.create_token(
            user_id,
            expires_at,
            audit_ids,
            methods=methods,
            domain_id=domain_id,
            project_id=project_id,
            trust_id=trust_id,
            federated_info=federated_info,
            access_token_id=access_token_id
        )
class TokenFormatter(object):
    """Packs and unpacks payloads into tokens for transport."""

    @property
    def crypto(self):
        """Return a cryptography instance.

        You can extend this class with a custom crypto @property to provide
        your own token encoding / decoding. For example, using a different
        cryptography library (e.g. ``python-keyczar``) or to meet arbitrary
        security requirements.

        This @property just needs to return an object that implements
        ``encrypt(plaintext)`` and ``decrypt(ciphertext)``.

        """
        keys = utils.load_keys()

        if not keys:
            raise exception.KeysNotFound()

        fernet_instances = [fernet.Fernet(key) for key in keys]
        return fernet.MultiFernet(fernet_instances)

    def pack(self, payload):
        """Pack a payload for transport as a token.

        :type payload: six.binary_type
        :rtype: six.text_type

        """
        # base64 padding (if any) is not URL-safe
        return self.crypto.encrypt(payload).rstrip(b'=').decode('utf-8')  ## 生成fernet token

验证token

api入口跟获取类似,

    @controller.protected()  ## 注意这个装饰器
    def validate_token(self, context):
        token_id = context.get('subject_token_id')
        include_catalog = 'nocatalog' not in context['query_string']
        token_data = self.token_provider_api.validate_v3_token(
            token_id)
        if not include_catalog and 'catalog' in token_data['token']:
            del token_data['token']['catalog']
        return render_token_data_response(token_id, token_data)
    def validate_v3_token(self, token_id):
        if not token_id:
            raise exception.TokenNotFound(_('No token in the request'))

        try:
            # NOTE(lbragstad): Only go to persistent storage if we have a token
            # to fetch from the backend (the driver persists the token).
            # Otherwise the information about the token must be in the token
            # id.
            if not self._needs_persistence:  ### fernet是不需要持久化到数据库的
                token_ref = self.validate_non_persistent_token(token_id)
            ......
            self._is_valid_token(token_ref)  ### 验证根据id解析出来的token是否有效
            return token_ref
        except exception.Unauthorized as e:
            LOG.debug('Unable to validate token: %s', e)
            raise exception.TokenNotFound(token_id=token_id)

    @MEMOIZE
    def validate_non_persistent_token(self, token_id):
        return self.driver.validate_non_persistent_token(token_id)
    ### 调用fernet driver keystone.token.providers.fernet.core.Provider,
    ### 未实现该方法,找到base class keystone.token.providers.common.BaseProvider
    def validate_non_persistent_token(self, token_id):
        try:
            (user_id, methods, audit_ids, domain_id, project_id, trust_id,
                federated_info, access_token_id, created_at, expires_at) = (
                    self.token_formatter.validate_token(token_id))  ### 根据id解析token信息
        except exception.ValidationError as e:
            raise exception.TokenNotFound(e)

        ......

        return self.v3_token_data_helper.get_token_data( ### 组装token data
            user_id,
            method_names=methods,
            domain_id=domain_id,
            project_id=project_id,
            issued_at=created_at,
            expires=expires_at,
            trust=trust_ref,
            token=token_dict,
            access_token=access_token,
            audit_info=audit_ids)
    def validate_token(self, token):
        """Validates a Fernet token and returns the payload attributes.

        :type token: six.text_type

        """
        ### 根据id反向解析token信息,执行的是:self.crypto.decrypt(token.encode('utf-8'))
        serialized_payload = self.unpack(token)
        versioned_payload = msgpack.unpackb(serialized_payload)
        version, payload = versioned_payload[0], versioned_payload[1:]

        for payload_class in PAYLOAD_CLASSES:
            if version == payload_class.version:
                (user_id, methods, project_id, domain_id, expires_at,
                 audit_ids, trust_id, federated_info, access_token_id) = (
                    payload_class.disassemble(payload))
                break
        else:
            # If the token_format is not recognized, raise ValidationError.
            raise exception.ValidationError(_(
                'This is not a recognized Fernet payload version: %s') %
                version)

        # rather than appearing in the payload, the creation time is encoded
        # into the token format itself
        created_at = TokenFormatter.creation_time(token)
        created_at = ks_utils.isotime(at=created_at, subsecond=True)
        expires_at = timeutils.parse_isotime(expires_at)
        expires_at = ks_utils.isotime(at=expires_at, subsecond=True)

        return (user_id, methods, audit_ids, domain_id, project_id, trust_id,
                federated_info, access_token_id, created_at, expires_at)
    def _is_valid_token(self, token):
        """Verify the token is valid format and has not expired."""
        current_time = timeutils.normalize_time(timeutils.utcnow())

        try:
            # Get the data we need from the correct location (V2 and V3 tokens
            # differ in structure, Try V3 first, fall back to V2 second)
            token_data = token.get('token', token.get('access'))
            expires_at = token_data.get('expires_at',
                                        token_data.get('expires'))
            if not expires_at:
                expires_at = token_data['token']['expires']
            expiry = timeutils.normalize_time(
                timeutils.parse_isotime(expires_at))
        except Exception:
            LOG.exception(_LE('Unexpected error or malformed token '
                              'determining token expiry: %s'), token)
            raise exception.TokenNotFound(_('Failed to validate token'))

        if current_time < expiry:
            self.check_revocation(token)
            # Token has not expired and has not been revoked.
            return None
        else:
            raise exception.TokenNotFound(_('Failed to validate token'))

@controller.protected()这个装饰器里面做的事情比较多,检查了x-auth-token(权限比较高的认证token)和x-subject-token(被验证的token),

def protected(callback=None):
    """Wraps API calls with role based access controls (RBAC).

    This handles both the protection of the API parameters as well as any
    target entities for single-entity API calls.

    More complex API calls (for example that deal with several different
    entities) should pass in a callback function, that will be subsequently
    called to check protection for these multiple entities. This callback
    function should gather the appropriate entities needed and then call
    check_protection() in the V3Controller class.

    """
    def wrapper(f):
        @functools.wraps(f)
        def inner(self, context, *args, **kwargs):
            if 'is_admin' in context and context['is_admin']:
                LOG.warning(_LW('RBAC: Bypassing authorization'))
            elif callback is not None:
                prep_info = {'f_name': f.__name__,
                             'input_attr': kwargs}
                callback(self, context, prep_info, *args, **kwargs)
            else:
                action = 'identity:%s' % f.__name__
                creds = _build_policy_check_credentials(self, action, ## 这里检查x-auth-token
                                                        context, kwargs)

                policy_dict = {}

                # Check to see if we need to include the target entity in our
                # policy checks.  We deduce this by seeing if the class has
                # specified a get_member() method and that kwargs contains the
                # appropriate entity id.
                if (hasattr(self, 'get_member_from_driver') and
                        self.get_member_from_driver is not None):
                    key = '%s_id' % self.member_name
                    if key in kwargs:
                        ref = self.get_member_from_driver(kwargs[key])
                        policy_dict['target'] = {self.member_name: ref}

                # TODO(henry-nash): Move this entire code to a member
                # method inside v3 Auth
                if context.get('subject_token_id') is not None:
                    token_ref = token_model.KeystoneToken(
                        token_id=context['subject_token_id'],
                        token_data=self.token_provider_api.validate_token(
                            context['subject_token_id']))  ## 这里检查x-subject-token
                    policy_dict.setdefault('target', {})
                    policy_dict['target'].setdefault(self.member_name, {})
                    policy_dict['target'][self.member_name]['user_id'] = (
                        token_ref.user_id)
                    try:
                        user_domain_id = token_ref.user_domain_id
                    except exception.UnexpectedError:
                        user_domain_id = None
                    if user_domain_id:
                        policy_dict['target'][self.member_name].setdefault(
                            'user', {})
                        policy_dict['target'][self.member_name][
                            'user'].setdefault('domain', {})
                        policy_dict['target'][self.member_name]['user'][
                            'domain']['id'] = (
                                user_domain_id)

                # Add in the kwargs, which means that any entity provided as a
                # parameter for calls like create and update will be included.
                policy_dict.update(kwargs)
                self.policy_api.enforce(creds,
                                        action,
                                        utils.flatten_dict(policy_dict))
                LOG.debug('RBAC: Authorization granted')
            return f(self, context, *args, **kwargs)
        return inner
    return wrapper
## 上面两个token认证都是走这个方法
    def validate_token(self, token_id, belongs_to=None):
        unique_id = utils.generate_unique_id(token_id)
        # NOTE(morganfainberg): Ensure we never use the long-form token_id
        # (PKI) as part of the cache_key.
        token = self._validate_token(unique_id)  ## 解析token信息(fernet格式可unpack解密)
        self._token_belongs_to(token, belongs_to) ## 检查token所属project,belongs_to=None,不做检查
        self._is_valid_token(token)  ## 检查token有效期
        return token
    @MEMOIZE  ## 注意这个装饰器,cache token管理(进程内或者配置的memcache servers)
              ## 已用过的token可以从缓存中直接读取信息,第一次用的就缓存起来
    def _validate_token(self, token_id):
        if not token_id:
            raise exception.TokenNotFound(_('No token in the request'))

        try:
            if not self._needs_persistence:
                # NOTE(lbragstad): This will validate v2 and v3 non-persistent
                # tokens.  ### fernet格式token是非持久化的,走这里
                return self.driver.validate_non_persistent_token(token_id)
            token_ref = self._persistence.get_token(token_id)
            version = self.get_token_version(token_ref)
            if version == self.V3:
                return self.driver.validate_v3_token(token_ref)
        except exception.Unauthorized as e:
            LOG.debug('Unable to validate token: %s', e)
            raise exception.TokenNotFound(token_id=token_id)
        if version == self.V2:
            return self.driver.validate_v2_token(token_ref)
        raise exception.UnsupportedTokenVersionException()

validate_non_persistent_token这个方法上面已经介绍过。

可以看出,一次token认证过程,keystone/token/providers/common.py(507)get_token_data()这个方法走了3次,一次x-auth-token(protected装饰器里),两次x-subject-token(protected装饰器里和主流程里)。

这里可以看出两次x-subject-token流程貌似有点多余,但我还没看明白为啥要两次。

以nova-api服务为例进行第三方服务token验证流程分析。

[composite:openstack_compute_api_v21]
use = call:nova.api.auth:pipeline_factory_v21
noauth2 = cors compute_req_id faultwrap sizelimit noauth2 osapi_compute_app_v21
keystone = cors compute_req_id faultwrap sizelimit authtoken keystonecontext osapi_compute_app_v21


[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

可以看到通过keystonemiddleware来进行token认证,keystonemiddleware中会根据token格式判断是否可以本地验证,如PKI格式等,否则就需要发送请求给keystone服务进行token的验证,成功后,把response传递给下一个filter(keystonecontext),keystonecontext filter会把token信息转换成nova的context对象,然后这个context就可以在请求经过的所有nova子服务中进行流转使用。keystonecontext实现是nova.api.auth.NovaKeystoneContext#__call__,返回的对象是context.RequestContext。

keystonemiddleware的用法参考:

关于@webob.dec.wsgify(RequestClass=wsgi.Request),这部分我也没看懂,大致就是把方法转换成WSGI app,比较省事儿。参考:

用户授权

角色管理

keystone角色的增删改查流程就不分析了,跟上面的用户很类似。

根据角色进行授权

keystone默认有两种角色,admin和_member_,分别表示管理员和普通用户,也可以增加自己的角色,但是需要修改授权策略配置文件,授权配置文件一般在服务配置目录下,比如/etc/nova/policy.json、/etc/neutron/policy.json(json格式的授权策略配置已经要被废弃了,后续会切换到yaml格式),以nova的配置为例:

{
    "context_is_admin":  "role:admin",   ### 检查用户角色,是admin的通过
    "admin_or_owner":  "is_admin:True or project_id:%(project_id)s",  ### admin角色或者操作者project_id与资源的project_id相同的通过
    "default": "rule:admin_or_owner",  ### 默认规则是admin或者project_id通过
    ### 我们也可以定义自己的策略,比如project_id和user_id完全匹配的才可以执行操作
    ### "project_and_user": "project_id:%(project_id)s and user_id:%(user_id)s"

    "cells_scheduler_filter:TargetCellFilter": "is_admin:True",

    ### 这些是nova/compute/api.py里面各种操作的授权策略配置
    "compute:create": "rule:admin_or_owner",
    "compute:create:attach_network": "rule:admin_or_owner",
    "compute:create:attach_volume": "rule:admin_or_owner",
    "compute:create:forced_host": "is_admin:True",
    ......
    "admin_api": "is_admin:True",
    ### 这些是扩展api的授权策略配置
    "compute_extension:accounts": "rule:admin_api",
    "compute_extension:admin_actions": "rule:admin_api",
    "compute_extension:admin_actions:pause": "rule:admin_or_owner",
    "compute_extension:admin_actions:unpause": "rule:admin_or_owner",
    "compute_extension:admin_actions:suspend": "rule:admin_or_owner",
    ......
    ### nova/network/api.py各种操作的授权配置
    "network:get_all": "rule:admin_or_owner",
    "network:get": "rule:admin_or_owner",
    "network:create": "rule:admin_or_owner",
    "network:delete": "rule:admin_or_owner",
    ......
    ### nova/api/openstack/compute相关接口的授权配置
    "os_compute_api:servers:detail:get_all_tenants": "is_admin:True",
    "os_compute_api:servers:index:get_all_tenants": "is_admin:True",
    "os_compute_api:servers:confirm_resize": "rule:admin_or_owner",
    "os_compute_api:servers:create": "rule:admin_or_owner",
    "os_compute_api:servers:create:attach_network": "rule:admin_or_owner",
    "os_compute_api:servers:create:attach_volume": "rule:admin_or_owner",
    ......
}

以nova创建虚拟机接口为例:

    @wsgi.response(202)
    @extensions.expected_errors((400, 403, 409, 413))
    @validation.schema(schema_server_create_v20, '2.0', '2.0')
    @validation.schema(schema_server_create, '2.1', '2.18')
    @validation.schema(schema_server_create_v219, '2.19')
    def create(self, req, body):
        """Creates a new server for a given user."""

        context = req.environ['nova.context']
        ......
        target = {
            'project_id': context.project_id,
            'user_id': context.user_id,
            'availability_zone': availability_zone}
        authorize(context, target, 'create')  ### create操作授权策略检查

        # TODO(Shao He, Feng) move this policy check to os-availabilty-zone
        # extension after refactor it.
        parse_az = self.compute_api.parse_availability_zone
        availability_zone, host, node = parse_az(context, availability_zone)
        if host or node:
            authorize(context, {}, 'create:forced_host')  ### create:forced_host操作授权检查
ALIAS = 'servers'
......
authorize = extensions.os_compute_authorizer(ALIAS)
# This will be deprecated after policy cleanup finished
def core_authorizer(api_name, extension_name):
    ### create操作授权检查时target为project_id/user_id/az信息,action为create
    ### create:force_host操作授权检查时为{},action为create:force_host
    ### api_name为os_compute_api,extension_name为servers
    def authorize(context, target=None, action=None):
        if target is None:
            target = {'project_id': context.project_id,
                      'user_id': context.user_id}
        if action is None:
            act = '%s:%s' % (api_name, extension_name)
        else:
            act = '%s:%s:%s' % (api_name, extension_name, action)
        ### 所以两个操作的最终act为:"os_compute_api:servers:create"和"os_compute_api:servers:create:forced_host"
        nova.policy.enforce(context, act, target)
    return authorize

# NOTE(alex_xu): The functions os_compute_authorizer and
# os_compute_soft_authorizer are used to policy enforcement for OpenStack
# Compute API, now Nova V2.1 REST API will invoke it.
#

def os_compute_authorizer(extension_name):
    return core_authorizer('os_compute_api', extension_name)

nova.policy.enforce(context, act, target)最终是使用oslo_policy.policy的enforce方法来进行授权检查的,该方法会首先加载授权配置文件中的全部规则,并且缓存起来,只有当传入了force_reload参数或者策略配置文件修改时间有变动时,才重新加载,所以修改策略配置文件后,下次发送请求给API服务即可立即生效。

oslo_policy这个库代码不多,单步调试也不复杂,主要流程在于策略的加载检查,我也没有去实际的调试,后面用到再说吧。

其他WSGI框架使用的认证和授权方案

上面写了那么多,实际我只有一个想法,解决困扰了我很久的问题:web服务端如何实现认证和授权?是不是每个后端接口都要手工加上认证、授权相关方法?用户怎么登录?登录的用户有什么权限、能执行哪些操作?

所以看完OpenStack相关服务的认证和授权方式之后,又查看了一些python web框架的认证、授权方式,比如最简单的bottle.py,它本身并没有实现相关功能,而是集成第三方库Bottle-Cork(当然不止这一个库,还有其他的),示例代码:http://cork.firelet.net/example_webapp_decorated.htmlhttp://cork.firelet.net/example_webapp.html,一种是用装饰器实现认证、授权,第二种是显式的加到方法里去。

而OpenStack则是基于paste deploy库的authtoken filter来实现的认证,授权是显式的写在需要进行授权检查的操作方法里。

Django框架是MVC集成模式,所以这方面做的很完善:http://python.usyiyi.cn/translate/django_182/topics/auth/default.html,有很多现成的方法可以直接用起来。web请求可以配合session来记录用户登录信息,就不必要每次发请求都要把用户名密码发给后端,或者把token带到header里面。

OpenStack各个服务只是实现HTTP RestFul API(包含model和controller),并没有web视图(views),所以也不涉及session,只能通过token来进行请求的认证和授权(token有过期时间限制,不用每次发送用户名密码,更安全)。

使用Flask框架设计RestFul API认证:http://www.pythondoc.com/flask-restful/third.html

可以看出也是基于token的认证,并且文章还解释了使用用户名密码方式认证的问题,与上面我提到的一样,只是多了一条,http客户端必须要明文保存用户名密码,否则无法发送到服务端,这样也是不安全的。

另外上面的文章还提到,基于token的认证跟flask处理基于session cookies的认证方法是类似的,都是基于一个叫itsdangerous的库,因此可以实现cookies中缓存的认证信息在多久之后就失效,防止永不失效或者失效时间过长带来的安全风险。

还有其他很多的认证方法比如OAuth及OAuth2.0,还有OpenID、LADP等,这部分没研究。但目标应该都是大同小异,减少客户端每次发送用户名密码带来的安全风险,并保证token的安全不易破解。

参考: