.net 分布式架构之分布式锁实现

摘要:
分布式锁经常用于在解决分布式环境下的业务一致性和协调分布式环境。开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock开源相关群:.net开源基础服务238543768这里整理了C#.net关于redis分布式锁和zookeeper分布式锁的实现,仅用于研究。采用ServiceStack.Redis实现Redis分布式锁/**Redis分布式锁*采用ServiceStack.Redis实现的Redis分布式锁*详情可阅读其开源代码*备注:不同版本的ServiceStack.Redis实现reidslock机制不同xxf里面默认使用2.2版本*/publicclassRedisDistributedLock:BaseRedisDistributedLock{privateServiceStack.Redis.RedisLock_lock;privateRedisClient_client;publicRedisDistributedLock:base{}publicoverrideLockResultTryGetDistributedLock(TimeSpan?=null)this._client.Dispose();}catch{XXF.Log.ErrorLog.Write;}}}来自网络的java实现Redis分布式锁(C#版)/**Redis分布式锁*采用网络上java实现的Redis分布式锁*参考http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html*详情可阅读其开源代码*/publicclassRedisDistributedLockFromJava:BaseRedisDistributedLock{publicRedisDistributedLockFromJava:base{}publicoverrideLockResultTryGetDistributedLock(TimeSpan?

分布式锁

经常用于在解决分布式环境下的业务一致性和协调分布式环境。

实际业务场景中,比如说解决并发一瞬间的重复下单,重复确认收货,重复发现金券等。

使用分布式锁的场景一般不能太多。

开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock

开源相关群: .net 开源基础服务 238543768

这里整理了C#.net关于redis分布式锁和zookeeper分布式锁的实现,仅用于研究。(可能有bug)

采用ServiceStack.Redis实现Redis分布式锁

/** Redis分布式锁     
 * 采用ServiceStack.Redis实现的Redis分布式锁     
 * 详情可阅读其开源代码     
 * 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同 xxf里面默认使用2.2版本     
 */    public classRedisDistributedLock : BaseRedisDistributedLock
    {
        privateServiceStack.Redis.RedisLock _lock;
        privateRedisClient _client;
        public RedisDistributedLock(string redisserver, stringkey)
            : base(redisserver, key)
        {
 
        }
 
        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan?taskrunTimeOut)
        {
            if (lockresult ==LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            _client =DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient();
            /** 阅读源码发现当其获取锁后,redis连接资源会一直占用,知道获取锁的资源释放后,连接才会跳出,可能会导致连接池资源的浪费。             */            
try{ this._lock = newServiceStack.Redis.RedisLock(_client, key, getlockTimeOut); lockresult =LockResult.Success; } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult =LockResult.LockSystemExceptionFailure; } returnlockresult; } public override voidDispose() { try{ if (this._lock != null) this._lock.Dispose(); if (_client != null) this._client.Dispose(); } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } }

来自网络的java实现Redis分布式锁(C#版)

/** Redis分布式锁     
* 采用网络上java实现的Redis分布式锁     
* 参考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html* 详情可阅读其开源代码     
*/    public classRedisDistributedLockFromJava : BaseRedisDistributedLock
    {
 
 
        public RedisDistributedLockFromJava(string redisserver, stringkey)
            : base(redisserver, key)
        {
 
 
        }
 
        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan?taskrunTimeOut)
        {
            if (lockresult ==LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            try{
                //1. 通过SETNX试图获取一个lock               
string @lock = key;
long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime); long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : 0); long hassleepMilliseconds = 0; while (true) { using (var redisclient =DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + 1; /*Java以前版本都是用SetNX,但是这种是无法设置超时时间的,不是很理解为什么,
* 可能是因为原来的redis命令比较少导致的?现在用Add不知道效果如何.
因对redis细节不了解,但个人怀疑若异常未释放锁经常发生,可能会导致内存逐步溢出
*/
bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds +DistributedLockConfig.TaskLockDelayCleepUpTime)); //SETNX成功,则成功获取一个锁 if (acquired == true) { lockresult =LockResult.Success; } //SETNX失败,说明锁仍然被其他对象保持,检查其是否已经超时
else
{
var oldValueBytes =redisclient.Get(@lock); //超时 if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, 0) <CurrentUnixTimeMillis()) { /*此处虽然重设并获取锁,但是超时时间可能被覆盖,故重设超时时间;若有进程一直在尝试获取锁,那么锁存活时间应该被延迟*/
var getValueBytes =redisclient.GetSet(@lock, BitConverter.GetBytes(value)); var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//这里如果程序异常终止,依然会有部分锁未释放的情况。 //获取锁成功 if (getValueBytes == oldValueBytes) { lockresult =LockResult.Success; } //已被其他进程捷足先登了 else{ lockresult =LockResult.GetLockTimeOutFailure; } } //未超时,则直接返回失败 else{ lockresult =LockResult.GetLockTimeOutFailure; } } } //成功拿到锁 if (lockresult ==LockResult.Success) break; //获取锁超时 if (hassleepMilliseconds >=getlockexpiredMilliseconds) { lockresult =LockResult.GetLockTimeOutFailure; break; } //继续等待 System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime); hassleepMilliseconds +=DistributedLockConfig.GetLockFailSleepTime; } } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult =LockResult.LockSystemExceptionFailure; } returnlockresult; } private longCurrentUnixTimeMillis() { return (long)(System.DateTime.UtcNow - new System.DateTime(1970, 1, 1, 0, 0, 0, System.DateTimeKind.Utc)).TotalMilliseconds; } public override voidDispose() { if (lockresult == LockResult.Success || lockresult ==LockResult.LockSystemExceptionFailure) { try{ long current =CurrentUnixTimeMillis(); using (var redisclient =DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { var v =redisclient.Get(key); if (v != null) { //避免删除非自己获取得到的锁 if (current < BitConverter.ToInt64(v, 0)) { redisclient.Del(key); } } } } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } } }

ServiceStack.Redis内部实现版本(较旧)

/** Redis分布式锁    
 * 采用ServiceStack.Redis实现的Redis分布式锁    
 * 详情可阅读其开源代码    
 * 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同     
  * 拷贝自网络开源代码 较旧的实现版本    
  */    public classRedisDistributedLockFromServiceStack : BaseRedisDistributedLock
    {
        public RedisDistributedLockFromServiceStack(string redisserver, stringkey)
            : base(redisserver, key)
        {
 
 
        }
        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan?taskrunTimeOut)
        {
            if (lockresult ==LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            try{
                using (var redisClient =DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
                {
                    ExecExtensions.RetryUntilTrue(
                             () =>{
                                 //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
                                  //Calculate a unix time for when the lock should expire                                 
TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime);
//new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year DateTime expireTime = DateTime.UtcNow.Add(realSpan); string lockString = (expireTime.ToUnixTimeMs() + 1).ToString(); //Try to set the lock, if it does not exist this will succeed and the lock is obtained
var nx = redisClient.SetEntryIfNotExists(key, lockString);
if(nx) { lockresult =LockResult.Success; return true; } //If we've gotten here then a key for the lock is present. This could be because the lock is
//correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
//Therefore we need to get the value of the lock to see when it should expire redisClient.Watch(key); string lockExpireString = redisClient.Get<string>(key); longlockExpireTime; if (!long.TryParse(lockExpireString, outlockExpireTime)) { redisClient.UnWatch(); //since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure;
return false; } //If the expire time is greater than the current time then we can't let the lock go yet
if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
{ redisClient.UnWatch(); //since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure;
return false; } //If the expire time is less than the current time then it wasn't released properly and we can attempt to //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes //before we call Commit() below, the Commit will fail and return false, which means that another thread //was able to acquire the lock before we finished processing. using (var trans = redisClient.CreateTransaction()) //we started the "Watch" above; this tx will succeed if the value has not moved { trans.QueueCommand(r =>r.Set(key, lockString)); //return trans.Commit(); //returns false if Transaction failed var t = trans.Commit(); if (t == false) lockresult =LockResult.GetLockTimeOutFailure; elselockresult =LockResult.Success; returnt; } }, getlockTimeOut ); } } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); lockresult =LockResult.LockSystemExceptionFailure; } returnlockresult; } public override voidDispose() { try{ using (var redisClient =DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient()) { redisClient.Remove(key); } } catch(Exception exp) { XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp); } } }

Zookeeper 版本实现分布式锁

/** 来源java网络源码的zookeeper分布式锁实现(目前仅翻译并简单测试ok,未来集成入sdk)  
* 备注:    共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,   
然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,
如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,
从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
*/ public classZooKeeprDistributedLockFromJava : IWatcher { privateZooKeeper zk; private string root = "/locks"; // private string lockName; //竞争资源的标志 private string waitNode; //等待前一个锁 private string myZnode; //当前锁 //private CountDownLatch latch; //计数器 privateAutoResetEvent autoevent; private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(30000); private IList<Exception> exception = new List<Exception>(); /// <summary> ///创建分布式锁,使用前请确认config配置的zookeeper服务可用 </summary> /// <param name="config">127.0.0.1:2181 </param> /// <param name="lockName">竞争资源标志,lockName中不能包含单词lock </param> public ZooKeeprDistributedLockFromJava(string config, stringlockName) { this.lockName =lockName; //创建一个与服务器的连接 try
{ zk
= new ZooKeeper(config, sessionTimeout, this); var stat = zk.Exists(root, false); if (stat == null) { //创建根节点 zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); } } catch(KeeperException e) { throwe; } } /// <summary> ///zookeeper节点的监视器 /// </summary> public virtual voidProcess(WatchedEvent @event) { if (this.autoevent != null) { this.autoevent.Set(); } } public virtual booltryLock() { try{ string splitStr = "_lock_"; if(lockName.Contains(splitStr)) { //throw new LockException("lockName can not contains \u000B");
}
//创建临时子节点 myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential); Console.WriteLine(myZnode + "is created "); //取出所有子节点 IList<string> subNodes = zk.GetChildren(root, false); //取出所有lockName的锁 IList<string> lockObjNodes = new List<string>(); foreach (string node insubNodes) { if(node.StartsWith(lockName)) { lockObjNodes.Add(node); } } Array alockObjNodes =lockObjNodes.ToArray(); Array.Sort(alockObjNodes); Console.WriteLine(myZnode + "==" + lockObjNodes[0]); if (myZnode.Equals(root + "/" + lockObjNodes[0])) { //如果是最小的节点,则表示取得锁 return true; } //如果不是最小的节点,找到比自己小1的节点 string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1); waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1]; } catch(KeeperException e) { throwe; } return false; } public virtual booltryLock(TimeSpan time) { try{ if (this.tryLock()) { return true; } returnwaitForLock(waitNode, time); } catch(KeeperException e) { throwe; } return false; } private bool waitForLock(stringlower, TimeSpan waitTime) { var stat = zk.Exists(root + "/" + lower, true); //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if (stat != null) { Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + "waiting for " + root + "/" +lower); autoevent = new AutoResetEvent(false); bool r =autoevent.WaitOne(waitTime); autoevent.Dispose(); autoevent = null; returnr; } else return true; } public virtual voidunlock() { try{ Console.WriteLine("unlock " +myZnode); zk.Delete(myZnode, -1); myZnode = null; zk.Dispose(); } catch(KeeperException e) { throwe; } } }

以上代码仅做参考,未压测。

代码粘贴有些问题,详细请下载开源包运行研究。

免责声明:文章转载自《.net 分布式架构之分布式锁实现》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇string.Format出现异常"输入的字符串格式有误"的解决方法Centos7 k8s v1.5.2二进制部署安装-kube-proxy下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

OHC Java堆外缓存详解与应用

1、背景   在当前微服务体系架构中,有很多服务例如,在 特征组装 与 排序等场景都需要有大量的数据支撑,快速读取这些数据对提升整个服务于的性能起着至关重要的作用。   缓存在各大系统中应用非常广泛。尤其是业务程序所依赖的数据可能在各种类型的数据库上(mysql、hive 等),那么如果想要获取到这些数据需要通过网络来访问。再加上往往数据量又很庞大,网络传...

上手七牛云存储

早就听说过七牛云存储,终于有时间上手实践。 1、第一步,注册七牛账号,由于是测试,首先申请的是个人账号 2、注册成功之后,默认是体验账号,每月只有1G的空间容量及1G的下载流量       3、账号认证,认证成功之后将升级为标准账号,每月有10G的空间容量及20G的下载流量       虽然认证麻烦了些,但看得出来,七牛还是很良心的,这种免费套餐对于一...

Golang的高级数据类型-切片(slice)实战篇

          Golang的高级数据类型-切片(slice)实战篇                              作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任。        切片(slice)是Go中一种比较特殊的数据结构,这种数据结构更便于使用和管理数据集合,切片是围绕动态数组的概念构建的,可以按需自动增长。   ...

图表行为用于触发能够改变图表显示的相关动态功能,event事件用于接收action触发的行为,所以action行为要配合event事件一块学习

//触发图表行为(更改变图表显示的相关动态),例如图例开关legendToggleSelect, 数据区域缩放dataZoom,显示提示框showTip等等 //通过不同的type触发不同的行为 myChart.dispatchAction({ type: 'highlight', //高亮指定的数据图形。通过seriesNam...

使用jquery刷新当前页面

如何使用jquery刷新当前页面 下面介绍全页面刷新方法:有时候可能会用到 window.location.reload()刷新当前页面. parent.location.reload()刷新父亲对象(用于框架) opener.location.reload()刷新父窗口对象(用于单开窗口) top.location.reload()刷新最顶端对象(用于...

System.Web.Mvc 找到的程序集清单定义与程序集引用不匹配

System.IO.FileLoadException: 未能加载文件或程序集“System.Web.Mvc, Version=5.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35”或它的某一个依赖项。找到的程序集清单定义与程序集引用不匹配。 (异常来自 HRESULT:0x80131040)文...