博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
翻译:Hystrix - How To Use
阅读量:7230 次
发布时间:2019-06-29

本文共 35792 字,大约阅读时间需要 119 分钟。

转载请注明出处:

Hello World!

下面的代码展示了HystrixCommand版的Hello World:

public class CommandHelloWorld extends HystrixCommand
{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return "Hello " + name + "!"; }}

HystrixObservableCommand的同等实现如下:

public class CommandHelloWorld extends HystrixObservableCommand
{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable
construct() { return Observable.create(new Observable.OnSubscribe
() { @Override public void call(Subscriber
observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); }}

Synchronous Execution

可以通过调用HystrixCommand.execute()方法实现同步执行, 示例如下:

String s = new CommandHelloWorld("World").execute();

测试如下:

@Test        public void testSynchronous() {            assertEquals("Hello World!", new CommandHelloWorld("World").execute());            assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute());        }

HystrixObservableCommand不提供同步执行方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:

  1. HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
  2. HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()

如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.

Asynchronous Execution

可以通过调用HystrixCommand.queue()方法实现异步执行, 示例如下:

Future
fs = new CommandHelloWorld("World").queue();

此时可以通过Future.get()方法获取command执行结果:

String s = fs.get();

测试代码如下:

@Test        public void testAsynchronous1() throws Exception {            assertEquals("Hello World!", new CommandHelloWorld("World").queue().get());            assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get());        }        @Test        public void testAsynchronous2() throws Exception {            Future
fWorld = new CommandHelloWorld("World").queue(); Future
fBob = new CommandHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }

下面的两种实现是等价的:

String s1 = new CommandHelloWorld("World").execute();String s2 = new CommandHelloWorld("World").queue().get();

HystrixObservableCommand不提供queue方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:

  1. HystrixObservableCommand.observe().observe().toBlocking().toFuture()
  2. HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()

如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.

Reactive Execution

你也可以将HystrixCommand当做一个可观察对象(Observable)来观察(Observe)其产生的结果, 可以使用以下任意一个方法实现:

  1. observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.
  2. toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.
Observable
ho = new CommandHelloWorld("World").observe();// or Observable
co = new CommandHelloWorld("World").toObservable();

然后你可以通过订阅到这个Observable来取得command产生的结果:

ho.subscribe(new Action1
() { @Override public void call(String s) { // value emitted here }});

测试如下:

@Testpublic void testObservable() throws Exception {    Observable
fWorld = new CommandHelloWorld("World").observe(); Observable
fBob = new CommandHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlockingObservable().single()); assertEquals("Hello Bob!", fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn't do assertions fWorld.subscribe(new Observer
() { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1
() { @Override public void call(String v) { System.out.println("onNext: " + v); } });}

使用Java 8的Lambda表达式可以使代码更简洁:

fWorld.subscribe((v) -> {        System.out.println("onNext: " + v);    })        // - or while also including error handling        fWorld.subscribe((v) -> {        System.out.println("onNext: " + v);    }, (exception) -> {        exception.printStackTrace();    })

关于Observable的信息可以

Reactive Commands

相比将HystrixCommand使用上述方法转换成一个Observable, 你也可以选择创建一个HystrixObservableCommand对象. HystrixObservableCommand包装的Observable允许产生多个结果(译者注: Subscriber.onNext可以调用多次), 而HystrixCommand即使转换成了Observable也只能产生一个结果.

使用HystrixObservableCommnad时, 你需要重载construct方法来实现你的业务逻辑, 而不是重载run方法, contruct方法将会返回你需要包装的Observable.

使用下面任意一个方法可以从HystrixObservableCommand中获取Observable对象:

  1. observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.
  2. toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.

Fallback

大多数情况下, 我们都希望command在执行失败时能够有一个候选方法来处理, 如: 返回一个默认值或执行其他失败处理逻辑, 除了以下几个情况:

  1. 执行写操作的command: 当command的目标是执行写操作而不是读操作, 那么通常需要将写操作失败的错误交给调用者处理.
  2. 批处理系统/离线计算: 如果command的目标是做一些离线计算、生成报表、填充缓存等, 那么同样应该将失败交给调用者处理.

无论command是否实现了getFallback()方法, command执行失败时, Hystrix的状态和断路器(circuit-breaker)的状态/指标都会进行更新.

HystrixCommand可以通过实现getFallback()方法来实现降级处理, run()方法异常、执行超时、线程池或信号量已满拒绝提供服务、断路器短路时, 都会调用getFallback():

public class CommandHelloFailure extends HystrixCommand
{ private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { throw new RuntimeException("this command always fails"); } @Override protected String getFallback() { return "Hello Failure " + name + "!"; }}

这个命令的run()方法总是会执行失败, 但是调用者总是能收到getFallback()方法返回的值, 而不是收到一个异常:

@Test    public void testSynchronous() {        assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute());        assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute());    }

HystrixObservableCommand可以通过重载resumeWithFallback方法实现原Observable执行失败时返回回另一个Observable, 需要注意的是, 原Observable有可能在发出多个结果之后才出现错误, 因此在fallback实现的逻辑中不应该假设订阅者只会收到失败逻辑中发出的结果.

Hystrix内部使用了RxJava的操作符来实现Observable之间的无缝转移.

Error Propagation

HystrixBadRequestException异常外, run方法中抛出的所有异常都会被认为是执行失败且会触发getFallback()方法和断路器的逻辑.

你可以在HystrixBadRequestException中包装想要抛出的异常, 然后通过getCause()方法获取. HystrixBadRequestException使用在不应该被错误指标(failure metrics)统计和不应该触发getFallback()方法的场景, 例如报告参数不合法或者非系统异常等.

对于HystrixObservableCommand, 不可恢复的错误都会在通过onError方法通知, 并通过获取用户实现的resumeWithFallback()方法返回的Observable来完成回退机制.

执行异常类型

Failure Type Exception class Exception.cause
FAILURE HystrixRuntimeException underlying exception(user-controlled)
TIMEOUT HystrixRuntimeException j.u.c.TimeoutException
SHORT_CIRCUITED HystrixRuntimeException j.l.RuntimeException
THREAD_POOL_REJECTED HystrixRuntimeException j.u.c.RejectedExecutionException
SEMAPHORE_REJECTED HystrixRuntimeException j.l.RuntimeException
BAD_REQUEST HystrixBadRequestException underlying exception(user-controller)

Command Name

默认的command name是从类名中派生的:

getClass().getSimpleName()

可以通过HystrixCommandHystrixObservableCommand的构造器来指定command name:

public CommandHelloWorld(String name) {        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))                .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));        this.name = name;    }

可以通过如下方式来重用Setter:

private static final Setter cachedSetter =         Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));        public CommandHelloWorld(String name) {        super(cachedSetter);        this.name = name;    }

HystrixCommandKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:

HystrixCommandKey.Factory.asKey("Hello World");

Command Group

Hystrix使用command group来为分组, 分组信息主要用于报告、警报、仪表盘上显示, 或者是标识团队/库的拥有者.

默认情况下, 除非已经用这个名字定义了一个信号量, 否则 Hystrix将使用这个名称来定义command的线程池.

HystrixCommandGroupKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:

HystrixCommandGroupKey.Factory.asKey("Example Group")

Command Thread-pool

thread-pool key主要用于在监控、指标发布、缓存等类似场景中标识一个HystrixThreadPool, 一个HystrixCommand于其构造函数中传入的HystrixThreadPoolKey指定的HystrixThreadPool相关联, 如果未指定的话, 则使用HystrixCommandGroupKey来获取/创建HystrixThreadPool.

可以通过HystrixCommandHystrixObservableCommand的构造器来指定其值:

public CommandHelloWorld(String name) {        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))                .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));        this.name = name;    }

HystrixCommandThreadPoolKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:

HystrixThreadPoolKey.Factory.asKey("Hello World Pool")

使用HystrixThreadPoolKey而不是使用不同的HystrixCommandGroupKey的原因是: 可能会有多条command在逻辑功能上属于同一个组(group), 但是其中的某些command需要和其他command隔离开, 例如:

  1. 两条用于访问视频元数据的command
  2. 两条commandgroup name都是VideoMetadata
  3. command A与资源#1互斥
  4. command B与资源#2互斥

如果command A由于延迟等原因导致其所在的线程池资源耗尽, 不应该影响command B#2的执行, 因为他们访问的是不同的后端资源.

因此, 从逻辑上来说, 我们希望这两条command应该被分到同一个分组, 但是我们同样系统将这两条命令的执行隔离开来, 因此我们使用HystrixThreadPoolKey将其分配到不同的线程池.

Request Cache

可以通过实现HystrixCommandHystrixObservableCommandgetCacheKey()方法开启用对请求的缓存功能:

public class CommandUsingRequestCache extends HystrixCommand
{ private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); }}

由于该功能依赖于请求的上下文信息, 因此我们必须初始化一个HystrixRequestContext, 使用方式如下:

@Test        public void testWithoutCacheHits() {            HystrixRequestContext context = HystrixRequestContext.initializeContext();            try {                assertTrue(new CommandUsingRequestCache(2).execute());                assertFalse(new CommandUsingRequestCache(1).execute());                assertTrue(new CommandUsingRequestCache(0).execute());                assertTrue(new CommandUsingRequestCache(58672).execute());            } finally {                context.shutdown();            }        }

通常情况下, 上下文信息(HystrixRequestContext)应该在持有用户请求的ServletFilter或者其他拥有生命周期管理功能的类来初始化和关闭.

下面的例子展示了command如何从缓存中获取数据, 以及如何查询一个数据是否是从缓存中获取到的:

@Test        public void testWithCacheHits() {            HystrixRequestContext context = HystrixRequestContext.initializeContext();            try {                CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);                CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);                assertTrue(command2a.execute());                // this is the first time we've executed this command with                // the value of "2" so it should not be from cache                assertFalse(command2a.isResponseFromCache());                assertTrue(command2b.execute());                // this is the second time we've executed this command with                // the same value so it should return from cache                assertTrue(command2b.isResponseFromCache());            } finally {                context.shutdown();            }            // start a new request context            context = HystrixRequestContext.initializeContext();            try {                CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);                assertTrue(command3b.execute());                // this is a new request context so this                 // should not come from cache                assertFalse(command3b.isResponseFromCache());            } finally {                context.shutdown();            }        }

Request Collapsing

请求合并可以用于将多条请求绑定到一起, 由同一个HystrixCommand实例执行.

collapser可以通过batch sizebatch创建以来的耗时来自动将请求合并执行.

Hystrix支持两个请求合并方式: 请求级的合并和全局级的合并. 默认是请求范围的合并, 可以在构造collapser时指定值.

请求级(request-scoped)的collapser只会合并每一个HystrixRequestContext中的请求, 而全局级(globally-scoped)的collapser则可以跨HystrixRequestContext合并请求. 因此, 如果你下游的依赖者无法再一个command中处理多个HystrixRequestContext的话, 那么你应该使用请求级的合并.

在Netflix, 我们只会使用请求级的合并, 因为我们当前所有的系统都是基于一个command对应一个HystrixRequestContext的设想下构建的. 因此, 当一个command使用不同的参数在一个请求中并发执行时, 合并是有效的.

下面的代码展示了如何实现请求级的HystrixCollapser:

public class CommandCollapserGetValueForKey extends HystrixCollapser
, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand
> createCommand(final Collection
> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List
batchResponse, Collection
> requests) { int count = 0; for (CollapsedRequest
request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand
> { private final Collection
> requests; private BatchCommand(Collection
> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List
run() { ArrayList
response = new ArrayList
(); for (CollapsedRequest
request : requests) { // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } }}

下面的代码展示了如果使用collapser自动合并4个CommandCollapserGetValueForKey到一个HystrixCommand中执行:

@Testpublic void testCollapser() throws Exception {    HystrixRequestContext context = HystrixRequestContext.initializeContext();    try {        Future
f1 = new CommandCollapserGetValueForKey(1).queue(); Future
f2 = new CommandCollapserGetValueForKey(2).queue(); Future
f3 = new CommandCollapserGetValueForKey(3).queue(); Future
f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); // assert that the batch command 'GetValueForKey' was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand
command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand
[1])[0]; // assert the command is the one we're expecting assertEquals("GetValueForKey", command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); }}

Request Context Setup

使用请求级的特性时(如: 请求缓存、请求合并、请求日志)你必须管理HystrixRequestContext的生命周期(或者实现HystrixConcurrencyStategy).

这意味着你必须在请求之前执行如下代码:

HystrixRequestContext context = HystrixRequestContext.initializeContext();

并在请求结束后执行如下代码:

context.shutdown();

在标准的Java web应用中, 你可以使用Setvlet Filter实现的如下的过滤器来管理:

public class HystrixRequestContextServletFilter implements Filter {    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)      throws IOException, ServletException {        HystrixRequestContext context = HystrixRequestContext.initializeContext();        try {            chain.doFilter(request, response);        } finally {            context.shutdown();        }    }}

可以在web.xml中加入如下代码实现对所有的请求都使用该过滤器:

HystrixRequestContextServletFilter
HystrixRequestContextServletFilter
com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter
HystrixRequestContextServletFilter
/*

Common Patterns

以下是HystrixCommandHystrixObservableCommand的一般用法和使用模式.

Fail Fast

最基本的使用是执行一条只做一件事情且没有实现回退方法的command, 这样的command在发生任何错误时都会抛出异常:

public class CommandThatFailsFast extends HystrixCommand
{ private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } }

下面的代码演示了上述行为:

@Testpublic void testSuccess() {    assertEquals("success", new CommandThatFailsFast(false).execute());}@Testpublic void testFailure() {    try {        new CommandThatFailsFast(true).execute();        fail("we should have thrown an exception");    } catch (HystrixRuntimeException e) {        assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage());        e.printStackTrace();    }}

HystrixObservableCommand需要重载resumeWithFallback()方法来实现同样的行为:

@Override    protected Observable
resumeWithFallback() { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast")); } else { return Observable.just("success"); } }

Fail Silent

静默失败等同于返回一个空的响应或者移除功能. 可以是返回null、空Map、空List, 或者其他类似的响应.

可以通过实现HystrixCommand.getFallback()方法实现该功能:

public class CommandThatFailsSilently extends HystrixCommand
{ private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } @Override protected String getFallback() { return null; }}
@Testpublic void testSuccess() {    assertEquals("success", new CommandThatFailsSilently(false).execute());}@Testpublic void testFailure() {    try {        assertEquals(null, new CommandThatFailsSilently(true).execute());    } catch (HystrixRuntimeException e) {        fail("we should not get an exception as we fail silently with a fallback");    }}

或者返回一个空List的实现如下:

@Override    protected List
getFallback() { return Collections.emptyList(); }

HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:

@Override    protected Observable
resumeWithFallback() { return Observable.empty(); }

Fallback: Static

Fallback可以返回代码里设定的默认值, 这种方式可以通过默认行为来有效避免于静默失败带来影响.

例如, 如果一个应返回true/false的用户认证的command执行失败了, 那么其默认行为可以如下:

@Override    protected Boolean getFallback() {        return true;    }

对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:

@Override    protected Observable
resumeWithFallback() { return Observable.just( true ); }

Fallback: Stubbed

command返回的是一个包含多个字段的复合对象, 且该对象的一部分字段值可以通过其他请求状态获得, 另一部分状态可以通过设置默认值获得时, 你通常需要使用存根(stubbed)模式.

你可能可以从存根值(stubbed values)中得到适当的值的情况如下:

  1. cookies
  2. 请求参数和请求头
  3. 当前失败请求的前一个服务请求的响应

fallback代码块内可以静态地获取请求范围内的存根(stubbed)值, 但是通常我们更推荐在构建command实例时注入这些值, 就像下面实例的代码中的countryCodeFromGeoLookup一样:

public class CommandWithStubbedFallback extends HystrixCommand
{ private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. */ protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value 'countryCodeFromGeoLookup' that we'll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } }}

下面的代码演示了上述行为:

@Test    public void test() {        CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca");        UserAccount account = command.execute();        assertTrue(command.isFailedExecution());        assertTrue(command.isResponseFromFallback());        assertEquals(1234, account.customerId);        assertEquals("ca", account.countryCode);        assertEquals(true, account.isFeatureXPermitted);        assertEquals(true, account.isFeatureYPermitted);        assertEquals(false, account.isFeatureZPermitted);    }

对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:

@Overrideprotected Observable
resumeWithFallback() { return Observable.just( new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false) );}

如果你想要从Observable中发出多个值, 那么当失败发生时, 原本的Observable可能已经发出的一部分值, 此时你或许更希望能够只从fallback逻辑中发出另一部分未被发出的值, 下面的例子就展示了如何实现这一个目的: 它通过追踪原Observable发出的最后一个值来实现fallback逻辑中的Observable应该从什么地方继续发出存根值(stubbed value) :

@Overrideprotected Observable
construct() { return Observable.just(1, 2, 3) .concatWith(Observable.
error(new RuntimeException("forced error"))) .doOnNext(new Action1
() { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation());}@Overrideprotected Observable
resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); }}

Fallback: Cache via Network

有时后端的服务异常也会引起command执行失败, 此时我们也可以从缓存中(如: memcached)取得相关的数据.

由于在fallback的逻辑代码中访问网络可能会再次失败, 因此必须构建新的HystrixCommandHystrixObservableCommand来执行:

translation-Hystrix-How-To-Use-1

很重要的一点是执行fallback逻辑的command需要在一个不同的线程池中执行, 否则如果原command的延迟变高且其所在线程池已经满了的话, 执行fallback逻辑的command将无法在同一个线程池中执行.

下面的代码展示了CommandWithFallbackViaNetwork如何在getFallback()方法中执行FallbackViaNetwork.

注意, FallbackViaNetwork同样也具有回退机制, 这里通过返回null来实现fail silent.

FallbackViaNetwork默认会从HystrixCommandGroupKey中继承线程池的配置RemoteServiceX, 因此需要在其构造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")来使其在不同的线程池中执行.

这样, CommandWithFallbackViaNetwork会在名为RemoteServiceX的线程池中执行, 而FallbackViaNetwork会在名为RemoteServiceXFallback的线程池中执行.

public class CommandWithFallbackViaNetwork extends HystrixCommand
{ private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand
{ private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won't prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } }}

Primary + Secondary with Fallback

有些系统可能具有是以双系统模式搭建的 — 主从模式或主备模式.

有时从系统或备用系统会被认为是失败状态的一种, 仅在执行fallback逻辑是才使用它;这种场景和Cache via Network一节中描述的场景是一样的.

然而, 如果切换到从系统是一个很正常时, 例如发布新代码时(这是有状态的系统发布代码的一种方式), 此时每当切换到从系统使用时, 主系统都是处于不可用状态,断路器将会打开且发出警报.

这并不是我们期望发生的事, 这种狼来了式的警报可能会导致真正发生问题的时候我们却把它当成正常的误报而忽略了.

因此, 我们可以通过在其前面放置一个门面HystrixCommand(见下文), 将主/从系统的切换视为正常的、健康的状态.

translation-Hystrix-How-To-Use-2

主从HystrixCommand都是需要访问网络且实现了特定的业务逻辑, 因此其实现上应该是线程隔离的. 它们可能具有显著的性能差距(通常从系统是一个静态缓存), 因此将两个command隔离的另一个好处是可以针对性地调优.

你不需要将这两个command都公开发布, 只需要将它们隐藏在另一个由信号量隔离的HystrixCommand中(称之为门面HystrixCommand), 在这个command中去实现主系统还是从系统的调用选择. 只有当主从系统都失败了, 才会去执行这个门面commandfallback逻辑.

门面HystrixCommand可以使用信号量隔离的, 因为其业务逻辑仅仅是调用另外两个线程隔离的HystrixCommand, 它不涉及任何的网络访问、重试等容易出错的事, 因此没必要将这部分代码放到其他线程去执行.

public class CommandFacadeWithPrimarySecondary extends HystrixCommand
{ private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand
{ private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive 'primary' service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand
{ private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast 'secondary' service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } }}

Client Doesn't Perform Network Access

当你使用HystrixCommand实现的业务逻辑不涉及到网络访问、对延迟敏感且无法接受多线程带来的开销时, 你需要设置)属性的值为.SEMAPHORE, 此时Hystrix会使用信号量隔离代替线程隔离.

下面的代码展示了如何为command设置该属性(也可以在运行时动态改变这个属性的值):

public class CommandUsingSemaphoreIsolation extends HystrixCommand
{ private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return "ValueFromHashMap_" + id; }}

Get-Set-Get with Request Cache Invalidation

Get-Set-Get是指: Get请求的结果被缓存下来后, 另一个command对同一个资源发出了Set请求, 此时由Get请求缓存的结果应该失效, 避免随后的Get请求获取到过时的缓存结果, 此时可以通过调用)方法来使缓存失效.

public class CommandUsingRequestCacheInvalidation {    /* represents a remote data store */    private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_";    public static class GetterCommand extends HystrixCommand
{ private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand"); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /** * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand
{ private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey("GetSetGet")); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } }}
@Test        public void getGetSetGet() {            HystrixRequestContext context = HystrixRequestContext.initializeContext();            try {                assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute());                GetterCommand commandAgainstCache = new GetterCommand(1);                assertEquals("ValueBeforeSet_1", commandAgainstCache.execute());                // confirm it executed against cache the second time                assertTrue(commandAgainstCache.isResponseFromCache());                // set the new value                new SetterCommand(1, "ValueAfterSet_").execute();                // fetch it again                GetterCommand commandAfterSet = new GetterCommand(1);                // the getter should return with the new prefix, not the value from cache                assertFalse(commandAfterSet.isResponseFromCache());                assertEquals("ValueAfterSet_1", commandAfterSet.execute());            } finally {                context.shutdown();            }        }    }

Migrating a Library to Hystrix

如果你要迁移一个已有的客户端库到Hystrix, 你应该将所有的服务方法(service methods)替换成HystrixCommand.

服务方法(service methods)转而调用HystrixCommand且不在包含任何额外的业务逻辑.

因此, 在迁移之前, 一个服务库可能是这样的:

translation-Hystrix-How-To-Use-3

迁移完成之后, 服务库的用户要能直接访问到HystrixCommand, 或者通过服务门面(service facade)的代理间接访问到HystrixCommand.

translation-Hystrix-How-To-Use-4

你可能感兴趣的文章
无标题
查看>>
我的友情链接
查看>>
Web前端入门学习(3)——CSS选择器
查看>>
DNS的搭建
查看>>
Apache/Nginx 访问日志分析脚本
查看>>
Curator的使用
查看>>
第五章 集合类型
查看>>
我的友情链接
查看>>
nagios监控服务出现FLAPPING状态时无法发出邮件报警信息
查看>>
数据库链接字符串方法
查看>>
The DCI Architecture: A New Vision of Object-Oriented Programming(一篇具有里程碑式意义的论文)...
查看>>
RIP路由配置实例V2
查看>>
Bytescout Spreadsheet SDK for.NET
查看>>
我的友情链接
查看>>
Haproxy的三种保持客户端会话保持方式
查看>>
iOS的数学函数
查看>>
python 模块 chardet下载及介绍(转)
查看>>
能力工场--关于在JavaScript中使用EL表达式的问题
查看>>
NFS服务器设置
查看>>
s:iterator 中的status 使用方法
查看>>