摘要:最近因工作需要开发计划任务模块接触到了Hangfire。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。以上服务注入并执行,接下来就是往hangfire里面添加任务。源码分析客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。
最近因工作需要开发计划任务模块(严格来说应该是修改bug吧,其他同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差不多了,在github上面down了hangfire源码,下面分享一下,自己读hangfire源码的一些理解,和工作中需要注意的地方。介绍大概分为以下几个部分吧。1.准备工作,2.简单使用,3.源码分析,4.避坑。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。
准备工作:hangfire源代码的代码量不多,github地址:
https://github.com/HangfireIO/Hangfire,有兴趣的朋友可以自己下载瞅瞅源码。功能上大概可以分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。可以这么说,它的定时任务完全就是基于多线程协作实现的。因为是多线程环境,所以个人觉得看起来有点费力。
简单使用:.Net&.Net Core环境都可以使用,下面就以.Net Core的使用为例。
1.客户端和服务端独立部署
client端
1 publicIServiceProvider ConfigureServices(IServiceCollection services)
2 {
3 //其他代码
4 services.AddHangfire(config =>
5 {
6 config.UseSqlServerStorage(...);
7 });
8 }
9
10 public voidConfigure(IApplicationBuilder app, IWebHostEnvironment env)
11 {
12 //其他代码...
13 //启用Dashboard看板
14 app.UseHangfireDashboard();
15 }
server端
1 public voidConfiguration(IAppBuilder app)
2 {
3 GlobalConfiguration.Configuration
4 .UseSqlServerStorage("连接字符串", newSqlServerStorageOptions
5 {
6 //options
7 });
8 app.UseHangfireServer(newBackgroundJobServerOptions
9 {
10 });
11 }
12
13
或者
1 services.AddHangfireServer(options =>
2 {
3 //基于IHostedService接口实现
4 });
PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法一样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,可以简单看下IHostedService接口的定义。
1 public interfaceIHostedService
2 {
3 Task StartAsync(CancellationToken cancellationToken);
4 Task StopAsync(CancellationToken cancellationToken);
5 }
接口就定义了两个方法,start是在程序启动的时候执行,当然stop就是在程序停止的时候执行。我们用一张图简单描绘一下它的执行时机,图是盗的。
以上就是hangfire的client端和server端分开部署的一个简单应用,下面我们看下第二种,client&server部署在同一台机器上。
2.客户端和服务端统一部署
1 public voidConfiguration(IAppBuilder app)
2 {
3 GlobalConfiguration.Configuration.UseSqlServerStorage(); //配置数据库连接
4
5 app.UseHangfireServer(); //启用server
6 app.UseHangfireDashboard(); //启用看板
7 }
简单的几行代码,当然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); //立即执行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); //延后执行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); //循环执行,支持cron表达式
简单使用就到这吧,我们继续大纲的第三部分,源码分析。
源码分析
客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。我们先找到入口,也可以看做是NetCore里面的一个中间件吧。看代码
1 app.UseHangfireServer(); //启用server
UseHangfireServer实现
1 public staticIAppBuilder UseHangfireServer(
2 [NotNull] thisIAppBuilder builder,
3 [NotNull] JobStorage storage,
4 [NotNull] BackgroundJobServerOptions options,
5 [NotNull] paramsIBackgroundProcess[] additionalProcesses)
6 {
7 //其他代码...
8 var server = newBackgroundJobServer(options, storage, additionalProcesses);
9
10 returnbuilder;
11 }
UseHangfireServer扩展方法实现里面,比较重要的一行代码就是创建BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。我们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
1 //IBackgroundProcessingServer
2 public interfaceIBackgroundProcessingServer : IDisposable
3 {
4 voidSendStop();
5 boolWaitForShutdown(TimeSpan timeout);
6 Task WaitForShutdownAsync(CancellationToken cancellationToken);
7 }
8
9 //BackgroundJobServer
10 public classBackgroundJobServer : IBackgroundProcessingServer
11 {
12 //其他成员...
13 publicBackgroundJobServer(
14 [NotNull] BackgroundJobServerOptions options,
15 [NotNull] JobStorage storage,
16 [NotNull] IEnumerable<IBackgroundProcess>additionalProcesses,
17 [CanBeNull] IJobFilterProvider filterProvider,
18 [CanBeNull] JobActivator activator,
19 [CanBeNull] IBackgroundJobFactory factory,
20 [CanBeNull] IBackgroundJobPerformer performer,
21 [CanBeNull] IBackgroundJobStateChanger stateChanger)
22 {
23 //其他代码
24 var processes = new List<IBackgroundProcessDispatcherBuilder>();
25 processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger));
26 processes.AddRange(additionalProcesses.Select(x => x.UseBackgroundPool(1)));
27 var properties = new Dictionary<string, object>
28 {
29 { "Queues", options.Queues },
30 { "WorkerCount", options.WorkerCount }
31 };
32
33 _processingServer = newBackgroundProcessingServer(
34 storage,
35 processes,
36 properties,
37 GetProcessingServerOptions());
38 }
39 public voidSendStop()
40 {
41 }
42 public voidDispose()
43 {
44 }
45 [Obsolete("This method is a stub. There is no need to call the `Start` method. Will be removed in version 2.0.0.")]
46 public voidStart()
47 {
48 }
49 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")]
50 public voidStop()
51 {
52 }
53 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")]
54 public void Stop(boolforce)
55 {
56 }
57 public boolWaitForShutdown(TimeSpan timeout)
58 {
59 }
60 publicTask WaitForShutdownAsync(CancellationToken cancellationToken)
61 {
62 }
IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,创建了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,我们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。我们继续看看BackgroundProcessingServer这个类型。这里需要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道我们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
1 public sealed classBackgroundProcessingServer : IBackgroundProcessingServer
2 {
3 //其他成员
4 internalBackgroundProcessingServer(
5 [NotNull] BackgroundServerProcess process,
6 [NotNull] BackgroundProcessingServerOptions options)
7 {
8 _process = process ?? throw newArgumentNullException(nameof(process));
9 _options = options ?? throw newArgumentNullException(nameof(options));
10 _dispatcher =CreateDispatcher();
11 #if !NETSTANDARD1_3
12 AppDomain.CurrentDomain.DomainUnload +=OnCurrentDomainUnload;
13 AppDomain.CurrentDomain.ProcessExit +=OnCurrentDomainUnload;
14 #endif
15 }
16 public voidSendStop()
17 {
18 }
19 public boolWaitForShutdown(TimeSpan timeout)
20 {
21 }
22 public asyncTask WaitForShutdownAsync(CancellationToken cancellationToken)
23 {
24 }
25 public voidDispose()
26 {
27
28 }
29 private void OnCurrentDomainUnload(objectsender, EventArgs args)
30 {
31
32 }
33 privateIBackgroundDispatcher CreateDispatcher()
34 {
35 var execution = newBackgroundExecution(
36 _stoppingCts.Token,
37 newBackgroundExecutionOptions
38 {
39 Name =nameof(BackgroundServerProcess),
40 ErrorThreshold =TimeSpan.Zero,
41 StillErrorThreshold =TimeSpan.Zero,
42 RetryDelay = retry =>_options.RestartDelay
43 });
44 return newBackgroundDispatcher(
45 execution,
46 RunServer,
47 execution,
48 ThreadFactory);
49 }
50 private void RunServer(Guid executionId, objectstate)
51 {
52 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
53 }
54 private static IEnumerable<Thread>ThreadFactory(ThreadStart threadStart)
55 {
56 yield return newThread(threadStart)
57 {
58 IsBackground = true,
59 Name = $"{nameof(BackgroundServerProcess)} #{Interlocked.Increment(ref _lastThreadId)}",
60 };
61 }
62 }
果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,我们暂且不去管他,先记一下这个有个runserver,我们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,我们看下它的实现
1 privateIBackgroundDispatcher CreateDispatcher()
2 {
3 var execution = newBackgroundExecution(
4 _stoppingCts.Token,
5 newBackgroundExecutionOptions
6 {
7 Name =nameof(BackgroundServerProcess),
8 ErrorThreshold =TimeSpan.Zero,
9 StillErrorThreshold =TimeSpan.Zero,
10 RetryDelay = retry =>_options.RestartDelay
11 });
12 return newBackgroundDispatcher(
13 execution,
14 RunServer,
15 execution,
16 ThreadFactory);
17 }
在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,并且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,我们先看下它们的定义。
1 //IBackgroundDispatcher
2 public interfaceIBackgroundDispatcher : IDisposable
3 {
4 boolWait(TimeSpan timeout);
5 Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);
6 }
7
8 //BackgroundDispatcher
9 internal sealed classBackgroundDispatcher : IBackgroundDispatcher
10 {
11 //其他成员
12 publicBackgroundDispatcher(
13 [NotNull] IBackgroundExecution execution,
14 [NotNull] Action<Guid, object>action,
15 [CanBeNull] objectstate,
16 [NotNull] Func<ThreadStart, IEnumerable<Thread>>threadFactory)
17 {
18 if (threadFactory == null) throw newArgumentNullException(nameof(threadFactory));
19 _execution = execution ?? throw newArgumentNullException(nameof(execution));
20 _action = action ?? throw newArgumentNullException(nameof(action));
21 _state =state;
22 #if !NETSTANDARD1_3
23 AppDomainUnloadMonitor.EnsureInitialized();
24 #endif
25 var threads = threadFactory(DispatchLoop)?.ToArray();
26 if (threads == null || threads.Length == 0)
27 {
28 throw new ArgumentException("At least one unstarted thread should be created.", nameof(threadFactory));
29 }
30 if (threads.Any(thread => thread == null || (thread.ThreadState & ThreadState.Unstarted) == 0))
31 {
32 throw new ArgumentException("All the threads should be non-null and in the ThreadState.Unstarted state.", nameof(threadFactory));
33 }
34 _stopped = newCountdownEvent(threads.Length);
35 foreach (var thread inthreads)
36 {
37 thread.Start();
38 }
39 }
40 public boolWait(TimeSpan timeout)
41 {
42 return_stopped.WaitHandle.WaitOne(timeout);
43 }
44 public asyncTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
45 {
46 await _stopped.WaitHandle.WaitOneAsync(timeout, cancellationToken).ConfigureAwait(false);
47 }
48 public voidDispose()
49 {
50 }
51 public override stringToString()
52 {
53 }
54 private voidDispatchLoop()
55 {
56 try
57 {
58 _execution.Run(_action, _state);
59 }
60 catch(Exception ex)
61 {
62
63 }
64 finally
65 {
66 try
67 {
68 _stopped.Signal();
69 }
70 catch(ObjectDisposedException)
71 {
72
73 }
74 }
75 }
76 }
从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,我们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码并且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工作。我们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
1 public interfaceIBackgroundExecution : IDisposable
2 {
3 void Run([NotNull] Action<Guid, object> callback, [CanBeNull] objectstate);
4 Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull] objectstate);
5 }
就两方法,run包含同步和异步,看看它的唯一实现类BackgroundExecution。
1 internal sealed classBackgroundExecution : IBackgroundExecution
2 {
3 //其他成员
4 public void Run(Action<Guid, object> callback, objectstate)
5 {
6 if (callback == null) throw newArgumentNullException(nameof(callback));
7 var executionId =Guid.NewGuid();
8
9 {
10 #if !NETSTANDARD1_3
11 try
12 #endif
13 {
14 HandleStarted(executionId, out varnextDelay);
15 while (true)
16 {
17 //Don't place anything here.
18 try
19 {
20
21 if (StopRequested) break;
22 if (nextDelay >TimeSpan.Zero)
23 {
24 HandleDelay(executionId, nextDelay);
25 }
26 callback(executionId, state);
27 HandleSuccess(outnextDelay);
28 }
29 #if !NETSTANDARD1_3
30 catch(ThreadAbortException) when (AppDomainUnloadMonitor.IsUnloading)
31 {
32 //Our thread is aborted due to AppDomain unload. It's better to give up to
33 //not to cause the host to be more aggressive.
34 throw;
35 }
36 #endif
37 catch(OperationCanceledException) when (StopRequested)
38 {
39 break;
40 }
41 catch(Exception ex)
42 {
43 HandleException(executionId, ex, outnextDelay);
44 }
45 }
46 HandleStop(executionId);
47 }
48 #if !NETSTANDARD1_3
49 catch(ThreadAbortException ex)
50 {
51 HandleThreadAbort(executionId, ex);
52 }
53 #endif
54 }
55 }
56 }
hangfire里面所有的独立线程都是通过run方法执行,然后回调到自己的实现类Execute方法,自此每个独立的功能线程就循环干着自己独立的工作(这个后面会单独分析RecurringJobScheduler)。继续我们的主线,server启动,我们以run的同步方法为例,第一个线程(我们就叫它主线程吧)启动了一个while循环,在循环里面并且callback调用了我们的runserver方法。
1 private void RunServer(Guid executionId, objectstate)
2 {
3 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
4 }
在runserver方法里面的实现很简单,直接调用了_process的execute方法,我们简单看下_process类型IBackgroundServerProcess的定义。
1 internal interfaceIBackgroundServerProcess
2 {
3 voidExecute(
4 Guid executionId,
5 BackgroundExecution execution,
6 CancellationToken stoppingToken,
7 CancellationToken stoppedToken,
8 CancellationToken shutdownToken);
9 }
IBackgroundServerProcess的定义就一个execute方法,这个接口的工作其实就是初始化server服务端,我们看看它的唯一实现类BackgroundServerProcess。
1 internal sealed classBackgroundServerProcess : IBackgroundServerProcess
2 {
3
4 //其他成员
5 publicBackgroundServerProcess(
6 [NotNull] JobStorage storage,
7 [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder>dispatcherBuilders,
8 [NotNull] BackgroundProcessingServerOptions options,
9 [NotNull] IDictionary<string, object>properties)
10 {
11 if (dispatcherBuilders == null) throw newArgumentNullException(nameof(dispatcherBuilders));
12
13
14 _storage = storage ?? throw newArgumentNullException(nameof(storage));
15 _options = options ?? throw newArgumentNullException(nameof(options));
16 _properties = properties ?? throw newArgumentNullException(nameof(properties));
17
18
19 var builders = new List<IBackgroundProcessDispatcherBuilder>();
20 builders.AddRange(GetRequiredProcesses()); //添加默认的工作dispatcher也就是独立线程
21 builders.AddRange(GetStorageComponents());
22 builders.AddRange(dispatcherBuilders);
23
24
25 _dispatcherBuilders =builders.ToArray();
26 }
27
28
29 public voidExecute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,
30 CancellationToken stoppedToken, CancellationToken shutdownToken) //server初始化
31 {
32 var serverId =GetServerId();
33 Stopwatch stoppedAt = null;
34
35
36 voidHandleRestartSignal()
37 {
38 if (!stoppingToken.IsCancellationRequested)
39 {
40 _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");
41 }
42 }
43 using (var restartCts = newCancellationTokenSource())
44 using (var restartStoppingCts =CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))
45 using (var restartStoppedCts =CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))
46 using (var restartShutdownCts =CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))
47 using(restartStoppingCts.Token.Register(HandleStopRestartSignal))
48 using(stoppingToken.Register(HandleStoppingSignal))
49 using(stoppedToken.Register(HandleStoppedSignal))
50 using(shutdownToken.Register(HandleShutdownSignal))
51 using(restartCts.Token.Register(HandleRestartSignal))
52 {
53 var context = newBackgroundServerContext(
54 serverId,
55 _storage,
56 _properties,
57 restartStoppingCts.Token,
58 restartStoppedCts.Token,
59 restartShutdownCts.Token);
60 var dispatchers = new List<IBackgroundDispatcher>();
61 CreateServer(context);
62 try
63 {
64 //ReSharper disable once AccessToDisposedClosure
65 using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) //创建守护线程
66 {
67 StartDispatchers(context, dispatchers); //启动hangfire默认初始化的所有独立任务线程
68 execution.NotifySucceeded();
69 WaitForDispatchers(context, dispatchers);
70
71
72 restartCts.Cancel();
73
74 heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();
75 }
76 }
77 finally
78 {
79 DisposeDispatchers(dispatchers);
80 ServerDelete(context, stoppedAt);
81 }
82 }
83 }
84
85
86 private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) //创建守护线程
87 {
88 return newServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)
89 .UseBackgroundPool(threadCount: 1)
90 .Create(context, _options);
91 }
92
93
94 private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() //初始化日志和任务监控线程
95 {
96 yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);
97 yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);
98 }
99 private string GetServerId() //获取serverid
100 {
101 var serverName =_options.ServerName
102 ?? Environment.GetEnvironmentVariable("COMPUTERNAME")
103 ?? Environment.GetEnvironmentVariable("HOSTNAME");
104 var guid =Guid.NewGuid().ToString();
105
106 return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}": guid;
107 }
108
109
110 private void CreateServer(BackgroundServerContext context) //创建server,写入Server数据表
111 {
112 var stopwatch =Stopwatch.StartNew();
113 using (var connection =_storage.GetConnection())
114 {
115 connection.AnnounceServer(context.ServerId, GetServerContext(_properties));
116 }
117 stopwatch.Stop();
118
119
120 ServerJobCancellationToken.AddServer(context.ServerId);
121 _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms");
122 }
123
124
125 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) //启动所有独立的任务线程,包括我们的队列计划、循环计划、日志、守护等等线程
126 {
127
128 foreach (var dispatcherBuilder in_dispatcherBuilders)
129 {
130 dispatchers.Add(dispatcherBuilder.Create(context, _options));
131 }
132 }
133
134 }
以上代码我有做精简处理,不要纠结里面的实现,代码注释也比较详细。下面我做一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire所有任务线程-》创建的第一个worker线程挂起(用于处理所有任务线程的资源释放)。server的初始化工作大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里我们以RecurringJobScheduler循环任务为例。
RecurringJobScheduler实现机制
还记得上面提到的7个dispatcher任务线程的创建吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,我们看代码。
1 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher>dispatchers)
2 {
3 //其他代码...
4 foreach (var dispatcherBuilder in_dispatcherBuilders)
5 {
6 dispatchers.Add(dispatcherBuilder.Create(context, _options)); //初始化独立任务线程
7 }
8 }
遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
1 public IBackgroundDispatcher Create(BackgroundServerContext context, BackgroundProcessingServerOptions options) //第一步
2 {
3 //其他代码
4 var execution = newBackgroundExecution(
5 context.StoppingToken,
6 newBackgroundExecutionOptions
7 {
8 Name =_process.GetType().Name,
9 RetryDelay =options.RetryDelay
10 }); //定义自己的execution
11 return new BackgroundDispatcher( //创建BackgroundDispatcher
12 execution,
13 ExecuteProcess, //指定回调
14 Tuple.Create(_process, context, execution), //创建三元组上下文,注意一下1元组这个对象
15 _threadFactory);
16 }
17
18 public BackgroundDispatcher( //第二步
19 [NotNull] IBackgroundExecution execution,
20 [NotNull] Action<Guid, object>action,
21 [CanBeNull] objectstate,
22 [NotNull] Func<ThreadStart, IEnumerable<Thread>>threadFactory)
23 {
24
25 _state =state;
26
27 var threads = threadFactory(DispatchLoop)?.ToArray();
28
29 foreach (var thread inthreads)
30 {
31 thread.Start(); //执行线程
32 }
33 }
34
35 private void DispatchLoop() //第三步
36 {
37 try
38 {
39 _execution.Run(_action, _state); //在run里面回调_action
40 }
41 catch(Exception ex)
42 {
43 }
44 finally
45 {
46 try
47 {
48 _stopped.Signal();
49 }
50 catch(ObjectDisposedException)
51 {
52 }
53 }
54 }
55
56 private static void ExecuteProcess(Guid executionId, object state) //第四步 回调方法,对应上面的指定回调
57 {
58 var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext, BackgroundExecution>)state;
59 var serverContext =tuple.Item2;
60 var context = new BackgroundProcessContext( //创建公共上下文
61 serverContext.ServerId,
62 serverContext.Storage,
63 serverContext.Properties.ToDictionary(x => x.Key, x =>x.Value),
64 executionId,
65 serverContext.StoppingToken,
66 serverContext.StoppedToken,
67 serverContext.ShutdownToken);
68 while (!context.IsStopping)
69 {
70 tuple.Item1.Execute(context); //执行自己元组对应的实例
71 tuple.Item3.NotifySucceeded();
72 }
73 }
上面有点乱啊,我大概简单串起来说一下。第一步在create方法里面创建了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop并且执行-》第三步执行Loop并且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面通过元组参数调用对应的任务类型,自此7种任务类型启动并开始工作。以上代码还有个细节需要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
1 public interfaceIBackgroundProcess : IServerProcess
2 {
3 voidExecute([NotNull] BackgroundProcessContext context);
4 }
接口就定义了一个方法,没什么特别的,但是它的几个实现类就是我们单独的任务类,我们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此我们的RecurringJobScheduler循环定时任务线程就算开始执行了。
RecurringJobScheduler循环定时任务机制
照旧看下这个类型的定义
1 public classRecurringJobScheduler : IBackgroundProcess
2 {
3 //其他代码
4 publicRecurringJobScheduler(
5 [NotNull] IBackgroundJobFactory factory,
6 TimeSpan pollingDelay,
7 [NotNull] ITimeZoneResolver timeZoneResolver,
8 [NotNull] Func<DateTime>nowFactory)
9 {
10 if (factory == null) throw newArgumentNullException(nameof(factory));
11 if (nowFactory == null) throw newArgumentNullException(nameof(nowFactory));
12 if (timeZoneResolver == null) throw newArgumentNullException(nameof(timeZoneResolver));
13
14
15 _factory =factory;
16 _nowFactory =nowFactory;
17 _timeZoneResolver =timeZoneResolver;
18 _pollingDelay =pollingDelay;
19 _profiler = newSlowLogProfiler(_logger);
20 }
21
22
23 /// <inheritdoc />
24 public void Execute(BackgroundProcessContext context) //实现方法
25 {
26 if (context == null) throw newArgumentNullException(nameof(context));
27
28
29 var jobsEnqueued = 0;
30
31
32 while (EnqueueNextRecurringJobs(context)) //从数据库获取定时任务
33 {
34 jobsEnqueued++;
35
36
37 if(context.IsStopping)
38 {
39 break;
40 }
41 }
42
43
44 if (jobsEnqueued != 0)
45 {
46 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued.");
47 }
48
49
50 if (_pollingDelay >TimeSpan.Zero)
51 {
52 context.Wait(_pollingDelay);
53 }
54 else
55 {
56 var now =_nowFactory();
57 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) -now);
58 }
59 }
60 }
承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。通过EnqueueNextRecurringJobs方法获取任务。
1 private boolEnqueueNextRecurringJobs(BackgroundProcessContext context)
2 {
3 return UseConnectionDistributedLock(context.Storage, connection =>
4 {
5 var result = false;
6 if(IsBatchingAvailable(connection))
7 {
8 var now =_nowFactory();
9 var timestamp =JobHelper.ToTimestamp(now);
10 var recurringJobIds = ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp, BatchSize); //从数据库里面查询
11 if (recurringJobIds == null || recurringJobIds.Count == 0) return false;
12 foreach (var recurringJobId inrecurringJobIds)
13 {
14 if (context.IsStopping) return false;
15 if (TryEnqueueBackgroundJob(context, connection, recurringJobId, now))//排队执行
16 {
17 result = true;
18 }
19 }
20 }
21 else
22 {
23 for (var i = 0; i < BatchSize; i++)
24 {
25 if (context.IsStopping) return false;
26 var now =_nowFactory();
27 var timestamp =JobHelper.ToTimestamp(now);
28 var recurringJobId = connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp);
29 if (recurringJobId == null) return false;
30 if (!TryEnqueueBackgroundJob(context, connection, recurringJobId, now))
31 {
32 return false;
33 }
34 }
35 }
36 returnresult;
37 });
38 }
GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)并且时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
1 private boolEnqueueBackgroundJob(
2 BackgroundProcessContext context,
3 IStorageConnection connection,
4 stringrecurringJobId,
5 DateTime now)
6 {
7 //其他代码
8 using(connection.AcquireDistributedRecurringJobLock(recurringJobId, LockTimeout))
9 {
10 try
11 {
12 var recurringJob =connection.GetRecurringJob(recurringJobId, _timeZoneResolver, now);
13 if (recurringJob == null)
14 {
15 using (var transaction =connection.CreateWriteTransaction())
16 {
17 transaction.RemoveFromSet("recurring-jobs", recurringJobId);
18 transaction.Commit();
19 }
20 return false;
21 }
22
23 BackgroundJob backgroundJob = null;
24 IReadOnlyDictionary<string, string>changedFields;
25 if (recurringJob.TrySchedule(out var nextExecution, out varerror))
26 {
27 if (nextExecution.HasValue && nextExecution <=now)
28 {
29 backgroundJob =_factory.TriggerRecurringJob(context.Storage, connection, _profiler, recurringJob, now);
30 if (String.IsNullOrEmpty(backgroundJob?.Id))
31 {
32 _logger.Debug($"Recurring job '{recurringJobId}' execution at '{nextExecution}' has been canceled.");
33 }
34 }
35 recurringJob.IsChanged(out changedFields, outnextExecution);
36 }
37 else if (recurringJob.RetryAttempt <MaxRetryAttemptCount)
38 {
39 var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay : TimeSpan.FromMinutes(1);
40
41 _logger.WarnException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be retried in {delay}.", error);
42 recurringJob.ScheduleRetry(delay, out changedFields, outnextExecution);
43 }
44 else
45 {
46 _logger.ErrorException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be disabled.", error);
47 recurringJob.Disable(error, out changedFields, outnextExecution);
48 }
49
50 using (var transaction =connection.CreateWriteTransaction())
51 {
52 if (backgroundJob != null)
53 {
54 _factory.StateMachine.EnqueueBackgroundJob(
55 context.Storage,
56 connection,
57 transaction,
58 recurringJob,
59 backgroundJob,
60 "Triggered by recurring job scheduler",
61 _profiler);
62 }
63 transaction.UpdateRecurringJob(recurringJob, changedFields, nextExecution, _logger);
64 transaction.Commit();
65 return true;
66 }
67 }
68 catch(TimeZoneNotFoundException ex)
69 {
70 catch(Exception ex)
71 {
72
73 }
74 return false;
75 }
76 }
需要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,如果下次执行时间小于当前,执行这条任务(并非真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&所有任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
避坑
简单说下个人在改bug期间遇到的一些问题啊。
1.时区问题,在添加定时任务时如果不指定时区信息,默认使用的是utc时间,我们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种可以通过全局指定options的ITimeZoneResolver属性指定,也可以通过AddorUpdate方法指定,如果是指定时区信息,需要注意看板上面的异常信息,如果有异常会导致任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。