说下hangfire吧

摘要:
最近因工作需要开发计划任务模块接触到了Hangfire。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。以上服务注入并执行,接下来就是往hangfire里面添加任务。源码分析客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。
最近因工作需要开发计划任务模块(严格来说应该是修改bug吧,其他同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差不多了,在github上面down了hangfire源码,下面分享一下,自己读hangfire源码的一些理解,和工作中需要注意的地方。介绍大概分为以下几个部分吧。1.准备工作,2.简单使用,3.源码分析,4.避坑。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。
准备工作:hangfire源代码的代码量不多,github地址: https://github.com/HangfireIO/Hangfire,有兴趣的朋友可以自己下载瞅瞅源码。功能上大概可以分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。可以这么说,它的定时任务完全就是基于多线程协作实现的。因为是多线程环境,所以个人觉得看起来有点费力。
简单使用:.Net&.Net Core环境都可以使用,下面就以.Net Core的使用为例。
1.客户端和服务端独立部署
client端
1 publicIServiceProvider ConfigureServices(IServiceCollection services)
2 {
3             //其他代码
4              services.AddHangfire(config =>
5 {
6 config.UseSqlServerStorage(...);
7 });
8 }
9  
10 public voidConfigure(IApplicationBuilder app, IWebHostEnvironment env)
11 {
12             //其他代码...
13             //启用Dashboard看板
14 app.UseHangfireDashboard();
15         }
server端
1 public voidConfiguration(IAppBuilder app)
2 {
3 GlobalConfiguration.Configuration
4                  .UseSqlServerStorage("连接字符串", newSqlServerStorageOptions
5 {
6                     //options
7 });
8             app.UseHangfireServer(newBackgroundJobServerOptions
9 {
10 });
11 }
12  
13  
或者
1 services.AddHangfireServer(options =>
2 {
3                         //基于IHostedService接口实现
4                     });
PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法一样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,可以简单看下IHostedService接口的定义。
1   public interfaceIHostedService
2 {
3 Task StartAsync(CancellationToken cancellationToken);
4 Task StopAsync(CancellationToken cancellationToken);
5   }
接口就定义了两个方法,start是在程序启动的时候执行,当然stop就是在程序停止的时候执行。我们用一张图简单描绘一下它的执行时机,图是盗的。
说下hangfire吧第1张
以上就是hangfire的client端和server端分开部署的一个简单应用,下面我们看下第二种,client&server部署在同一台机器上。
2.客户端和服务端统一部署
1 public voidConfiguration(IAppBuilder app)
2 {
3     GlobalConfiguration.Configuration.UseSqlServerStorage(); //配置数据库连接
4     
5     app.UseHangfireServer(); //启用server
6     app.UseHangfireDashboard(); //启用看板
7 }
简单的几行代码,当然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); //立即执行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); //延后执行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); //循环执行,支持cron表达式
简单使用就到这吧,我们继续大纲的第三部分,源码分析。
源码分析
客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。我们先找到入口,也可以看做是NetCore里面的一个中间件吧。看代码
1 app.UseHangfireServer(); //启用server
UseHangfireServer实现
1 public staticIAppBuilder UseHangfireServer(
2             [NotNull] thisIAppBuilder builder,
3 [NotNull] JobStorage storage,
4 [NotNull] BackgroundJobServerOptions options,
5             [NotNull] paramsIBackgroundProcess[] additionalProcesses)
6 {
7             //其他代码...
8             var server = newBackgroundJobServer(options, storage,  additionalProcesses); 
9             
10             returnbuilder;
11         }
UseHangfireServer扩展方法实现里面,比较重要的一行代码就是创建BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。我们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
1 //IBackgroundProcessingServer
2 public interfaceIBackgroundProcessingServer : IDisposable
3 {
4         voidSendStop();
5         boolWaitForShutdown(TimeSpan timeout);
6 Task WaitForShutdownAsync(CancellationToken cancellationToken);
7 }
8  
9 //BackgroundJobServer
10 public classBackgroundJobServer : IBackgroundProcessingServer
11 {
12         //其他成员...
13         publicBackgroundJobServer(
14 [NotNull] BackgroundJobServerOptions options,
15 [NotNull] JobStorage storage,
16             [NotNull] IEnumerable<IBackgroundProcess>additionalProcesses,
17 [CanBeNull] IJobFilterProvider filterProvider,
18 [CanBeNull] JobActivator activator,
19 [CanBeNull] IBackgroundJobFactory factory,
20 [CanBeNull] IBackgroundJobPerformer performer,
21 [CanBeNull] IBackgroundJobStateChanger stateChanger)
22 {
23             //其他代码
24             var processes = new List<IBackgroundProcessDispatcherBuilder>();
25 processes.AddRange(GetRequiredProcesses(filterProvider, activator,  factory, performer, stateChanger));
26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1)));
27             var properties = new Dictionary<string, object>
28 {
29                 { "Queues", options.Queues },
30                 { "WorkerCount", options.WorkerCount }
31 };
32             
33             _processingServer = newBackgroundProcessingServer(
34 storage,
35 processes,
36 properties,
37 GetProcessingServerOptions());
38 }
39         public voidSendStop()
40 {
41 }
42         public voidDispose()
43 {
44 }
45         [Obsolete("This method is a stub. There is no need to call the `Start`  method. Will be removed in version 2.0.0.")]
46         public voidStart()
47 {
48 }
49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
50         public voidStop()
51 {
52 }
53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
54         public void Stop(boolforce)
55 {
56 }
57         public boolWaitForShutdown(TimeSpan timeout)
58 {
59 }
60         publicTask WaitForShutdownAsync(CancellationToken cancellationToken)
61 {
62         }
IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,创建了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,我们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。我们继续看看BackgroundProcessingServer这个类型。这里需要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道我们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
1 public sealed classBackgroundProcessingServer : IBackgroundProcessingServer
2 {
3         //其他成员
4         internalBackgroundProcessingServer(
5 [NotNull] BackgroundServerProcess process,
6 [NotNull] BackgroundProcessingServerOptions options)
7 {
8             _process = process ?? throw newArgumentNullException(nameof(process));
9             _options = options ?? throw newArgumentNullException(nameof(options));
10             _dispatcher =CreateDispatcher();
11 #if !NETSTANDARD1_3
12             AppDomain.CurrentDomain.DomainUnload +=OnCurrentDomainUnload;
13             AppDomain.CurrentDomain.ProcessExit +=OnCurrentDomainUnload;
14 #endif
15 }
16         public voidSendStop()
17 {
18 }
19         public boolWaitForShutdown(TimeSpan timeout)
20 {
21 }
22         public asyncTask WaitForShutdownAsync(CancellationToken cancellationToken)
23 {
24 }
25         public voidDispose()
26 {
27             
28 }
29         private void OnCurrentDomainUnload(objectsender, EventArgs args)
30 {
31             
32 }
33         privateIBackgroundDispatcher CreateDispatcher()
34 {
35             var execution = newBackgroundExecution(
36 _stoppingCts.Token,
37                 newBackgroundExecutionOptions
38 {
39                     Name =nameof(BackgroundServerProcess),
40                     ErrorThreshold =TimeSpan.Zero,
41                     StillErrorThreshold =TimeSpan.Zero,
42                     RetryDelay = retry =>_options.RestartDelay
43 });
44             return newBackgroundDispatcher(
45 execution,
46 RunServer,
47 execution,
48 ThreadFactory);
49 }
50         private void RunServer(Guid executionId, objectstate)
51 {
52 _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
53 }
54         private static IEnumerable<Thread>ThreadFactory(ThreadStart threadStart)
55 {
56             yield return newThread(threadStart)
57 {
58                 IsBackground = true,
59                 Name = $"{nameof(BackgroundServerProcess)}  #{Interlocked.Increment(ref _lastThreadId)}",
60 };
61 }
62     }
果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,我们暂且不去管他,先记一下这个有个runserver,我们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,我们看下它的实现
1 privateIBackgroundDispatcher CreateDispatcher()
2 {
3             var execution = newBackgroundExecution(
4 _stoppingCts.Token,
5                 newBackgroundExecutionOptions
6 {
7                     Name =nameof(BackgroundServerProcess),
8                     ErrorThreshold =TimeSpan.Zero,
9                     StillErrorThreshold =TimeSpan.Zero,
10                     RetryDelay = retry =>_options.RestartDelay
11 });
12             return newBackgroundDispatcher(
13 execution,
14 RunServer,
15 execution,
16 ThreadFactory);
17         }
在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,并且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,我们先看下它们的定义。
1 //IBackgroundDispatcher
2 public interfaceIBackgroundDispatcher : IDisposable
3 {
4         boolWait(TimeSpan timeout);
5 Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);
6 }
7  
8 //BackgroundDispatcher
9 internal sealed classBackgroundDispatcher : IBackgroundDispatcher
10 {
11         //其他成员
12         publicBackgroundDispatcher(
13 [NotNull] IBackgroundExecution execution,
14             [NotNull] Action<Guid, object>action,
15             [CanBeNull] objectstate,
16             [NotNull] Func<ThreadStart, IEnumerable<Thread>>threadFactory)
17 {
18             if (threadFactory == null) throw newArgumentNullException(nameof(threadFactory));
19             _execution = execution ?? throw newArgumentNullException(nameof(execution));
20             _action = action ?? throw newArgumentNullException(nameof(action));
21             _state =state;
22 #if !NETSTANDARD1_3
23 AppDomainUnloadMonitor.EnsureInitialized();
24 #endif
25             var threads = threadFactory(DispatchLoop)?.ToArray();
26             if (threads == null || threads.Length == 0)
27 {
28                 throw new ArgumentException("At least one unstarted thread should be  created.", nameof(threadFactory));
29 }
30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0))
31 {
32                 throw new ArgumentException("All the threads should be non-null and  in the ThreadState.Unstarted state.", nameof(threadFactory));
33 }
34             _stopped = newCountdownEvent(threads.Length);
35             foreach (var thread inthreads)
36 {
37 thread.Start();
38 }
39 }
40         public boolWait(TimeSpan timeout)
41 {
42             return_stopped.WaitHandle.WaitOne(timeout);
43 }
44         public asyncTask WaitAsync(TimeSpan timeout, CancellationToken  cancellationToken)
45 {
46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false);
47 }
48         public voidDispose()
49 {
50 }
51         public override stringToString()
52 {
53 }
54         private voidDispatchLoop()
55 {
56             try
57 {
58 _execution.Run(_action, _state);
59 }
60             catch(Exception ex)
61 {
62  
63 }
64             finally
65 {
66                 try
67 {
68 _stopped.Signal();
69 }
70                 catch(ObjectDisposedException)
71 {
72  
73 }
74 }
75 }
76     }
从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,我们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码并且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工作。我们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
1 public interfaceIBackgroundExecution : IDisposable
2 {
3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] objectstate);
4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  objectstate);
5     }
就两方法,run包含同步和异步,看看它的唯一实现类BackgroundExecution。
1   internal sealed classBackgroundExecution : IBackgroundExecution
2 {
3                 //其他成员
4         public void Run(Action<Guid, object> callback, objectstate)
5 {
6             if (callback == null) throw newArgumentNullException(nameof(callback));
7             var executionId =Guid.NewGuid();
8            
9 {
10 #if !NETSTANDARD1_3
11                 try
12 #endif
13 {
14                     HandleStarted(executionId, out varnextDelay);
15                     while (true)
16 {
17                         //Don't place anything here.
18                         try
19 {
20                            
21                             if (StopRequested) break;
22                             if (nextDelay >TimeSpan.Zero)
23 {
24 HandleDelay(executionId, nextDelay);
25 }
26 callback(executionId, state);
27                             HandleSuccess(outnextDelay);
28 }
29 #if !NETSTANDARD1_3
30                         catch(ThreadAbortException) when  (AppDomainUnloadMonitor.IsUnloading)
31 {
32                             //Our thread is aborted due to AppDomain unload. It's  better to give up to
33                             //not to cause the host to be more aggressive.
34                             throw;
35 }
36 #endif
37                         catch(OperationCanceledException) when (StopRequested)
38 {
39                             break;
40 }
41                         catch(Exception ex)
42 {
43                             HandleException(executionId, ex, outnextDelay);
44 }
45 }
46 HandleStop(executionId);
47 }
48 #if !NETSTANDARD1_3
49                 catch(ThreadAbortException ex)
50 {
51 HandleThreadAbort(executionId, ex);
52 }
53 #endif
54 }
55 }
56 }
hangfire里面所有的独立线程都是通过run方法执行,然后回调到自己的实现类Execute方法,自此每个独立的功能线程就循环干着自己独立的工作(这个后面会单独分析RecurringJobScheduler)。继续我们的主线,server启动,我们以run的同步方法为例,第一个线程(我们就叫它主线程吧)启动了一个while循环,在循环里面并且callback调用了我们的runserver方法。
1 private void RunServer(Guid executionId, objectstate)
2 {
3 _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
4         }
在runserver方法里面的实现很简单,直接调用了_process的execute方法,我们简单看下_process类型IBackgroundServerProcess的定义。
1 internal interfaceIBackgroundServerProcess
2 {
3         voidExecute(
4 Guid executionId,
5 BackgroundExecution execution,
6 CancellationToken stoppingToken,
7 CancellationToken stoppedToken,
8 CancellationToken shutdownToken);
9     }
IBackgroundServerProcess的定义就一个execute方法,这个接口的工作其实就是初始化server服务端,我们看看它的唯一实现类BackgroundServerProcess。
1 internal sealed classBackgroundServerProcess : IBackgroundServerProcess
2 {
3         
4         //其他成员
5         publicBackgroundServerProcess(
6 [NotNull] JobStorage storage,
7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder>dispatcherBuilders,
8 [NotNull] BackgroundProcessingServerOptions options,
9             [NotNull] IDictionary<string, object>properties)
10 {
11             if (dispatcherBuilders == null) throw newArgumentNullException(nameof(dispatcherBuilders));
12  
13  
14             _storage = storage ?? throw newArgumentNullException(nameof(storage));
15             _options = options ?? throw newArgumentNullException(nameof(options));
16             _properties = properties ?? throw newArgumentNullException(nameof(properties));
17  
18  
19             var builders = new List<IBackgroundProcessDispatcherBuilder>();
20             builders.AddRange(GetRequiredProcesses()); //添加默认的工作dispatcher也就是独立线程
21 builders.AddRange(GetStorageComponents());
22 builders.AddRange(dispatcherBuilders);
23  
24  
25             _dispatcherBuilders =builders.ToArray();
26 }
27  
28  
29         public voidExecute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,
30             CancellationToken stoppedToken, CancellationToken shutdownToken)  //server初始化
31 {
32             var serverId =GetServerId();
33             Stopwatch stoppedAt = null;
34  
35  
36             voidHandleRestartSignal()
37 {
38                 if (!stoppingToken.IsCancellationRequested)
39 {
40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");
41 }
42 }
43             using (var restartCts = newCancellationTokenSource())
44             using (var restartStoppingCts =CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))
45             using (var restartStoppedCts =CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))
46             using (var restartShutdownCts =CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))
47             using(restartStoppingCts.Token.Register(HandleStopRestartSignal))
48             using(stoppingToken.Register(HandleStoppingSignal))
49             using(stoppedToken.Register(HandleStoppedSignal))
50             using(shutdownToken.Register(HandleShutdownSignal))
51             using(restartCts.Token.Register(HandleRestartSignal))
52 {
53                 var context = newBackgroundServerContext(
54 serverId,
55 _storage,
56 _properties,
57 restartStoppingCts.Token,
58 restartStoppedCts.Token,
59 restartShutdownCts.Token);
60                 var dispatchers = new List<IBackgroundDispatcher>();
61 CreateServer(context);
62                 try
63 {
64                     //ReSharper disable once AccessToDisposedClosure
65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) //创建守护线程
66 {
67                         StartDispatchers(context, dispatchers); //启动hangfire默认初始化的所有独立任务线程
68 execution.NotifySucceeded();
69 WaitForDispatchers(context, dispatchers);
70  
71  
72 restartCts.Cancel();
73  
74 heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();
75 }
76 }
77                 finally
78 {
79 DisposeDispatchers(dispatchers);
80 ServerDelete(context, stoppedAt);
81 }
82 }
83 }
84  
85  
86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) //创建守护线程
87 {
88             return newServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)
89                 .UseBackgroundPool(threadCount: 1)
90 .Create(context, _options);
91 }
92  
93  
94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() //初始化日志和任务监控线程
95 {
96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);
97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);
98 }
99         private string GetServerId() //获取serverid
100 {
101             var serverName =_options.ServerName
102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME")
103                  ?? Environment.GetEnvironmentVariable("HOSTNAME");
104             var guid =Guid.NewGuid().ToString();
105  
106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}": guid;
107 }
108  
109         
110         private void CreateServer(BackgroundServerContext context) //创建server,写入Server数据表
111 {
112             var stopwatch =Stopwatch.StartNew();
113             using (var connection =_storage.GetConnection())
114 {
115 connection.AnnounceServer(context.ServerId, GetServerContext(_properties));
116 }
117 stopwatch.Stop();
118  
119  
120 ServerJobCancellationToken.AddServer(context.ServerId);
121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms");
122 }
123  
124  
125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) //启动所有独立的任务线程,包括我们的队列计划、循环计划、日志、守护等等线程
126 {
127  
128             foreach (var dispatcherBuilder in_dispatcherBuilders)
129 {
130 dispatchers.Add(dispatcherBuilder.Create(context, _options));
131 }
132 }
133  
134     }
以上代码我有做精简处理,不要纠结里面的实现,代码注释也比较详细。下面我做一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire所有任务线程-》创建的第一个worker线程挂起(用于处理所有任务线程的资源释放)。server的初始化工作大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里我们以RecurringJobScheduler循环任务为例。
RecurringJobScheduler实现机制
还记得上面提到的7个dispatcher任务线程的创建吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,我们看代码。
1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher>dispatchers)
2 {
3                //其他代码...
4             foreach (var dispatcherBuilder in_dispatcherBuilders)
5 {
6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); //初始化独立任务线程
7 }
8         }
遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) //第一步
2 {
3             //其他代码
4             var execution = newBackgroundExecution(
5 context.StoppingToken,
6                 newBackgroundExecutionOptions
7 {
8                     Name =_process.GetType().Name,
9                     RetryDelay =options.RetryDelay
10                 }); //定义自己的execution
11             return new BackgroundDispatcher( //创建BackgroundDispatcher 
12 execution,
13                 ExecuteProcess, //指定回调
14                 Tuple.Create(_process, context, execution), //创建三元组上下文,注意一下1元组这个对象
15 _threadFactory);
16 }
17  
18 public BackgroundDispatcher(  //第二步
19 [NotNull] IBackgroundExecution execution,
20             [NotNull] Action<Guid, object>action,
21             [CanBeNull] objectstate,
22             [NotNull] Func<ThreadStart, IEnumerable<Thread>>threadFactory)
23 {
24    
25             _state =state;
26  
27             var threads = threadFactory(DispatchLoop)?.ToArray();
28            
29             foreach (var thread inthreads)
30 {
31                 thread.Start(); //执行线程
32 }
33 }
34  
35 private void DispatchLoop() //第三步
36 {
37             try
38 {
39                 _execution.Run(_action, _state);  //在run里面回调_action
40 }
41             catch(Exception ex)
42 {
43 }
44             finally
45 {
46                 try
47 {
48 _stopped.Signal();
49 }
50                 catch(ObjectDisposedException)
51 {
52 }
53 }
54 }
55  
56 private static void ExecuteProcess(Guid executionId, object state) //第四步 回调方法,对应上面的指定回调
57 {
58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state;
59             var serverContext =tuple.Item2;
60             var context = new BackgroundProcessContext( //创建公共上下文
61 serverContext.ServerId,
62 serverContext.Storage,
63                 serverContext.Properties.ToDictionary(x => x.Key, x =>x.Value),
64 executionId,
65 serverContext.StoppingToken,
66 serverContext.StoppedToken,
67 serverContext.ShutdownToken);
68             while (!context.IsStopping)
69 {
70                 tuple.Item1.Execute(context); //执行自己元组对应的实例
71 tuple.Item3.NotifySucceeded();
72 }
73         }
上面有点乱啊,我大概简单串起来说一下。第一步在create方法里面创建了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop并且执行-》第三步执行Loop并且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面通过元组参数调用对应的任务类型,自此7种任务类型启动并开始工作。以上代码还有个细节需要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
1 public interfaceIBackgroundProcess : IServerProcess
2 {
3         voidExecute([NotNull] BackgroundProcessContext context);
4     }
接口就定义了一个方法,没什么特别的,但是它的几个实现类就是我们单独的任务类,我们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此我们的RecurringJobScheduler循环定时任务线程就算开始执行了。
RecurringJobScheduler循环定时任务机制
照旧看下这个类型的定义
1 public classRecurringJobScheduler : IBackgroundProcess
2 {
3         //其他代码
4         publicRecurringJobScheduler(
5 [NotNull] IBackgroundJobFactory factory,
6 TimeSpan pollingDelay,
7 [NotNull] ITimeZoneResolver timeZoneResolver,
8             [NotNull] Func<DateTime>nowFactory)
9 {
10             if (factory == null) throw newArgumentNullException(nameof(factory));
11             if (nowFactory == null) throw newArgumentNullException(nameof(nowFactory));
12             if (timeZoneResolver == null) throw newArgumentNullException(nameof(timeZoneResolver));
13  
14  
15             _factory =factory;
16             _nowFactory =nowFactory;
17             _timeZoneResolver =timeZoneResolver;
18             _pollingDelay =pollingDelay;
19             _profiler = newSlowLogProfiler(_logger);
20 }
21  
22  
23         /// <inheritdoc />
24         public void Execute(BackgroundProcessContext context) //实现方法
25 {
26             if (context == null) throw newArgumentNullException(nameof(context));
27  
28  
29             var jobsEnqueued = 0;
30  
31  
32             while (EnqueueNextRecurringJobs(context)) //从数据库获取定时任务
33 {
34                 jobsEnqueued++;
35  
36  
37                 if(context.IsStopping)
38 {
39                     break;
40 }
41 }
42  
43  
44             if (jobsEnqueued != 0)
45 {
46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued.");
47 }
48  
49  
50             if (_pollingDelay >TimeSpan.Zero)
51 {
52 context.Wait(_pollingDelay);
53 }
54             else
55 {
56                 var now =_nowFactory();
57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) -now);
58 }
59 }
60     }
承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。通过EnqueueNextRecurringJobs方法获取任务。
1 private boolEnqueueNextRecurringJobs(BackgroundProcessContext context)
2 {
3             return UseConnectionDistributedLock(context.Storage, connection => 
4 {
5                 var result = false;
6                 if(IsBatchingAvailable(connection))
7 {
8                     var now =_nowFactory();
9                     var timestamp =JobHelper.ToTimestamp(now);
10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); //从数据库里面查询
11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false;
12                     foreach (var recurringJobId inrecurringJobIds)
13 {
14                         if (context.IsStopping) return false;
15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))//排队执行
16 {
17                             result = true;
18 }
19 }
20 }
21                 else
22 {
23                     for (var i = 0; i < BatchSize; i++)
24 {
25                         if (context.IsStopping) return false;
26                         var now =_nowFactory();
27                         var timestamp =JobHelper.ToTimestamp(now);
28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp);
29                         if (recurringJobId == null) return false;
30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))
31 {
32                             return false;
33 }
34 }
35 }
36                 returnresult;
37 });
38         }
GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)并且时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
1 private boolEnqueueBackgroundJob(
2 BackgroundProcessContext context,
3 IStorageConnection connection,
4             stringrecurringJobId,
5 DateTime now)
6 {
7             //其他代码
8             using(connection.AcquireDistributedRecurringJobLock(recurringJobId,  LockTimeout))
9 {
10                 try
11 {
12                     var recurringJob =connection.GetRecurringJob(recurringJobId,  _timeZoneResolver, now);
13                     if (recurringJob == null)
14 {
15                         using (var transaction =connection.CreateWriteTransaction())
16 {
17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId);
18 transaction.Commit();
19 }
20                         return false;
21 }
22           
23                     BackgroundJob backgroundJob = null;
24                     IReadOnlyDictionary<string, string>changedFields;
25                     if (recurringJob.TrySchedule(out var nextExecution, out varerror))
26 {
27                         if (nextExecution.HasValue && nextExecution <=now)
28 {
29                             backgroundJob =_factory.TriggerRecurringJob(context.Storage,  connection, _profiler, recurringJob, now);
30                             if (String.IsNullOrEmpty(backgroundJob?.Id))
31 {
32                                 _logger.Debug($"Recurring job '{recurringJobId}' execution  at '{nextExecution}' has been canceled.");
33 }
34 }
35                         recurringJob.IsChanged(out changedFields, outnextExecution);
36 }
37                     else if (recurringJob.RetryAttempt <MaxRetryAttemptCount)
38 {
39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1);
40                         
41                         _logger.WarnException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be retried in {delay}.", error);
42                         recurringJob.ScheduleRetry(delay, out changedFields, outnextExecution);
43 }
44                     else
45 {
46                         _logger.ErrorException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be disabled.", error);
47                         recurringJob.Disable(error, out changedFields, outnextExecution);
48 }
49               
50                     using (var transaction =connection.CreateWriteTransaction())
51 {
52                         if (backgroundJob != null)
53 {
54 _factory.StateMachine.EnqueueBackgroundJob(
55 context.Storage,
56 connection,
57 transaction,
58 recurringJob,
59 backgroundJob,
60                                 "Triggered by recurring job scheduler",
61 _profiler);
62 }
63 transaction.UpdateRecurringJob(recurringJob, changedFields,  nextExecution, _logger);
64 transaction.Commit();
65                         return true;
66 }
67 }
68                 catch(TimeZoneNotFoundException ex)
69 {
70                 catch(Exception ex)
71 {
72    
73 }
74                 return false;
75 }
76         }
需要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,如果下次执行时间小于当前,执行这条任务(并非真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&所有任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
避坑
简单说下个人在改bug期间遇到的一些问题啊。
1.时区问题,在添加定时任务时如果不指定时区信息,默认使用的是utc时间,我们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种可以通过全局指定options的ITimeZoneResolver属性指定,也可以通过AddorUpdate方法指定,如果是指定时区信息,需要注意看板上面的异常信息,如果有异常会导致任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。

免责声明:文章转载自《说下hangfire吧》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇layer系列之弹层layer.promptdocker容器的端口映射下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

安卓音频采集播放方法

Android音频收集和播放(一) 一、文章说明(此文章转载自简书) 这篇文章主要讲述的是Android中使用AudioRecord类和AudioTrack类来进行语音采集和播放相关的知识,在这篇文章中首先介绍的是有关声音的一些概念性知识,然后介绍声音的采集,之后再讲述Android上回声消除的相关步骤,最后介绍的是声音的播放。 二、概念性知识点 在这里...

如何用FFmpeg API采集摄像头视频和麦克风音频,并实现录制文件的功能转

之前一直用Directshow技术采集摄像头数据,但是觉得涉及的细节比较多,要开发者比较了解Directshow的框架知识,学习起来有一点点难度。最近发现很多人问怎么用FFmpeg采集摄像头图像,事实上FFmpeg很早就支持通过DShow获取采集设备(摄像头、麦克风)的数据了,只是网上提供的例子比较少。如果能用FFmpeg实现采集、编码和录制(或推流)...

[转]C/C++:构建你自己的插件框架

本文译自GigiSayfan在DDJ上的专栏文章。GigiSayfan是北加州的一个程序员,email:gigi@gmail.com. 本文是一系列讨论架构、开发和部署C/C++跨平台插件框架的文章的 第一篇 第一部分探索了一下现状,调查了许多现有的插件/组件库,深入研究了二进制兼容问题,并展现了一些该方案必要的一些属性。 后续的文章用一个例子展示了可用于...

@PostConstruct和static静态块初始化的区别

static blocks are invoked when the class is being initialized, after it is loaded. The dependencies of your component haven't been initialized yet. That is why you get a NullPoint...

转:程序内存空间(代码段、数据段、堆栈段)

https://blog.csdn.net/ywcpig/article/details/52303745 在冯诺依曼的体系结构中,一个进程必须有:代码段,堆栈段,数据段。 进程的虚拟地址空间图示如下: BSS段:BSS段(bss segment)通常是指用来存放程序中未初始化的全局变量的一块内存区域。BSS是英文Block Started by Sym...

ios 字典没有值的问题

我遇到这样一个问题:初始化一个字典,初始化的时候,给三个key-value;但是,打印字典的时候,只有第一个元素有值,后两个没有值。  NSDictionary *dict = [NSDictionary dictionaryWithObjectsAndKeys:_messageString,@"message",_photoImage,@"photo",...