不为有趣之事,何遣有涯之生
不失其所者久,死而不亡者寿

SpringCloud微服务系列(4)

hzqiuxm阅读(388)评论(0)

服务容错保护

基本介绍

为什么要服务容错保护

我们现在已经了解,微服务架构中,系统是分成好多个服务单元的,各个但隐患之间通过注册中心建立联系。
服务多了,出问题的概率同样也就增大了,问题可能来自依赖的服务也可能来自网络。不管如何肯定会导致服务调用故障或者延迟,而这些问题会直接导致依赖调用方的服务也出现问题。
这样一层影响一层,再随着请求的增加,任务的积压,最终可能导致服务瘫痪,触发雪崩。

举个例子,比如下图所示的一个电子合同服务调用:

上图调用关系非常简单,假设在用户服务调用签署服务的时候需要调用计费服务来判断当前用户资源是否允许操作时,计费服务因自身处理逻辑等原因造成了响应缓慢,所以签署服务的线程将被挂起,以等待计费服务的响应,在漫长的等待后用户服务会被告知调用签署服务失败。如果是在高并发的场景下,挂起的线程很多,使得后来的签署服务请求都被阻塞,最终导致签署服务无法使用。这样还没结束,如果签署服务无法使用,那么依赖它的合同服务和用户服务也会出现线程挂起,到阻塞到服务不可用,然后继续向外围蔓延。
你看,本来计费服务的问题,最后导致原来正常的签署服务,合同服务,用户服务都不可用,就像雪山上出现雪崩一样,连锁反应。

为了解决这个问题,微服务架构中引入了断路器模式来进行对服务容错保护。其实很像我们家里的断路器,如果没有断路器,电流过载了(例如功率过大、短路等),电路不断开,电路就会升温,甚至是烧断电路、起火。有了断路器之后,当电流过载时,会自动切断电路(跳闸或保险丝熔断),从而保护了整条电路与家庭的安全。当电流过载的问题被解决后,只要将关闭断路器,电路就又可以工作了。

Hystrix介绍

针对上述问题,SpringCloud就采用了Spring Cloud Hysrix 来实现了断路器、线程隔离等一系列保护功能。它也是基于Netflix的开源框架Hystrix来实现的,该框架的目标就是通过控制哪些访问远程系统的节点,从而对延迟或故障提供强大的容错能力。

Hystrix包含的功能有:服务降级、服务熔断、线程和信号隔离、请求缓存、请求合并以及服务监控等。
这里有个注意点,平时我发现很多人把Hystrix称之为断路器,其实是不准确的,Hystrix包含的远不只是一个断路器,按我的理解Hystrix包含了:断路器、命令对象、线程池、请求队列、信号量、健康检查对象等等组件。不然光光只是断路器,可实现不了上述我们介绍的那些功能。

简单示例

我们对上一章中的Ribbon服务进行改造,让其具备容错保护功能。

  • 第一步:加入依赖
compile('org.springframework.cloud:spring-cloud-starter-hystrix')
  • 第二步:加入注解,启动容错保护
@EnableCircuitBreaker //开启断路器容错保护
@EnableDiscoveryClient
@SpringBootApplication
public class ServiceRibbonApplication {

   public static void main(String[] args) {
      SpringApplication.run(ServiceRibbonApplication.class, args);
   }


   @Bean
   @LoadBalanced
   RestTemplate restTemplate() {
      return new RestTemplate();
   }
}

注意,我们其实可以用注解@SpringCloudApplication来替代上面的3个注解

  • 第三步:在调用服务方法上加上注解,指定回调方法
/**
 * 负载方法
 * @param name
 * @return
 * SERVICE-HI :虚拟主机名
 * restTemplate+Ribbon 可以应对简单的URL 如果url复杂,使用Feign,它整合了Ribbon
 */

@HystrixCommand(fallbackMethod = "hiFallback")
public String hiService(String name) {
    return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
}

  • 第四步:新增指定的方法,完成降级逻辑
/**
 * 降级服务处理
 * 加入Hysrix使用回调方法,使用注解方式时必须和调用服务在一个类中
 * @return
 */
public String hiFallback(String name){//注意这里的参数要和上面的一样,否则会找不到该方法

    return "hi service has a error!";
}

我们启动测试下,架构如下:

访问测试:localhost:8764/hi?name=hzqiuxm

结果会随机出现成功和失败的情况

hi hzqiuxm, i am from port8763
hi service has a error!hzqiuxm
hi hzqiuxm, i am from port8763
hi hzqiuxm, i am from port8762

从上述结果可以看出,断路器已经设置成功了

核心

Hystrix原理分析

工作流程
  • 1、创建HystrixCommand或HystrixObservableCommand对象,以“命令”方式实现对服务调用操作封装
  • 2、执行命令
  • 3、是否被缓存,是的话缓存结果会立即返回
  • 4、断路器是否打开,打开的话立即返回
  • 5、线程池、请求队列、信号量资源判断,不够时执行第8步
  • 6、请求依赖的服务
  • 7、计算断路器的健康度,根据成功、失败、拒绝、超时等信息计算是否打开/闭合断路器
  • 8、如果需要进行,fallback降级处理
  • 9、返回成功响应

断路器原理

断路器在HystrixCommand或HystrixObservableCommand执行时,起到了关键作用,它是Hystrix的核心部件。那么它是怎么决策和计算健康度的呢?
我们先看看断路器HystrixCircuitBreaker接口中定义的主要方法和类:

boolean allowRequest(); //每个Hystrix命令的请求通过它判断是否被执行
boolean isOpen(); //判断断路器的开发/关闭状态
void markSuccess(); //从半开状态到关闭
void markNonSuccess(); //从半开状态到打开
boolean attemptExecution(); //获取断路器状态,是非幂等的

class Factory{...} //维护Hystrix和断路器的关系集合

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker{...} //接口的一个实现类

这里面我们主要关注下HystrixCircuitBreakerImpl 中计算断路器打开的规则,规则在subscribeToStream()方法中:

public void onNext(HealthCounts hc) {
    // check if we are past the statisticalWindowVolumeThreshold
    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { 
        // we are not past the minimum volume threshold for the stat window,
        // so no change to circuit status.
        // if it was CLOSED, it stays CLOSED
        // if it was half-open, we need to wait for a successful command execution
        // if it was open, we need to wait for sleep window to elapse
    } else {
        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
            //we are not past the minimum error threshold for the stat window,
            // so no change to circuit status.
            // if it was CLOSED, it stays CLOSED
            // if it was half-open, we need to wait for a successful command execution
            // if it was open, we need to wait for sleep window to elapse
        } else {
            // our failure rate is too high, we need to set the state to OPEN
            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                circuitOpened.set(System.currentTimeMillis());
            }
        }

从上面代码逻辑中我们可以看到,当最大线程数超过阈值并且请求错误百分比也超过阈值的时候,断路器会通过CAS多线程保护的方式打开。

circuitBreakerRequestVolumeThreshold决定了最大线程数阈值,它的默认值由default_circuitBreakerRequestVolumeThreshold = 20决定。
circuitBreakerErrorThresholdPercentage决定了请求错误百分比阈值,它的默认值由default_circuitBreakerErrorThresholdPercentage = 50决定。
(默认值可以参考HystrixCommandProperties类中的定义)

下图是断路器的状态关系图:

当断路器处于打开状态时,如果打开时间超过了我们定义的circuitBreakerSleepWindowInMilliseconds时间(默认5000毫秒),那么断路器会切换到半开状态。
如果此时请求继续失败,断路器又变回成打开状态,等待下个circuitBreakerSleepWindowInMilliseconds时间。若请求成功,则断路器变为闭合状态。

最后附上Hystrix官方文档中断路器详细执行逻辑,大家可以在仔细理解下。

实战详解

首先大家要清楚,Hystrix对于依赖服务调用采用了依赖隔离的方式,隔离方式主要有线程隔离和信号量隔离。

  • 线程隔离:为每一个依赖服务创建一个独立的线程,性能上低于信号量隔离(我还是推荐使用这个,除非你的应用无法忍受9ms级别的延迟)。

  • 信号量隔离:用信号量控制单个依赖服务,开销远小于线程隔离,但是无法异步和设置超时。

创建请求命令

我们这边只介绍以注解的方式来创建请求命令,除了按照注解的方式,还可以以继承HystrixCommand或HystrixObservableCommand类的方式,有兴趣的同学可以参考官方文档。
创建请求命令,按照调动方式,我们可以分为三种:

  • 同步方式:最普通最常见的方式
@HystrixCommand(fallbackMethod = "hiFallback")
public String hiService(String name) {
    return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
}
  • 异步方式:
@HystrixCommand
public Future<String> hiServiceAsync(final String name ){

    return new AsyncResult<String>(name){

        @Override
        public String get() throws ExecutionException {
            return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
        }
    };
}

  • 响应式方式:
public Observable<String> hiServiceObs(final String name){

    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {

            if(!subscriber.isUnsubscribed()){

                String forObject = restTemplate.getForObject("http://SERVICE-HI/hi?name=" + name, String.class);

                subscriber.onNext(forObject);
                subscriber.onCompleted();
            }
        }
    });
}
定义服务降级

从上面可以看出,当Hystrix命令执行失败,fallback是实现服务降级处理的后备方法。使用相对比较简单,只要在HystrixCommand注解的fallbackMethod属性指定对应的方法就行。
唯一需要注意的就是一点:fallbackMethod属性指定的方法必须定义在同一个类中,并且参数保持一致。

并不是所有的服务都要去实现降级逻辑,比如一些写操作的命令,批处理的命令,离线计算的命令等。不论Hystrix命令是否实现了服务降级,命令的状态和断路器状态都会更新。

异常处理
  • 异常忽略:对某个异常,不调用fallback操作,而是抛出。实现例子如下:
@HystrixCommand(ignoreExceptions = {需要忽略的异常类.class})
  • 异常分类降级:根据不同异常,采用不同的降级处理。
    实现也很简单,只要在fallbackMethod的实现方法参数上增加Throwable对象,这样在方法内部就可以获取触发服务降级的具体异常内容了。比如:
User fallbackl{String id, Throwable e){  获取e的异常类型,采取对应的降级处理 ... }
请求缓存

微服务架构的目的之一就是应对不断增长的业务,随着每个微服务需要承受的并发压力也越来越大。都说要提高系统性能,可以采用缓存技术。那么Hystrix当仁不让的提供了。

我们可以通过注解的方式,简单的实现请求缓存。请求缓存注解有:

  • @CacheResult:标记请命令返回的结果应该被缓存,必须和@HystrixCommand注解结合使用,所用属性有:cacheKeyMethod
@CacheResult(cacheKeyMethod = "getNameByidCacheKey")
@HystrixCommand(fallbackMethod = "hiFallback")
public String hiService(String name) {
    return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
}

private Long getNameByidCacheKey(Long id) {
    return id;
}

上述代码中@CacheResult代表开启缓存功能,当调用结果返回后将被Hystrix缓存,缓存的Key值不指定时就会使用该方法中的所有参数(name),当然我们可以自定义缓存Key的生成规则,上面就使用了cacheKeyMethod指定了具体的生成函数。

  • @CacheRemove:标记请求命令的缓存失效,失效的缓存根据定义的Key决定,常用属性:command、cacheKeyMethod
@CacheResult
@HystrixCommand(fallbackMethod = "hiFallback")
public String hiService(@CacheKey("name") String name) {
    return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
}

@CacheRemove(commandKey = "hiService")
public void update(@CacheKey("name")User user){

    restTemplate.getForObject("http://USER-SERVICE/users",user,User.class);

}

注意,commandKey属性是必须要指定的,它用来指明需要使用请求缓存的请求命令。

  • @CacheKey:标记在请求命令的参数上,使其作为缓存的Key值。如果没有标注,则会使用所有参数。常用属性:value
@CacheResult
@HystrixCommand(fallbackMethod = "hiFallback")
public String hiService(@CacheKey("name") String name) {
    return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
}

注意它的优先级最低,使用了cacheKeyMethod的话,它就不生效了。

请求合并

把一个单体应用拆分为微服务应用后,最明显的变化就是增加了通信消耗和连接数占用。在高并发的情况下,随着通信次数的增加,总的通信时间消耗将会变得不理想。

Hystrix也考虑到了这个问题,提供了HystrixCollapser来实现请求的合并,以减少通信消耗和线程数的占用。

HystrixCollapser实现的基本思想就是在HystrixCommand之前放置一个合并处理器,将处于很短时间内(默认10ms)对同一依赖服务的多个请求进行整合并以批量方式发起请求。批量的发起请求需要开发人员自己实现,并且服务的提供方也要提供相应的批量实现接口才行。

可以通过下面的图来直观了解下:

实现步骤简单如下:

  • 1.调用方需要准备2个方法,一个单个调用,一个合并调用
  • 2.在单个调用的方法上,加一个合并器

注意,请求合并会有额外的开销,因为合并的时候会有个延迟时间10ms。一般在高并发的时候才会启用,所以需要考虑请求命令本身的延迟和延迟时间窗内的并发量来统筹考虑。

配置属性

属性配置优先级

4种属性配置的优先级,由低到高分别是:
- 全局默认值:其他三个都没设时
- 全局配置属性:通过配置文件定义,可以配合动态刷新在运行期动态调整
- 实例默认值:通过代码为实例定义默认值
- 实例配置属性:通过配置文件定义,可以配合动态刷新在运行期动态调整

Command属性

Command属性主要用来控制HystrixCommand命令的行为。
- execution配置,主要功能是实现隔离,超时

execution.isolation.strategy 设置执行隔离策略,有两个值:THREAD和SEMAPHORE。

THREAD: 通过线程池隔离的策略。它在独立的线程上执行, 并且它的并发限制
受线程池中线程数量的限制。
SEMAPHORE: 通过信号量隔离的策略。它在调用线程上执行, 并且它的并发限
制受信号量计数的限制。

execution.isolation.thread.timeoutinMilliseconds :配置HystrixCommand执行的超时时间,单位为毫秒。
当HystrixCommand执行时间超过该配置值之后,Hystrix会将该执行命令标记为TIMEOUT并进入服务降级处理逻辑。

execution.timeout.enabled: 该属性用来配置HystrixCommand.run()的执行是否启用超时时间,默认为true。

execution.isolation.thread.interruptOnTimeout: 该属性用来配置当HystrixCommand.run()执行超时的时候是否要将它中断。

execution.isolation. thread.interruptOnCancel: 该属性用来配置当HystrixCommand.run()执行被取消的时候是否要将它中断。

execution.isolation.semaphore.maxConcurrentRequests: 当HystrixCommand的隔离策略使用信号量的时候,该属性用来配置信号量的大小(并发请求数)

  • fallback属性配置,主要实现并发数控制和是否降级

fallback.isolation.semaphore.maxConcurrentRequests: 该属性用来设置从调用线程允许HystrixComrnand.getFallback()方法执行的最大并发请求数。

fallback.enabled: 该属性用来设置服务降级策略是否启用

  • circuitBreaker配置,主要实现休眠时间,断路器打开条件(请求并发数,错误百分比),强制(开关)

circuitBreaker.enabled: 该属性用来确定当服务请求命令失败时,是否使用断路器来跟踪其健康指标和熔断请求,默认是true。

circuitBreaker.requestVolumeThreshold: 该属性用来设置在滚动时间窗中,断路器熔断的最小请求数,默认20。

circuitBreaker.sleepWindowinMilliseconds: 该属性用来设置当断路器打开之后的休眠时间窗,默认5000毫秒。

circuitBreaker.errorThresholdPercentage: 该属性用来设置断路器打开的错误百分比条件,默认50%。

circuitBreaker.forceOpen: 如果将该属性设置为true, 断路器将强制进入“ 打开”状态,它会拒绝所有请求,默认false。

circuitBreaker.forceClosed: 如果将该属性设置为true,断路器将强制进入“关闭”状态,它会接收所有请求,默认false。

  • metrics配置,主要实现滚动时间窗相关配置

metrics.rollingStats.timeinMillseconds: 该属性用来设置滚动时间窗的长度,单位为毫秒,默认10000。

metrics.rollingstats.numBuckets: 该属性用来设置滚动时间窗统计指标信息时划分“桶”的数量,默认10。

metrics.rollingPercentile.enabled: 该属性用来设置对命令执行的延迟是否使用百分位数来跟踪和计算,默认true。

metrics.rollingPercentile.timeinMilliseconds: 该属性用来设置百分位统计的滚动窗口的持续时间,单位为毫秒,默认60000。
注意:该值通过动态刷新不会有效。

metics.rollingPercentile.numBuckets: 该属性用来设置百分位统计滚动窗口中使用“ 桶”的数量,默认6。
注意:该值通过动态刷新不会有效。

metrics.rollingPercentile.bucketSize: 该属性用来设置在执行过程中每个“桶”中保留的最大执行次数,默认100。
注意:该值通过动态刷新不会有效。

metrics.healthSnapshot.intervalinMilliseconds: 该属性用来设置采集影响断路器状态的健康快照(请求的成功、错误百分比)的间隔等待时间,默认500。
注意:该值通过动态刷新不会有效。

  • requestContext配置,主要实现缓存和日志相关

requestCache.enabled: 此属性用来配置是否开启请求缓存,默认true。

requestLog.enabled: 该属性用来设置Hys立ixCommand的执行和事件是否打印日志到HystrixRequestLog中,默认true。

collapser属性

合并请求相关设置
maxRequestsinBatch: 该参数用来设置一次请求合并批处理中允许的最大请求数,默认Integer.MAX VALUE。

timerDelayinMillseconds: 该参数用来设置批处理过程中每个命令延迟的时间,单位为毫秒,默认10。

requestCache.enabled: 该参数用来设置批处理过程中是否开启请求缓存,默认true。

threadPool属性

线程池的先关配置
coreSize: 该参数用来设置执行命令线程池的核心线程数,该值也就是命令执行的最大并发量,默认10。

maxQueueSize: 该参数用来设置线程池的最大队列大小,默认-1。
当设置为-1时,线程池将使用SynchronousQueue实现的队列,否则将使用LinkedBlockingQueue实现的队列。
注意:该值通过动态刷新不会有效。

queueSizeRejectionThreshold: 该参数用来为队列设置拒绝阈值,默认5。
注意:当maxQueueSize属性为-1 的时候,该属性不会生效。

metrics.rollingstats.timeinMilliseconds: 该参数用来设置滚动时间窗的长度,单位为毫秒,默认10000。

metrics.rollingStats.numBuckets: 该参数用来设置滚动时间窗被划分成“桶”的数量,默认10。

上面属性对应全局默认值,全局配置属性,实例默认值,实例配置属性值请在Hystrix的番外篇中查阅。

参考资料推荐

https://baijiahao.baidu.com/s?id=1593211109840459044&wfr=spider&for=pc
https://www.ebayinc.com/stories/blogs/tech/application-resiliency-using-netflix-hystrix/
https://github.com/Netflix/Hystrix/wiki
https://github.com/Netflix/Hystrix/wiki/How-it-Works

SpringCloud微服务系列(3)

hzqiuxm阅读(330)评论(0)

客户端负载Ribbon

Ribbon的基本介绍

什么是客户端负载

我们在做服务集群的时候,经常会听到负载均衡这个词,比如下图的一个架构:

客户端在访问服务器的时候,中间一般会用一些硬件(F5)或软件(nginx)来作负载均衡,从而实现后端各服务器分摊请求压力,达到均衡的目的。图中的nginx就是负载均衡器,我们通常称之为服务端的负载均衡,客户端不用关心自己调用的是哪个服务,只要统一访问某一个地址,负载均衡器会根据某个负载策略(权重,可用性,线性轮询等)路由到某个具体的服务器上。

那么客户端负载均衡和服务端负载均衡有什么区别呢?如果我们把上图架构改造成下图所示:

我们在客户端后面加了一个服务器清单(里面维护着后端可以用的服务),客户端访问后端服务的时候,自己去选择一个服务去访问。所以它们二者最大的不用点就是谁来维护服务端清单,就称之为谁的负载均衡。

SpringCloud中的Ribbon

Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载工具,它基于Netflix实现。它不用像注册中心、配置中心、API网关那样独立部署,而且几乎存在与于每一个SpringCloud构建的微服务和基础设施中。我们要使用它也很简单,只要添加spring-cloud-starter-ribbon的依赖即可。

简单示例

在上一节注册中心的讲解中我们没有使用客户端的负载均衡,服务消费者调用服务生产者的架构如下所示:

消费者调用的时候,输入的URL是指向具体的生产者的,接下来我们新建一个项目service-ribbon,然后把Ribbon加进去。
新建的项目添加依赖:spring-cloud-starter-ribbon。

通过Spring Cloud Ribbon来使用客户端负载均衡调用,需要二个步骤:
- 第一步,服务提供者的多个实例注册到注册中心
- 第二步,服务消费者通过调用被@LoadBalanced注解修饰过的RestTemplate来实现

第一步的话,只要仿造上节的启动注册中心(单中心和多中心都可以)和生产者多个实例即可。
第二步的话,需要编写一些代码,我们会有3个类:
ServiceRibbonApplication,启动类并负责生成RestTemplate实例

@EnableDiscoveryClient
@SpringBootApplication
public class ServiceRibbonApplication {

   public static void main(String[] args) {
      SpringApplication.run(ServiceRibbonApplication.class, args);
   }


   @Bean
   @LoadBalanced
   RestTemplate restTemplate() {
      return new RestTemplate();
   }
}

HelloControler,提供一个请求入口,目的是调用本身的服务

@RestController
public class HelloControler {

    @Autowired
    HelloService helloService;
    @RequestMapping(value = "/hi")
    public String hi(@RequestParam String name){
        return helloService.hiService(name);
    }
}

HelloService,提供给HelloControler调动,服务本身不提供具体的服务,而是去调用服务生产者的服务

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    /**
     * 负载方法
     * @param name
     * @return
     * SERVICE-HI :虚拟主机名
     */
    public String hiService(String name) {
        return restTemplate.getForObject("http://SERVICE-HI/hi?name="+name,String.class);
    }
}

配置文件如下:

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8764
spring:
  application:
    name: service-ribbon

启动service-ribbon,eureka-server,eureka-client(2个实例),在注册中心看到注册的服务如下:

通过url:http://localhost:8764?name=hziquxm 来访问下服务, 多次访问后发现以下返回结果轮询出现:

hi hzqiuxm, i am from port8762
hi hzqiuxm, i am from port8763

证明ribbon已经发挥了作用,默认采用了轮询访问后台服务的方式。
最终我们的架构变成如下所示:

Ribbon深入详解

RestTemplate

可以从简单示例得知RestTemplate的作用非常关键,该对象会使用Ribbon的自动化配置,通过注解@LoadBalanced开启客户端负载。从名字上我们得知RestTemplate和REST请求是很有关系的,它就是针对REST几种不同请求类型调用实现工具类。我们接下来就看看这个工具类的增删改查。

GET请求

RestTemplate中对GET请求,通过如下两个方法进行调用实现:
第一种getForEntity(),它有三种不同的重载实现

<T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType)

url:请求的地址,注意类型是是URL(包含了路径和参数等信息),不是String
responseType:请求响应体包装类型
ResponseEntity:返回结构,是Spring对HTTP请求响应的封装,该对象中body中内容类型会根据第二个参数类型进行转换,比如第二个参数是String.class代表返回对象body的内容会转换为String类型

<T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)
  • url:字符串类型的url,通常这种用的比较多
  • uriVariables:url中的参数绑定,数组方式,顺序和url占位符按顺序对应
<T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables)
  • uriVariables:url中的参数绑定,Map类型,需要提供一个key,value的map作为参数

第二种getForObject(),功能类似对getForEntity()进行进一步封装,它通过HttpMessageConverterExtractor对请求响应体body中的内容进行对象转换,实现了请求直接返回包装好的对象内容,不用再去body中去取了。它也提供三种不同的重载实现:

<T> T getForObject(URI url, Class<T> responseType)
<T> T getForObject(String url, Class<T> responseType, Object... uriVariables)
<T> T getForObject(String url, Class<T> responseType, Map<String, ?> uriVariables)

参数和 getForEntity类似,就不赘述了。

POST请求

RestTemplate中对POST请求,通过如下三个方法进行调用实现:
第一种,postForEntity(),该方法和GET中的getForEntity类型,返回值也是一个ResponseEntity对象,它有三种不同重载方法:

<T> ResponseEntity<T> postForEntity(URI url, Object request, Class<T> responseType)
<T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables)
<T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Object... uriVariables)
  • request:可以是普通类型(比如自定义的某个实体Bean)或者是HttpEntity类型,普通类型不包含header,HttpEntity类型包含header。

第二种,postForObject(),也和getForObject()有些类似,包含了三种重载方法:

<T> T postForObject(URI url, Object request, Class<T> responseType)
<T> T postForObject(String url, Object request, Class<T> responseType, Object... uriVariables)
<T> T postForObject(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables)

除了返回类型,参数都和postForEntity()一样,就不赘述了。

第三种,postForLocation(),该方法实现了以POST请求提交资源,并返回新资源URI,该URI就相同于指定了返回类型。
它也有三种不同的重载方法:

URI postForLocation(URI url, Object request)
URI postForLocation(String url, Object request, Object... uriVariables)
URI postForLocation(String url, Object request, Map<String, ?> uriVariables)

参数都和前面二种一样,就不赘述了。

PUT请求

RestTemplate中对PUT请求,就是通过put方法调用实现的,它有三种不同的重载方法:

void put(URI url, Object request)
void put(String url, Object request, Object... uriVariables)
void put(String url, Object request, Map<String, ?> uriVariables)

请求参数之前GET或POST中都有出现,就不赘述了。

PATCH请求

RestTemplate中对PATCH请求,是通过patchForObject方法调用实现的,它有三种不同的重载方法:

<T> T patchForObject(URI url, Object request, Class<T> responseType)
<T> T patchForObject(String url, Object request, Class<T> responseType,Object... uriVariables)
<T> T patchForObject(String url, Object request, Class<T> responseType,Map<String, ?> uriVariables)

请求参数之前GET或POST中都有出现,就不赘述了。

DELETE请求

RestTemplate中对DELETE请求,就是通过delete方法调用实现的,它有三种不同的重载方法:

void delete(URI url)
void delete(String url, Object... uriVariables)
void delete(String url, Map<String, ?> uriVariables)

方法参数非常简单,一般我们会把请求唯一标示拼接在url中。

源码分析

熟悉Spring的同学肯定知道RestTemplate是Spring自己提供的,那么和客户端负载均衡有关的,貌似就剩下之前没有见过的注解@LoadBalancer了,接下来我们就从它开始,看看Ribbon到底是怎么实现客户端负载均衡的。

首先我们会情不自禁的看下@LoadBalancerClinet注解的源码:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

看完后好像并没有发生什么,就是一个自定义的注解,怎么办?别慌来读一遍上面的官方注释:该注解是用来标记RestTemplate,使其使用LoadBalancerClient(负载均衡的客户端)来配置它。看来LoadBalancerClient有蹊跷,赶紧进去看下。

public interface LoadBalancerClient extends ServiceInstanceChooser {
    <T> T execute(String var1, LoadBalancerRequest<T> var2) throws IOException;

    <T> T execute(String var1, ServiceInstance var2, LoadBalancerRequest<T> var3) throws IOException;

    URI reconstructURI(ServiceInstance var1, URI var2);
}

顺便也看下它继承的接口ServiceInstanceChooser

public interface ServiceInstanceChooser {

    /**
     * Choose a ServiceInstance from the LoadBalancer for the specified service
     * @param serviceId the service id to look up the LoadBalancer
     * @return a ServiceInstance that matches the serviceId
     */
    ServiceInstance choose(String serviceId);
}
  • choose()方法,根据传入的服务名serviceId,从负载均衡器挑选一个对应的服务实例
  • execute()方法,2个重载方法都是使用从负载均衡器中挑选出来服务实例来执行请求的内容
  • reconstructURI(),把服务名称的URI(后一个参数),转换成host+port形式的请求地址(前一个参数)

我们按照习惯,以LoadBalancerClient为引线,整理下和它有关的类图如下:

类图中最关键的类就是LoadBalancerInterceptor,它是由LoadBalancerAutoConfiguration自动化配置类生成的。
它的作用就是对加了@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时拦截客户端的请求时进行拦截,获取需要的真正实例,发起实际请求。
它的源码主要部分如下:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

intercept函数负责拦截客户端请求,然后通过LoadBalancerClient的execute方法获取具体实例发起实际请求。
由上面类图得知LoadBalancerClient是一个接口,我们要看下它的实现类RibbonLoadBalancerClient的execute方法寻找关键。

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
   Server server = getServer(loadBalancer);
   if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
   }
   RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
         serviceId), serverIntrospector(serviceId).getMetadata(server));

   return execute(serviceId, ribbonServer, request);
}
protected Server getServer(ILoadBalancer loadBalancer) {
   if (loadBalancer == null) {
      return null;
   }
   return loadBalancer.chooseServer("default"); // TODO: better handling of key
}

从RibbonLoadBalancerClient的源码中,我们可以发现execute方法一开就就通过getServer()方法,根据传入的服务名来获取具体的服务实例,而获取实例又是依赖ILoadBalancer对象的chooseServer方法。
在获取了服务实例之后,会将server对象包装成一个RibbonServer对象,该对象额外增加了是否使用HTTPS协议,服务名等其它信息。然后使用该对象回调LoadBalancerRequest的apply方法,向一个实际的服务实例发起请求,从而实现以服务名为host的URI请求到host:port形式的实际地址转换。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
   Server server = null;
   if(serviceInstance instanceof RibbonServer) {
      server = ((RibbonServer)serviceInstance).getServer();
   }
   if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
   }

   RibbonLoadBalancerContext context = this.clientFactory
         .getLoadBalancerContext(serviceId);
   RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

   try {
      T returnVal = request.apply(serviceInstance);
      statsRecorder.recordStats(returnVal);
      return returnVal;
   }
... ...

再简单看下ILoadBalancer接口,它其实是我们下面要介绍的一系列各种负载均衡器的接口,我们先简单看下接口中定义的一些抽象操作,具体的在负载均衡器中进行讲解。

public interface ILoadBalancer {

   public void addServers(List<Server> newServers);//向负载均衡器维护的实例清单中增加服务实例
   public Server chooseServer(Object key);//通过某种策略,选择一个具体的服务实例
   public void markServerDown(Server server);//标识出某个服务已经停止服务
   public List<Server> getReachableServers();//获取正常服务的实例清单
   public List<Server> getAllServers();//获取所有服务的实例清单

负载均衡

负载均衡器

我们先看下ILoadBalancer接口的类图:

AbstractLoadBalancer:是接口的抽象实现,主要功能是把实例进行分组并提供获取实例方法。

public enum ServerGroup{
    ALL,//所有实例
    STATUS_UP,//正常服务实例
    STATUS_NOT_UP //停止服务实例       
}

BaseLoadBalancer:是Ribbon负载均衡器的基础实现类,包含的主要是一些基础内容,比如:存储服务实例的列表,检查服务实例是否正常服务的IPing对象,定义负载均衡的处理规则IRule(这也是所有负载均衡策略的接口)对象,选择一个具体服务实例(默认使用线性轮询的方式)等等。

NoOpLoadBalancer:一个什么也不做的负载均衡器。

DynamicServerListLoadBalancer:对BaseLoadBalancer的扩展,在基础功能上增加了动态更新服务实例清单和过滤的功能,仍然使用线性轮询方式进行具体服务实例的选择。

ZoneAwareLoadBalancer:对DynamicServerListLoadBalancer的扩展,增加了按区域的概念。它父类的负载均衡器都是把所有实例视为一个Zone下的节点进行轮询的,所以当我们有多个区域的情况下,势必会造成周期性的跨区域访问问题。使用了该负载均衡器,就可以避免这种问题。我们可以在RibbonClientConfiguration类中查看:

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
      ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
      IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
   if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
      return this.propertiesFactory.get(ILoadBalancer.class, config, name);
   }
   return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
         serverListFilter, serverListUpdater);
}

ZoneAwareLoadBalancer是默认整合时采用的负载均衡器,它内部使用的是ZoneAvoidanceRule策略来实现的,各种负载均衡策略的具体介绍,参考下节内容。

负载均衡策略

整个负载均衡策略相关类的关系图如下:

IRule:所有负载均衡器的统一接口,提供选择负载均衡器功能

AbstractLoadBalancerRule: 定义了负载均衡器ILoadBalancer

RandomRule:随机选择策略,从服务实例清单中随机取一个。

RoundRobinRule:按照线性轮询的方式依次选择每个服务实例的策略。

RetryRule:内部使用了RoundRobinRule负载策略,增加了重试机制,通过时间来控制。

WeightedResponseTimeRule:增加了权重的线性轮询机制,主要由定时任务(默认30秒计算一次)、权重计算(根据实例的响应时间按规则计算)、实例选择三部分功能组成。

ClientConfigEnabledRoundRobinRule:本身也是通过RoundRobinRule来实现策略的,它的主要作用是做为一个父类,使得它所有子类具备线性轮询的功能同时可以和扩展自己的特性。

BestAvailableRule:最空闲实例负载策略,选择一个目前负载量最小的实例。

PredicateBasedRule:具备过滤机制的线性轮询负载策略。

AvailabilityFilteringRule:可用的最大空闲负载策略,这里的可用指的是:1 断路器处于关闭状态 2 并发数是小于阈值(默认2的32次减1)的

ZoneAvoidanceRule:区域选择负载策略,这个策略的逻辑稍微有点复杂,大致规则如下:

  • 为所有Zone区域创建快照
  • 计算出可用区域:首先剔除符合下面这些规则的Zone区域:实例数为0的区域,平均负载小于0的区域,故障了大于阈值(默认0.99999)的区域 ;然后根据平均负载计算出最差的Zone区域;如果上面过程没有符合剔除条件的区域并且平均负载小于阈值(20%),就直接返回所有可用的区域;否则从最坏的区域集合中随机选择一个剔除
  • 当区域数集合不为空,随机选择一个
  • 确定某个区域后,获取区域的服务实例清单轮询选择

在上面的类图中,我把所有策略中最为重要的两个策略用绿色标记了下,为什么我认为他们最重要呢?RoundRobinRule是其他所有负责策略的基础,也是我们通常情况下使用最多的策略;ZoneAvoidanceRule是我们需要对所有微服务实例进行有效管理和最优化实施的关键策略,特别是有跨区域的实例时。

配置详解

自动化配置

由于Ribbon中定义的每一个接口都有多种不同的策略实现,同时这些接口之间又有一定的依赖关系,这使得第一次使用Ribbon的开发者很难上手,不知道如何选择具体的实现策略以及如何组织它们的关系。SpringCloudRibbon中的自动化配置恰恰能够解决这样的痛点,在引入Spring Cloud Ribbon的依赖之后,就能够自动化构建下面这些接口的实现。

  • IClientConfig:Ribbon 的客户端配置, 默认采用com.netflix.client.config.DefaultClientConfigimpl实现
  • IRule:Ribbon 的负载均衡策略, 默认采用com.netflix.loadbalancer.ZoneAvoidanceRule实现,该策略能够在多区域环境下选出最佳区域的实例进行访问
  • IPing:Ribbon的实例检查策略,默认采用com.netflix.loadbalancer.NoOping实现, 该检查策略是一个特殊的实现,实际上它并不会检查实例是否可用, 而是始终返回true, 默认认为所有服务实例都是可用的。
  • ServerList:服务实例清单的维护机制, 默认采用com.netflix.loadbalancer.ConfigurationBasedServerList实现
  • ServerListFilter:服务实例清单过滤机制, 默认采用org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter实现, 该策略能够优先过滤出与请求调用方处于同区域的服务实例。
  • ILoadBalancer:负载均衡器, 默认采用com.netflix.loadbalancer.ZoneAwareLoadBalancer实现, 它具备了区域感知的能力。

上面这些自动化配置内容仅在没有引入Spring Cloud Eureka等服务治理框架时如此,在同时引入Eureka和Ribbon依赖时,自动化配置会有一些不同。如果要看这些类具体的配置了些什么,只要查看对应的类具体代码就可以详细了解。

我们也可以使用配置类方便地替换上面的这些默认实现,比如:

@Configuration
public class MyRibbonConfiguration {

@Bean //默认的NoOping 就不会被创建
public IPing ribbonPing(IClientConfig config) {
return new PingUrl();
    }


@RibbonClient(name = "hello-service", configura七ion = HelloServiceConfiguration.
class) //为服务指定配置
public class RibbonConfiguration {
    }
}

参数配置

支持二种方式的配置:

  • 全局配置
    ribbon.< key>=< value>格式进行配置即可。代表了Ribbon 客户端配置的参数名, < value>则代表了对应参数的
    值。例如:ribbon.ConnectTimeout=250,配置了全局的Ribbon创建连接时间。
    一般建议全局配置可以作为系统的默认配置,客户端配置可以覆盖全局配置,从而实现自定义的一些配置

  • 指定客户端配置
    配置方式采用< client> .ribbon.< key>=< value>的格式进行配置。其中, < key>和< value>的含义同全局配置相同, 而< client>代表了客户端的名称, 比如使用@RibbonClient指定的名称, 也可以将它理解为是一个服务名。

与Eureka结合

当在Spring Cloud的应用中同时引入Spring Cloudribbon和Spring Cloud Eureka依赖时, 会触发Eureka中实现的对ribbon的自动化配置。ServerList的维护机制,IPing的实现都将被覆盖。

在与Spring Cloud Eureka结合使用的时候, 我们的配置将会变得更加简单。不再需要通过类似hello-service.ribbon.listOfServers的参数来指定具体的服务实例清单, 因为Eureka将会为我们维护所有服务的实例清单。而对于Ribbon 的参数配置, 我们依然可以采用之前的两种配置方式来实现, 而指定客户端的配置方式可以直接使用Eureka中的服务名作为来完成针对各个微服务的个性化配置。

SpringCloudRibbon默认实现了区域亲和策略,所以, 我们可以通过Eureka实例的元数据配置来实现区域化的实例配置方案。
将处于不同机房的实例配置成不同的区域值, 以作为跨区域的容错机制实现。例如:

eureka.instance.metadataMap.zone=hangzhou

最后,如果你喜欢或有需要使用Ribbon来维护服务实例,也可以通过参数配置的方式来禁用Eureka对Ribbon服务实例的维护实现。例如:

ribbon.eureka.enabled=false

重试机制

Spring Cloud Eureka实现的服务智力机制强调了CAP原理中的AP(可用性和分区容错性),不同于类似ZK这类强调CP(一致性和分区容错性) 的服务治理框架。Eureka为了可用性,牺牲了一定的一致性。

在极端情况下,它宁愿接收故障的服务实例,也不要丢掉健康实例。比如在注册中心的网络发生故障时,CP优先的服务治理将会把所有的服务实例全部剔除,而Eureka则会因为超过85%的实例丢掉心跳触发保护机制,注册中心会保留此时的所有节点。

所以当服务调用到故障实例的时候,我们希望能够增强对这类问题的容错,这时可以使用Spring Retry(SpringCloud已经做了整合)来增强RestTemplate的重试能力。

要实现上面所说的重试机制我们只要增加相关配置就行:

spring.cloud.loadbalancer.retry.enabled = true //开启重试机制

然后是对具体服务进行一些超时时间的配置:

hystrix.command.default.execution.isolation.thread.timeoutinMilliseconds=l0000 //断路器超时时间
xxxservice.ribbon.ConnectTimeout=250 //请求连接超时时间
xxxservice.ribbon.ReadTimeout= l000 //请求处理的超时时间
xxxservice.ribbon.OkToRetryOnAllOperations=true //对所有操作请求都进行重试
xxxservice.ribbon.MaxAutoReriesNextServer=2 //切换实例的重试次数
xxxservice.ribbon.MaxAutoRetries=1   //对当前实例的重试次数

注意断路器的超时时间需要大于Ribbon的超时时间,不然不会触发重试,原理很简单,因为先触发断路器的逻辑了。

SpringCloud微服务系列(2)

hzqiuxm阅读(256)评论(0)

注册中心Eureka

起源

Spring Cloud Eureka是Spring Cloud Netflix微服务套件中的一部分,它基于Netflix Eureka做了二次封装,主要负责完成微服务架构中的服务治理功能。

基本介绍

为什么需要注册中心

微服务的早期,我们可以通过一些静态配置或者软件负载来完成服务之间的负载均衡调用,但随着微服务的不断增加,静态配置就会暴露出一些问题:

  • 静态配置维护越来越复杂
  • 集群规模越来越大,人工维护成本高
  • 服务的位置可能会发生变化,灵活性差,调整成本高
  • 服务的名称都有可能发生变化,难以维护

所以为了有效解决以上的问题,我们需要引入服务治理。服务治理一般包含三个角色:

  • 服务注册中心:每个服务都要在注册中心进行注册登记自己的服务(主机,端口,版本,协议等),服务中心会提供心跳维护。
  • 服务提供者:提供服务的一方,就是服务的被调用方
  • 服务消费者:服务的调用方,当然本身也可以是其他服务的提供者

也就是说,在服务治理的框架下,服务间的调用不再通过指定具体的实例地址来实现,而是通过服务名来实现。
服务的调用方在注册中心查询到可用的服务清单后,可以采用不同的负载均衡方式进行调用。

Spring Cloud Eureka能做什么

  • 既包含了服务端组件(作为注册中心),又包含了客户端组件(作为服务提供者,处理服务的注册与发现)
  • 服务端和客户端均以java实现,非常适合通过java实现的分布式系统或与JVM兼容的其他语言构成的系统
  • 提供了完备RESTful API,支持非java语言构建的微服务纳入进来,不过其他语言要实现自己的客户端程序(很多语言都已经有实现)

核心知识

搭建服务中心Eureka Server

依赖于:spring-cloud-starter-eureka-server (默认是服务端和客户端为一体的)

关键注解
  • 在启动类上加上@EnableEurekaServer注解,使其成为注册中心
关键配置
  • 注册中心服务端口
  • 注册中心服务实例名称
  • 默认客户端配置是打开的,注册中心也会将自己作为客户端来尝试注册自己,所以我们需要禁用它的客户端注册行为
  • 如果不是高可用的多中心配置,也不需要和其他注册中心进行同步,检索其他注册中心服务也要禁用

一个单中心注册服务配置示例:

server:
  port: 8761

eureka:
  instance:
    hostname: localhost
  # 每个Eureka server 也是一个client 所以把client相关配置关闭掉,只作为服务使用
  client:
    # 只作为服务端
    registerWithEureka: false
    # 不需要同其他的注册中心同步信息
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
  # 关闭注册中心的保护模式,如果90秒收不到心跳信息,将销毁某个注册者信息,
  # 开启保护模式时,即使服务提供者宕机或无法提供服务,注册中心仍然会保留注册信息
  server:
    enable-self-preservation: false

客户端注册到注册中心

  • 客户端的依赖和服务端一样
  • 客户端可以使用@EnableEurekaClient注解,也可以使用@EnableDiscoveryClient注解注册为服务的客户端
  • 二者区别:当Eureka在classpath下的话,二者没有区别。@EnableDiscoveryClient可以支持其他的服务发现组件,比如zk
  • 最后客户端只要添加以下配置就可以注册到服务端注册中心了
eureka:
  client:
    service-url:
      #注册中心地址
      defaultZone: http://localhost:8761/eureka/

  # 以IP地址方式注册,默认是hostname
  instance:
    ip-address: true
server:
  port: 8762
# service name
spring:
  application:
    name: service-hi

我们给客户端添加一个简单的hello world服务

@RestController
@EnableEurekaClient
@SpringBootApplication
public class EurekaclientApplication {

   public static void main(String[] args) {
      SpringApplication.run(EurekaclientApplication.class, args);
   }

   @Value("${server.port}")
   String port;

   @RequestMapping("/hi")
   public String home(@RequestParam String name){

      return "hi " + name +", i am from port" + port;
   }
}

#### 简单测试

分别启动服务端和客户端

服务端,端口:8761
java -jar eureka-server-1.0.0.jar 

客户端,端口:8762
java -jar eureka-client-1.0.0.jar 

访问:localhost:8761可以查看服务端注册中心的监控页面,看到我们的客户端服务已经注册到注册中心了

尝试通过url,http://localhost:8762?name=hziquxm 来访问下服务,客户端的服务则会返回

hi hzqiuxm, i am from port8762

Eureka Server 的高可用

在分布式服务应用中,高可用是必须考虑的事情,我们注册中心也需要具备高可用才行,所以下面简单介绍下注册中心的高可用

基本思想
  • 实现思想:所有节点是服务提供方,也是服务消费方,服务中心也一样,多个Eureka之间相互注册实现高可用

  • 建议按照优先级命名方式比如:profile:primary ,secondary, tertiary,当然也可以采用自己喜欢的简单命名方式,比如:ha1,ha2......

高可用注册中心架构图:

注意:Eureka微服务客户端只要注册到其中一个服务端即可

主要配置
  • 配置关键点:每个Eureka名字相同, 实例名称不同,端口号只要不在一台主机上建议都设置相同,便于统一管理
  • 不同中心启动的时候指定对应的配置文件中不同的段,例如:java -jar eureka-server-1.0.0.jar -- spring.profiles.active=ha1

一个高可用注册中心服务配置示例:

# ha service name
spring:
  application:
    name: service-hi-ha
---
spring:
  profiles: ha1
server:
  port: 8761
eureka:
  instance:
    # profiles = ha1
    hostname: ha1
  client:
    serviceUrl:
      # 将自己注册到ha2
      defaultZone: http://ha2:8771/eureka/

---
spring:
  profiles: ha2
server:
  port: 8771
eureka:
  instance:
    # profiles = ha2
    hostname: ha2
  client:
    serviceUrl:
      # 将自己注册到ha1
      defaultZone: http://ha1:8761/eureka/

简单示例

我们分别启动二个注册中心(其实从数学理论角度看,3个是最佳的,3个的配置也很简单,就是在其中一个中心注册地址defaultZone后面加上另外2个中心地址即可,用逗号分开):
为了演示方便,我这里就举2个中心的例子

注册中心1,端口:8761
java -jar eureka-server-1.0.0.jar -- spring.profiles.active=ha1

注册中心2,端口:8771
java -jar eureka-server-1.0.0.jar -- spring.profiles.active=ha2

客户端服务,端口:8762,只注册到8761
java -jar eureka-client-1.0.0.jar 

我们访问8761和8771的注册中心监视界面:
下面是8761

数字1:此列出了注册到8761上的实例,发现除了客户端服务外,注册中心本身
数字2:此处指出了8761注册中心注册到了8771端口的注册中心
数字3:此处指出了8761的备份节点是8771

下面是8771

数字1:可以发现我们之前客户端只是注册到了8761端口的注册中心,但在此处,也可以发现此实例
数字2:此处指出了8771注册中心注册到了8761端口的注册中心
数字3:此处指出了8771的备份节点是8761

组件详解

基础架构与通信行为

Eureka作为服务治理框架,其基础架构主要包含了三个核心要素

  • 服务注册中心:就是本节中的Eureka服务端,又称之为注册中心
  • 服务生产者:就是本节中的Eureka客户端,扮演作用是服务的提供者
  • 服务消费者:本节上面没有演示,其实也是Eureka客户端,扮演的作用是服务的消费者

注意:上面的服务提供者和服务消费者在实际应用中并不是单一职责的,服务B可能是服务A的提供者,同时也可能是服务C的消费者,是一个相对的概念。

下图是三者关系调用图:

服务生产者:主要有三种操作,服务注册、服务续约、服务下线

服务注册,服务的提供者在启动的时候通过发送REST请求将自己注册到Eureka Server上。Eureka Server接收到这个信息后,
会把发送请求中关于服务提供者的元数据信息存放在一个双层的Map中。

类似下面的结构:第一层key是服务名,value是这个服务下的所有实例;第二层key是实例名,value是具体实例元数据信息

因为上图中的架构是高可用的架构,所以注册中心之间还会有个服务同步的操作,在一方注册的服务提供者信息会被同步到另一方的注册中心。

通过服务同步,服务提供者的服务就可以从这两台注册中心中的任意一台上获得,从而实现了高可用。

服务续约,在注册完服务后,服务提供者会维护一个心跳来持续告诉注册中心它还活着,以防止注册中心从服务列表中剔除没有心跳的服务实例。

服务续约的两个重要属性是

EurekaInstanceConfigBean类
private int leaseRenewalIntervalInSeconds = 30; //续约服务调用间隔时间
private int leaseExpirationDurationInSeconds = 90;//定义服务失效时间

服务下线,在服务关闭时候,会触发一个服务下线的REST请求给注册中心,注册中心收到请求后,将该服务状态置为下线,
并把该下线事件传播出去(同步给其他注册中心或以及通知服务消费者)。

服务的消费者:主要操作二个,获取服务、服务调用。

获取服务,服务消费者在启动的时候会发送一个REST请求给服务注册中心,获取在上面注册的服务清单。
出于性能考虑,注册中心Eureka Server只会维护一份只读的清单缓存用来返回给客户端使用,清单默认每隔30秒刷新一次

EurekaClientConfigBean类
private int registryFetchIntervalSeconds = 30; //刷新时间配置的属性与默认值

服务调用,服务消费者在获取了服务清单后,通过服务名可以获得具体提供服务的实例名和元数据信息(参考上面的双层Map图)。
客户端可以根据自己的需要来决定具体调用哪个,所以一般我们会在服务消费者端集成类似Ribbon,Feign这样的负载工具。

这里需要补充的是对于访问实例的选择,Eureka中有Region和Zone概念,它们的关系如下图所示:

一个Region中可以包含多个Zone,Zone中包含了服务的实例。细心的读者会发现我们之前的配置文件中有这么一段

  client:
    serviceUrl:
      # 将自己注册到ha1
      defaultZone: http://ha1:8761/eureka/

这里的defaultZone就是服务默认注册的Zone,我们也可以自己设置Region和Zone

EurekaClientConfigBean类
private String region = "us-east-1";//默认的region名字
private Map<String, String> availabilityZones = new HashMap<>();//多个Zone用逗号分开

消费者在进行服务调用时,优先访问同一个Zone的服务,若访问不到就会访问其他Zone。

最佳实践提醒:利用上面这个特点我们可以用一个Zone代表一个物理区域(物理主机或集群),设计出具备区域故障容错的微服务集群。

关键源码分析

我们把一个普通的SpringBoot应用注册到Eureka Server注册中心时,主要做了两件事:

  • 在应用主类中配置了@EnableDiscoveryClient或者@EnableEurekaClient注解
  • 在配置文件中用eureka.client.serviceUrl.defaultZone参数指定了服务中心的位置

那这一切是如何发生的呢?我们顺着这两个线索,一起去看看它们背后的实现原理。

首先我们看下@EnableDiscoveryClient注解的源码:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

   /**
    * If true, the ServiceRegistry will automatically register the local server.
    */
   boolean autoRegister() default true;
}

从上面的代码中我们可以看出来,它主要用来开启DiscoveryClient实例。由这个类我们梳理下和它相关的类,以及他们之间的关系如下:

为了让大家看清楚它们之间的关系我用两种不同的颜色区分了Netflix包下的类和SpringCloud包下面的类。真正实现发现服务的
则是Netflix 包中的com.netflix.discovery.DiscoveryClient 类,我们就来详细看看DiscoveryClient 类功能吧,它的主要作用就是与注册中心Eureka Server进行交互,上一节中我们说了它的功能主要有:向注册中心注册服务实例、向注册中心服务租约、服务关闭时取消租约、查询注册中心服务实例列表。

DiscoveryClient 中提供下非常多的方法,在这里就不一一说明,举上面的注册来说说吧,希望大家可以举一反三。

通过查看它的构造类, 可以找到它调用了下面这个函数:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        ...

    if (clientConfig.shouldRegisterWithEureka()) { //注释一
        ...

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
    ...
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

大家可以看到在注释一处判断了是否要注册到注册中心,条件为真后创建了一个InstanceInfoReplicator实例,它实现了Runnable接口,所以会启动一个线程来处理。

果然在后面的代码中InstanceInfoReplicator实例启动了start方法,我们赶紧去它的Run方法里看下,它启动后干了点什么:

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();//注释二
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

不负所望,在它的run方法里注释二处,我们看到了discoveryClient.register(),这一行真正触发了注册的动作。我们再进入注册方法中看看:

/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

相信一切都真相大白了,注册操作通过REST请求方式进行,注册时传入的instanceInfo就是客户端给服务端的元数据,如果你对元数据看兴趣的话,就进入InstanceInfo中去看看吧。

配置详解

服务注册类

这部分主要负责配置注册中心的地址、服务获取的间隔时间、可用区域等。
服务注册类的配置,我们可以查看源码中的org.springframemwork.cloud.netflix.eureka.EurekaClientConfigBean类,该类中的属性基本都是可以进行配置的,比看官方的文档还要全。
比如上面我们提到的属性:registryFetchIntervalSeconds = 30,用来设置缓存中服务清单刷新时间,30表示默认值。我们如果配置成100秒的话可以这么配置:

eureka.client.registry-fetch-interval-seconds=100
服务实例类

这部分主要负责配置服务实例的名称、IP地址、端口号、健康检查路径等。
服务实例类配置我们可以查看源码中的org.springframemwork.cloud.netflix.eureka.EurekaInstanceConfigBean类。
比如我们上面提到的属性:leaseRenewalIntervalInSeconds = 30,用来设置续约时间,30表示默认值。我们如果要配置成90秒的话,可以:

eureka.client.lease-renewal-interval-in-seconds=90

跨平台支持

其他语言客户端

因为采用了HTTP的REST接口方式,使得Eureka Server下注册的微服务不限于使用Java开发。
除了Java实现了Eureka的客户端外,有JS的实现eureka-js-client,Python的实现python-eureka,即使是你自己来为某门语言来开一个客户端,
也并不是十分复杂,只需要根据上面提到的那些用户服务协调的通信请求实现就能实现服务的注册与发现,有兴趣的同学可以参考官方的API。

通信协议

默认情况下,Eureka使用Jersey和XStream配合JSON作为Server和Clinet之间的通信协议。
Jersey是JAX-RS规范的参考实现,主要包含:核心服务器,核心客户端,集成三个部分。
XStream是用来将对象序列化或反序列化操作一个Java类库。

Keepalived简明介绍

hzqiuxm阅读(272)评论(0)

介绍

一款用于保障服务高可用的软件,自动侦测服务器状态,移除故障服务器,切换到正常服务器,添加恢复后的服务器到集群

实现的基本思路

基于VRRP协议的实现,主要用在IP层,TCP层,应用层

VRRP协议解决问题

在现实网路中,两台服务器之间通常不会直连,那A和B如何路由呢?
1. 在A上使用动态路由协议(问题:管理维护成本大,设备需要支持动态路由协议)不推荐
2. 在A上配置静态路由(问题:路由器或默认的网关成单点,需重启网络)

VRRP(Virtual Router Redundancy Protocol 虚拟路由冗余协议)解决你的单点故障问题

1.它是一个选择协议,把一个虚拟路由器的职责,动态转移给Master进行处理
2.是一种路由容错协议或备份路由协议。当Master宕机后,虚拟路由将启用备份路由器

名词解释

1.VRRP路由器 物理路由器,上面运行着实现VRRP协议的程序
2.VRRP虚拟路由器 逻辑上的路由器,通常由多台物理路由器组成,可以看成一个路由器池 由VRID来标示,范围0-255
3.Master和Backup: 一个虚拟路由里的多个路由器并不是同时工作的,工作的叫Master,其他的为Backup

工作机制

1.路由器开启VRRP功能后,会根据优先级确认Master
2.Master会通过IP多播包的形式来发送公告报文,Backup会接收到这些报文
3.如果是抢占式:Backup会跟Master比较优先级,如果大于Master,互换身份
4.如果是非抢占式,只要Master能正常工作,不会出现新Master (一般使用该种)
5.如果备份服务器在连续三个公告间隔内收不到VRRP公告,或收到优先级为0的公告,会按照竞选协议选出新的Master,以保证服务可用

VRRP负载分担

多台路由器同时承担业务,避免设备闲置
让同一台路由器可以加入到备份组,在不同组中优先级不同。使其在不同组担任不同角色。

安装演示Keepalived

一:下载并安装IPVS

下载的版本要和自己主机内核一致
内核查看 cat /proc/version

下载地址: http://www.linuxvirtualserver.org/software/ipvs.html

安装之前要创建一个软连接 In -sv /usr/src/kernels/3.10.0-123.9.3.el7.x86_64 /usr/src/linux

然后make && make install
ipvsadm 命令被安装到/sbin下面
输入ipvsadm 命令 检查是否安装成功

二.下载并安装Keepalived

下载地址:http://www.keepalived.org/

下载源码来安装
1. ./configure --prefix=/usr/java/keepalived
如果出现NO SO_MARK in headers 这样的错误提示,可以在命令上添加 --disable-fwmark
如果要使用LVS,还需要指定内核的目录,添加 --with-kernel-dir=具体内核路径,以指定使用内核源码里的文件

2.make && make install
3.验证安装
a.到sbin下,执行keepalived 命令
b.查看进程 ps -aux|grep keepalived 应该有三个进程
c.查看内核模块ip_vs是否被加载到内核空间, lsmod |grep ip_vs
d.执行 tail -f /var/log/messages来查看日志
e.执行pkill keepalived来关闭

注意:keepalived 配置文件是没有语法检查的,修改的时候千万要小心

Keepalived 体系结构图

keepalived配置

分为:全局配置、VRRPD配置、LVS配置

全局配置:全局定义与静态路由 关键是routerid配置,其他一般很少配置
VRRPD配置:同步组,实例配置
LVS配置:如果启用LVS就需要配置,不启用就无需配置。

keepalived+nginx的HA

让Keepalived监控Nginx的状态,当某台Nginx宕机后切换到其他的Nginx上
需要使用到shell脚本,监听本地端口的80端口,如果本地80端口不通,那么keepalived就自杀,这样master自动就会切换到其他Bankup上,升级为Master

上面脚本尝试了一次重启

需要在keepalived.conf配置中增加上面脚本的调用,在VRRP实例配置前 ,在实例配置之后,增加脚本检查的配置,这样Keepalived在启动的时候就会检查一次

keepalived 做HA思路

和具体的应用和服务没有关系,仿造keepalived和nginx的思路,可以实现:
1.keepalived + varnish
2.keepalived + tomcat
3.keepalived + redis
4.keepalived + mysql
.... ...

示例varnish的检测脚本

LVS+ Keepalived介绍
LVS可用来实现LINUX下的简单负载均衡
工作在四层,其转发依赖于四层协议的特征进行转发的, 需要内核的TCP/IP协议栈进行过滤筛选,而这样的过滤转发规则可由管理员对内核进行定义

三种负载均衡转发机制

1.NAT (Network Address Translation)网络地址翻译技术
类似IP的负载均衡技术

2.TUN (IP Tunneling) IP隧道技术
类似数据链路负载均衡策略

3.DR(Direct Routing) 直接路由技术
通过改写请求报文的Mac地址,一般采用该种,要求:调度器与真实服务器在同一物理网段上

负载均衡调度算法

1.RR (Round Robin) 轮询
2.wrr (Weight Round Robin) 加权轮询
3.lc(Least Connections)最少连接
4.wlc(weight Least Connections)加权最小连接
5.dh(Destination hashing) 目标地址hash
6.sh(Source hashing) 源地址hash
7.sed (Shortest Expected Delay)最短期望的延迟
8.NQ(Never Queue) 最少排队
9.LBLC(Location-Based Least-Connection) 基于局部的最少连接,优先使用上一次的
10.LBLCR(Location-Based Least-Connection with replication Scheduling) 使用上次使用的服务器同一组中的

注意点

KVS+Keepalived其实就是把LVS看做是Nginx或者varnish
配置的时候只需要在keepalived中去配置,无需单独配置LVS
全局配置和VRRP配置是一样的,增加关于LVS配置
主、备服务器配置是一样的
LVS+DR模式中,只支持IP的转发,不支持端口转发。virtual_server和real_server配置节点中端口必须一致

AMQ简明教程(12)

hzqiuxm阅读(203)评论(0)

AMQ集群

Queue consumer clusters

ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。
如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。
因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接

Broker clusters

大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。实现以上行为需要用failover协议作为Client。

如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。

这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。

特别注意:ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在netWorkConnector配置的时候,设置duplex=true 就可以了。

消息会较为平均的分配给2个集群,而不是每个消费者。即使某个消费者集群的消费者比其他集群中多,它获得的消息总数仍然差不多。不适合机器性能不均等的架构。

原因:networkConnector配置的可用属性conduitSubscriptions :默认true,标示是否把同一个broker的多个consumer当做一个来处理

负载均衡的时候一般设置为false, 设置为false后,会按照消费者个数来分配。

Master Slave

在5.9的版本里面,废除了Pure Master Slave的方式,目前支持:

1:Shared File System Master Slave:基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了,某个抢到文件锁的slave变成master

2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master

3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的
具体的可以到官方察看: http://activemq.apache.org/masterslave.html

JDBC Master Slave的方式

利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获
得独有锁,其它Slaves Broker则等待获取独有锁。
推荐客户端使用Failover来链接Brokers。
具体如下图所示:

Master失败
如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。同时,Client将停止链接之
前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示

Master重启
任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示

JDBC Master Slave的配置

使用来配置消息的持久化,自动就会使用JDBC MasterSlave的方式。

AMQ简明教程(11)

hzqiuxm阅读(227)评论(0)

AMQ动态网络链接

多播协议multicast

ActiveMQ使用Multicast 协议将一个Service和其他的Broker的Service连接起来。IPmulticast是一个被用于网络中传输数据到其它一组接收者的技术。

Ip multicast传统的概念称为组地址。组地址是ip地址在224.0.0.0到239.255.255.255之间的ip地址。

ActiveMQ broker使用multicast协议去建立服务与远程的broker的服务的网络链接。

基本的格式配置

multicast://ipadaddress:port?transportOptions

transportOptions如下:
1:group:表示唯一的组名称,缺省值default
2:minmumWireFormatVersion:被允许的最小的wireformat版本,缺省为0
3:trace:是否追踪记录日志,默认false
4:useLocalHost:表示本地机器的名称是否为localhost,默认true
5:datagramSize:特定的数据大小,默认值4 * 1024
6:timeToLive:消息的生命周期,默认值-1
7:loopBackMode:是否启用loopback模式,默认false
8:wireFormat:默认用wireFormat命名
9:wireFormat.*:前缀是wireFormat

配置示例

1:默认配置,请注意,默认情况下是不可靠的多播,数据包可能会丢失
multicast://default
2:特定的ip和端口
multicast://224.1.2.3:6255
3:特定的ip和端口以及组名
multicast://224.1.2.3:6255?group=mygroupname

Activemq使用multicast协议的配置格式如下

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data">
<networkConnectors>
<networkConnector name="default-nc" uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
</transportConnectors>
</broker>

上面的配置说明
1:uri=“multicast://default”中的default是activemq默认的ip,默认动态的寻找地址
2:“discoveryUri”是指在transport中用multicast的default的地址传递
3:“uri”指动态寻找可利用的地址
4:如何防止自动的寻找地址?
(1)名称为openwire的transport,移除discoveryUri=”multicast://default”即可。传输链接用
默认的名称openwire来配置broker的tcp多点链接,这将允许其它broker能够自动发现和链接到可用的broker中。
(2)名称为“default-nc”的networkConnector,注释掉或者删除即可。ActiveMQ默认的networkConnector基于multicast协议的链接的默认名称是default-nc,
而且自动的去发现其他broker。去停止这种行为,只需要注销或者删除掉default-nc网络链接。
(3)使brokerName的名字唯一,可以唯一识别Broker的实例,默认是localhost

Multicast 协议和普通的tcp协议
它们是差不多的,不同的是Multicast能够自动的发现其他broker,从而替代了使用static功能列表brokers。

用multicast协议可以在网络中频繁的添加和删除ip不会有影响。
multicast协议的好处是:能够适应动态变化的地址。缺点:自动的链接地址和过度的销耗网络资源。

Discovery协议

Discovery是在multicast协议的功能上定义的。功能类似与failover功能。它将动态的发现multicast 协议的broker的链接并且随机的链接其中一个broker。

基本配置格式如下:

discovery:(discoveryAgentURI)?transportOptions

transportOptions如下:
1:reconnectDelay:再次寻址等待时间,缺省值10
2:initialReconnectDelay:初始化设定再次寻址等待时间,缺省值10
3:maxReconnectDelay:最大寻址等待时间, 缺省值30000
4:useExponentialBackOff:是否尝试BackOff重链接,默认是true
5:backOffMultiplier:尝试Backoff的次数,默认是2
6:maxReconnectAttempts:如果异常,最大的重新链接个数,默认是0
7:group:组唯一的地址,默认是default

示例:

discovery:(multicast://default)?initialReconnectDelay=100

Discovery协议的配置示例

<broker name="foo">
<transportConnectors>
<transportConnector uri="tcp://localhost:0"
discoveryUri="multicast://default"/>
</transportConnectors>
</broker>

Peer协议(适用内嵌的方式)
ActiveMQ提出了peer transport connector 以让你更加容易的去嵌入broker中网络中。
它将创建一个优于vm链接的p2p网络链接。
默认格式如下:peer://peergroup/brokerName?key=value

Peer协议基本使用
当我们启动了用peer协议时,应用将自动的启动内嵌broker,也将会自动的去配置
其它broker来建立链接,当然了,前提是必须属于一个组。 配置如下:

peer://groupa/broker1?persistent=false

另外,生产者和消费者都各自链接到嵌入到自己应用的broker,并且在在本地的同一个组名中相互访问数据。

Peer协议的基本原理示意图

在本地机器断网的情况下,本地的client访问本地brokerA将任然正常。
在断网的情况下发送消息到本地brokerA,然后网路链接正常后,所有的消息将重新发送并链接到brokerB

Fanout协议

Fanout协议是同时链接多个broker,默认的格式如下:

fanout:(fanoutURI)?key=value
示例:fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616))

表示client将试图链接到三个static列表中定义的三个URI

Fanout协议的配置方式如下:
fanout:(discoveryURI)?transportOptions
transportOptions如下:
1:initialReconnectDelay:重新链接的等待时间,默认是10
2:maxReconnectDelay:最大重新链接的等待时间,默认是30000
3:useExponentialBackOff:是否尝试BackOff重链接,默认是true
4:backOffMultiplier:尝试Backoff的次数,默认是2
5:maxReconnectAttempts:如果异常,最大的重新链接个数,默认是0
6:fanOutQueues:是否将topic消息转换queue消息,默认false
7:minAckCount:Broker链接的最小数,默认是2

配置示例:

fanout:(static:(tcp://localhost:61616,tcp://remotehost:61616))?initialReconnectDelay=100

特别提醒
Activemq不推荐使Consumer使用fanout协议。当Provider发送消息到多个broker中,测试Consumer可能收到重复的消息

AMQ简明教程(10)

hzqiuxm阅读(305)评论(0)

AMQ的静态网络链接与容错链接

一台机器上启动多个broker步骤如下:

1:把整个conf文件夹复制一份,比如叫做conf2
2:修改里面的activemq.xml文件
(1)里面的brokerName 不能跟原来的重复
(2)数据存放的文件名称不能重复,比如:

<kahaDB directory="${activemq.data}/kahadb_2"/>

(3)所有涉及的transportConnectors 的端口,都要跟前面的不一样
3:修改jetty.xml,主要就是修改端口,比如:

<property name=“port” value=“8181”/> 端口必须和前面的不一样

4:到bin下面,复制一个activemq,比如叫做activemq2:
(1)修改程序的id,不能和前面的重复ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2-hostname.pid"
(2)修改配置文件路径ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"
(3)修改端口,里面有个tcp的61616的端口,要改成不一样的,最好跟activemq.xml里面的tcp的端口一致
(4)然后就可以执行了,如果执行没有权限的话,就授权:chmod 751 activemq2

ActiveMQ的networkConnector是什么

在某些场景下,需要多个ActiveMQ的Broker做集群,那么就涉及到Broker到Broker的通信,这个
被称为ActiveMQ的networkConnector。
ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一Broker在另一
端接收消息。这就是所谓的“桥接”。 ActiveMQ也支持双向链接,创建一个双向的通道对于两个
Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下:

“discovery“的概念
一般情况下,discovery是被用来发现远程的服务,客户端通常想去发现所有可利用的brokers;另一层意思,它是基于现有的网络Broker去发现其他可用的Brokers。

有两种配置Client到Broker的链接方式,一种方式:Client通过Statically配置的方式去连接Broker,一种方式:Client通过discovery agents来dynamically的发现Brokers

Static networks

Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker。这
种协议用于复合url,一个复合url包括多个url地址。格式如下:
static:(uri1,uri2,uri3,...)?key=value
1:配置示例如下:

<networkConnectors>
<networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/>
</networkConnectors>

Static networkConnector的基本原理示意图:

上图中,两个Brokers是通过一个static的协议来网络链接的。一个Consumer链接到brokerB的一个地址上 ,当Producer在brokerA上以相同的地址
发送消息时,此时它将被转移到brokerB上。也就是,BrokerA会转发消息到BrokerB上。

networkConnector配置的可用属性:
1:name:默认是bridge
2:dynamicOnly:默认是false,如果为true, 持久订时才创建对应的网路持久订阅。默认是启动时激活
3:decreaseNetworkConsumerPriority:默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0

一般的静态桥接的消费者消费的比本地消费者来的更快(桥接方的消费者被偏爱了),这一点要注意

4:networkTTL :默认是1 ,网络中用于消息和订阅消费的broker数量
5:messageTTL :默认是1 ,网络中用于消息的broker数量
6:consumerTTL:默认是1 ,网络中用于消费的broker数量
7:conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理

负载均衡的时候一般设置为false,否则负载均衡可能失效,因为所有的消费者被看成一个后,可能所有消息都给某一个消费者处理了

8:dynamicallyIncludedDestinations :默认为空,要包括的动态消息地址,类似于excludedDestinations,过滤一部分消息转发给桥接的broker
过滤要转移的消息如:

<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>

9:staticallyIncludedDestinations :默认为空,要包括的静态消息地址。类似于excludedDestinations,如:

<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
</staticallyIncludedDestinations>

10:excludedDestinations :默认为空,指定排除的地址,示例如下:

<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)"
name="bridge" dynamicOnly="false" conduitSubscriptions="true"
decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
<topic physicalName="always.include.topic"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>

11:duplex :默认false,设置是否能双向通信

两个broker消息互通

12:prefetchSize :默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息
13:suppressDuplicateQueueSubscriptions:默认false,如果为true, 重复的订阅关系一产生即被阻止
14:bridgeTempDestinations :默认true,是否广播advisory messages来创建临时destination
15:alwaysSyncSend :默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker。
16:staticBridge :默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。

多线程consumer访问集群

static静态桥接的consumer具有的优先更高, 例子:A--->B 消息发送到A上,A上的消费者与B上的消费者相比,B上的优先级更高

集群下的消息回流功能

“丢失”的消息
有这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,
消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转
发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2
上去了,但是有一部分他们还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好
像是消失了,除非有消费者重新连接到broker1上来消费。(即时配置成双向连接,也是取不到的)

怎么办呢?
从5.6版起,在destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1
上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为
false,为了防止消息回流后被当做重复消息而不被分发,示例如下:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

注意:需要回流的broker中双方都要配置

容错的连接Failover Protocol

前面讲述的都是Client配置链接到指定的broker上。但是,如果Broker的链接失败怎么办呢?此时,Client有两个选项:要么立刻死掉,要么去连接到其它的broker上。

Failover协议实现了自动重新链接的逻辑。
这里有两种方式提供了稳定的brokers列表对于Client链接。
第一种方式:提供一个static的可用的Brokers列表。第二种方式:提供一个dynamic 发现的可用Brokers。
Failover Protocol 的配置方式

failover:(uri1,...,uriN)?key=value 或者 failover:uri1,...,uriN

Failover Protocol 的默认配置
默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其他的Broker上。

默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。当然所有的重新链接参数都可以根据应用的需要而配置。

Failover Protocol 的使用示例,在客户端程序里面:

ConnectionFactory connectionFactory = newActiveMQConnectionFactory("failover:(tcp://192.168.1.106:61679,tcp://192.168.1.106:61819)?randomize=false");

如果randomize = true 比较平均的交给可以用的broker,但是在false的情况下,只要第一个broker有效,就会一直往第一个发

Failover Protocol 可用的配置参数

1:initialReconnectDelay:在第一次尝试重连之前等待的时间长度(毫秒),默认10
2:maxReconnectDelay:最长重连的时间间隔(毫秒),默认30000
3:useExponentialBackOff:重连时间间隔是否以指数形式增长,默认true
4:backOffMultiplier:递增倍数,默认2.0
5:maxReconnectAttempts: 默认-1|0,自版本5.6起:-1为默认值,代表不限重试次数;0代表从不重试
(只尝试连接一次,并不重连),5.6以前的版本:0为默认值,代表不限重试次数所有版本:如果设置为大于0的数,代表最大重试次数
6:startupMaxReconnectAttempts:初始化时的最大重连次数。一旦连接上,将使用maxReconnectAttempts的配置,默认0
7:randomize:使用随机链接,以达到负载均衡的目的,默认true
8:backup:提前初始化一个未使用连接,以便进行快速失败转移,默认false
9:timeout:设置发送操作的超时时间(毫秒),默认-1
10:trackMessages:设置是否缓存[故障发生时]尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺利传送,默认false
11:maxCacheSize:当trackMessages启用时,缓存的最大字节,默认为128*1024bytes
12:updateURIsSupported:设定是否可以动态修改broker uri(自版本5.4起),默认true

技术架构指导原则

hzqiuxm阅读(222)评论(0)

技术架构设计原则

1 大道至简

避免过度设计

  • 努力把代码写得通俗易懂
  • 一个工程师好不好就看他能多快简化一个复杂问题,并构建易于理解的解决方案
  • 方案好不好的一个检验标准就是看其他人理解的快不快

DID设计方法

  • Design设计20倍的容量
  • Implement 实施3倍的容量
  • Deploy部署1.5倍的容量

三步简化方案 how to DiD?

  • 采用帕累托原则(2-8)简化范围(如何简化方案)

    先做那能产生80%效益的20%产品功能

  • 结合成本与可扩展性简化设计(如何简化设计)

    例如不需要的数据列就不要查询出来

  • 依靠专家经验简化部署(如何简化部署)

    使用成熟的开源解决方案

减少域名解析

  • 网站域名资源不要太多,合理利用各级缓存

减少页面目标

  • 合并CSS JS 图片
  • 压缩各种资源
  • 不同资源分配在不同的域名上

采用同构网络

  • 确保交换机和路由器源于同一供应商

2 分轴扩展

三维坐标扩展法,X,Y,X轴

X轴扩展

X轴:水平扩展:WEB集群,读写分离

实施速度快,研发成本低,事务处理扩展效果好,运营成本高

Y轴拆分

Y轴:垂直扩展:模块化,分库

从两个维度去分析扩展:功能或资源,一般团队逐渐变大后,业务比重增加,技术比重下降,生产力下降,所以采用专人负责专门模块可以减少业务比重投入,提升生产力

Z轴拆分

Z轴:又称分片:分表,SOA,微服务

一般根据一些独特属性:ID,位置,基础服务等进行拆分

水平扩展(向外扩展)

  • 拆分或复制服务与数据库来扩展系统,而不是向上扩展(利用更好的服务器性能)
  • 摩尔定律VS安迪比尔定律
  • 利用三维坐标扩展法为指导

金鱼而非汗血宝马

  • 使用普通PC服务器即可,不要使用性能成本很高的商业服务器
  • 金鱼便宜,可以轻易丢弃不用修复;宝马太贵维护成本过高

托管方案扩展

  • 系统部署到三个或更多的数据中心
  • 三个数据中心是最成本最优的方案
  • 避免拥有自己的数据中心,直到公司的规模大到可以通过建设和运行自己的数据中心来节省成本变为可能

利用云服务

  • 有效利用云服务,提升系统可伸缩性,节省成本

3 先对其器

不要陷入马斯洛锤子陷阱中:当你只有一个锤子时,任何东西看起来都像是钉子

适当使用数据库

  • 当需要ACID属性来保持数据之间的关系和一致性时才选择关系型数据库
  • 不要用关系数据库存储所有数据,利用不同NoSql综合存储

慎重使用防火墙

  • 不要让防火墙成为系统瓶颈
  • 对值得使用防火墙的内容上使用

积极使用日志文件

  • 打造完善的日志系统(比如ELK),使其成为追踪,预防,监控,自动化修复重要工具
  • 一个好的日志体系,生产环境问题将会越来越少,且越快解决

4 奥卡姆剃刀

避免画蛇添足

  • 不要在无需同步的地方一定采用同步
  • 利用好缓存技术,不要去查询刚刚入库的数据

停止重定向

  • 尽量避免重定向
  • 使用服务器配置来实现正确的重定向

放宽时间约束

  • 不要过分考虑响应及时性,系统无法扩展而停止服务比让用户有些失望相比更严重

5 缓存为王

利用CDN缓存

  • 利用CDN缓存解决区域化响应速度问题
  • 注意成本与效率之间的平衡

正确管理缓存

  • 正确使用HTTP头来管理缓存(什么头在什么场景下适用,不要生搬硬套)
  • 使用HTTP响应头确保Ajax请求响应可缓存

静态资源缓存

  • 利用nginx,varnish组合对静态资源,甚至某些动态内容进行缓存

应用数据缓存

  • 对热点业务数据采用memcached,redis等技术进行缓存,其可扩展性也可以参考三维坐标法
  • 如果是和用户有关的数据,注意分布式缓存扩展性问题(通过哈希一致性算法或数据预热方式来防止雪崩或穿透)

DAO缓存

  • 利用数据库持久化工具(如:mybatis)或本地缓存工具(Ecached)做数据持久层的一级缓存和二级缓存

6 前车之鉴

组织必须在深度和广度上学习
有些知识可以培训获得,有些知识必须从外部获得

失败乃成功之母

  • 拥有学习文化的组织更易于实现病毒式增长
  • 从客户中学习(出的问题,解决的需求)
  • 从业务技术的运营中学习
  • 关注失败,拒绝简化,牛人心态,谦卑态度

不要依赖QA发现错误

  • 做好自动化测试
  • 当测试活动获得超过一个工程师的价值时,应该招聘一个QA人员

不能回滚注定失败

  • 让每一次上线都具备快速的回滚能力

7 重中之重

从事务处理中清除商务智能

  • 尽早使用Nosql
  • 不要在关系数据库中使用存储过程,触发器,外键等约束与业务逻辑绑定

设计之初就考虑扩展性

  • 特别是关系性数据库结构的设计,提前考虑到其扩展性
  • 不要局限于数据库范式设计

正确使用数据库锁

  • 注意暗锁 、明锁、行锁、页锁、区间锁、表锁、数据库锁使用正确姿势

禁用分阶段提交

  • 最好的方式就是不要使用

慎用Select for Update

  • 避免在业务中使用

避免选择所有列

  • 遵循用到什么拿什么的原则

8 有备无患

用分布式(泳道)来隔离故障

  • 道理就和不要把鸡蛋放在一个篮子里一样
  • 不同的服务不要互相影响

拒绝单点故障

  • 在分布式系统的各层都要考虑单点故障问题

避免系统串联

  • 不要因为某个模块或服务问题,影响整个系统的运行

启用和禁用功能

  • 通过缓存刷新,可配置化等方式启用和禁用产品功能

9 超然物外

力求无状态

  • 无状态是一些系统可扩展性的基石

在浏览器中保存会话

  • 使用H5本地存储,JWT,cookie技术

用分布式缓存处理状态

  • 用集中的分布式缓存来保存,使得应用和web服务无状态化

10 意犹未尽

警惕第三方方案

  • 不要依赖供应商解决方案来实现可扩展性
  • 自己的命运自己来掌控

梯级存储策略

  • 做到存储成本与数据价值相匹配

异步通信

  • 非实时性响应需求建议采用异步通信方式

分类处理不同负载

  • 确保解决方案支持四种基本类型工作负载:归纳(数据形成),演绎(数据使用),批处理,用户交互
  • 彼此隔离
  • 不要因为非关键性业务影响到关键性业务

保持竞争力

  • 保持公司所有岗位人员的竞争力,不仅仅某一个

注:(本文根据《架构真经》内容结合作者本身经验整理修改而得)更加详细的解释会在本人《架构之道》课程里讲述

Gradle简明教程(5)

hzqiuxm阅读(255)评论(0)

模块化项目构建

规划好自己项目的模块
  • 根据高内聚低耦合的设计思想,设计符合自己项目的模块
  • 一般会有web层,控制层,服务层,数据层
  • 每个层下面的路径可以自定义,只后在构建文件build.gradle中说明就行

    建议采用默认的就行

大概的结构图如下:
项目结构示意图
每个模块下都有一个build.gradle文件,项目根目录下有一个 setting.gradle文件

理解setting文件
  • 对项目根目录下的setting.gradle文件进行配置多模块项目
rootProject.name = 'ziniuxiaozhu'
include 'common'
include 'service'
include 'webapp'
  • setting文件的执行是在初始化阶段
    回顾下gradle的三个生命周期
    构建生命周期

  • 初始化的时候先从本模块下的setting文件中寻找,再到根目录下去寻找

    也可以通过命令行参数控制setting搜索行为

  • 支持分层布局和扁平布局,当然个人推荐分层的,能够更细粒度控制组件的建模
配置build文件
  • build文件中包含了项目的插件配置,版本控制,依赖管理等
    一个根目录下build.gradle文件例子:
plugins{
    id 'org.springframework.boot' version '1.5.9.RELEASE'
}

allprojects{
    group = 'com.znxz'
    version = '1.0'

    apply plugin: 'org.springframework.boot'
}

subprojects{
    apply plugin: 'java'
    apply plugin: 'idea'

    // JVM 版本号要求
    sourceCompatibility = 1.8
    targetCompatibility = 1.8

    ext {
        springBootVersion = '1.5.10.RELEASE'
        springloadedVersion = '1.2.8.RELEASE'
    }

    idea {
        module {
            inheritOutputDirs = false
            outputDir = file("$buildDir/classes/main/")
            testOutputDir = file("$buildDir/classes/test/")
        }
    }


    [compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8'

    repositories {
        mavenLocal()
        maven{ url 'http://maven.aliyun.com/nexus/content/groups/public' }
        maven{ url 'http://www.ziniuxiaozhu.com:8081/nexus/content/groups/public' }
        mavenCentral()
        jcenter()
    }
    dependencies{
        compile ("org.hibernate.validator:hibernate-validator:6.0.7.Final")
        compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'

    }

}
  • 子模块的build.gradle文件一般是指名该模块的名称以及依赖的其他模块或第三方jar包
    例子如下:
project(':control'){

    dependencies{
        compile project(':service')
        //WEB服务
        compile("org.springframework.boot:spring-boot-starter-data-rest"){
            exclude group : 'org.hibernate', module :'hibernate-validator'
            exclude group : 'javax.validation', module :'validation-api'
        }
        //JWT
        compile("io.jsonwebtoken:jjwt:0.9.0")
        //SpringSecurity
        compile("org.springframework.boot:spring-boot-starter-security")
    }
}

Gradle简明教程(4)

hzqiuxm阅读(178)评论(0)

依赖管理概述

Gradle摒弃了Ivy和Maven依赖管理工具的一切缺点,更注重性能、构建可靠性以及可重复性

  • 通常开发都需要依赖一些流行的开源框架包,避免自己重复发明轮子
  • 随着项目的增大,依赖的模块和第三方类库会越来越多,如何组织与管理就显得尤为重要
  • 自动化依赖管理可以解决传递性依赖,版本管理等问题
  • 搭建内部仓库解决中央仓库的单点依赖 Sonatype Nexus, JFrog, Artifactory
  • Gradle提供了很有价值的依赖报告

一个自动化依赖结构图
自动化依赖管理

依赖配置

  • 通过project实例添加和访问配置
  • 每个项目都有一个ConfigurationContainer类的容器来管理相应的配置
  • 定义一个Cargio类配置的例子
configurations{
        cagro {
            description = 'classpath for cargo Ant tasks'
            visible = false
        }
    }

声明依赖

  • 外部模块依赖
  • 项目依赖
  • 文件依赖
  • 客户端模块依赖
  • Gradle运行时依赖

每个Gradle项目都有依赖处理器实例,由DependencyHandler接口来表示

外部依赖
依赖属性
  • group 通常用来标示一个组织,公司或者项目
  • name 唯一标识
  • version 版本号 一般都包含主版本和次版本
  • classifier 用来区分相同group,name,version工件,但使用环境上有所区别

例子:
依赖属性例子

依赖标记

有两种标识方式
- 使用map结构形式,显示标记出group,name,version

cargo group: cargoGroup,name:' cargo-core-uberjar', vewrsion: cargoVersion
  • 使用字符串简写形式,类似上一节Hibernate的例子

使用gradle dependencise 命令来查看详细依赖报告

排除传递依赖

由于Gradle会自动管理传递依赖,如果你需要可以主动排除一些自动的依赖

  • 排除一个传递依赖
dependencies{
    cargo('org.codehaus.cargo:cargo-ant:1.3.1'){
        exclude(group:'xml-apis', module:'xml-apis')
    }
    cargo 'xml-apis:xml-apis:2.0.2'
}
  • 排除所有传递依赖
dependencies{
    cargo('org.codehaus.cargo:cargo-ant:1.3.1'){
        transitive = false
    }
}

Gradle还支持最新版本依赖,又叫动态版本依赖,当然正式环境下还是不要用这个功能了

文件依赖

适用于从现有Ant或者Maven管理方式转变到Gradle时,或者本地稳定的开发包依赖

dependencies{
    cargo fileTree(dir: "lib路径" ,include: ' *.jar ')
}

使用和配置仓库

  • 定义仓库的接口类 RepositoryHandler
  • 支持Maven仓库,Ivy仓库,扁平的目录仓库
Maven仓库
  • 中央仓库 mavenCentral()
  • 本地仓库 mavenLocal() 慎用
  • 自定义仓库 maven() 和mavenRepo()
dependencies{
    mavenCentral()
    maven{
        name  'Custom Maven Repository'
        url 'http://ziniuxiaozhu.com/nexus/public'
    }
}
Ivy仓库
  • 相对Maven来说可以自定义布局
  • 仓库依赖元数据存储在ivy.xml中

具体语法可参考官方文档

扁平目录仓库
  • 只有JAR文件,没有元数据
  • 适合经常手动维护项目中的类库,或者项目迁移的时候
  • 声明依赖智能使用name和version这两个属性

一个从falt目录仓库取Cargo依赖声明例子:


repositories{ flatDir(dir: " ${System.properties['user.home']}/libs/cargo", name: 'Local libs directory') } dependencies{ cargo name : 'activetion', version:'1.1' catgo name: 'ant', version:'1.7.1' cargo ':xml-apis:1.3.1', ' : jaxen:1.0-FCS' }

本地依赖缓存

  • Gradle会缓存从仓库下载的二进制文件,该目录根据不同的版本,路径可能不同
  • 存储依赖来源,当来源发生变化,能够保证构建的可靠
  • 减少到远程仓库的传输
  • 比较本地仓库和远程仓库,减少工件的下载
  • 支持离线模式,在你无法联网时采用本地缓存来构建

常见依赖问题

如果你项目有很多依赖,而且你选择自动解决传递性依赖,那么版本冲突几乎是不可避免的
- 应对版本冲突
设置当遇到版本冲突时,构建失败

 configuration.cargo.resolutionStrategy{
    failOnVersionConfilct()
}
  • 强制制定一个版本
 configuration.cargo.resolutionStrategy{
   force ' org.codehaus.cargo:cargo:cargo-ant:1.3.0'
}
  • 使用依赖观察报告
  • 刷新缓存

可以手动刷新,可以不缓存快照版本的依赖包

欢迎加入紫牛小筑

进入小筑关于作者