nova添加neutron安全组代码流程分析




依赖知识点

  • neutron-server启动流程(包含处理HTTP请求的core plugin和extensions注册流程)
  • neutron-openvswitch-agent启动流程
  • neutron openvswitch(或Linux bridge)firewall安全组配置方案
  • neutron安全组创建、安全组规则创建流程
  • neutron rpc及callback机制
  • nova client(或openstack client)命令行及代码流程
  • nova-api启动流程(包含处理HTTP请求的controller注册流程)
  • nova-compute启动流程
  • nova云主机创建流程
  • WSGI路由规则
  • paste deploy WSGI框架

我们可以使用nova命令行给一台云主机(或者叫虚拟机、实例,下面统一叫云主机)添加安全组(前提是云主机已经成功创建,neutron安全组和规则也都创建完毕),其命令行如下:

[root@vs2-controller ~(VisionStack)]$ nova secgroup-list   ### 列出安全组
+--------------------------------------+------------+------------------------+
| Id                                   | Name       | Description            |
+--------------------------------------+------------+------------------------+
| d9cdf493-bd74-4189-8065-21f0d24cd503 | 1          |                        |
| c8f3fb48-f203-4f92-ac7e-0bcad86f6add | default    | Default security group |
| 8eb76d3e-1f74-4261-963a-c061669d1e7f | hello1234  |                        |
| 9fc5cfa4-7787-4c15-b496-1a8575fbe15f | hello41231 | hello4wqeqw            |

[root@vs2-controller ~(VisionStack)]$ nova list | grep e27f9125   ### 列出云主机
| e27f9125-0465-4f05-b2ad-35c5ab1cc979 | wkm-test           | ACTIVE            | -          | Running     | network1=10.0.0.10                    |

[root@vs2-controller ~(VisionStack)]$ nova add-secgroup e27f9125-0465-4f05-b2ad-35c5ab1cc979 9fc5cfa4-7787-4c15-b496-1a8575fbe15f   ### 添加安全组到云主机

[root@vs2-controller ~(VisionStack)]$ nova show e27f9125-0465-4f05-b2ad-35c5ab1cc979 | grep security_groups     ### 查看云主机上已添加的安全组
| security_groups                      | hello41231

本文的目的就是分析一下“nova add-secgroup”这个操作在nova项目和neutron项目中,具体执行到的代码流程,分析的相关项目的OpenStack版本是Mitaka。

本人水平有限,如有谬误,请不吝指正!

Nova项目代码流程

nova部分的代码流程比较简单,nova add-secgroup命令会通过client封装并发送HTTP请求到nova-api服务,对应的curl命令发送方式为(可以通过nova –debug add-secgroup $UUID $SGID获得):

curl -g -i -X POST http://vs2-controller:8774/v2.1/8005649576f14399a88e38ef7e17a831/servers/e27f9125-0465-4f05-b2ad-35c5ab1cc979/action -H "User-Agent: python-novaclient" -H "Content-Type: application/json" -H "Accept: application/json" -H "X-OpenStack-Nova-API-Version: 2.25" -H "X-Auth-Token: {SHA1}cbb848c1305c0e63a495b03f19bb9f74fb30a9f8" -d '{"addSecurityGroup": {"name": "9fc5cfa4-7787-4c15-b496-1a8575fbe15f"}}'

nova-api接受到请求后,会交给nova.api.openstack.compute.security_groups.SecurityGroupActionController进行实际的处理,对应的处理方法为:

    @extensions.expected_errors((400, 404, 409))
    @wsgi.response(202)
    @wsgi.action('addSecurityGroup')
    def _addSecurityGroup(self, req, id, body):
        context = req.environ['nova.context']
        authorize(context)

        group_name = self._parse(body, 'addSecurityGroup')
        try:
            return self._invoke(self.security_group_api.add_to_instance,
                                context, id, group_name)
        except (exception.SecurityGroupNotFound,
                exception.InstanceNotFound) as exp:
            raise exc.HTTPNotFound(explanation=exp.format_message())
        except exception.NoUniqueMatch as exp:
            raise exc.HTTPConflict(explanation=exp.format_message())
        except (exception.SecurityGroupCannotBeApplied,
                exception.SecurityGroupExistsForInstance) as exp:
            raise exc.HTTPBadRequest(explanation=exp.format_message())

这段代码比较简单,_invoke没什么好说的,看一眼就明白,主要是self.security_group_api这个是什么要搞清楚,因为它影响接下来的代码流程。一般来讲,类里面的属性都是在__init__里面初始化的,我们首先去那边找找看:

class SecurityGroupActionController(wsgi.Controller):
    def __init__(self, *args, **kwargs):
        super(SecurityGroupActionController, self).__init__(*args, **kwargs)
        self.security_group_api = (
            openstack_driver.get_openstack_security_group_driver(
                skip_policy_check=True))
        self.compute_api = compute.API(
            security_group_api=self.security_group_api, skip_policy_check=True)

是的,就是在这里初始化的。这里又涉及到openstack_driver是什么从哪儿来的问题(我看代码的三问:是什么?从哪儿来?到哪儿去?)。在nova\network\security_group\openstack_driver.py找到get_openstack_security_group_driver:

NOVA_DRIVER = ('nova.compute.api.SecurityGroupAPI')
NEUTRON_DRIVER = ('nova.network.security_group.neutron_driver.'
                  'SecurityGroupAPI')
DRIVER_CACHE = {}


def _get_openstack_security_group_driver(skip_policy_check=False):
    if is_neutron_security_groups():
        return importutils.import_object(NEUTRON_DRIVER,
                                         skip_policy_check=skip_policy_check)
    elif CONF.security_group_api.lower() == 'nova':
        return importutils.import_object(NOVA_DRIVER,
                                         skip_policy_check=skip_policy_check)
    else:
        return importutils.import_object(CONF.security_group_api,
                                         skip_policy_check=skip_policy_check)


def get_openstack_security_group_driver(skip_policy_check=False):
    if skip_policy_check not in DRIVER_CACHE:
        DRIVER_CACHE[skip_policy_check] = _get_openstack_security_group_driver(
            skip_policy_check)
    return DRIVER_CACHE[skip_policy_check]


def is_neutron_security_groups():
    return (CONF.security_group_api.lower() == 'neutron'
            or nova.network.is_neutron())

可以看到,_get_openstack_security_group_driver这个方法会import对应的类,这里还有一个is_neutron_security_groups方法,CONF.security_group_api.lower() == ‘neutron’这半个条件容易迷惑人,因为这个配置项的默认值是’nova’,因此实际上是后面的nova.network.is_neutron()条件才有效,找到nova\network\__init__.py中的is_neutron:

def is_neutron():
    """Does this configuration mean we're neutron.

    This logic exists as a separate config option
    """
    legacy_class = oslo_config.cfg.CONF.network_api_class
    use_neutron = oslo_config.cfg.CONF.use_neutron

    if legacy_class not in (NEUTRON_NET_API, NOVA_NET_API):
        # Someone actually used this option, this gets a pass for now,
        # but will just go away once deleted.
        return None
    elif legacy_class == NEUTRON_NET_API and not use_neutron:
        # If they specified neutron via class, we should respect that
        LOG.warn(_LW("Config mismatch. The network_api_class specifies %s, "
                     "however use_neutron is not set to True. Using Neutron "
                     "networking for now, however please set use_neutron to "
                     "True in your configuration as network_api_class is "
                     "deprecated and will be removed."), legacy_class)
        return True
    elif use_neutron:
        return True
    else:
        return False

可以看到network_api_class、use_neutron这两个配置项,根据我们实际环境的配置(/etc/nova/nova.conf),network_api_class未配置为默认值,use_neutron=True,因此is_neutron()返回True。再回到openstack_driver.py和SecurityGroupActionController,最终确定security_group_api是NEUTRON_DRIVER = (‘nova.network.security_group.neutron_driver.SecurityGroupAPI’)。因此self.security_group_api.add_to_instance调用的是nova.network.security_group.neutron_driver.SecurityGroupAPI#add_to_instance方法:

    @compute_api.wrap_check_security_groups_policy
    def add_to_instance(self, context, instance, security_group_name):
        """Add security group to the instance."""

        neutron = neutronapi.get_client(context)
        try:
            security_group_id = neutronv20.find_resourceid_by_name_or_id(
                neutron, 'security_group',
                security_group_name,
                context.project_id)
        except n_exc.NeutronClientNoUniqueMatch as e:
            raise exception.NoUniqueMatch(six.text_type(e))
        except n_exc.NeutronClientException as e:
            exc_info = sys.exc_info()
            if e.status_code == 404:
                msg = (_("Security group %(name)s is not found for "
                         "project %(project)s") %
                       {'name': security_group_name,
                        'project': context.project_id})
                self.raise_not_found(msg)
            else:
                LOG.exception(_LE("Neutron Error:"))
                six.reraise(*exc_info)
        params = {'device_id': instance.uuid}
        try:
            ports = neutron.list_ports(**params).get('ports')
        except n_exc.NeutronClientException:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("Neutron Error:"))

        if not ports:
            msg = (_("instance_id %s could not be found as device id on"
                   " any ports") % instance.uuid)
            self.raise_not_found(msg)

        for port in ports:
            if not self._has_security_group_requirements(port):
                LOG.warning(_LW("Cannot add security group %(name)s to "
                                "%(instance)s since the port %(port_id)s "
                                "does not meet security requirements"),
                            {'name': security_group_name,
                             'instance': instance.uuid,
                             'port_id': port['id']})
                raise exception.SecurityGroupCannotBeApplied()
            if 'security_groups' not in port:
                port['security_groups'] = []
            port['security_groups'].append(security_group_id)
            updated_port = {'security_groups': port['security_groups']}
            try:
                LOG.info(_LI("Adding security group %(security_group_id)s to "
                             "port %(port_id)s"),
                         {'security_group_id': security_group_id,
                          'port_id': port['id']})
                neutron.update_port(port['id'], {'port': updated_port})
            except Exception:
                with excutils.save_and_reraise_exception():
                    LOG.exception(_LE("Neutron Error:"))

这段代码也比较简单清晰,先根据安全组的名称或id找到安全组,之后根据instance uuid找到云主机上的ports,最后遍历ports调用update_port接口将安全组更新到每个port上。

nova项目代码流程到此结束。

Neutron项目代码流程

使用nova命令行添加安全组到云主机的主要流程实际是在neutron项目中,neutron client会封装update_port并发送HTTP请求给neutron-server,neutron-server收到后转发给base controller,然后根据之前注册好的controller和url映射关系,以及HTTP method(method这里是PUT,url是/port,映射过来就是update_port),再动态找到处理请求的core plugin和extensions(这段代码流程需要分析neutron-server启动流程),我们配置的core plugin是ml2,因此首先转到neutron.plugins.ml2.plugin.Ml2Plugin#update_port:

    def update_port(self, context, id, port):
        attrs = port[attributes.PORT]
        need_port_update_notify = False
        session = context.session
        bound_mech_contexts = []

        with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
                session.begin(subtransactions=True):
            port_db, binding = db.get_locked_port_and_binding(session, id)
            if not port_db:
                raise exc.PortNotFound(port_id=id)
            mac_address_updated = self._check_mac_update_allowed(
                port_db, attrs, binding)
            need_port_update_notify |= mac_address_updated
            original_port = self._make_port_dict(port_db)
            updated_port = super(Ml2Plugin, self).update_port(context, id,
                                                              port)
            self.extension_manager.process_update_port(context, attrs,
                                                       updated_port)
            self._portsec_ext_port_update_processing(updated_port, context,
                                                     port, id)

            if (psec.PORTSECURITY in attrs) and (
                        original_port[psec.PORTSECURITY] !=
                        updated_port[psec.PORTSECURITY]):
                need_port_update_notify = True
            # TODO(QoS): Move out to the extension framework somehow.
            # Follow https://review.openstack.org/#/c/169223 for a solution.
            if (qos_consts.QOS_POLICY_ID in attrs and
                    original_port[qos_consts.QOS_POLICY_ID] !=
                    updated_port[qos_consts.QOS_POLICY_ID]):
                need_port_update_notify = True

            if addr_pair.ADDRESS_PAIRS in attrs:
                need_port_update_notify |= (
                    self.update_address_pairs_on_port(context, id, port,
                                                      original_port,
                                                      updated_port))
            need_port_update_notify |= self.update_security_group_on_port(
                context, id, port, original_port, updated_port)
            network = self.get_network(context, original_port['network_id'])
            need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
                context, id, port, updated_port)
            levels = db.get_binding_levels(session, id, binding.host)
            mech_context = driver_context.PortContext(
                self, context, updated_port, network, binding, levels,
                original_port=original_port)
            need_port_update_notify |= self._process_port_binding(
                mech_context, attrs)
            # For DVR router interface ports we need to retrieve the
            # DVRPortbinding context instead of the normal port context.
            # The normal Portbinding context does not have the status
            # of the ports that are required by the l2pop to process the
            # postcommit events.

            # NOTE:Sometimes during the update_port call, the DVR router
            # interface port may not have the port binding, so we cannot
            # create a generic bindinglist that will address both the
            # DVR and non-DVR cases here.
            # TODO(Swami): This code need to be revisited.
            if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
                dvr_binding_list = db.get_dvr_port_bindings(session, id)
                for dvr_binding in dvr_binding_list:
                    levels = db.get_binding_levels(session, id,
                                                   dvr_binding.host)
                    dvr_mech_context = driver_context.PortContext(
                        self, context, updated_port, network,
                        dvr_binding, levels, original_port=original_port)
                    self.mechanism_manager.update_port_precommit(
                        dvr_mech_context)
                    bound_mech_contexts.append(dvr_mech_context)
            else:
                self.mechanism_manager.update_port_precommit(mech_context)
                bound_mech_contexts.append(mech_context)

        # Notifications must be sent after the above transaction is complete
        kwargs = {
            'context': context,
            'port': updated_port,
            'mac_address_updated': mac_address_updated,
            'original_port': original_port,
        }
        registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)

        # Note that DVR Interface ports will have bindings on
        # multiple hosts, and so will have multiple mech_contexts,
        # while other ports typically have just one.
        # Since bound_mech_contexts has both the DVR and non-DVR
        # contexts we can manage just with a single for loop.
        try:
            for mech_context in bound_mech_contexts:
                self.mechanism_manager.update_port_postcommit(
                    mech_context)
        except ml2_exc.MechanismDriverError:
            LOG.error(_LE("mechanism_manager.update_port_postcommit "
                          "failed for port %s"), id)

        self.check_and_notify_security_group_member_changed(
            context, original_port, updated_port)
        need_port_update_notify |= self.is_security_group_member_updated(
            context, original_port, updated_port)

        if original_port['admin_state_up'] != updated_port['admin_state_up']:
            need_port_update_notify = True
        # NOTE: In the case of DVR ports, the port-binding is done after
        # router scheduling when sync_routers is called and so this call
        # below may not be required for DVR routed interfaces. But still
        # since we don't have the mech_context for the DVR router interfaces
        # at certain times, we just pass the port-context and return it, so
        # that we don't disturb other methods that are expecting a return
        # value.
        bound_context = self._bind_port_if_needed(
            mech_context,
            allow_notify=True,
            need_notify=need_port_update_notify)
        return bound_context.current

这个流程就比较复杂了,但我们要谨记一点,nova调用的update_port方法,传入的参数只有2个,一个是port_id,一个是包含security_groups属性的port信息:

port['security_groups'].append(security_group_id)
updated_port = {'security_groups': port['security_groups']}
neutron.update_port(port['id'], {'port': updated_port})

知道这些信息后,代码流程就可以忽略很多无关的分支,整理后的相关代码分支如下:

    def update_port(self, context, id, port):
        attrs = port[attributes.PORT]
        need_port_update_notify = False
        session = context.session
        bound_mech_contexts = []

        with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
                session.begin(subtransactions=True):
            port_db, binding = db.get_locked_port_and_binding(session, id)
            if not port_db:
                raise exc.PortNotFound(port_id=id)
            ### mac未更新

            original_port = self._make_port_dict(port_db)
            ### 更新db中port信息
            updated_port = super(Ml2Plugin, self).update_port(context, id,
                                                              port)
            ### attrs里没有port_security_enabled字段,只有security_groups字段
            
            ### attrs里也没有qos_policy_id字段

            ### attrs里也没有allowed_address_pairs字段
            
            ### 更新数据库中port的安全组信息
            need_port_update_notify |= self.update_security_group_on_port(
                context, id, port, original_port, updated_port)

            ### attrs里也没有extra_dhcp_opts字段

            ### port已经bind到host了,不需要再次bind
            
            ### port类型不属于DVR interface
            if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
                ......
            else:
                ### openvswitch mechanism driver没有实现update_port_precommit方法
                ### 应该是rpc到基类neutron.plugins.ml2.driver_api.MechanismDriver
                ### 的这个方法,然后直接pass了
                self.mechanism_manager.update_port_precommit(mech_context)
                bound_mech_contexts.append(mech_context)

        # Notifications must be sent after the above transaction is complete
        ### 通知各种extensions,port更新完成了,你们有啥要补充执行的操作也可以搞起来了
        registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)

        ### update_port_postcommit与update_port_precommit类似,也是pass了
        try:
            for mech_context in bound_mech_contexts:
                self.mechanism_manager.update_port_postcommit(
                    mech_context)
        except ml2_exc.MechanismDriverError:
            LOG.error(_LE("mechanism_manager.update_port_postcommit "
                          "failed for port %s"), id)

        ### 关键执行路径,下面单独分析
        self.check_and_notify_security_group_member_changed(
            context, original_port, updated_port)
        ### port的security_groups属性有更新,need_port_update_notify为True
        need_port_update_notify |= self.is_security_group_member_updated(
            context, original_port, updated_port)

        ### 不需要bind port

check_and_notify_security_group_member_changed这个方法是关键流程,它是在基类neutron.db.securitygroups_rpc_base.SecurityGroupServerRpcMixin实现的,用途是通过rpc发送port安全组更新通知给neutron-openvswitch-agent服务,我们单独分析下:

    def check_and_notify_security_group_member_changed(
            self, context, original_port, updated_port):
        sg_change = not utils.compare_elements(
            original_port.get(ext_sg.SECURITYGROUPS),
            updated_port.get(ext_sg.SECURITYGROUPS))
        if sg_change:  ### port上的安全组有变更,我们这次是add操作,从无到有新增一个安全组
            self.notify_security_groups_member_updated_bulk(
                context, [original_port, updated_port])
        elif original_port['fixed_ips'] != updated_port['fixed_ips']:
            self.notify_security_groups_member_updated(context, updated_port)
    ### SecurityGroupServerRpcMixin还是这个类的方法
    def notify_security_groups_member_updated_bulk(self, context, ports):
        """Notify update event of security group members for ports.

        sg_provider_updated_networks = set()
        sec_groups = set()
        for port in ports:
            ### port不是dhcp类型的也不是router类型的
            else:
                sec_groups |= set(port.get(ext_sg.SECURITYGROUPS))

        if sec_groups:
            self.notifier.security_groups_member_updated(
                context, list(sec_groups))  ### 通知agent安全组有更新

走到这里,就又遇到了代码三问,notifier是什么?从哪儿来?security_groups_member_updated到哪儿去找?因为上面的代码流程有很多基类和派生类之间的跳转,还是得回到最初调用过来的地方,然后一路再找回来,我们目前分析过的类的跳转关系如下:

  1. neutron.plugins.ml2.plugin.Ml2Plugin#update_port
  2. neutron.db.securitygroups_rpc_base.SecurityGroupServerRpcMixin#check_and_notify_security_group_member_changed

其中SecurityGroupServerRpcMixin是Ml2Plugin的基类,因此self.notifier不在SecurityGroupServerRpcMixin这里就肯定在Ml2Plugin里,我们在Ml2Plugin的__init__方法里没有直接找到self.notifier的初始化代码,但看到了self._start_rpc_notifiers()这个方法,里面有self.notifier初始化过程(其实我是直接搜索字符串”self.notifier = “找到的_start_rpc_notifiers,然后再找_start_rpc_notifiers的调用方,就找到了__init__,这也是我常用的·猥琐·查找代码方法,个人感觉比较快速有效):

    @log_helpers.log_method_call
    def _start_rpc_notifiers(self):
        """Initialize RPC notifiers for agents."""
        self.notifier = rpc.AgentNotifierApi(topics.AGENT)
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
        )
### neutron.plugins.ml2.rpc.AgentNotifierApi
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
                       sg_rpc.SecurityGroupAgentRpcApiMixin,  ### 注意这个基类
                       type_tunnel.TunnelAgentRpcApiMixin):
    """Agent side of the openvswitch rpc API.

我们要找的security_groups_member_updated方法在AgentNotifierApi类没有实现,只能从基类里面查找:

    ### neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupAgentRpcApiMixin
    def security_groups_member_updated(self, context, security_groups):
        """Notify member updated security groups."""
        if not security_groups:
            return
        cctxt = self.client.prepare(version=self.SG_RPC_VERSION,
                                    topic=self._get_security_group_topic(),
                                    fanout=True)
        cctxt.cast(context, 'security_groups_member_updated',
                   security_groups=security_groups)

上面的代码是发送rpc请求到agent端(neutron-openvswitch-agent端的rpc consumer注册流程,需要分析agent的启动流程),agent端的回调方法在:

    ### neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupAgentRpcCallbackMixin
    def security_groups_member_updated(self, context, **kwargs):
        """Callback for security group member update.

        :param security_groups: list of updated security_groups
        """
        security_groups = kwargs.get('security_groups', [])
        LOG.debug("Security group member updated on remote: %s",
                  security_groups)
        if not self.sg_agent:
            return self._security_groups_agent_not_set()
        self.sg_agent.security_groups_member_updated(security_groups)

接下来是self.sg_agent三问:是什么?从哪儿来?到哪里去?分析neutron-openvswitch-agent启动流程可以得知,self.sg_agent是在neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#__init__方法中定义并初始化的:

class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,   ### 注意这个基类
                      l2population_rpc.L2populationRpcCallBackTunnelMixin,
                      dvr_rpc.DVRAgentRpcCallbackMixin):
    ......
    def __init__(self, bridge_classes, conf=None):
         '''Constructor.
         ......
        # Security group agent support
        self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
                self.sg_plugin_rpc, self.local_vlan_map,
                defer_refresh_firewall=True,  ### 注意这个参数是True
                integration_bridge=self.int_br)
    ### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc
    ### self.sg_agent.security_groups_member_updated最终调用的就是这个方法
    def security_groups_member_updated(self, security_groups): 
        LOG.info(_LI("Security group "
                 "member updated %r"), security_groups)
        self._security_group_updated(
            security_groups,
            'security_group_source_groups',
            'sg_member')
    ### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc
    def _security_group_updated(self, security_groups, attribute, action_type):
        devices = []
        sec_grp_set = set(security_groups)
        for device in self.firewall.ports.values():
            if sec_grp_set & set(device.get(attribute, [])):
                devices.append(device['device'])
        if devices:
            if self.use_enhanced_rpc:  ### 通过发送rpc到服务端,确认是否可用,应该是True
                self.firewall.security_group_updated(action_type, sec_grp_set)
            if self.defer_refresh_firewall:  ### True,上面实例化的时候传入的参数
                ### 延迟刷新安全组,应该是openvswitch-agent的轮询任务会定期查看是否需要刷新
                ### 如果需要刷新,则走else里的refresh_firewall进行刷新
                LOG.debug("Adding %s devices to the list of devices "
                          "for which firewall needs to be refreshed",
                          devices)
                self.devices_to_refilter |= set(devices)
            else:
                self.refresh_firewall(devices)
    ### self.firewall的查找方法也是顺着类的继承关系往基类找,根据配置文件,我们使用的firewall driver是
    ### neutron.agent.linux.openvswitch_firewall.firewall.OVSFirewallDriver
    def security_group_updated(self, action_type, sec_group_ids,
                               device_ids=None):
        """This method is obsolete

        The current driver only supports enhanced rpc calls into security group
        agent. This method is never called from that place.
        """
        ### 啥也没干

因此只能等轮询过程调用refresh_firewall了,分析neutron-openvswitch-agent启动流程,可以知道轮询执行流程如下:

  1. neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.main
  2. neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#daemon_loop
  3. neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#rpc_loop
  4. neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#process_network_ports
  5. neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc#setup_port_filters
  6. neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc#refresh_firewall
ipdb> bt
  /usr/bin/neutron-openvswitch-agent(10)<module>()
      8 
      9 if __name__ == "__main__":
---> 10     sys.exit(main())

  /opt/stack/neutron/neutron/cmd/eventlet/plugins/ovs_neutron_agent.py(20)main()
     18 
     19 def main():
---> 20     agent_main.main()

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/main.py(49)main()
     47     common_config.setup_logging()
     48     n_utils.log_opt_values(LOG)
---> 49     mod.main()

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/main.py(36)main()
     34         'br_tun': br_tun.OVSTunnelBridge,
     35     }
---> 36     ovs_neutron_agent.main(bridge_classes)

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2156)main()
   2154         LOG.error(_LE("%s Agent terminated!"), e)
   2155         sys.exit(1)
-> 2156     agent.daemon_loop()

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2079)daemon_loop()
   2078 
-> 2079             self.rpc_loop(polling_manager=pm)
   2080 

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2029)rpc_loop()
   2028                         failed_devices = self.process_network_ports(
-> 2029                             port_info, ovs_restarted)
   2030                         if need_clean_stale_flow:

  /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(1643)process_network_ports()
   1642         self.sg_agent.setup_port_filters(added_ports,
-> 1643                                          port_info.get('updated', set()))
   1644         failed_devices['added'] |= self._bind_devices(need_binding_devices)

  /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(310)setup_port_filters()
    309                           len(updated_devices))
--> 310                 self.refresh_firewall(updated_devices)
    311 

  /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(147)decorated_function()
    146                 return func(self,  # pylint: disable=not-callable
--> 147                             *args, **kwargs)
    148         return decorated_function

  /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(263)refresh_firewall()
    262                 LOG.debug("Update port filter for %s", device['device'])
--> 263                 self.firewall.update_port_filter(device)
    264 

  /opt/stack/neutron/neutron/agent/linux/openvswitch_firewall/firewall.py(297)update_port_filter()
    296         self.initialize_port_flows(of_port)
--> 297         self.add_flows_from_rules(of_port)
    298 

> /opt/stack/neutron/neutron/agent/linux/openvswitch_firewall/firewall.py(667)add_flows_from_rules()
    666         LOG.debug('Creating flow rules for port %s that is port %d in OVS',
1-> 667                   port.id, port.ofport)
    668         rules_generator = self.create_rules_generator_for_port(port)

2017-09-20补充:ipdb加断点调试后确认之前分析的流程是正确的。

上述流程没有仔细分析,但应该差不多就是这样了。

    ### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc
    @skip_if_noopfirewall_or_firewall_disabled
    def refresh_firewall(self, device_ids=None):
        LOG.info(_LI("Refresh firewall rules"))
        if not device_ids:
            device_ids = self.firewall.ports.keys()
            if not device_ids:
                LOG.info(_LI("No ports here to refresh firewall"))
                return
        if self.use_enhanced_rpc:
            devices_info = self.plugin_rpc.security_group_info_for_devices(
                self.context, device_ids)
            devices = devices_info['devices']
            security_groups = devices_info['security_groups']
            security_group_member_ips = devices_info['sg_member_ips']
        else:
            devices = self.plugin_rpc.security_group_rules_for_devices(
                self.context, device_ids)

        with self.firewall.defer_apply():
            if self.use_enhanced_rpc:
                LOG.debug("Update security group information for ports %s",
                          devices.keys())
                self._update_security_group_info(
                    security_groups, security_group_member_ips)
            for device in devices.values():
                LOG.debug("Update port filter for %s", device['device'])
                self.firewall.update_port_filter(device)   ### 这个是关键流程
    ### neutron.agent.linux.openvswitch_firewall.firewall.OVSFirewallDriver
    def update_port_filter(self, port):
        """Update rules for given port

        Current existing filtering rules are removed and new ones are generated
        based on current loaded security group rules and members.

        """
        if not firewall.port_sec_enabled(port):
            self.remove_port_filter(port)
            return
        elif not self.is_port_managed(port):
            self.prepare_port_filter(port)
            return
        ### 获取neutron.agent.linux.openvswitch_firewall.firewall.OFPort对象
        of_port = self.get_or_create_ofport(port)
        # TODO(jlibosva): Handle firewall blink
        self.delete_all_port_flows(of_port)
        self.initialize_port_flows(of_port)
        self.add_flows_from_rules(of_port)

self.delete_all_port_flows(of_port) 、self.initialize_port_flows(of_port) 、self.add_flows_from_rules(of_port)这三个方法分别执行清空port上的openflow流表、初始化流表、根据安全组规则添加流,具体执行流程不再分析,最终是通过ovs-ofctl命令进行br-int网桥上的流表规则的下发,这其中还牵涉到安全组规则到openflow流规则的转换过程。

这部分也没有来得及仔细分析,感觉应该是在启动过程中注册的br-int网桥的driver(neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:__init__方法有self.int_br = self.br_int_cls(ovs_conf.integration_bridge)初始化)。

关于rpc类的命名和用途的联系,我参考这些RPC类的注释分析相关类的用途后,得到到结论如下(不知道是否正确):

  • SecurityGroupServerRpcApi:注意里面的”ServerRpc”字段,表明这是发送到neutron-server的rpc client,因此调用方(client端)是agent或者extension,neutron-server是server端
  • SecurityGroupServerRpcCallback:对应上面rpc client发送请求的rpc server端的回调方法,用来处理rpc请求并返回结果(返回结果仅限call类型的同步的rpc请求,cast异步请求不返回)
  • SecurityGroupAgentRpcApiMixin:”AgentRpc”表明是发送到agent或者extension的rpc client,因此调用方(rpc client端)neutron-server,rpc server端是neutron-openvswitch-agent或其他extension
  • SecurityGroupAgentRpcCallbackMixin:对应上面rpc client请求的rpc server端的回调处理方法

其它rpc类的命名和用途的联系也是类似情况,知道这个规则,可以更简单的理解rpc调用关系以及代码是在哪个服务里执行的。