什么是过滤器?
再前面的几个章节中我们已经实现了将我们的网关服务注册到注册中心,并且成功的从配置中心拉取了配置。 那么接下来我们就要开始实现一个网关服务的核心了,也就是过滤器链。 过滤器链是由多个过滤器组成的,一个过滤器执行完毕其过滤流程之后,会转发该请求到下一个过滤器继续执行。从而完成对请求和响应的处理。 并且如果了解SpringCloudGateway的,就会知道,过滤器分为全局过滤器和局部过滤器。 前者对所有请求进行处理,而局部过滤器SpringCloud已经默认帮助我们实现了,当然我们也可以自己继承并实现。
过滤器按照链条的方式对请求进行处理,如果了解网关项目的人应该是知道的,当所有过滤器请求处理完毕之后,会有一个路由过滤器将请求发送到对应的后台服务进行处理,也就是将请求进行转发,当后台服务处理完毕之后,就会再次返回请求。 如果再过滤器链处理请求的过程中出现了异常,我们也可以使用过滤器链的方式来进行捕获。 而如果请求正常转发并且处理完毕了,我们就可以使用 context.writeAndFlush方法将数据写回并返回。大概的流程如下:
可以通过这篇文章简单了解一下Gateway中的网关 SpringCLoudGateway实现URL加密与数字签名
了解完毕过滤器的简单概念之后,我们开始分析如何实现过滤器。 首先按照SpringCloudGateway(下文统称scg)的方法,定义一个Filter顶层接口。并且我们需要实现Ordered接口来设定处理优先级。 并且我们同时设定一个切面来对过滤器进行增强处理,方便我们之后得到过滤器的一些信息,同时方便我们实现可插拔式的开发,方便我们按照SPI的方式进行开发。 之后,我们还需要设定工厂生产类,FilterFactory,来让它帮助我们生产链表并且执行。 而我们还需要用到过滤器链表,也就是scg中的GatewayFilterChain。 同时,我们还需要对过滤器链条工厂进行具体实现,设定一个类:GatewayFilterChainFactory
因此我们可以得到,Filter作为过滤器的顶层接口,其子类需要实现这个接口并实现具体的过滤方法。
FilterAspect用于提供过滤器AOP功能,方便我们对过滤器进行管理。
FilterFactory过滤器工厂,用于构建过滤器链表并且提供根据过滤器ID获取过滤器的方法。
GatewayFilterChain提供具体的添加过滤器和执行过滤器链处理逻辑的方法。
GatewayFilterChainFactory实现FilterFactory,实现具体的构造过滤器链的方法,并且提供根据ID获取过滤器的实际方法。
这里贴出具体的代码实现: 首先是过滤器链类,用于存储实际的过滤器,并且提供过滤器执行方法。
public class GatewayFilterChain {
private List<Filter> filters = new ArrayList<>();
public GatewayFilterChain addFilter(Filter filter){
filters.add(filter);
return this;
}
public GatewayFilterChain addFilterList(List<Filter> filter){
filters.addAll(filter);
return this;
}
/**
* 执行过滤器处理流程
* @param ctx
* @return
* @throws Exception
*/
public GatewayContext doFilter(GatewayContext ctx) throws Exception {
if(filters.isEmpty()){
return ctx;
}
try {
for(Filter fl: filters){
fl.doFilter(ctx);
}
}catch (Exception e){
log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
throw e;
}
return ctx;
}
}
接下来提供过滤器链工厂,过滤器链工厂的作用是存储过滤器配置信息,创建过滤器链,并且提供获取过滤器的方法。 而过滤器配置信息来源于我们之前设定的配置中心。
@Slf4j
public class GatewayFilterChainFactory implements FilterFactory {
private static class SingletonInstance {
private static final GatewayFilterChainFactory INSTANCE = new GatewayFilterChainFactory();
}
/**
* 饿汉式获取单例
* @return
*/
public static GatewayFilterChainFactory getInstance() {
return SingletonInstance.INSTANCE;
}
private Map<String, Filter> processorFilterIdMap = new ConcurrentHashMap<>();
/**
* 通过ServiceLoader的方式添加我们实现的过滤器类
* 将其保存到系统缓存中
*/
public GatewayFilterChainFactory() {
ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
serviceLoader.stream().forEach(filterProvider -> {
Filter filter = filterProvider.get();
FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
log.info("load filter success:{},{},{},{}", filter.getClass(),
annotation.id(), annotation.name(), annotation.order());
if (annotation != null) {
//添加到过滤集合
String filterId = annotation.id();
if (StringUtils.isEmpty(filterId)) {
filterId = filter.getClass().getName();
}
processorFilterIdMap.put(filterId, filter);
}
});
}
//测试功能是否成功
public static void main(String[] args) {
new GatewayFilterChainFactory();
}
/**
* 对网关请求上下文
* 通过配置中心选定对应的配置
* @param ctx
* @return
* @throws Exception
*/
@Override
public GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception {
GatewayFilterChain chain = new GatewayFilterChain();
List<Filter> filters = new ArrayList<>();
//获取过滤器配置规则 是我们再配置中心进行配置的
//这是由于我们的过滤器链是由我们的规则定义的
Rule rule = ctx.getRule();
if (rule != null) {
//获取所有的过滤器
Set<Rule.FilterConfig> filterConfigs = rule.getFilterConfigs();
Iterator iterator = filterConfigs.iterator();
Rule.FilterConfig filterConfig;
while (iterator.hasNext()) {
filterConfig = (Rule.FilterConfig) iterator.next();
if (filterConfig == null) {
continue;
}
String filterId = filterConfig.getId();
if (StringUtils.isNotEmpty(filterId) && getFilterInfo(filterId) != null) {
Filter filter = getFilterInfo(filterId);
filters.add(filter);
}
}
}
//添加路由过滤器-因为我们的网关最后要执行的就是路由转发
filters.add(new RouterFilter());
//排序
filters.sort(Comparator.comparingInt(Filter::getOrder));
//添加到链表中
chain.addFilterList(filters);
return chain;
}
@Override
public Filter getFilterInfo(String filterId) throws Exception {
return processorFilterIdMap.get(filterId);
}
}
编写负载均衡过滤器
负载均衡的定义与实现
再编写负载均衡过滤器之前,得先了解什么是负载均衡。
负载均衡(Load Balancing)是一种计算机网络和服务器架构的技术,旨在分配网络请求、数据流或负载到多个服务器或计算资源,以确保高可用性、提高性能和避免任何单一服务器或资源的过载。负载均衡在分布式系统和网络应用中起着重要作用,它可以帮助应对流量波动和提供冗余性,从而提高系统的可靠性和性能。
而对于负载均衡的实现,我们有如下几种方式:
- DNS负载均衡
- 硬件负载均衡
- 软件负载均衡
DNS负载均衡(地理级别): 原理:DNS负载均衡通过DNS服务器将域名解析请求映射到多个不同的IP地址,每个IP地址对应一个负载均衡器或服务器。DNS服务器将解析的IP地址返回给客户端,然后客户端将请求发送到其中一个IP地址。
优点:相对简单,不需要额外的硬件或软件负载均衡器,易于实施和扩展。
缺点:DNS负载均衡不具备智能的流量分发能力,无法动态调整负载,也无法处理服务器故障检测和恢复。客户端缓存DNS记录可能会导致不均匀的流量分布。 DNS负载均衡无法检测后端服务是否存活,可能会出现请求宕机服务的情况。
硬件负载均衡: 原理:硬件负载均衡是通过专用的硬件设备来分发流量到后端服务器。这些硬件设备通常具有性能优势,可以处理大量的连接和请求。有兴趣的可以搜索F5和A10负载均衡器。
优点:高性能,专门设计用于负载均衡任务,通常具有高可用性和可靠性。支持先进的负载均衡算法和流量管理。
缺点:相对昂贵,需要购买专门的硬件设备。配置和管理可能较复杂,需要专业知识。
软件负载均衡: 原理:软件负载均衡是通过在普通服务器上运行负载均衡软件来分发流量。这些软件可以是开源或商业的,如Nginx、HAProxy、LVS等。
优点:相对经济,可以运行在通用硬件上,易于部署和管理。提供了多种负载均衡算法和高级配置选项。
缺点:性能可能受限于服务器硬件,对于极高流量负载可能需要增加服务器数量。可用性和可靠性可能不如专门的硬件设备。
这里推荐详细了解Nginx和LVS负载均衡的区别。
而再实际生产过程中这些负载均衡并不是单独使用的,而是一起配合。 1、DNS负载均衡用于实现地理级别的负载均衡 2、硬件负载均衡用于实现集群级别的负载均衡 3、软件负载均衡用于实现机器级别的负载均衡
负载均衡算法
静态负载均衡算法:轮询、比率、优先权 其中比较常用的就是轮询,特点如下: 1、顺序循环的连接队列中每个服务器,一旦某个服务器发生异常,那么就将其从队列中移除 2、优点:实现简单、高效,易于水平扩展 3、缺点:请求目的节点不确定,不适合存有写的场景
动态态负载均衡算法:最少连接数、最快响应速度、动态性能分配、动态服务器补充、服务质 量等 动态负载均衡算法比较常用的有动态性能分配,它是通过BIG-IP收集到的应用程序和应用服务器的各项性能参数,动态调整流量分配。一般我们会配合Prometheus来实现。
设计实现
首先依旧是创建我们的顶层接口,这个接口用于帮助我们获取根据负载均衡策略选择到的后端服务实例。
public interface LoadBalanceGatewayRule {
/**
* 通过上下文参数获取服务实例
* @param ctx
* @return
*/
ServiceInstance choose(GatewayContext ctx);
/**
* 通过服务ID拿到对应的服务实例
* @param serviceId
* @return
*/
ServiceInstance choose(String serviceId);
}
之后我们对这个接口进行实现,我们首先实现比较简单的随机负载均衡策略。 其实现方式为根据我们的服务id,然后保存当前服务id对应的所有服务实例,之后我们就可以从服务实例中随机返回一个即可。
@Slf4j
public class RandomLoadBalanceRule implements LoadBalanceGatewayRule {
private final String serviceId;
/**
* 服务列表
*/
private Set<ServiceInstance> serviceInstanceSet;
public RandomLoadBalanceRule(String serviceId) {
this.serviceId = serviceId;
}
private static ConcurrentHashMap<String, RandomLoadBalanceRule> serviceMap = new ConcurrentHashMap<>();
public static RandomLoadBalanceRule getInstance(String serviceId) {
RandomLoadBalanceRule loadBalanceRule = serviceMap.get(serviceId);
if (loadBalanceRule == null) {
loadBalanceRule = new RandomLoadBalanceRule(serviceId);
serviceMap.put(serviceId, loadBalanceRule);
}
return loadBalanceRule;
}
@Override
public ServiceInstance choose(GatewayContext ctx) {
String serviceId = ctx.getUniqueId();
return choose(serviceId);
}
@Override
public ServiceInstance choose(String serviceId) {
Set<ServiceInstance> serviceInstanceSet =
DynamicConfigManager.getInstance().getServiceInstanceByUniqueId(serviceId);
if (serviceInstanceSet.isEmpty()) {
log.warn("No instance available for:{}", serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
List<ServiceInstance> instances = new ArrayList<ServiceInstance>(serviceInstanceSet);
int index = ThreadLocalRandom.current().nextInt(instances.size());
ServiceInstance instance = (ServiceInstance) instances.get(index);
return instance;
}
}
而对于轮询负载均衡策略,我们就需要维护一个全局的索引编号,然后每次执行都不断自增,然后对服务实例数量进行取余,就可以知道要执行的后端实例是哪一个。
@Slf4j
public class RoundRobinLoadBalanceRule implements LoadBalanceGatewayRule {
private AtomicInteger position = new AtomicInteger(1);
private final String serviceId;
public RoundRobinLoadBalanceRule(String serviceId) {
this.serviceId = serviceId;
}
private static ConcurrentHashMap<String, RoundRobinLoadBalanceRule> serviceMap = new ConcurrentHashMap<>();
public static RoundRobinLoadBalanceRule getInstance(String serviceId) {
RoundRobinLoadBalanceRule loadBalanceRule = serviceMap.get(serviceId);
if (loadBalanceRule == null) {
loadBalanceRule = new RoundRobinLoadBalanceRule(serviceId);
serviceMap.put(serviceId, loadBalanceRule);
}
return loadBalanceRule;
}
@Override
public ServiceInstance choose(GatewayContext ctx) {
return choose(ctx.getUniqueId());
}
@Override
public ServiceInstance choose(String serviceId) {
Set<ServiceInstance> serviceInstanceSet =
DynamicConfigManager.getInstance().getServiceInstanceByUniqueId(serviceId);
if (serviceInstanceSet.isEmpty()) {
log.warn("No instance available for:{}", serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
List<ServiceInstance> instances = new ArrayList<ServiceInstance>(serviceInstanceSet);
if (instances.isEmpty()) {
log.warn("No instance available for service:{}", serviceId);
return null;
} else {
int pos = Math.abs(this.position.incrementAndGet());
return instances.get(pos % instances.size());
}
}
}
最后,我们就可以根据请求头中设定的要使用的负载均衡策略对我们实现的负载均衡策略进行选择了。
@Slf4j
@FilterAspect(id=LOAD_BALANCE_FILTER_ID,
name = LOAD_BALANCE_FILTER_NAME,
order = LOAD_BALANCE_FILTER_ORDER)
public class LoadBalanceFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx){
//拿到服务id
String serviceId = ctx.getUniqueId();
//从请求上下文中获取负载均衡策略
LoadBalanceGatewayRule gatewayLoadBalanceRule = getLoadBalanceRule(ctx);
ServiceInstance serviceInstance = gatewayLoadBalanceRule.choose(serviceId);
System.out.println("IP为"+serviceInstance.getIp()+",端口号:"+serviceInstance.getPort());
GatewayRequest request = ctx.getRequest();
if(serviceInstance != null && request != null){
String host = serviceInstance.getIp()+":"+serviceInstance.getPort();
request.setModifyHost(host);
}else{
log.warn("No instance available for :{}",serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
}
/**
* 根据配置获取负载均衡器
*
* @param ctx
* @return
*/
public LoadBalanceGatewayRule getLoadBalanceRule(GatewayContext ctx) {
LoadBalanceGatewayRule loadBalanceRule = null;
Rule configRule = ctx.getRule();
if (configRule != null) {
Set<Rule.FilterConfig> filterConfigs = configRule.getFilterConfigs();
Iterator iterator = filterConfigs.iterator();
Rule.FilterConfig filterConfig;
while (iterator.hasNext()) {
filterConfig = (Rule.FilterConfig) iterator.next();
if (filterConfig == null) {
continue;
}
String filterId = filterConfig.getId();
if (filterId.equals(LOAD_BALANCE_FILTER_ID)) {
String config = filterConfig.getConfig();
String strategy = LOAD_BALANCE_STRATEGY_RANDOM;
if (StringUtils.isNotEmpty(config)) {
Map<String, String> mapTypeMap = JSON.parseObject(config, Map.class);
strategy = mapTypeMap.getOrDefault(LOAD_BALANCE_KEY, strategy);
}
switch (strategy) {
case LOAD_BALANCE_STRATEGY_RANDOM:
loadBalanceRule = RandomLoadBalanceRule.getInstance(configRule.getServiceId());
break;
case LOAD_BALANCE_STRATEGY_ROUND_ROBIN:
loadBalanceRule = RoundRobinLoadBalanceRule.getInstance(configRule.getServiceId());
break;
default:
log.warn("No loadBalance strategy for service:{}", strategy);
loadBalanceRule = RandomLoadBalanceRule.getInstance(configRule.getServiceId());
break;
}
}
}
}
return loadBalanceRule;
}
}
那么到这里为止我们就成功实现了负载均衡策略过滤器。