Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

asp.net core microservices 架构之 分布式自动计算(二)

无为有道 2019-01-25 18:17:00 阅读数:166 评论数:0 点赞数:0 收藏数:0

 

一  简介                  

       上一篇介绍了zookeeper如何进行分布式协调,这次主要讲解quartz使用zookeeper进行分布式计算,因为上一篇只是讲解原理,而这次实际使用,zookeeperService更改了一部分代码,算是集成优化吧。

系统结构图如下:

 

上图展示了,两个部分,一个是故障灾难转移集群,一个实现了分片的功能。故障灾难转移集群是quartz自带的功能,就不多说,分片功能是自己实现的。在这里要说下quartz使用故障灾难转移集群的一些配置注意事项:

再者就是netcore不支持remote,这个很重要,所以需要自己做一个web服务接口,但是本实例没有实现,而是仅仅使用数据库去配置和跟踪quartz服务,但是这是主要的。而使用api的一些功能就是实时开启,关闭,监控quartz主机状态,监控分片主机状态。所以大家留意这些功能暂时没有,不过大家在本文学会后很容易就可以自己扩展。

在这里要感谢 github账号为 weizhong1988/Weiz.TaskManager 的一个quartz管理的项目。

当然我这次的案例都是基于linux和mysql,而这个项目是sql server,所以我把sql全部替换了,更改了一些东西。后面会把代码全部放出来。界面如下图

好,下面看代码实现。

二 quartz故障灾难转移和分片功能             

 首先看结构:

然后看Program入口方法:var host = newHostBuilder() .UseEnvironment(EnvironmentName.Development) .ConfigureAppConfiguration((hostContext, configApp)=>{ configApp.SetBasePath(Directory.GetCurrentDirectory()); configApp.AddJsonFile( $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional:true); configApp.AddEnvironmentVariables("PREFIX_"); configApp.AddCommandLine(args);var rootConfiguration =configApp.Build(); QuartzOpt= newQuartzOption(); rootConfiguration.GetSection("Quartz").Bind(QuartzOpt); //绑定quartz的配置类的数据 }).ConfigureLogging((hostContext, configBuild)=>{ configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging")); configBuild.AddConsole(); configBuild.AddCustomizationLogger(); }) .ConfigureServices((hostContext, service)=>{ service.AddKafka(KafkaBuilder=>{ KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService")); }); service.AddZookeeper(zookeeperBuilder=>{ zookeeperBuilder.AddConfiguration(hostContext.Configuration.GetSection("zookeeperService")); }); service.AddDbContext(option =>option.UseMySQL(hostContext.Configuration.GetConnectionString("QuartzDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);//这个是操作数据库的数据库服务,这个和 quartz的cluster数据提供程序是分开的。 }) .Build(); Host=host; ILoggerFactory loggerFact= host.Services.GetService(); LogProvider.SetCurrentLogProvider(newConsoleLogProvider(loggerFact));//将框架的日志提供程序,传递给quart的日志接口。 var ischema =RunProgramRunExample(loggerFact); //从数据库构造job的方法 host.WaitForShutdown();//netcore的通用主机。 ischema.Shutdown(true);//quartz自己的主机。

quartz框架的架构和netcore目前的架构不兼容,netcore的通道和服务部件的软件架构方式,quartz先天不支持,你无法将任何上下文,比如host上下文,configuration上下文或者service上下文,传递给quartz。所以我使用了属性的方式:

privateILoggerFactory loggerFact;public static IHost Host { get; set; }public static String QUARTZINSTANCEID = "PREFIXQUARTZINSTANCEID";public static QuartzOption QuartzOpt { get; set; }

在quartz上下文是这样使用的:

ILoggerFactory loggerFact = Program.Host.Services.GetService();var _logger=loggerFact.CreateLogger();

而我在上一节说的不支持remote的解决方案,使用netcore的解决方案就是kestrel,netcoe宣称后面将要将webHost合并到通用主机里面,netcore确实目前发展较快,内部代码优化时,代码变动较大

,比如我上次扩展log模块,上一版本程序设计上和当前版本几乎没有什么可重用的,当然,对于开发者,并没有什么区别,因为肯定会保持兼容。

咱们看quart的日志模块:ConsoleLogProviderusingSystem;usingMicrosoft.Extensions.Logging;usingQuartz.Logging;namespaceWalt.Framework.Quartz.Host {public classConsoleLogProvider : ILogProvider {privateILoggerFactory logFactory;publicConsoleLogProvider(ILoggerFactory logFactory) { logFactory=logFactory; }public Logger GetLogger(stringname) {return (level, func, exception, parameters) =>{if (func != null) {string logInfo=string.Format(func(), parameters);var log=_logFactory.CreateLogger();//将提供程序,替换为自定义的分布式log提供程序 log.LogDebug(logInfo); }return true; }; }public IDisposable OpenNestedContext(stringmessage) {throw newNotImplementedException(); }public IDisposable OpenMappedContext(string key, stringvalue) {throw newNotImplementedException(); } } }

再看quartz的配置类:

namespaceWalt.Framework.Quartz.Host {public classQuartzOption {public string InsatanceId{get;set;} //很重要,cluster中必须两个实例不一样。public string InstanceName{get;set;} //quartz的实例名称,一般情况下用于显示名称。public bool IsClear{get;set;} //是否启动的时候清理job,因为cluster在数据库中有历史数据。public bool IsSlave{get;set;} //是不是slave,预留,暂时没用public int CustomerRecordCountForTest{get;set;}//分片时候,每个机器分到的需要处理的数据的数量 } }

然后就是咱们的主要方法:RunProgramRunExample

private staticIScheduler RunProgramRunExample(ILoggerFactory loggerFact) {var log = loggerFact.CreateLogger();try{var config = Host.Services.GetService();//Grab the Scheduler instance from the Factory NameValueCollection properties = newNameValueCollection { ["quartz.scheduler.instanceName"] =QuartzOpt.InstanceName, ["quartz.scheduler.instanceId"] =QuartzOpt.InsatanceId, ["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz", ["quartz.threadPool.threadCount"] = "5", ["quartz.jobStore.misfireThreshold"] = "60000", ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz", ["quartz.jobStore.useProperties"] = "false", ["quartz.jobStore.dataSource"] = "default", ["quartz.jobStore.tablePrefix"] = "QRTZ_", ["quartz.jobStore.clustered"] = "true", ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz", ["quartz.dataSource.default.connectionString"] = config.GetConnectionString("QuatrzClustDatabase"), ["quartz.dataSource.default.provider"] = "MySql", ["quartz.serializer.type"] = "json", ["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz", //从这个往下netcore都不支持,以上为集群配置,只要两个实例id不同的quartz,配置同一个数据源,就会自动的按照cluster运行,还有一点就是如果分布到不同机器,一定要配置ntp时间服务器,同步时间。 ["quartz.scheduler.exporter.port"] = "555", ["quartz.scheduler.exporter.bindName"] = "QuartzScheduler", ["quartz.scheduler.exporter.channelType"] = "tcp", ["quartz.scheduler.exporter.channelName"] = "httpQuartz", ["quartz.scheduler.exporter.rejectRemoteRequests"] = "true"}; StdSchedulerFactory factory= newStdSchedulerFactory(properties); IScheduler scheduler=factory.GetScheduler().GetAwaiter().GetResult();string machine =Environment.MachineName; //获取当前的机器名 QuartzDbContext db= Host.Services.GetService();var listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0 && w.MachineName == machine && w.InstanceId ==QuartzOpt.InsatanceId) .ToListAsync().GetAwaiter().GetResult(); //从数据库中获取这台机器和实例中的job log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask)); Dictionary collAssembly=new Dictionary(); //加载程序集foreach (var item inlistQuartzTask)//首先第一次加载全部的程序集 {//加载程序集 if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName)) {try{ collAssembly[item.AssemblyName]=AssemblyHelp.GetAssemblyByteByAssemblyName( Path.Combine(Directory.GetCurrentDirectory(),"AssemblyColl"), item.AssemblyName); }catch(Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, ep, "没有找到程序集."); Task.Delay(10000);continue; } } }//and start it off scheduler.Start();//if (!QuartzOpt.IsSlave)//{ var task = Task.Run(() =>{bool isClear =QuartzOpt.IsClear; log.LogInformation("job监控程序开始循环,间隔为15秒");while (true) //主要用来循环数据库记录,在添加或者修改job的时候,自动重新添加和执行job。 {try{if (scheduler != null) { log.LogDebug("检查scheduler是否开始");if(scheduler.IsStarted) {if(isClear) //启动清理 { scheduler.Clear().GetAwaiter().GetResult(); isClear= false; } log.LogDebug("scheduler已经开始"); db = Host.Services.GetService(); listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0 && w.MachineName == machine && w.InstanceId ==QuartzOpt.InsatanceId) .ToListAsync().GetAwaiter().GetResult(); //在循环中获取数据库中本机器和实例的job记录。 log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask)); foreach (var item inlistQuartzTask) {//加载程序集 if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName))//预先加载新添加的job的程序集 {try{ collAssembly[item.AssemblyName]=AssemblyHelp.GetAssemblyByteByAssemblyName( Path.Combine(Directory.GetCurrentDirectory(),"AssemblyColl"), item.AssemblyName); }catch(Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, ep, "没有找到程序集."); Task.Delay(10000);continue; } } log.LogDebug("开始检查task:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(item));var jobKey = newJobKey(item.TaskName, item.GroupName);var triggerKey = newTriggerKey(item.TaskName, item.GroupName);if(scheduler.CheckExists(jobKey).Result) //如果存在,则根据状态处理相应的动作。 {var jobDetai =scheduler.GetJobDetail(jobKey);var trigger =scheduler.GetTrigger(triggerKey); log.LogDebug("此task已经存在scheduler中,数据库状态:{0},scheduer中的状态:{1}.trigger状态:{2}", ((OperateStatus)item.OperateStatus).ToString(), jobDetai.Status.ToString(), trigger.Status.ToString());if ((OperateStatus)item.OperateStatus ==OperateStatus.Stop) //如果数据库中停止job,则删除这个job,如果有remote,可以实时,处理,这断代码就没有用了,但是可以作为远程处理失败的预防错误,所以可以保留。 { log.LogInformation("删除schduler中的job:{0}", jobKey.ToString());if (!scheduler.DeleteJob(jobKey).GetAwaiter().GetResult()) { log.LogError("删除job失败。name:{0},group:{1}", jobKey.Name, jobKey.Group); } }else{if(jobDetai.IsFaulted)//如果失败,则更改数据库中job的状体,同理,如果有remote,这个可以作为预防错误。 {if (jobDetai.Exception != null) { log.LogError(10005, jobDetai.Exception, "job faulted"); }var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0 && w.TaskName ==jobKey.Name&& w.GroupName ==jobKey.Group&& w.MachineName ==machine&& w.InstanceId ==scheduler.SchedulerInstanceId); item.Status= (int)TaskStatus.Faulted; item.OperateStatus= (int)OperateStatus.Stop; db.Update(jobItem); db.SaveChanges(); }else //如果非执行状态,则中断{if (jobDetai.Status !=TaskStatus.Running&& jobDetai.Status !=TaskStatus.RanToCompletion&& jobDetai.Status !=TaskStatus.WaitingForActivation&& jobDetai.Status !=TaskStatus.WaitingForChildrenToComplete&& jobDetai.Status !=TaskStatus.WaitingToRun) {var interTask = scheduler.Interrupt(jobKey, new CancellationToken(true)) .GetAwaiter().GetResult(); jobDetai.Start(); } } }var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");if (triggerListener == null) { triggerListener= new TriggerUpdateListens("trigger"+item.TaskName); IMatcher triggermatcher = KeyMatcher.KeyEquals(triggerKey); scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher); }var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");if (jobListener == null) { IJobListener jobUpdateListener= new JobUpdateListens("job"+item.TaskName); IMatcher jobmatcher = KeyMatcher.KeyEquals(jobKey); scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher); } }else//如果不存在,则新添加job,以及执行{ log.LogInformation("添加新的job,判断是否状态为停止。");if ((OperateStatus)item.OperateStatus !=OperateStatus.Stop) { log.LogInformation("添加新的job");var assemblyName =item.AssemblyName;var className =item.ClassName; Type jobTaskType= null;try{ jobTaskType=AssemblyHelp.GetTypeByAssemblyNameAndClassName(collAssembly[item.AssemblyName], className); log.LogInformation("找到类型,type:{0}",className); }catch(Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, ep, "没有找到type."); }if (jobTaskType == null) {try{ jobTaskType=AssemblyHelp .GetTypeByCurrentAssemblyNameAndClassName(className, Assembly.GetExecutingAssembly());if (jobTaskType == null) { log.LogInformation("没有找到类型");continue; } log.LogInformation("找到类型,type:{0}",className); }catch(Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, ep, "没有找到类型.");continue; } } IJobDetail job=JobBuilder.Create(jobTaskType) .WithIdentity(item.TaskName, item.GroupName) .Build(); ITrigger trigger=TriggerBuilder.Create() .WithIdentity(item.TaskName, item.GroupName) .StartNow() .WithCronSchedule(item.CronExpressionString) .Build(); scheduler.ScheduleJob(job, trigger).GetAwaiter().GetResult(); log.LogInformation("添加成功,type:{0}",className); ITriggerListener triggerListener= new TriggerUpdateListens("trigger"+item.TaskName); IMatcher triggermatcher = KeyMatcher.KeyEquals(trigger.Key); scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher); IJobListener jobUpdateListener= new JobUpdateListens("job"+item.TaskName); IMatcher jobmatcher = KeyMatcher.KeyEquals(job.Key); scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher); } } } }else{ log.LogInformation("scheduler is not IsStarted"); } }else{ log.LogInformation("scheduler is null"); } }catch(Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, ep, "task监控程序执行错误."); } Thread.Sleep(15000); } });//}//else//{//db = Host.Services.GetService();//listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0//&& w.MachineName == machine//&& w.InstanceId == QuartzOpt.InsatanceId)//.ToListAsync().GetAwaiter().GetResult();//foreach (var item in listQuartzTask)//{//var jobKey = new JobKey(item.TaskName, item.GroupName);//var triggerKey = new TriggerKey(item.TaskName, item.GroupName);// //var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0// //&& w.TaskName == jobKey.Name// //&& w.GroupName == jobKey.Group// //&& w.MachineName == machine// //&& w.InstanceId == scheduler.SchedulerInstanceId);// //item.Status = (int)TaskStatus.Faulted;// //item.OperateStatus = (int)OperateStatus.Stop;// //db.Update(jobItem);// //db.SaveChanges();//if (scheduler.CheckExists(jobKey).Result)//{//var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");//if (triggerListener == null)//{//triggerListener = new TriggerUpdateListens();//IMatcher triggermatcher = KeyMatcher.KeyEquals(triggerKey);//scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);//}//var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");//if (jobListener == null)//{//IJobListener jobUpdateListener = new JobUpdateListens();//IMatcher jobmatcher = KeyMatcher.KeyEquals(jobKey);//scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);//}//}//}//} returnscheduler;//Tell quartz to schedule the job using our trigger//await scheduler.ScheduleJob(job, trigger); }catch(SchedulerException sep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error,0, sep, "job执行错误。"); }return null; }

咱们现在看trriger监控类和job的监控类:

usingSystem;usingSystem.Threading;usingSystem.Threading.Tasks;usingQuartz;usingQuartz.Logging;usingQuartz.Impl;usingWalt.Framework.Service.Zookeeper;usingMicrosoft.Extensions.DependencyInjection;usingorg.apache.zookeeper;usingSystem.Linq;usingSystem.Collections.Generic;usingMicrosoft.Extensions.Logging;using staticorg.apache.zookeeper.KeeperException;namespaceWalt.Framework.Quartz.Host {public classTriggerUpdateListens : ITriggerListener {public string Name { get; set; }public TriggerUpdateListens(stringname) { Name=name; }private bool VoteJob{ get; set;}public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default(CancellationToken)) { ILoggerFactory loggerFact= Program.Host.Services.GetService();var logger=loggerFact.CreateLogger(); logger.LogInformation(0, null, "执行成功.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);return Task.FromResult(true); } //trigger激发,这是job执行的第一个执行的。public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken)) { ILoggerFactory loggerFact= Program.Host.Services.GetService();var logger=loggerFact.CreateLogger(); logger.LogInformation(0, null, "开始执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);string machine =Environment.MachineName; //获取当前机器名try{var customerAttri = context.JobDetail.JobType.GetCustomAttributes(false);foreach (var customer incustomerAttri) {if (customer isDistributingAttributes) //如果打这个标签,则说明是分片job。 {var distri = customer asDistributingAttributes;var zookeeper = Program.Host.Services.GetService();string currentTempNodeName = string.Empty;string fullPath = "/lock/"+ context.JobDetail.Key.Name +context.JobDetail.Key.Group;int flag = 0; Repeat: //这里因为某些原因失败,可以给重复几次。string jsonData = zookeeper.GetDataByLockNode(fullPath, "getlock", ZooDefs.Ids.OPENACLUNSAFE,outcurrentTempNodeName);if(jsonData==null) { _logger.LogError("获取锁失败。节点:{0},锁前缀:{1},重试:{2}",fullPath,"getlock",flag);if(flag<=2) { flag= flag + 1;gotoRepeat; } VoteJob= true; //如果获取失败,则否决执行job,这个变量在下面的trriger方法中使用。//context.Scheduler.Interrupt(context.JobDetail.Key);

return Task.FromResult(false); //返回false,则会执行VetoJobExecution方法。}//获取锁成功,处理分片数据,构造分片上下文。 QuartzDbContext db= Program.Host.Services.GetService();var item = db.QuartzTask.Where(w => w.IsDelete == 0 && w.TaskName ==context.JobDetail.Key.Name&& w.GroupName ==context.JobDetail.Key.Group&& w.MachineName ==machine&& w.InstanceId ==context.Scheduler.SchedulerInstanceId).FirstOrDefault();if (item != null) {//TODO 这里可以找出机器名,拼接remote的api,可以查看分片主机是否存活,从而将一些挂起的任务重新分配。 }string distributeFlag = item.MachineName +item.InstanceId; List distriData = new List(); DistributingData currentDistriEntity= null;if (string.IsNullOrEmpty(jsonData)) { currentDistriEntity= newDistributingData //分片元数据 { DistributeFlag=distributeFlag,//分片标记,以机器名和实例名构造 PageIndex= 1, PageSize= Program.QuartzOpt.CustomerRecordCountForTest //配置的需要处理的数据数量 }; distriData.Add(currentDistriEntity); }else{ distriData= Newtonsoft.Json.JsonConvert.DeserializeObject>(jsonData);if (distriData == null || distriData.Count() < 1) { currentDistriEntity= newDistributingData { DistributeFlag=distributeFlag, PageIndex= 1, PageSize= Program.QuartzOpt.CustomerRecordCountForTest //配置 }; distriData.Add(currentDistriEntity); }else{ currentDistriEntity= distriData.Where(w => w.DistributeFlag ==distributeFlag).SingleOrDefault();if (currentDistriEntity == null) //当前主机还没有分片过,将当前主机加入分片集群 {var maxPageIndex = distriData.Max(w =>w.PageIndex); maxPageIndex= maxPageIndex + 1;var entity = newDistributingData { DistributeFlag=distributeFlag, PageIndex=maxPageIndex, PageSize= Program.QuartzOpt.CustomerRecordCountForTest //配置 }; distriData.Add(entity); }else{var maxPageIndex = distriData.Max(w =>w.PageIndex); maxPageIndex= maxPageIndex + 1; currentDistriEntity.PageIndex=maxPageIndex; } } } item.Remark=Newtonsoft.Json.JsonConvert.SerializeObject(currentDistriEntity); db.Update(item); db.SaveChanges();string resultData =Newtonsoft.Json.JsonConvert.SerializeObject(distriData); context.JobDetail.JobDataMap.Put("distriData", currentDistriEntity); //将分片数据放入数据上下文,job中可以访问。 zookeeper.SetDataAsync(fullPath , resultData,false).GetAwaiter().GetResult(); zookeeper.DeleteNode(currentTempNodeName); //处理完成,需要删除当前节点,释放锁。 logger.LogInformation("分片执行:{0}",resultData); } } }catch(ConnectionLossException cle) { VoteJob= true; logger.LogError(cle,"获取同步锁出现错误。连接丢失"); }catch(SessionExpiredException sep) { VoteJob= true; logger.LogError(sep,"获取同步锁出现错误。连接过期"); }catch(KeeperException kep) { VoteJob= true; logger.LogError(kep,"获取同步锁出现错误。操作zookeeper出错"); }catch(Exception ep) {try{ logger.LogError(0,ep,"分片失败。");//context.Scheduler.DeleteJob(context.JobDetail.Key).GetAwaiter().GetResult(); VoteJob = true; QuartzDbContext db= Program.Host.Services.GetService();var item = db.QuartzTask.Where(w => w.IsDelete == 0 && w.TaskName ==context.JobDetail.Key.Name&& w.GroupName ==context.JobDetail.Key.Group&& w.MachineName ==machine&& w.InstanceId ==context.Scheduler.SchedulerInstanceId).FirstOrDefault();if (item == null) { logger.LogError(0, ep, "分片失败,获取数据库记录失败。"); }else{ item.Status= (int)TaskStatus.Canceled; item.OperateStatus= (int)OperateStatus.Stop; item.Remark=ep.ToString(); db.Update(item); db.SaveChanges(); } }catch(Exception eep) { logger.LogError(0, eep, "分片失败,更新数据库失败。"); } }return Task.FromResult(true); }public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default(CancellationToken)) {return Task.FromResult(true); } // //当TriggerComplete返回false,执行这个方法。 public Task VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken)) { ILoggerFactory loggerFact= Program.Host.Services.GetService();var logger=loggerFact.CreateLogger();if(VoteJob) { _logger.LogInformation(0, null, "取消执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group); }returnTask.FromResult(VoteJob);//VoteJob在TriggerComplete中指定,默认为false//,如果获取锁失败,则设置为true,这个方法返回true,则只执行JobUpdateListens的JobExecutionVetoed方法,然后job这一次将不执行。 } } }

接下来看job执行前都执行那些方法:

usingSystem;usingSystem.Threading;usingSystem.Threading.Tasks;usingQuartz;usingQuartz.Logging;usingQuartz.Impl;usingMicrosoft.Extensions.DependencyInjection;usingSystem.Linq;usingMicrosoft.EntityFrameworkCore;usingMicrosoft.Extensions.Logging;namespaceWalt.Framework.Quartz.Host {public classJobUpdateListens : IJobListener {public string Name { get; set; }public JobUpdateListens(stringname) { Name=name; }public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken)) {return Task.FromResult(true); } //job执行前执行,将状态放入数据库。public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken)) {try{string machine =Environment.MachineName; QuartzDbContext db= Program.Host.Services.GetService();var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0 && w.TaskName ==context.JobDetail.Key.Name&& w.GroupName ==context.JobDetail.Key.Group&& w.MachineName ==machine&& w.InstanceId ==context.Scheduler.SchedulerInstanceId); item.Status= (int)TaskStatus.WaitingToRun; db.Update(item); db.SaveChanges(); }catch(Exception ep) {//context.Scheduler.Interrupt(context.JobDetail.Key); var logFaoctory = Program.Host.Services.GetService();var log = logFaoctory.CreateLogger(); log.LogError(0, ep, "JobToBeExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group); }return Task.FromResult(true); } //job执行后执行,这个方法将执行结果放入数据库,处理异常。public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default(CancellationToken)) {try{ QuartzDbContext db= Program.Host.Services.GetService();var logFaoctory = Program.Host.Services.GetService();var log = logFaoctory.CreateLogger();string machine =Environment.MachineName;var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0 && w.TaskName ==context.JobDetail.Key.Name&& w.GroupName ==context.JobDetail.Key.Group&& w.MachineName ==machine&& w.InstanceId ==context.Scheduler.SchedulerInstanceId);if (jobException != null) { item.Status= (int)TaskStatus.Faulted; item.Remark=Newtonsoft.Json.JsonConvert.SerializeObject(jobException); log.LogError("Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group); }else{ item.Status= (int)TaskStatus.RanToCompletion; item.RecentRunTime=context.FireTimeUtc.DateTime;if(context.NextFireTimeUtc.HasValue) { item.NextFireTime=context.NextFireTimeUtc.Value.DateTime; } } db.Update(item); db.SaveChanges(); }catch(Exception ep) {//context.Scheduler.Interrupt(context.JobDetail.Key); var logFaoctory = Program.Host.Services.GetService();var log = logFaoctory.CreateLogger(); log.LogError(0, ep, "JobWasExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group); }return Task.FromResult(true); } } }

在这次使用 zookeeperservice中,优化了一些代码:

 usingSystem;usingSystem.Collections.Generic;usingSystem.Threading.Tasks;usingMicrosoft.Extensions.Logging;usingMicrosoft.Extensions.Options;usingorg.apache.zookeeper;usingorg.apache.zookeeper.data;using staticorg.apache.zookeeper.ZooKeeper;usingSystem.Linq;usingSystem.Linq.Expressions;usingSystem.Threading;using staticorg.apache.zookeeper.Watcher.Event;usingNewtonsoft.Json;usingSystem.Collections.Concurrent;using staticorg.apache.zookeeper.ZooDefs;using staticorg.apache.zookeeper.KeeperException;namespaceWalt.Framework.Service.Zookeeper {internal classWaitLockWatch : Watcher {privateAutoResetEvent autoResetEvent;privateManualResetEvent mutex;privateILogger logger;private stringpath;publicWaitLockWatch(AutoResetEvent autoResetEvent , ILogger logger,stringpath , ManualResetEvent mutex) { autoResetEvent=autoResetEvent; logger=logger; path=path; mutex=mutex; }public overrideTask process(WatchedEvent @event) { mutex.Set();return Task.FromResult(true); } }internal classWaitConnWatch : Watcher {privateAutoResetEvent autoResetEvent;privateILogger logger;privateManualResetEvent mutex;publicWaitConnWatch(AutoResetEvent autoResetEvent ,ILogger logger ,ManualResetEvent mutex) { autoResetEvent=autoResetEvent; logger=logger; mutex=mutex; }public overrideTask process(WatchedEvent @event) { logger.LogInformation("watch激发,回掉状态:{0}",@event.getState().ToString());if(@event.getState()==KeeperState.SyncConnected||@event.getState()==KeeperState.ConnectedReadOnly) { logger.LogInformation("释放连接阻塞"); autoResetEvent.Set(); }else{ logger.LogInformation("连接断开,释放分布式锁阻塞"); mutex.Set(); }return Task.FromResult(0); } }public classZookeeperService : IZookeeperService {privateZookeeperOptions zookeeperOptions;privateZooKeeper zookeeper;private static readonly byte[] NO_PASSWORD = new byte[0];public Watcher Wathcer {get;set;}public ILoggerFactory LoggerFac { get; set; }privateILogger _logger;internal Thread CurrThread{ get; } AutoResetEvent[] autoResetEvent=new AutoResetEvent[2] {new AutoResetEvent(false),new AutoResetEvent(false)}; ManualResetEvent _manualReset= new ManualResetEvent(false);public ZookeeperService(IOptionsMonitorzookeeperOptions ,ILoggerFactory loggerFac) { LoggerFac=loggerFac; _logger=LoggerFac.CreateLogger(); _zookeeperOptions=zookeeperOptions.CurrentValue; _logger.LogInformation("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions)); zookeeperOptions.OnChange((zookopt,s)=>{ _zookeeperOptions=zookopt; }); _logger.LogInformation("开始连接"); Conn(_zookeeperOptions); CurrThread=System.Threading.Thread.CurrentThread; }private voidConn(ZookeeperOptions zookeeperOptions) {bool isReadOnly=default(Boolean); Wathcer=new WaitConnWatch(autoResetEvent[0],_logger,_manualReset);if(isReadOnly!=zookeeperOptions.IsReadOnly) { isReadOnly=zookeeperOptions.IsReadOnly; }byte[] pwd=new byte[0];//如果没有密码和sessionId if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)&&_zookeeperOptions.SessionId==default(int)) { _zookeeper=newZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly); }else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd)) { pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd); _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly); }else{ _zookeeper=newZooKeeper(zookeeperOptions.Connectstring ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly); }if(_zookeeper.getState()==States.CONNECTING) { _logger.LogInformation("当前状态:CONNECTING。阻塞等待"); autoResetEvent[0].WaitOne(); } }public Task CreateZNode(string path,string data,CreateMode createMode,ListaclList) { ReConn();if(string.IsNullOrEmpty(path)||!path.StartsWith('/')) { _logger.LogInformation("path路径非法,参数:path:{0}",path);return null; }byte[] dat=new byte[0];if(string.IsNullOrEmpty(data)) { dat=System.Text.Encoding.Default.GetBytes(data); }if(createMode==null) { _logger.LogInformation("createMode为null,默认使用CreateMode.PERSISTENT"); createMode=CreateMode.PERSISTENT; }return_zookeeper.createAsync(path,dat,aclList,createMode); }public async void Sync(stringpath) {try{ _logger.LogInformation("同步成功");await_zookeeper.sync(path); }catch(Exception ep) { _logger.LogError("同步失败。", ep); } }public async Task GetDataAsync(string path,Watcher watcher,boolisSync) { ReConn();if(await _zookeeper.existsAsync(path)==null) { _logger.LogInformation("path不存在");return null; }if(isSync) { _logger.LogInformation("即将进行同步。");try{await_zookeeper.sync(path); _logger.LogInformation("同步成功"); }catch(Exception ep) { _logger.LogError("同步失败。", ep); } }return await_zookeeper.getDataAsync(path,watcher); }public async Task SetDataAsync(string path,string data,boolisSync) { ReConn();if(await _zookeeper.existsAsync(path)==null) { _logger.LogInformation("path不存在");return null; }byte[] dat=new byte[0];if(!string.IsNullOrEmpty(data)) { dat=System.Text.Encoding.Default.GetBytes(data); }return await_zookeeper.setDataAsync(path,dat); }public async Task GetChildrenAsync(string path, Watcher watcher, boolisSync) { ReConn();if (await _zookeeper.existsAsync(path) == null) { _logger.LogInformation("path不存在");return null; }if(isSync) { _logger.LogInformation("即将进行同步。");try{ _logger.LogInformation("开始同步");await_zookeeper.sync(path); _logger.LogInformation("同步成功"); }catch(Exception ep) { _logger.LogError("同步失败。", ep); } }return await_zookeeper.getChildrenAsync(path, watcher); }public async Task DeleteNode(stringpath) { ReConn();if(await _zookeeper.existsAsync(path)==null) { _logger.LogDebug("删除path:path不存在");return; }try{ _logger.LogDebug("删除node:{0}", path);await_zookeeper.deleteAsync(path); }catch(Exception ep) { _logger.LogError("删除失败", ep);return; } }public async Task SetWatcher(stringpath,Watcher watcher) { ReConn();var stat = await_zookeeper.existsAsync(path);if(stat==null) { _logger.LogDebug("判断path是否存在:path不存在");return false; }try{ _logger.LogDebug("设置监控:{0}", path);await_zookeeper.getDataAsync(path,watcher);return true; }catch(Exception ep) { _logger.LogError("设置监控错误", ep);return false; } }public string GetDataByLockNode(string path,string sequenceName,List aclList,out stringtempNodeOut) { _logger.LogInformation("获取分布式锁开始。");string tempNode=string.Empty; tempNodeOut=string.Empty;try{ ReConn();if (_zookeeper.existsAsync(path).Result == null) { _logger.LogDebug("path不存在,创建"); CreateZNode(path,"", CreateMode.PERSISTENT, aclList).GetAwaiter().GetResult(); } tempNode= CreateZNode(path + "/" + sequenceName, "", CreateMode.EPHEMERAL_SEQUENTIAL, aclList).Result; _logger.LogDebug("创建节点:{0}", tempNode);if (tempNode == null) { _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}", path+ "/squence", "", CreateMode.EPHEMERAL_SEQUENTIAL);return null; } _logger.LogInformation("创建成功。");//var taskGetData=Task.Run(async () =>{//int circleCount = 0;//while (true)//{//Thread.Sleep(200);//circleCount++;//_logger.LogInformation("循环获取锁。当前循环次数:{0}", circleCount);//try//{//var childList =await GetChildrenAsync(path, null, true);//if (childList == null || childList.Children == null || childList.Children.Count < 1)//{//_logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);//return null;//}//_logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));//var top = childList.Children.OrderBy(or => or).First();//if (path + "/" + top == tempNode)//{//return tempNode;//}//}//catch (Exception ep)//{//_logger.LogError(ep,"循环获取锁出错。");//return null;//}//}//});//tempNode = taskGetData.GetAwaiter().GetResult();//if (!string.IsNullOrEmpty(tempNode))//{//byte[] da = null;//tempNodeOut = tempNode;//da = GetDataAsync(path, null, true).Result.Data;//if (da == null || da.Length < 1)//{//return string.Empty;//}//return System.Text.Encoding.Default.GetString(da);//} int clycleCount = 0; GetChild://这里防止并发出现错误。 clycleCount++;var childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();if (childList == null || childList.Children == null || childList.Children.Count < 1) { _logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);return null; } _logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));var top = childList.Children.OrderBy(or =>or).First();if (path + "/" + top ==tempNode) { tempNodeOut=tempNode;var da = GetDataAsync(path, null, true).Result.Data;if (da == null || da.Length < 1) {return string.Empty; }returnSystem.Text.Encoding.Default.GetString(da); }//bool isSet=//SetWatcher(path + "/" + top,).Result;//if(!isSet)//{//goto GetChild;//} bool isSet= SetWatcher(path + "/" + top,new WaitLockWatch(autoResetEvent[1], _logger, path,_manualReset)).Result;if(!isSet) { _logger.LogWarning("没有设置上watcher,需要重新运行一遍。");gotoGetChild; } _manualReset.WaitOne(15000); childList= GetChildrenAsync(path, null, true).GetAwaiter().GetResult();if (childList == null || childList.Children == null || childList.Children.Count < 1) { _logger.LogWarning("再次获取子序列失败,计数为零.path:{0}", path);return null; } _logger.LogInformation("再次获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); top= childList.Children.OrderBy(or =>or).First();if (path + "/" + top ==tempNode) { _logger.LogDebug("节点获取到锁权限。"); tempNodeOut=tempNode;var da = GetDataAsync(path, null, true).Result.Data;if (da == null || da.Length < 1) {return string.Empty; }returnSystem.Text.Encoding.Default.GetString(da); }else{ _logger.LogDebug("没有获取到锁权限,进行循环。循环第:{0}次",clycleCount); Thread.Sleep(1000);gotoGetChild;//Sync(path);//DeleteNode(tempNode).GetAwaiter().GetResult();//DeleteNode(tempNode).GetAwaiter().GetResult();//_logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");//if (_zookeeper.existsAsync(tempNode).Result== null)//{//_logger.LogWarning("tempNode:{0}存在,但是没有获取到锁,在等待的时候,被线程检查程序释放了阻塞,属于误伤"//,tempNode);//}//else//{//_logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");//} } }catch(ConnectionLossException cle) { _logger.LogError(cle,"获取同步锁出现错误。连接丢失"); }catch(SessionExpiredException sep) { _logger.LogError(sep,"获取同步锁出现错误。连接过期"); }catch(KeeperException kep) { _logger.LogError(kep,"获取同步锁出现错误。操作zookeeper出错"); }catch(Exception ep) { _logger.LogError(ep,"获取同步锁出现错误。");if (!string.IsNullOrEmpty(tempNode)) {try{ DeleteNode(tempNode).GetAwaiter().GetResult(); }catch(Exception) { } } }return null; }private voidReConn() { _logger.LogInformation("检查连接状态,status:{0}",_zookeeper.getState());if(_zookeeper.getState()==States.CLOSED||_zookeeper.getState()==States.NOT_CONNECTED) { _logger.LogInformation("连接为关闭,开始重新连接。"); Conn(_zookeeperOptions); } }public async void Close(stringtempNode) {try{await_zookeeper.closeAsync(); }catch(Exception ep) { _logger.LogError("zookeeper关闭失败。", ep); } } } }

 

下面看结果

咱们关闭master2,然后看看slave1:

大家看时间上,master2已经2分钟没运行了,如果要有好的监控,还是最好实现remote,也不是什么难事,将通用主机改为webhost,然后写几个api就行了,就会实时的监控quartz主机的状态

,而且分片也会自动去除失败的主机,自动分派任务。咱们看看分片的情况:

master2已经失败,但是这里没有去掉,不过不影响接下来的任务,再者master2的任务如果在失败的时候没执行完成,那么会有一部分数据是没有处理的。

三 总结                  

      quartz扩展主要注意两点,一是job listeners和trigger listeners,做好异常处理,尤其是trigger listeners,如果出错,job会失去控制,不激发,而且job状态也会失效,必须重新关闭和重新运行一次。在写业务代码的时候,分片的需要处理的数据源必须是有规律自增的或者是静态的,这样分页才满足业务分片的要求。如果是非自增的或者随机增加,那么quartz就必须把需要处理的主键存进去,但是这样的需求毕竟是少数。

微服务系列的github:https://github.com/ck0074451665/Walt.Framework.git

 

测试例子:https://github.com/ck0074451665/Walt.MicroServicesTest.git

管理界面:https://pan.baidu.com/s/1gYNDX1j3-XctuPiejV2XPQ

 

版权声明
本文为[无为有道]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/ck0074451665/p/10321096.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领