Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

asp.net core microservices 架构之Task 事务一致性 事件源 详解

无为有道 2019-01-28 22:14:00 阅读数:217 评论数:0 点赞数:0 收藏数:0

一 aspnetcore之task的任务状态-CancellationToken                   

     我有一篇文章讲解了asp.net的线程方面的知识。我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作也比较简化,人为控制比较弱。那这一节为什么要说线程尼?大家有没有遇到过,部署或者人为的去重启,往往会造成一些不必要的业务中断,web api有这样的情况,后台程序也有这样的情况。异常和系统硬件的故障已经让我们防不胜防了,那么就尽量的人为的情况少那么一点点,系统的健壮性也就高那么一点点。

 目前有两个技巧可以处理这一类事情,第一是让主机graceful方式关闭,并且超时时间设置长一点,这样就有足够的时间,让运行的请求执行完毕,看代码:

    

public static async Task Main(string[] args)
{
var host = new HostBuilder()
.Build();
await host.RunAsync();
}

这是官方上的一段话:IHostedService 是执行代码的入口点。 每个 IHostedService 实现都按照 ConfigureServices 中服务注册的顺序执行。 主机启动时,每个 IHostedService 上都会调用 StartAsync,主机正常关闭时,以反向注册顺序调用 StopAsync

//关闭超时值
ShutdownTimeout 设置 StopAsync 的超时值。 默认值为 5 秒。
Program.Main 中的以下选项配置将默认值为 5 秒的关闭超时值增加至 20 秒:
C#
//复制
var host = new HostBuilder()
.ConfigureServices((hostContext, services) =>
{
services.Configure<HostOptions>(option =>
{
option.ShutdownTimeout = System.TimeSpan.FromSeconds(20);
});
})
.Build();

而我们看看源码中StopAsync方法:

/// <summary>
/// Attempts to gracefully stop the host with the given timeout.
/// </summary>
/// <param name="host"></param>
/// <param name="timeout">The timeout for stopping gracefully. Once expired the
/// server may terminate any remaining active connections.</param>
/// <returns></returns>
public static Task StopAsync(this IHost host, TimeSpan timeout)
{
return host.StopAsync(new CancellationTokenSource(timeout).Token);
}

系统接受到Ctrl+c和sign,就会调用这个方法,以比较礼貌的方式关闭。

那么看源码,这两个都是具有阻塞功能的异步方法,对应的非异步方法,都是同步调用的这两个方法:

/// <summary>
/// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered.
/// </summary>
/// <param name="host">The <see cref="IHost"/> to run.</param>
/// <param name="token">The token to trigger shutdown.</param>
public static async Task RunAsync(this IHost host, CancellationToken token = default)
{
using (host)
{
await host.StartAsync(token);
await host.WaitForShutdownAsync(token);
}
}
/// <summary>
/// Returns a Task that completes when shutdown is triggered via the given token.
/// </summary>
/// <param name="host">The running <see cref="IHost"/>.</param>
/// <param name="token">The token to trigger shutdown.</param>
public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default)
{
var applicationLifetime = host.Services.GetService<IApplicationLifetime>();
//当前token执行取消的时候,激发这个委托。
token.Register(state =>
{
((IApplicationLifetime)state).StopApplication(); //当进程取消的时候,通知注册IApplicationLifetime的进程也取消。
 },
applicationLifetime);
var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
//应用程序生命周期中的停止应用token激发的时候,执行这个委托,去释放阻塞,执行host的停止方法。
applicationLifetime.ApplicationStopping.Register(obj =>
{
var tcs = (TaskCompletionSource<object>)obj;
tcs.TrySetResult(null);
}, waitForStop);
await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);执行完毕。
// Host will use its default ShutdownTimeout if none is specified.

await host.StopAsync(); //调用关闭 }

具体原理就是Host使用这个applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource这个类,使用这个类是可以控制task的取消执行的。

所以,两个解决方案,如果是webapi,就将将超时时间设置大一点。

第二,如果在非webapi中,使用了超长执行的Task,就使用CancellationTokenSource吧,将它的Token传进去,在外边判断是否执行中,如果不在执行中,就执行Cancel方法,当然在task内部,也可以

判断token,是否自己主动取消掉。

这是官方的一个例子,了解CancellationTokenSource这个类,那么就会明白如何去处理Task

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class Example
{
public static void Main()
{
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
Random rnd = new Random();
Object lockObj = new Object();
List<Task<int[]>> tasks = new List<Task<int[]>>();
TaskFactory factory = new TaskFactory(token);
for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
int iteration = taskCtr + 1;
tasks.Add(factory.StartNew( () => {
int value;
int[] values = new int[10];
for (int ctr = 1; ctr <= 10; ctr++) {
lock (lockObj) {
value = rnd.Next(0,101);
}
if (value == 0) {
source.Cancel();
Console.WriteLine("Cancelling at task {0}", iteration);
break;
}
values[ctr-1] = value;
}
return values;
}, token));
}
try {
Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(),
(results) => {
Console.WriteLine("Calculating overall mean...");
long sum = 0;
int n = 0;
foreach (var t in results) {
foreach (var r in t.Result) {
sum += r;
n++;
}
}
return sum/(double) n;
} , token);
Console.WriteLine("The mean is {0}.", fTask.Result);
}
catch (AggregateException ae) {
foreach (Exception e in ae.InnerExceptions) {
if (e is TaskCanceledException)
Console.WriteLine("Unable to compute mean: {0}",
((TaskCanceledException) e).Message);
else
Console.WriteLine("Exception: " + e.GetType().Name);
}
}
finally {
source.Dispose();
}
}
}
// Repeated execution of the example produces output like the following:
// Cancelling at task 5
// Unable to compute mean: A task was canceled.
//
// Cancelling at task 10
// Unable to compute mean: A task was canceled.
//
// Calculating overall mean...
// The mean is 5.29545454545455.
//
// Cancelling at task 4
// Unable to compute mean: A task was canceled.
//
// Cancelling at task 5
// Unable to compute mean: A task was canceled.
//
// Cancelling at task 6
// Unable to compute mean: A task was canceled.
//
// Calculating overall mean...
// The mean is 4.97363636363636.
//
// Cancelling at task 4
// Unable to compute mean: A task was canceled.
//
// Cancelling at task 5
// Unable to compute mean: A task was canceled.
//
// Cancelling at task 4
// Unable to compute mean: A task was canceled.
//
// Calculating overall mean...
// The mean is 4.86545454545455.

 

二   业务的事务一致性                                                                

       因为微服务的理念中是牺牲了系统业务的一致性,我们知道事务的一致性都是靠的数据库的本地事务,或者分布式事务来实现的,但是微服务是严禁使用分布式事务。那么如何保证整个系统的事务完整性尼?举个例子:比如订单服务中,新接受一个订单,这个订单需要同步到库房的订单子系统,那么在订单服务中的这个订单在最后更新自己订单状态的时候,是需要同时发送异步消息给库房消息服务器的,如果这时候网络断了,本地订单更新成功了,但是异步消息没有发送过去,这样就会引起业务的缺失,目前有两个方法可以实现:

      第一:为本地数据库创建事件源表,记录下消息和本地数据更新的全部状态,比如订单在更新前就可以添加事件,事件状态可以有,准备更新订单,订单已更新,发送消息队列,消息发送成功等。

这样的好处就是最后跟踪这个事务处理的时候,每个步骤都可以找到,而且完全不用事务。最后job去跟踪失败情况,然后根据情况处理。

      第二:只是用本地事务,就是在订单更新的时候,同时给事件源表添加消息内容,然后让后台job去发送消息,这样是最简单和最稳定的方式。

      当然,最合适的还是第一种方法,虽然代码能复杂点,但是最后的效果是一样的,而且效率是比第二种方法更高效,但是考虑打事件源表并不是并发频繁操作的表,所以这个看自己的喜好了。

针对一个系统,业务的一致性,也并不是全部,针对于一些关键业务做好一致性,但是很多其实可以设计成为在用户ui层面去补偿操作,唯一的坏处就是一部分数据需要重新填写。

三     事件源                                                                                                           

     这个事件源并不是为了解决业务的一致性,而是为了应对大数据量的请求,比如,客户管理,一个分类下有上万条记录需要处理,那么往往我们需要对性能和实时反馈上有个折衷。

     系统设计如下:

                                   

这样看来,会增加1个api服务和一个后台服务,但是对于系统的问题,却得到了一个缓冲,或许这个设计不是最好的,但是却可以做一个抛砖引玉的案例,现实中案例非常多变,所以设计也会有很多方案。

因为目前我们看到的大部分app,请求的时候,某些功能确实会有少许等待事件,这个都是为了折衷,当然这一篇内容并不是讨论云或者分布式计算,但是在后台这块处理越快,反馈也越快。

这套方案的设计理念其实就是异步处理,可以有自己的优化空间,而并不会消耗api这个轻量级服务,后台分布式计算越快,app反应也越快,到一定程度,就并不会感觉到有延迟,这就是大师比喻的耳朵与眼睛的关系。

版权声明
本文为[无为有道]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/ck0074451665/p/10332044.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;