在上一篇 博客 中,我们一起探索和实现了gRPC
的健康检查。从服务治理的角度来看,健康检查保证的是被调用的服务“健康”或者“可用”。可即使如此,我们依然会遇到,因为网络不稳定等原因而造成的服务调用失败的情形,就如同我们赖以生存的这个真实世界,本身就充满了各种不确定的因素一样,“世间唯一不变的只有变化本身”。不管是面对不稳定的服务,还是面对不确定的人生,任何时候我们都需要有一个 B 计划,甚至我们人生中的一切努力,本质上都是为了多一份自由,一份选择的自由。在微服务的世界里,我们将这种选择称之为“降级(Fallback)”,如果大家有接触过 Hystrix 或者 Polly 这类框架,就会明白我这里的所说的“降级”具体是什么。在众多的“降级”策略中,重试是一种非常朴素的策略,尤其是当你调用一个不稳定的服务的时候。
引言
在此之前,博主曾经介绍过 HttpClient 的重试。所以,今天这篇博客我们来聊聊gRPC
的客户端重试,因为要构建一个高可用的微服务架构,除了需要高可用的服务提供者,同样还需要高可用的服务消费者。下面,博主将由浅入深地为大家分享 4 种重试方案的实现,除了 官方 内置的方案,基本上都需要搭配 Polly 来使用,所以,到这里你可以理解这篇博客的标题,为什么博主会 毁人不倦 地尝试不同的重试方案,因为每一种方案都有它自身的局限性,博主想要的是一种更优雅的方案。具体来讲,主要有:基于 gRPC RetryPolicy、基于 HttpClientFactory、基于 gRPC 拦截器 以及 基于 CallInvoker 4 种方案。如果大家还有更好的思路,欢迎大家在博客评论区积极留言、参与讨论。
基于 gRPC RetryPolicy
所谓的 gRPC RetryPolicy,其实是指 官方 提供的暂时性故障处理方案,它允许我们在创建GrpcChannel
的时候,去指定一个重试策略:
1var defaultMethodConfig = new MethodConfig {
2 Names = { MethodName.Default },
3 RetryPolicy = new RetryPolicy {
4 MaxAttempts = 5,
5 InitialBackoff = TimeSpan.FromSeconds(1),
6 MaxBackoff = TimeSpan.FromSeconds(5),
7 BackoffMultiplier = 1.5,
8 RetryableStatusCodes = { StatusCode.Unavailable }
9 }
10};
11
12var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions {
13 ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
14});
在上面的代码中,MethodConfig
可以为指定的方法配置一个重试策略,当传入的方法名为MethodName.Default
时,它将应用于该通道下的所有 gRPC 方法。如你所见,在重试策略中我们可以指定重试次数、重试间隔等参数。这个方案本身没有太多心智上的负担,唯一的缺点是,它没有预留出可扩展的接口,以至于我们想要验证它到底有没有重试的时候,居然要通过Fiddler
抓包这种方式,换句话讲,我们没有办法自定义整个重试行为,譬如你想在重试过程中记录日志,这种方案就会鸡肋起来,对使用者来说,这完全就是一个黑盒子。
除此之外,官方还提供了一种成为 Hedging
重试策略作为备选方案。类似地,它通过 HedgingPolicy
属性来指定重试策略。对比 RetryPolicy
,它可以同时发送单个 gRPC 请求的多个副本,并使用第一个成功的结果作为返回值,所以,一个显而易见的约束是,它要求这个 gRPC 方法是无副作用的、幂等的函数。其实,这是所有重试方案都应该考虑的一个问题,而不单单是 HedgingPolicy
。由于这两种策略有着本质上的不同,请记住:RetryPolicy 不能与 HedgingPolicy 一起使用。
1var defaultMethodConfig = new MethodConfig {
2 Names = { MethodName.Default },
3 HedgingPolicy = new HedgingPolicy {
4 MaxAttempts = 5,
5 NonFatalStatusCodes = { StatusCode.Unavailable }
6 }
7};
8
9var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions {
10 ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
11});
世间的一切都是双刃剑, HedgingPolicy
同样打不破这铁笼一般的人间真实,虽然它可以一次发送多个gRPC
请求,可毫无疑问的是,这是一种相当浪费的策略,因为不管有多少个请求,它始终都取第一个结果作为返回值,而剩余的结果都将会被直接抛弃。想想每一年的高考状元,大家是不是都只记住了第一名。也许,人生正是如此呢,程序世界固然是由 0 和 1 构成的虚幻世界,可何尝就不是真实世界的某种投影呢?这里请允许博主安利一部动漫《你好世界》,它用视觉化的方式表达了真实世界与程序世界的某种特殊联系。
基于 HttpClientFactory
接下来,我们要介绍的是基于 HttpClentFactory
的重试方案。也许,大家会感到困惑,明明这篇博客说的是 gRPC
,为什么 HttpClientFactory
会出现在这里呢?其实,很多时候,我们看到的只有表面,而出奇制胜的招式往往出自你对于本质的理解。如果大家阅读过 gRPC
客户端部分的源代码,就会意识到这样一件事情,即,gRPC
底层依然用到了 HttpClient
这套所谓“管道式”的体系,你可以理解为,最终传输层还是要交给 HttpClient
来处理,而 HttpClientFactory
本来就支持结合 Polly
进行重试,所以,我们其实是针对同一个问题的不同阶段进行了切入处理。一旦想清楚这一点,下面的代码理解起来就没有难度啦:
1var services = new ServiceCollection();
2services.AddGrpcClient<Greeter.GreeterClient>(opt => {
3 opt.Address = new Uri("https://localhost:8001");
4})
5.ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler {
6 ClientCertificateOptions = ClientCertificateOption.Manual,
7 ServerCertificateCustomValidationCallback = (httpRequestMessage, cert, cetChain, policyErrors) => true
8})
9.AddPolicyHandler(
10 HttpPolicyExtensions.HandleTransientHttpError()
11 .OrResult(res => res.StatusCode != System.Net.HttpStatusCode.OK)
12 .WaitAndRetryAsync(
13 6,
14 retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) + TimeSpan.FromMilliseconds(new Random().Next(0, 100)),
15 (result, timeSpan, current, context)=> {
16 Console.WriteLine($"StatusCode={result.Result?.StatusCode}");
17 Console.WriteLine($"Exception={result.Exception?.Message}");
18 Console.WriteLine($"正在进行第{current}次重试,间隔{timeSpan.TotalMilliseconds}秒");
19 }
20 )
21);
22
23var serviceProvider = services.BuildServiceProvider();
24await serviceProvider.GetService<Greeter.GreeterClient>().SayHelloAsync(new HelloRequest() { Name = "长安书小妆" });
在这里,为了模拟网络不畅的这种场景,我们故意指定了一个错误的终结点信息。此时,我们会得到下面的结果:
不过话又说回来,因为我们选择切入的阶段是“传输层”,所以,相对于整个 RpcException
而言,我们其实是找到了一个问题的子集,这意味着这个方案并不能覆盖到所有的场景,如果是在非“传输层”引发了某种异常,我们就没有办法通过这种方式去做重试处理。所以,我在一开始就说过,没有 100% 完美的解决方案,每一种方案都有它自身的局限性,这句话在这里得到了第一次印证。如果大家再回过头去看第一种方案,是不是就会发现,它里面还是使用了HTTP
状态码作为是否重试的判断依据。所以,大家觉得呢?欢迎大家在评论区留下你的想法。
基于 gRPC 拦截器
关于 gRPC
的拦截器,博主专门写过一篇 博客 来介绍它,所以,在一开始考虑重试方案的时候,拦截器其实是最容易想到的一种方案,主要思路是利用 Polly
中Policy
的Execute()
方法,对拦截器中获取gRPC
调用结果的过程进行包装,我们一起来看下面的例子:
1public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
2 TRequest request,
3 ClientInterceptorContext<TRequest, TResponse> context,
4 AsyncUnaryCallContinuation<TRequest, TResponse> continuation
5)
6{
7 var retryPolicy =
8 Policy<AsyncUnaryCall<TResponse>>
9 .Handle<RpcException>(s => s.StatusCode == StatusCode.Internal)
10 .Or<WebException>()
11 .OrResult(r =>
12 {
13 var awaiter = r.GetAwaiter();
14 if (awaiter.IsCompleted)
15 return r.GetStatus().StatusCode == StatusCode.OK;
16 try {
17 r.ResponseAsync.Wait();
18 } catch (AggregateException) {
19 return true;
20 }
21
22 return false;
23 })
24 .WaitAndRetryAsync(3, x => TimeSpan.FromSeconds(5), (result, timeSpan, current, context) =>
25 {
26 Console.WriteLine($"正在进行第{current}次重试...");
27 });
28
29 return retryPolicy.ExecuteAsync(() => Task.FromResult(continuation(request, context))).Result;
30 }
31}
基于 gRPC
拦截器的这种方案,它最大的问题在于异常的颗粒度太大,这句话是什么意思呢?简单来讲就是在拦截器这个层面上,你能捕捉到的只有RpcException
,这样就使得我们难以捕获更小粒度的异常,譬如网络异常、超时异常等等。其次,gPRC
拦截器中大量使用了,类似AsyncUnaryCall<TResponse>
这样的异步的返回值类型,这让我们在编写 Policy 的时候,多多少少会有一点不自在。综上所述,这个最容易想到的方案,本身是没有太大的问题的,最关键的问题是我们能接受什么样的异常颗粒度。而像异步返回值这种问题,只要写过一次以后,博主以为,它并不会成为我们继续探索的阻碍,这一点大家可以自己去体会。
在尝试基于拦截器的重试方案的过程中,博主发现,指定一个错误的终结点信息,gRPC
会在进入拦截器前就引发异常。这意味着这种基于拦截器的重试方案,在面对“传输层”的异常时略显乏力,所以,从某种程度上来讲,这个方案同样是一个不完美的方案。可这世上人来人往、本无完人,我们实在没有必要耽于技术方案的绝对完美而不可自拔,当求真、莫求执,所谓“大成若缺”,可以欣赏得来缺憾之美,同样是一种幸福。
基于 CallInvoker
如果说,前面的 3 种方案都属于“见招拆招”的外家功夫。那么,接下来我要分享的思路,绝对可以称得上是“打通任督二脉”的玄门内功。
首先,博主想用一张图来讲解 gRPC
客户端的工作原理。从这张图中,我们可以看出,初始化一个gRPC
的客户端,主要有GrpcChannel
和CallInvoker
两种构造形式,而GrpcChannel
中的CreateCallInvoker()
方法会返回HttpClientCallInvoker
的一个实例。此时,我们就会发现,HttpClientCallInvoker
是CallInvoker
的一个子类。所以,我们基本可以判定CallInvoker
是一个扮演着重要角色的类。继续探索,我们就会发现,GrpcCallInvokerFactory
内部通过构造GrpcChannel
,进而实现了CreateCallInvoker()
方法,换句话说,本质上依然是调用了GrpcChannel
中的CreateCallInvoker()
方法。最终,这个CallInvoker
实例会作为参数,传递给DefaultClientActivator
的CreateClient()
方法,至此我们就完成了整个gRPC
客户端的创建工作。
好了,相信现在大家都有一个疑问,这个CallInvoke
到底是个什么东西呢?为什么它在整个gRPC
的底层中是如此的重要呢?其实,它就是一个平平无奇的抽象类啦,可是一旦配合着gRPC
中的Calls
类来使用,这个CallInvoker
简直就是扩展gRPC
的一个重要的桥梁,因为我们不用关心底层是如何处理gRPC
请求/响应的,而这丝毫不影响我们对这个过程进行自定义重写。因此,按照这样的思路,我们有了下面的实现:
1class GrpcCallInvoker : CallInvoker
2{
3 private readonly Channel _channel;
4 private readonly GrpcPollyPolicyOptions _pollyOptions;
5 public GrpcCallInvoker(
6 Channel channel,
7 GrpcPollyPolicyOptions pollyOptions
8 )
9 {
10 _channel = channel;
11 _pollyOptions = pollyOptions;
12 }
13
14 public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
15 Method<TRequest, TResponse> method,
16 string host,
17 CallOptions options
18 )
19 {
20 var policy = CreatePollyPolicy<AsyncClientStreamingCall<TRequest, TResponse>>();
21 return policy.Execute(() => Calls.AsyncClientStreamingCall(CreateCall(method, host, options)));
22 }
23
24 public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
25 Method<TRequest, TResponse> method,
26 string host,
27 CallOptions options
28 )
29 {
30 var policy = CreatePollyPolicy<AsyncDuplexStreamingCall<TRequest, TResponse>>();
31 return policy.Execute(() => Calls.AsyncDuplexStreamingCall(CreateCall(method, host, options)));
32 }
33
34 public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
35 Method<TRequest, TResponse> method,
36 string host, CallOptions options,
37 TRequest request
38 )
39 {
40 var policy = CreatePollyPolicy<AsyncServerStreamingCall<TResponse>>();
41 return policy.Execute(() => Calls.AsyncServerStreamingCall(CreateCall(method, host, options), request));
42 }
43
44 public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
45 Method<TRequest, TResponse> method,
46 string host,
47 CallOptions options,
48 TRequest request
49 )
50 {
51 var policy = CreatePollyPolicy<AsyncUnaryCall<TResponse>>();
52 return policy.Execute(() => Calls.AsyncUnaryCall(CreateCall(method, host, options), request));
53 }
54
55 public override TResponse BlockingUnaryCall<TRequest, TResponse>(
56 Method<TRequest, TResponse> method,
57 string host,
58 CallOptions options,
59 TRequest request
60 )
61 {
62 var policy = CreatePollyPolicy<TResponse>();
63 return policy.Execute(() => Calls.BlockingUnaryCall(CreateCall(method, host, options), request));
64 }
65}
我想,经过连续三篇文章的洗礼,大家对这些方法应该都不陌生了吧!下面我们来着重讲解下CreateCall()
和CreatePollyPolicy()
这两个方法。其中,CreateCall()
这个方法会相对简单一点,因为它完全就是返回gRPC
的内置类型CallInvocationDetails
。
1protected CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(
2 Method<TRequest, TResponse> method,
3 string host,
4 CallOptions options
5)
6 where TRequest : class
7 where TResponse : class
8{
9 return new CallInvocationDetails<TRequest, TResponse>(_channel, method, options);
10}
接下来,CreatePollyPolicy()
这个方法就非常的明确啦,通过注入的GrpcPollyPolicyOptions
来构造一个Policy。考虑到我们要做的是一个通用的方案,这里预留了断路器、重试、超时三种不同策略的参数。如果希望对构建 Policy 的过程进行自定义,则可以通过重写该方法来实现:
1public virtual Policy<TResult> CreatePollyPolicy<TResult>()
2{
3 Policy<TResult> policy = null; ;
4
5 // 构造断路器策略
6 if (_pollyOptions.CircuitBreakerCount > 0)
7 {
8 var policyBreaker = Policy<TResult>
9 .Handle<Exception>()
10 .CircuitBreaker(_pollyOptions.CircuitBreakerCount, _pollyOptions.CircuitBreakerTime);
11
12 policy = policy == null ? policyBreaker :
13 policy.Wrap(policyBreaker) as Policy<TResult>;
14
15 // 断路器降级
16 var policyFallBack = Policy<TResult>
17 .Handle<Polly.CircuitBreaker.BrokenCircuitException>()
18 .Fallback(() =>
19 {
20 return default(TResult);
21 });
22 policy = policyFallBack.Wrap(policy);
23 }
24
25 // 构造超时策略
26 if (_pollyOptions.Timeout > TimeSpan.Zero)
27 {
28 var policyTimeout = Policy.Timeout(() => _pollyOptions.Timeout, Polly.Timeout.TimeoutStrategy.Pessimistic);
29
30 policy = policy == null ? (Policy<TResult>)policyTimeout.AsPolicy<TResult>() :
31 policy.Wrap(policyTimeout);
32
33 // 超时降级
34 var policyFallBack = Policy<TResult>
35 .Handle<Polly.Timeout.TimeoutRejectedException>()
36 .Fallback(() =>
37 {
38 return default(TResult);
39 });
40 policy = policyFallBack.Wrap(policy);
41 }
42
43 // 构造重试策略
44 if (_pollyOptions.RetryCount > 0)
45 {
46 var retryPolicy = Policy<TResult>.Handle<Exception>().WaitAndRetry(
47 _pollyOptions.RetryCount,
48 x => _pollyOptions.RetryInterval,
49 (result, timeSpan, current, context) =>
50 {
51 Console.WriteLine($"正在进行第{current}次重试,间隔{timeSpan.TotalSeconds}秒");
52 });
53
54 policy = policy == null ? retryPolicy :
55 policy.Wrap(retryPolicy) as Policy<TResult>;
56 }
57
58 return policy;
59}
因为我们无法修改DefaultGrpcClientFactory
中关于CallInvoker
这部分的逻辑,所以,我们采取了下面的“迂回战术”:
1services.AddGrpc();
2services.AddTransient<GrpcCallInvoker>();
3services.AddTransient<Channel>(sp => new Channel("localhost", 5001, ChannelCredentials.Insecure));
4services.AddTransient<GrpcPollyPolicyOptions>(sp => {
5 return new GrpcPollyPolicyOptions()
6 {
7 RetryCount = 10,
8 RetryInterval = TimeSpan.FromSeconds(1),
9 CircuitBreakerCount = 5,
10 CircuitBreakerTime = TimeSpan.FromSeconds(6),
11 Timeout = TimeSpan.FromSeconds(10)
12 };
13});
14
15var callInvoker = services.BuildServiceProvider().GetService<GrpcCallInvoker>();
16var client = (Greeter.GreeterClient)Activator.CreateInstance(typeof(Greeter.GreeterClient), callInvoker);
17client.SayHello(new HelloRequest() { Name = "长安书小妆" });
此时,如果我们故意写一个错误的终结点地址,我们将会得到下面的结果:
因为重试 5 次后就会启动断路器,所以,这个接口在重试 5 次后就立即停止了调用,这证明我们设想的这个方案是可以完美工作的!
本文小结
写完以后,突然发现这一篇的信息量有点爆炸,尤其是CallInvoker
这一部分,需要花点时间去阅读 gRPC 的源代码。可对于博主而言,其实更加享受的是,探索 gRPC
重试方案的这个过程。起初,因为对拦截器更熟悉一点,所以,我最先想到的是基于拦截器的重试方案。经过博主一番验证以后,发现这是一个有缺陷的方案。这时候,我意外发现,官方提供了重试策略,可这个重试策略对于使用者来说是一个黑盒子。再后来,发现可以在 HttpClient 上做一点文章,虽然它针对的是“传输层”这个阶段。直到从网上查资料,意识到可以重写CallInvoker
这个抽象类,这个时候终于找到了最完美的方案。所以,通过这个过程,大家可以发现,我这篇博客的写作过程,其实与我思考过程有着明显的不同。思考的过程中带入“先入为主”的意识,这让我的思考过程走了不少的弯路,而写作过程则是一个由浅入深、由表及里的顺序。也许,下一次遇到类似的问题,我会先了解一下官方有没有提供标准方案,这是我在写完这篇博客以后最大的一个感悟。好了,这篇博客就先写到这里啦,如果大家对文中的内容由意见或者建议,欢迎大家在评论区给我留言,谢谢大家!