依赖知识点
- neutron-server启动流程(包含处理HTTP请求的core plugin和extensions注册流程)
- neutron-openvswitch-agent启动流程
- neutron openvswitch(或Linux bridge)firewall安全组配置方案
- neutron安全组创建、安全组规则创建流程
- neutron rpc及callback机制
- nova client(或openstack client)命令行及代码流程
- nova-api启动流程(包含处理HTTP请求的controller注册流程)
- nova-compute启动流程
- nova云主机创建流程
- WSGI路由规则
- paste deploy WSGI框架
我们可以使用nova命令行给一台云主机(或者叫虚拟机、实例,下面统一叫云主机)添加安全组(前提是云主机已经成功创建,neutron安全组和规则也都创建完毕),其命令行如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
[root@vs2-controller ~(VisionStack)]$ nova secgroup-list ### 列出安全组 +--------------------------------------+------------+------------------------+ | Id | Name | Description | +--------------------------------------+------------+------------------------+ | d9cdf493-bd74-4189-8065-21f0d24cd503 | 1 | | | c8f3fb48-f203-4f92-ac7e-0bcad86f6add | default | Default security group | | 8eb76d3e-1f74-4261-963a-c061669d1e7f | hello1234 | | | 9fc5cfa4-7787-4c15-b496-1a8575fbe15f | hello41231 | hello4wqeqw | [root@vs2-controller ~(VisionStack)]$ nova list | grep e27f9125 ### 列出云主机 | e27f9125-0465-4f05-b2ad-35c5ab1cc979 | wkm-test | ACTIVE | - | Running | network1=10.0.0.10 | [root@vs2-controller ~(VisionStack)]$ nova add-secgroup e27f9125-0465-4f05-b2ad-35c5ab1cc979 9fc5cfa4-7787-4c15-b496-1a8575fbe15f ### 添加安全组到云主机 [root@vs2-controller ~(VisionStack)]$ nova show e27f9125-0465-4f05-b2ad-35c5ab1cc979 | grep security_groups ### 查看云主机上已添加的安全组 | security_groups | hello41231 |
本文的目的就是分析一下“nova add-secgroup”这个操作在nova项目和neutron项目中,具体执行到的代码流程,分析的相关项目的OpenStack版本是Mitaka。
本人水平有限,如有谬误,请不吝指正!
Nova项目代码流程
nova部分的代码流程比较简单,nova add-secgroup命令会通过client封装并发送HTTP请求到nova-api服务,对应的curl命令发送方式为(可以通过nova –debug add-secgroup $UUID $SGID获得):
1 |
curl -g -i -X POST http://vs2-controller:8774/v2.1/8005649576f14399a88e38ef7e17a831/servers/e27f9125-0465-4f05-b2ad-35c5ab1cc979/action -H "User-Agent: python-novaclient" -H "Content-Type: application/json" -H "Accept: application/json" -H "X-OpenStack-Nova-API-Version: 2.25" -H "X-Auth-Token: {SHA1}cbb848c1305c0e63a495b03f19bb9f74fb30a9f8" -d '{"addSecurityGroup": {"name": "9fc5cfa4-7787-4c15-b496-1a8575fbe15f"}}' |
nova-api接受到请求后,会交给nova.api.openstack.compute.security_groups.SecurityGroupActionController进行实际的处理,对应的处理方法为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@extensions.expected_errors((400, 404, 409)) @wsgi.response(202) @wsgi.action('addSecurityGroup') def _addSecurityGroup(self, req, id, body): context = req.environ['nova.context'] authorize(context) group_name = self._parse(body, 'addSecurityGroup') try: return self._invoke(self.security_group_api.add_to_instance, context, id, group_name) except (exception.SecurityGroupNotFound, exception.InstanceNotFound) as exp: raise exc.HTTPNotFound(explanation=exp.format_message()) except exception.NoUniqueMatch as exp: raise exc.HTTPConflict(explanation=exp.format_message()) except (exception.SecurityGroupCannotBeApplied, exception.SecurityGroupExistsForInstance) as exp: raise exc.HTTPBadRequest(explanation=exp.format_message()) |
这段代码比较简单,_invoke没什么好说的,看一眼就明白,主要是self.security_group_api这个是什么要搞清楚,因为它影响接下来的代码流程。一般来讲,类里面的属性都是在__init__里面初始化的,我们首先去那边找找看:
1 2 3 4 5 6 7 8 |
class SecurityGroupActionController(wsgi.Controller): def __init__(self, *args, **kwargs): super(SecurityGroupActionController, self).__init__(*args, **kwargs) self.security_group_api = ( openstack_driver.get_openstack_security_group_driver( skip_policy_check=True)) self.compute_api = compute.API( security_group_api=self.security_group_api, skip_policy_check=True) |
是的,就是在这里初始化的。这里又涉及到openstack_driver是什么从哪儿来的问题(我看代码的三问:是什么?从哪儿来?到哪儿去?)。在nova\network\security_group\openstack_driver.py找到get_openstack_security_group_driver:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
NOVA_DRIVER = ('nova.compute.api.SecurityGroupAPI') NEUTRON_DRIVER = ('nova.network.security_group.neutron_driver.' 'SecurityGroupAPI') DRIVER_CACHE = {} def _get_openstack_security_group_driver(skip_policy_check=False): if is_neutron_security_groups(): return importutils.import_object(NEUTRON_DRIVER, skip_policy_check=skip_policy_check) elif CONF.security_group_api.lower() == 'nova': return importutils.import_object(NOVA_DRIVER, skip_policy_check=skip_policy_check) else: return importutils.import_object(CONF.security_group_api, skip_policy_check=skip_policy_check) def get_openstack_security_group_driver(skip_policy_check=False): if skip_policy_check not in DRIVER_CACHE: DRIVER_CACHE[skip_policy_check] = _get_openstack_security_group_driver( skip_policy_check) return DRIVER_CACHE[skip_policy_check] def is_neutron_security_groups(): return (CONF.security_group_api.lower() == 'neutron' or nova.network.is_neutron()) |
可以看到,_get_openstack_security_group_driver这个方法会import对应的类,这里还有一个is_neutron_security_groups方法,CONF.security_group_api.lower() == ‘neutron’这半个条件容易迷惑人,因为这个配置项的默认值是’nova’,因此实际上是后面的nova.network.is_neutron()条件才有效,找到nova\network\__init__.py中的is_neutron:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def is_neutron(): """Does this configuration mean we're neutron. This logic exists as a separate config option """ legacy_class = oslo_config.cfg.CONF.network_api_class use_neutron = oslo_config.cfg.CONF.use_neutron if legacy_class not in (NEUTRON_NET_API, NOVA_NET_API): # Someone actually used this option, this gets a pass for now, # but will just go away once deleted. return None elif legacy_class == NEUTRON_NET_API and not use_neutron: # If they specified neutron via class, we should respect that LOG.warn(_LW("Config mismatch. The network_api_class specifies %s, " "however use_neutron is not set to True. Using Neutron " "networking for now, however please set use_neutron to " "True in your configuration as network_api_class is " "deprecated and will be removed."), legacy_class) return True elif use_neutron: return True else: return False |
可以看到network_api_class、use_neutron这两个配置项,根据我们实际环境的配置(/etc/nova/nova.conf),network_api_class未配置为默认值,use_neutron=True,因此is_neutron()返回True。再回到openstack_driver.py和SecurityGroupActionController,最终确定security_group_api是NEUTRON_DRIVER = (‘nova.network.security_group.neutron_driver.SecurityGroupAPI’)。因此self.security_group_api.add_to_instance调用的是nova.network.security_group.neutron_driver.SecurityGroupAPI#add_to_instance方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
@compute_api.wrap_check_security_groups_policy def add_to_instance(self, context, instance, security_group_name): """Add security group to the instance.""" neutron = neutronapi.get_client(context) try: security_group_id = neutronv20.find_resourceid_by_name_or_id( neutron, 'security_group', security_group_name, context.project_id) except n_exc.NeutronClientNoUniqueMatch as e: raise exception.NoUniqueMatch(six.text_type(e)) except n_exc.NeutronClientException as e: exc_info = sys.exc_info() if e.status_code == 404: msg = (_("Security group %(name)s is not found for " "project %(project)s") % {'name': security_group_name, 'project': context.project_id}) self.raise_not_found(msg) else: LOG.exception(_LE("Neutron Error:")) six.reraise(*exc_info) params = {'device_id': instance.uuid} try: ports = neutron.list_ports(**params).get('ports') except n_exc.NeutronClientException: with excutils.save_and_reraise_exception(): LOG.exception(_LE("Neutron Error:")) if not ports: msg = (_("instance_id %s could not be found as device id on" " any ports") % instance.uuid) self.raise_not_found(msg) for port in ports: if not self._has_security_group_requirements(port): LOG.warning(_LW("Cannot add security group %(name)s to " "%(instance)s since the port %(port_id)s " "does not meet security requirements"), {'name': security_group_name, 'instance': instance.uuid, 'port_id': port['id']}) raise exception.SecurityGroupCannotBeApplied() if 'security_groups' not in port: port['security_groups'] = [] port['security_groups'].append(security_group_id) updated_port = {'security_groups': port['security_groups']} try: LOG.info(_LI("Adding security group %(security_group_id)s to " "port %(port_id)s"), {'security_group_id': security_group_id, 'port_id': port['id']}) neutron.update_port(port['id'], {'port': updated_port}) except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE("Neutron Error:")) |
这段代码也比较简单清晰,先根据安全组的名称或id找到安全组,之后根据instance uuid找到云主机上的ports,最后遍历ports调用update_port接口将安全组更新到每个port上。
nova项目代码流程到此结束。
Neutron项目代码流程
使用nova命令行添加安全组到云主机的主要流程实际是在neutron项目中,neutron client会封装update_port并发送HTTP请求给neutron-server,neutron-server收到后转发给base controller,然后根据之前注册好的controller和url映射关系,以及HTTP method(method这里是PUT,url是/port,映射过来就是update_port),再动态找到处理请求的core plugin和extensions(这段代码流程需要分析neutron-server启动流程),我们配置的core plugin是ml2,因此首先转到neutron.plugins.ml2.plugin.Ml2Plugin#update_port:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False session = context.session bound_mech_contexts = [] with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\ session.begin(subtransactions=True): port_db, binding = db.get_locked_port_and_binding(session, id) if not port_db: raise exc.PortNotFound(port_id=id) mac_address_updated = self._check_mac_update_allowed( port_db, attrs, binding) need_port_update_notify |= mac_address_updated original_port = self._make_port_dict(port_db) updated_port = super(Ml2Plugin, self).update_port(context, id, port) self.extension_manager.process_update_port(context, attrs, updated_port) self._portsec_ext_port_update_processing(updated_port, context, port, id) if (psec.PORTSECURITY in attrs) and ( original_port[psec.PORTSECURITY] != updated_port[psec.PORTSECURITY]): need_port_update_notify = True # TODO(QoS): Move out to the extension framework somehow. # Follow https://review.openstack.org/#/c/169223 for a solution. if (qos_consts.QOS_POLICY_ID in attrs and original_port[qos_consts.QOS_POLICY_ID] != updated_port[qos_consts.QOS_POLICY_ID]): need_port_update_notify = True if addr_pair.ADDRESS_PAIRS in attrs: need_port_update_notify |= ( self.update_address_pairs_on_port(context, id, port, original_port, updated_port)) need_port_update_notify |= self.update_security_group_on_port( context, id, port, original_port, updated_port) network = self.get_network(context, original_port['network_id']) need_port_update_notify |= self._update_extra_dhcp_opts_on_port( context, id, port, updated_port) levels = db.get_binding_levels(session, id, binding.host) mech_context = driver_context.PortContext( self, context, updated_port, network, binding, levels, original_port=original_port) need_port_update_notify |= self._process_port_binding( mech_context, attrs) # For DVR router interface ports we need to retrieve the # DVRPortbinding context instead of the normal port context. # The normal Portbinding context does not have the status # of the ports that are required by the l2pop to process the # postcommit events. # NOTE:Sometimes during the update_port call, the DVR router # interface port may not have the port binding, so we cannot # create a generic bindinglist that will address both the # DVR and non-DVR cases here. # TODO(Swami): This code need to be revisited. if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: dvr_binding_list = db.get_dvr_port_bindings(session, id) for dvr_binding in dvr_binding_list: levels = db.get_binding_levels(session, id, dvr_binding.host) dvr_mech_context = driver_context.PortContext( self, context, updated_port, network, dvr_binding, levels, original_port=original_port) self.mechanism_manager.update_port_precommit( dvr_mech_context) bound_mech_contexts.append(dvr_mech_context) else: self.mechanism_manager.update_port_precommit(mech_context) bound_mech_contexts.append(mech_context) # Notifications must be sent after the above transaction is complete kwargs = { 'context': context, 'port': updated_port, 'mac_address_updated': mac_address_updated, 'original_port': original_port, } registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs) # Note that DVR Interface ports will have bindings on # multiple hosts, and so will have multiple mech_contexts, # while other ports typically have just one. # Since bound_mech_contexts has both the DVR and non-DVR # contexts we can manage just with a single for loop. try: for mech_context in bound_mech_contexts: self.mechanism_manager.update_port_postcommit( mech_context) except ml2_exc.MechanismDriverError: LOG.error(_LE("mechanism_manager.update_port_postcommit " "failed for port %s"), id) self.check_and_notify_security_group_member_changed( context, original_port, updated_port) need_port_update_notify |= self.is_security_group_member_updated( context, original_port, updated_port) if original_port['admin_state_up'] != updated_port['admin_state_up']: need_port_update_notify = True # NOTE: In the case of DVR ports, the port-binding is done after # router scheduling when sync_routers is called and so this call # below may not be required for DVR routed interfaces. But still # since we don't have the mech_context for the DVR router interfaces # at certain times, we just pass the port-context and return it, so # that we don't disturb other methods that are expecting a return # value. bound_context = self._bind_port_if_needed( mech_context, allow_notify=True, need_notify=need_port_update_notify) return bound_context.current |
这个流程就比较复杂了,但我们要谨记一点,nova调用的update_port方法,传入的参数只有2个,一个是port_id,一个是包含security_groups属性的port信息:
1 2 3 |
port['security_groups'].append(security_group_id) updated_port = {'security_groups': port['security_groups']} neutron.update_port(port['id'], {'port': updated_port}) |
知道这些信息后,代码流程就可以忽略很多无关的分支,整理后的相关代码分支如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False session = context.session bound_mech_contexts = [] with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\ session.begin(subtransactions=True): port_db, binding = db.get_locked_port_and_binding(session, id) if not port_db: raise exc.PortNotFound(port_id=id) ### mac未更新 original_port = self._make_port_dict(port_db) ### 更新db中port信息 updated_port = super(Ml2Plugin, self).update_port(context, id, port) ### attrs里没有port_security_enabled字段,只有security_groups字段 ### attrs里也没有qos_policy_id字段 ### attrs里也没有allowed_address_pairs字段 ### 更新数据库中port的安全组信息 need_port_update_notify |= self.update_security_group_on_port( context, id, port, original_port, updated_port) ### attrs里也没有extra_dhcp_opts字段 ### port已经bind到host了,不需要再次bind ### port类型不属于DVR interface if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: ...... else: ### openvswitch mechanism driver没有实现update_port_precommit方法 ### 应该是rpc到基类neutron.plugins.ml2.driver_api.MechanismDriver ### 的这个方法,然后直接pass了 self.mechanism_manager.update_port_precommit(mech_context) bound_mech_contexts.append(mech_context) # Notifications must be sent after the above transaction is complete ### 通知各种extensions,port更新完成了,你们有啥要补充执行的操作也可以搞起来了 registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs) ### update_port_postcommit与update_port_precommit类似,也是pass了 try: for mech_context in bound_mech_contexts: self.mechanism_manager.update_port_postcommit( mech_context) except ml2_exc.MechanismDriverError: LOG.error(_LE("mechanism_manager.update_port_postcommit " "failed for port %s"), id) ### 关键执行路径,下面单独分析 self.check_and_notify_security_group_member_changed( context, original_port, updated_port) ### port的security_groups属性有更新,need_port_update_notify为True need_port_update_notify |= self.is_security_group_member_updated( context, original_port, updated_port) ### 不需要bind port |
check_and_notify_security_group_member_changed这个方法是关键流程,它是在基类neutron.db.securitygroups_rpc_base.SecurityGroupServerRpcMixin实现的,用途是通过rpc发送port安全组更新通知给neutron-openvswitch-agent服务,我们单独分析下:
1 2 3 4 5 6 7 8 9 10 |
def check_and_notify_security_group_member_changed( self, context, original_port, updated_port): sg_change = not utils.compare_elements( original_port.get(ext_sg.SECURITYGROUPS), updated_port.get(ext_sg.SECURITYGROUPS)) if sg_change: ### port上的安全组有变更,我们这次是add操作,从无到有新增一个安全组 self.notify_security_groups_member_updated_bulk( context, [original_port, updated_port]) elif original_port['fixed_ips'] != updated_port['fixed_ips']: self.notify_security_groups_member_updated(context, updated_port) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
### SecurityGroupServerRpcMixin还是这个类的方法 def notify_security_groups_member_updated_bulk(self, context, ports): """Notify update event of security group members for ports. sg_provider_updated_networks = set() sec_groups = set() for port in ports: ### port不是dhcp类型的也不是router类型的 else: sec_groups |= set(port.get(ext_sg.SECURITYGROUPS)) if sec_groups: self.notifier.security_groups_member_updated( context, list(sec_groups)) ### 通知agent安全组有更新 |
走到这里,就又遇到了代码三问,notifier是什么?从哪儿来?security_groups_member_updated到哪儿去找?因为上面的代码流程有很多基类和派生类之间的跳转,还是得回到最初调用过来的地方,然后一路再找回来,我们目前分析过的类的跳转关系如下:
- neutron.plugins.ml2.plugin.Ml2Plugin#update_port
- neutron.db.securitygroups_rpc_base.SecurityGroupServerRpcMixin#check_and_notify_security_group_member_changed
其中SecurityGroupServerRpcMixin是Ml2Plugin的基类,因此self.notifier不在SecurityGroupServerRpcMixin这里就肯定在Ml2Plugin里,我们在Ml2Plugin的__init__方法里没有直接找到self.notifier的初始化代码,但看到了self._start_rpc_notifiers()这个方法,里面有self.notifier初始化过程(其实我是直接搜索字符串”self.notifier = “找到的_start_rpc_notifiers,然后再找_start_rpc_notifiers的调用方,就找到了__init__,这也是我常用的·猥琐·查找代码方法,个人感觉比较快速有效):
1 2 3 4 5 6 7 |
@log_helpers.log_method_call def _start_rpc_notifiers(self): """Initialize RPC notifiers for agents.""" self.notifier = rpc.AgentNotifierApi(topics.AGENT) self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() ) |
1 2 3 4 5 |
### neutron.plugins.ml2.rpc.AgentNotifierApi class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, ### 注意这个基类 type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. |
我们要找的security_groups_member_updated方法在AgentNotifierApi类没有实现,只能从基类里面查找:
1 2 3 4 5 6 7 8 9 10 |
### neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupAgentRpcApiMixin def security_groups_member_updated(self, context, security_groups): """Notify member updated security groups.""" if not security_groups: return cctxt = self.client.prepare(version=self.SG_RPC_VERSION, topic=self._get_security_group_topic(), fanout=True) cctxt.cast(context, 'security_groups_member_updated', security_groups=security_groups) |
上面的代码是发送rpc请求到agent端(neutron-openvswitch-agent端的rpc consumer注册流程,需要分析agent的启动流程),agent端的回调方法在:
1 2 3 4 5 6 7 8 9 10 11 12 |
### neutron.api.rpc.handlers.securitygroups_rpc.SecurityGroupAgentRpcCallbackMixin def security_groups_member_updated(self, context, **kwargs): """Callback for security group member update. :param security_groups: list of updated security_groups """ security_groups = kwargs.get('security_groups', []) LOG.debug("Security group member updated on remote: %s", security_groups) if not self.sg_agent: return self._security_groups_agent_not_set() self.sg_agent.security_groups_member_updated(security_groups) |
接下来是self.sg_agent三问:是什么?从哪儿来?到哪里去?分析neutron-openvswitch-agent启动流程可以得知,self.sg_agent是在neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#__init__方法中定义并初始化的:
1 2 3 4 5 6 7 8 9 10 11 12 |
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, ### 注意这个基类 l2population_rpc.L2populationRpcCallBackTunnelMixin, dvr_rpc.DVRAgentRpcCallbackMixin): ...... def __init__(self, bridge_classes, conf=None): '''Constructor. ...... # Security group agent support self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context, self.sg_plugin_rpc, self.local_vlan_map, defer_refresh_firewall=True, ### 注意这个参数是True integration_bridge=self.int_br) |
1 2 3 4 5 6 7 8 9 |
### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc ### self.sg_agent.security_groups_member_updated最终调用的就是这个方法 def security_groups_member_updated(self, security_groups): LOG.info(_LI("Security group " "member updated %r"), security_groups) self._security_group_updated( security_groups, 'security_group_source_groups', 'sg_member') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc def _security_group_updated(self, security_groups, attribute, action_type): devices = [] sec_grp_set = set(security_groups) for device in self.firewall.ports.values(): if sec_grp_set & set(device.get(attribute, [])): devices.append(device['device']) if devices: if self.use_enhanced_rpc: ### 通过发送rpc到服务端,确认是否可用,应该是True self.firewall.security_group_updated(action_type, sec_grp_set) if self.defer_refresh_firewall: ### True,上面实例化的时候传入的参数 ### 延迟刷新安全组,应该是openvswitch-agent的轮询任务会定期查看是否需要刷新 ### 如果需要刷新,则走else里的refresh_firewall进行刷新 LOG.debug("Adding %s devices to the list of devices " "for which firewall needs to be refreshed", devices) self.devices_to_refilter |= set(devices) else: self.refresh_firewall(devices) |
1 2 3 4 5 6 7 8 9 10 |
### self.firewall的查找方法也是顺着类的继承关系往基类找,根据配置文件,我们使用的firewall driver是 ### neutron.agent.linux.openvswitch_firewall.firewall.OVSFirewallDriver def security_group_updated(self, action_type, sec_group_ids, device_ids=None): """This method is obsolete The current driver only supports enhanced rpc calls into security group agent. This method is never called from that place. """ ### 啥也没干 |
因此只能等轮询过程调用refresh_firewall了,分析neutron-openvswitch-agent启动流程,可以知道轮询执行流程如下:
- neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.main
- neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#daemon_loop
- neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#rpc_loop
- neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent.OVSNeutronAgent#process_network_ports
- neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc#setup_port_filters
- neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc#refresh_firewall
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
ipdb> bt /usr/bin/neutron-openvswitch-agent(10)<module>() 8 9 if __name__ == "__main__": ---> 10 sys.exit(main()) /opt/stack/neutron/neutron/cmd/eventlet/plugins/ovs_neutron_agent.py(20)main() 18 19 def main(): ---> 20 agent_main.main() /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/main.py(49)main() 47 common_config.setup_logging() 48 n_utils.log_opt_values(LOG) ---> 49 mod.main() /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/main.py(36)main() 34 'br_tun': br_tun.OVSTunnelBridge, 35 } ---> 36 ovs_neutron_agent.main(bridge_classes) /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2156)main() 2154 LOG.error(_LE("%s Agent terminated!"), e) 2155 sys.exit(1) -> 2156 agent.daemon_loop() /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2079)daemon_loop() 2078 -> 2079 self.rpc_loop(polling_manager=pm) 2080 /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(2029)rpc_loop() 2028 failed_devices = self.process_network_ports( -> 2029 port_info, ovs_restarted) 2030 if need_clean_stale_flow: /opt/stack/neutron/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py(1643)process_network_ports() 1642 self.sg_agent.setup_port_filters(added_ports, -> 1643 port_info.get('updated', set())) 1644 failed_devices['added'] |= self._bind_devices(need_binding_devices) /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(310)setup_port_filters() 309 len(updated_devices)) --> 310 self.refresh_firewall(updated_devices) 311 /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(147)decorated_function() 146 return func(self, # pylint: disable=not-callable --> 147 *args, **kwargs) 148 return decorated_function /opt/stack/neutron/neutron/agent/securitygroups_rpc.py(263)refresh_firewall() 262 LOG.debug("Update port filter for %s", device['device']) --> 263 self.firewall.update_port_filter(device) 264 /opt/stack/neutron/neutron/agent/linux/openvswitch_firewall/firewall.py(297)update_port_filter() 296 self.initialize_port_flows(of_port) --> 297 self.add_flows_from_rules(of_port) 298 > /opt/stack/neutron/neutron/agent/linux/openvswitch_firewall/firewall.py(667)add_flows_from_rules() 666 LOG.debug('Creating flow rules for port %s that is port %d in OVS', 1-> 667 port.id, port.ofport) 668 rules_generator = self.create_rules_generator_for_port(port) |
2017-09-20补充:ipdb加断点调试后确认之前分析的流程是正确的。
上述流程没有仔细分析,但应该差不多就是这样了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
### neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc @skip_if_noopfirewall_or_firewall_disabled def refresh_firewall(self, device_ids=None): LOG.info(_LI("Refresh firewall rules")) if not device_ids: device_ids = self.firewall.ports.keys() if not device_ids: LOG.info(_LI("No ports here to refresh firewall")) return if self.use_enhanced_rpc: devices_info = self.plugin_rpc.security_group_info_for_devices( self.context, device_ids) devices = devices_info['devices'] security_groups = devices_info['security_groups'] security_group_member_ips = devices_info['sg_member_ips'] else: devices = self.plugin_rpc.security_group_rules_for_devices( self.context, device_ids) with self.firewall.defer_apply(): if self.use_enhanced_rpc: LOG.debug("Update security group information for ports %s", devices.keys()) self._update_security_group_info( security_groups, security_group_member_ips) for device in devices.values(): LOG.debug("Update port filter for %s", device['device']) self.firewall.update_port_filter(device) ### 这个是关键流程 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
### neutron.agent.linux.openvswitch_firewall.firewall.OVSFirewallDriver def update_port_filter(self, port): """Update rules for given port Current existing filtering rules are removed and new ones are generated based on current loaded security group rules and members. """ if not firewall.port_sec_enabled(port): self.remove_port_filter(port) return elif not self.is_port_managed(port): self.prepare_port_filter(port) return ### 获取neutron.agent.linux.openvswitch_firewall.firewall.OFPort对象 of_port = self.get_or_create_ofport(port) # TODO(jlibosva): Handle firewall blink self.delete_all_port_flows(of_port) self.initialize_port_flows(of_port) self.add_flows_from_rules(of_port) |
self.delete_all_port_flows(of_port) 、self.initialize_port_flows(of_port) 、self.add_flows_from_rules(of_port)这三个方法分别执行清空port上的openflow流表、初始化流表、根据安全组规则添加流,具体执行流程不再分析,最终是通过ovs-ofctl命令进行br-int网桥上的流表规则的下发,这其中还牵涉到安全组规则到openflow流规则的转换过程。
这部分也没有来得及仔细分析,感觉应该是在启动过程中注册的br-int网桥的driver(neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py:__init__方法有self.int_br = self.br_int_cls(ovs_conf.integration_bridge)初始化)。
关于rpc类的命名和用途的联系,我参考这些RPC类的注释分析相关类的用途后,得到到结论如下(不知道是否正确):
- SecurityGroupServerRpcApi:注意里面的”ServerRpc”字段,表明这是发送到neutron-server的rpc client,因此调用方(client端)是agent或者extension,neutron-server是server端
- SecurityGroupServerRpcCallback:对应上面rpc client发送请求的rpc server端的回调方法,用来处理rpc请求并返回结果(返回结果仅限call类型的同步的rpc请求,cast异步请求不返回)
- SecurityGroupAgentRpcApiMixin:”AgentRpc”表明是发送到agent或者extension的rpc client,因此调用方(rpc client端)neutron-server,rpc server端是neutron-openvswitch-agent或其他extension
- SecurityGroupAgentRpcCallbackMixin:对应上面rpc client请求的rpc server端的回调处理方法
其它rpc类的命名和用途的联系也是类似情况,知道这个规则,可以更简单的理解rpc调用关系以及代码是在哪个服务里执行的。