Dubbo扩展机制(三)Wrapper【代理】

摘要:
dubbo的核心设计原则也熟悉aop、ioc和动态编译器,这些被称为dubbo核心原则。包装机机制是自动包装扩展点。也就是说,ExtensionLoader返回的实际上是Wrapper类的一个实例,它保存了实际的扩展点实现类。扩展点可以有多个Wrapper类,也可以根据需要添加它们。Wrapper的规范Wrapper机制不是通过注释实现的,而是通过一组Wrapper规范实现的。
一、前言
Dubbo内核
dubbo所有功能都是基于dubbo内核之上完成的,dubbo内核由四部分构成,分别为SPI、Adaptive、Wrapper、Activate。而dubbo的内核设计原则,也是我们所熟悉的aop,ioc与动态编译compiler,这些称之为dubbo的内核原理。
 
Wrapper机制
即扩展点自动包装。Wrapper 类同样实现了扩展点接口,但是 Wrapper 不是扩展点的真正实现。它的用途主要是用于从 ExtensionLoader 返回扩展点时,包装在真正的扩展点实现外。即从 ExtensionLoader 中返回的实际上是 Wrapper 类的实例,Wrapper 持有了实际的扩展点实现类。
扩展点的 Wrapper 类可以有多个,也可以根据需要新增。
通过 Wrapper 类可以把所有扩展点公共逻辑移至 Wrapper 中。新加的 Wrapper 在所有的扩展点上添加了逻辑,有些类似 AOP,即 Wrapper 代理了扩展点。
 
Wrapper的规范
Wrapper 机制不是通过注解实现的,而是通过一套 Wrapper 规范实现的。
Wrapper 类在定义时需要遵循如下规范:
  • 该类要实现 SPI 接口
  • 该类中要有 SPI 接口的引用
  • 该类中必须含有一个含参的构造方法且参数只能有一个类型为SPI借口
  • 在接口实现方法中要调用 SPI 接口引用对象的相应方法
  • 该类名称以 Wrapper 结尾
 
 
二、使用示例
 
未使用Wrapper之前:
@SPI("ali")    // 默认的值支付宝支付
public interface Pay {
    // 接口的方法需要添加这个注解,在测试代码中,参数至少要有一个URL类型的参数
    @Adaptive({"paytype"})    // 付款方式
    void pay(URL url);
}

public class AliPay implements Pay {
    @Override
    public void pay(URL url) {
        System.out.println("使用支付宝支付");
    }
}

public class WechatPay implements Pay {
    @Override
    public void pay(URL url) {
        System.out.println("使用微信支付");
    }
}
在/dubbo-common/src/main/resources/META-INF/services/com.test.Pay文件下添加内容如下:
wechat = com.test.WechatPay
ali = com.test.AliPay
public static void main(String[] args) {
    ExtensionLoader<Pay> loader = ExtensionLoader.getExtensionLoader(Pay.class);        
    Pay pay = loader.getAdaptiveExtension();
    pay.pay(URL.valueOf("http://localhost:9999/xxx"));                    // 使用支付宝支付
    pay.pay(URL.valueOf("http://localhost:9999/xxx?paytype=wechat"));    // 使用微信支付
}
 上述示例是原Adaptive使用示例,在使用Wrapper之后:
  • 首先要添加一个Wrapper类
  • 并在/dubbo-common/src/main/resources/META-INF/services/com.test.Pay文件下追加“xxx = com.test.PayWrapper1”
(1)代理模式
public class PayWrapper1 implements Pay {    
    Pay pay;
    
    public PayWrapper1(Pay pay) {
        this.pay = pay;
    }

    @Override
    public void pay(URL url) {
        System.out.println("pay before...");
        pay.pay(url);
        System.out.println("pay after...");
    }
}
执行上面main方法,得出如下结果:
pay before...
使用支付宝支付
pay after...
pay before...
使用微信支付
pay after...

由此可见Wrapper是一个AOP功能

 
(2)责任链模式
我们还可以给它添加一个Wrapper2类,如下所示
public class PayWrapper2 implements Pay {
    Pay pay;
    
    public PayWrapper2(Pay pay) {
        this.pay = pay;
    }
    @Override
    public void pay(URL url) {
        System.out.println("-----pay before...");
        pay.pay(url);
        System.out.println("-----pay after...");
    }
}

并追加xxx2 = com.test.PayWrapper2

 
输出的结果如下:
-----pay before...
pay before...
使用支付宝支付
pay after...
-----pay after...

执行顺序先执行2,再执行1

 
 
三、源码分析
上面main方法等同于
public static void main(String[] args) {
    URL url = URL.valueOf("http://localhost:9999/xxx");
    String extName = url.getParameter("paytype", "ali");
    System.out.println(extName);    // ali
    ExtensionLoader<Pay> loader = ExtensionLoader.getExtensionLoader(Pay.class);    
    Pay extension = (Pay) loader.getExtension(extName);
    // extension返回的结果为PayWrapper1
    extension.pay(url);
}

Wrapper功能实现分为两个部分

一个是加载Extension时会把Wrapper类放入缓存中;
另一部分取得服务提供者实例时,将装配过的Wrapper类返回
 
我们先看第一部分,加载Wrapper类
// 维护一个线程安全的HashSet来存放Wrapper
try {
    // 尝试取得参数类型为SPI接口类型的构造函数,即判断该类是否是Wrapper类,如果不是会抛出异常;如果是,继续执行,并添加到cache中
    // 上面定义的Wrapper类如果有构造,则表示是一个真正的Wrapper
    clazz.getConstructor(type);
    Set<Class<?>> wrappers = cachedWrapperClasses;
    // 通过上面取得ExtensionLoader的代码你需要知道,每一个SPI接口都有一个ExtensionLoader,
    // 所以这里面的缓存也是每一个SPI接口都有他的Wrapper缓存,生命周期和loader的生命周期一致
    if (wrappers == null) {
        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
        wrappers = cachedWrapperClasses;
    }
    // 这里cachedWrapperClasses和wrappers用的是同一个对象地址,所以相当于往cachedWrapperClasses添加元素
    wrappers.add(clazz);
} 
 
第二部分则是组装Wrapper类
通过getExtension方法中调用了createExtension方法,createExtension会循环遍历,通过
wrapperClass.getConstructor(type).newInstance(instance) 将wrapper构造注入
private T createExtension(String name) {
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        // cache中是否存在wrapper类,如果存在,遍历,最后返回wrapper这个实例
        if (wrapperClasses != null && wrapperClasses.size() > 0) {
            for (Class<?> wrapperClass : wrapperClasses) {
                // 注册扩展,返回 wrapperClass.getConstructor(type).newInstance(instance) 实例
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                                        type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}
injectExtension 只有一个逻辑,就是判断是否有set方法,然后属性注入
private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                if (method.getName().startsWith("set")
                    && method.getParameterTypes().length == 1
                    && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                     + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}
四、 在Dubbo中的应用
以 ProtocolFilterWrapper为例,在 dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
当服务提供者启动时,ServiceConfig开始执行onApplicationEvent方法,并开始执行服务导出
public class ServiceConfig<T> extends AbstractServiceConfig {    
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            // 组装URL
            URL local = URL.valueOf(url.toFullString())        // 常量值为injvm,在执行wrapper链时用到
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
            // protocol,Protocol$Adaptive
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }
    
}
 Protocol$Adaptive 类如下(简化版)
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements Protocol {
    public void destroy() {
        // throw Exception
    }
    public int getDefaultPort() {
        // throw Exception
    }
    public Invoker refer(Class arg0, URL arg1) throws RpcException {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");
        URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            // throw Exception
        Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
    public Exporter export(Invoker arg0) throws RpcException {
        if (arg0 == null) 
            // throw Exception
        if (arg0.getUrl() == null) 
            // throw Exception
        URL url = arg0;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            // throw Exception
        Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

通过如下两行代码

String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());    // dubbo
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);

通过扩展名,我们可以在/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 文件分析出

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper            # Wrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper        # Wrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol            # 在ServiceConfig 组装过这个协议名称
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

在没有Wrapper的情况下,得到的扩展类为com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

在有Wrarpper的情况下,得到的是最后的一个Wrapper,即com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
所以下面就调用ProtocolListenerWrapper中export方法
public class ProtocolListenerWrapper implements Protocol {    
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }
    
}
根据Wrapper责任链模式的特点,接下来执行 ProtocolFilterWrapper 
public class ProtocolFilterWrapper implements Protocol {    
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
}

再接下来执行 injvm 对应的InjvmProtocol类中的export方法

 
 
 
 
 
 
 
 
 
 

免责声明:文章转载自《Dubbo扩展机制(三)Wrapper【代理】》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇堆排序算法实现linux 启动 jmeter 报 No X11 DISPLAY variable was set, but this program performed an operation which requires it下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java 面向切面编程(Aspect Oriented Programming,AOP)

本文内容 实例 引入 原始方法 装饰者模式 JDK 动态代理和 cglib 代理 直接使用 AOP 框架——AspectWerkz 最近跳槽了,新公司使用了 AOP 相关的技术,于是查点资料,复习一下。之前,多少知道点,但没怎么在实际项目中使用过~ 下载 demo 实例 引入 package com.cap.aop;   publi...

Dubbo学习笔记5:Dubbo整体框架分析

Dubbo的分层架构 本文将简单介绍Dubbo的分层架构设计,如下图是Dubbo官方的整体架构图: Dubbo官方提供的该架构图很复杂,一开始我们没必要深入细节,下面我们简单介绍下其中的主要模块。 其中Service和Config层为API,对于服务提供方来说,使用ServiceConfig API来代表一个要发布的服务配置对象,对于服务消费方来说,R...

SqlServer数据库主从同步

分发/订阅模式实现SqlServer主从同步 在文章开始之前,我们先了解一下几个关键的概念: 分发服务器分发服务器是负责存储在同步过程中所用复制信息的服务器。可以比喻成报刊发行商。 分发数据库分发数据库用于存储发布数据库所做的更改。它还可以存储快照和合并发布的历史信息。存在于系统数据库中,默认为destribution. 发布服务器使服务器能够成为发布...

zookeeper客户端命令行查看dubbo服务的生产者和消费者

假设zookeeper安装在192.168.5.130这台服务器上,现在我们通过命令行查看dubbo在zookeeper注册服务的生产者和消费者信息 首先通过命令切换到/usr/zookeeper-3.4.10/bin目录,然后输入 ./zkCli.sh -server 192.168.5.130:2888 (2888为zookeeper在服务器上提供服务...

Dubbo-深入配置

一、dubbo注解:   提供端暴露服务时与消费端调用远程接口可以使用注解形式配置   》服务端: 》1.原来采用接口配置,暴露服务,ref:指向真正的实现对象 <dubbo:service interface="com.ll.service.UserService" ref="userServiceImpl" /> 》2.现在采用注解...

dubbo心跳机制 (1)

此文已由作者赵计刚授权网易云社区发布。 欢迎访问网易云社区,了解更多网易技术产品运营经验。 dubbo的心跳机制: 目的:检测provider与consumer之间的connection连接是不是还连接着,如果连接断了,需要作出相应的处理。 原理: provider:dubbo的心跳默认是在heartbeat(默认是60s)内如果没有接收到消息,就会...