分布式主键生成算法

摘要:
分布式主键的生成方法分为两类:集中式和分散式。集中式生成算法集中式生成算法经典方案主要包括三种SEQUENCE间隔方案:基于SEQUENCE间隔方案,每个数据库根据特定步长自动递增,基于redis生成自动递增序列。淘宝分布式数据层TDDL使用SEQUENCE方案实现子数据库和子表、Master/Save、动态数据源配置等功能。迁移历史数据的步骤如下:步骤1:创建与序列对应的表。

https://blog.csdn.net/wangpengzhi19891223/article/details/81197078

这篇文章总结了分布式主键或者唯一键的生成算法,文章最后有我们基于snowflow算法的思考和实践。

分布式主键的生成方式分为中心化和去中心化两大类。

中心化生成算法

中心化生成算法经典的方案主要有基于SEQUENCE区间方案、各数据库按特定步长自增和基于redis生成自增序列三种

SEQUENCE区间方案

淘宝分布式数据层TDDL就是采用SEQUENCE方案实现了分库分表、Master/Salve、动态数据源配置等功能。大致原理是:所有应用服务器去同一个库获取可使用的sequence(乐观锁保证一致性),得到(sequence,sequence+步长]个可被这个数据源使用的id,当应用服务器插入“步长”个数据后,再次去争取新的sequence区间。
优势:生成一个 全局唯一 的 连续 数字类型主键,延用单库单表时的主键id。
劣势:无法保证 全局递增 。需要开发各种数据库类型id生成器。扩容历史数据不好迁移

操作步骤如下:
第一步:创建一张sequence对应的表。记录每一个表的当前最大sequence,几张逻辑表需要声明几个sequence;
第二步:配置sequenceDao,定义步长等信息

<bean id="sequenceDao" class="com.taobao.tddl.client.sequence.impl.DefaultSequenceDao">  
        <!-- 数据源 -->  
        <property name="dataSource"  ref="dataSource" />  
        <!-- 步长-->  
        <property name="step" value="1000" />  
        <!-- 重试次数-->  
        <property name="retryTimes" value="1" />  
        <!-- sequence 表名-->  
        <property name="tableName" value="gt_sequence" />  
        <!-- sequence 名称-->  
        <property name="nameColumnName" value="BIZ_NAME" />  
        <!-- sequence 当前值-->  
        <property name="valueColumnName" value="CURRENT_VALUE" />  
        <!-- sequence 更新时间-->  
        <property name="gmtModifiedColumnName" value="gmt_modified" />  
    </bean>  

DefaultSequenceDao获取区间源码如下:

public SequenceRange nextRange(String name) throws SequenceException {
        if (name == null) {
            throw new IllegalArgumentException("序列名称不能为空");
        }

        long oldValue;
        long newValue;

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        for (int i = 0; i < retryTimes + 1; ++i) {
            try {
                conn = dataSource.getConnection();
                stmt = conn.prepareStatement(getSelectSql());
                stmt.setString(1, name);
                rs = stmt.executeQuery();
                rs.next();
                oldValue = rs.getLong(1);

                if (oldValue < 0) {
                    StringBuilder message = new StringBuilder();
                    message.append("Sequence value cannot be less than zero, value = ").append(oldValue);
                    message.append(", please check table ").append(getTableName());

                    throw new SequenceException(message.toString());
                }

                if (oldValue > Long.MAX_VALUE - DELTA) {
                    StringBuilder message = new StringBuilder();
                    message.append("Sequence value overflow, value = ").append(oldValue);
                    message.append(", please check table ").append(getTableName());

                    throw new SequenceException(message.toString());
                }

                newValue = oldValue + getStep();
            } catch (SQLException e) {
                throw new SequenceException(e);
            } finally {
                closeResultSet(rs);
                rs = null;
                closeStatement(stmt);
                stmt = null;
                closeConnection(conn);
                conn = null;
            }

            try {
                conn = dataSource.getConnection();
                stmt = conn.prepareStatement(getUpdateSql());
                stmt.setLong(1, newValue);
                stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
                stmt.setString(3, name);
                stmt.setLong(4, oldValue);
                int affectedRows = stmt.executeUpdate();
                if (affectedRows == 0) {
                    // retry
                    continue;
                }

                return new SequenceRange(oldValue + 1, newValue);
            } catch (SQLException e) {
                throw new SequenceException(e);
            } finally {
                closeStatement(stmt);
                stmt = null;
                closeConnection(conn);
                conn = null;
            }
        }

        throw new SequenceException("Retried too many times, retryTimes = " + retryTimes);
    }

第三步:配置sequence生成器,用于获取可使用的sequence区间,使用完后再去sequence库获取。

<bean id="businessSequence"  class="com.taobao.tddl.client.sequence.impl.DefaultSequence">  
    <property name="sequenceDao" ref="sequenceDao"/>  
    <property name="name" value="business_sequence" />  
</bean>  

其中DefaultSequence源码如下:

public class DefaultSequence implements Sequence {
    private final Lock lock = new ReentrantLock();

    private SequenceDao sequenceDao;

    /**
     * 序列名称
     */
    private String name;

    private volatile SequenceRange currentRange;

    public long nextValue() throws SequenceException {
        if (currentRange == null) {
            lock.lock();
            try {
                if (currentRange == null) {
                    currentRange = sequenceDao.nextRange(name);
                }
            } finally {
                lock.unlock();
            }
        }

        long value = currentRange.getAndIncrement();
        if (value == -1) {
            lock.lock();
            try {
                for (;;) {
                    if (currentRange.isOver()) {
                        currentRange = sequenceDao.nextRange(name);
                    }

                    value = currentRange.getAndIncrement();
                    if (value == -1) {
                        continue;
                    }

                    break;
                }
            } finally {
                lock.unlock();
            }
        }

        if (value < 0) {
            throw new SequenceException("Sequence value overflow, value = " + value);
        }

        return value;
    }

    public SequenceDao getSequenceDao() {
        return sequenceDao;
    }

    public void setSequenceDao(SequenceDao sequenceDao) {
        this.sequenceDao = sequenceDao;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

第四步:调用

public class IbatisSmDAO extends SqlMapClientDaoSupport implements SmDAO {

  /**smSequence*/
  private DefaultSequence   businessSequence;
   
    public int insert(SmDO sm) throws DataAccessException {
        if (sm == null) {
            throw new IllegalArgumentException("Can't insert a null data object into db.");
        }
        
        try {
            sm.setId((int)businessSequence.nextValue());
        } catch (SequenceException e) {
            throw new RuntimeException("Can't get primary key.");
        }
        
        getSqlMapClientTemplate().insert("MS-SM-INSERT", sm);

        return sm.getId();
    }
}

优势:生成一个全局唯一的连续数字类型主键,延用单库单表时的主键id。
劣势:无法保证全局递增。需要开发各种数据库类型id生成器。

各数据库按特定步长自增

可以继续采用数据库生成自增主键的方式,为每个不同的分库设置不同的初始值,并按步长设置为分片的个数即可,这种方式对分片个数有依赖,一旦再次水平扩展,原有的分布式主键不易迁移。为了预防后续库表扩容,这边可以采用提前约定最大支持的库表数量,后续扩容为2的指数倍扩容。
比如:我们规定最大支持1024张分表,数据库增长的步长为1024(即使现在的表数量只有64)。
优势:生成一个全局唯一的数字类型主键,延用单库单表时的主键id。当分表数没有达到约定的1024张分表,全局不连续。
劣势:无法保证全局递增,也不保证单机连续。需要开发各种数据库类型id生成器。需要依赖一个中心库表sequence。

基于redis的方案

另一种中心化生成分布式主键的方式是采用Redis在内存中生成自增序列,通过redis的原子自增操作(incr接口)生成一个自增的序列。
优势:生成一个 全局连续递增 的数字类型主键。
劣势:此种方式新增加了一个外部组件的依赖,一旦Redis不可用,则整个数据库将无法在插入,可用性会大大下降,另外Redis的单点问题也需要解决,部署复杂度较高。

去中心化生成算法

去中心化方式无需额外部署,以jar包方式被加载,可扩展性也很好,因此更推荐使用。目前主流的去中心化生成算法有:UUID及其变种、Mongo的ObjectId、snowflake算法及其变种

UUID及其变种

UUID 是 通用唯一识别码(Universally Unique Identifier)的缩写,是一种软件建构的标准,亦为开放软件基金会组织在分布式计算环境领域的一部分。其目的,是让分布式系统中的所有元素,都能有唯一的辨识信息,而不需要通过中央控制端来做辨识信息的指定。UUID有很多变种实现,目前最广泛应用的UUID,是微软公司的全局唯一标识符(GUID)。
UUID是一个由4个连字号(-)将32个字节长的字符串分隔后生成的字符串,总共36个字节长。算法的核心思想是结合机器的网卡、当地时间、一个随即数来生成GUID。从理论上讲,如果一台机器每秒产生10000000个GUID,则可以保证(概率意义上)3240年不重复。
优势:全局唯一,各种语言都有UUID现成实现,Mysql也有UUID实现。
劣势:36个字符组成,按照目前Mysql最常用的编码Utf-8,每一个字符对应的索引成本是3字节,也就是一个UUID需要108个字节的索引存储成本,是最大数字类型(8字节)的13.5倍的存储成本。

mongodb的ObjectId

objectid有12个字节,包含时间信息(4字节、秒为单位)、机器标识(3字节)、进程id(2字节)、计数器(3字节,初始值随机)。其中,时间位精度(秒或者毫秒)与序列位数,二者决定了单位时间内,对于同一个进程最多可产生多少唯一的ObjectId,在MongoDB中,那每秒就是2^24(16777216)。但是机器标识与进程id一定要保证是不重复的,否则极大概率上会产生重复的ObjectId。由于ObjectId生成12个字节的16进制表示,无法用现有基础类型存储,只能转化为字符串存储,对应24个字符。objectid的组成结构如下

4字节3字节2字节3字节
timemachinepid自增

ObjectId生成算法的核心代码如下:

public class ObjectId implements Comparable<ObjectId> , java.io.Serializable {
final int _time;
    final int _machine;
    final int _inc;
boolean _new;

public ObjectId(){
        _time = (int) (System.currentTimeMillis() / 1000);
        _machine = _genmachine;
        _inc = _nextInc.getAndIncrement();
        _new = true;
}
……
}

优势: 全局唯一 。
劣势:非数字类型,24个字符,按照目前Mysql最常用的编码Utf-8,每一个字符对应的索引成本是3字节,也就是一个ObjectId需要72个字节的索引存储成本,是最大数字类型(8字节)的9倍的存储成本。

snowflake算法

Snowflake算法产生是为了满足Twitter每秒上万条消息的请求,每条消息都必须分配一条唯一的id,这些id还需要一些大致的顺序(方便客户端排序),并且在分布式系统中不同机器产生的id必须不同。Snowflake算法把时间戳,工作机器id,序列号组合在一起。生产Id的结构如下:

6362-2221-1211-0
1位:241位:支持69.7年(单位ms)10位:102412位:4096

默认情况下41bit的时间戳可以支持该算法使用到2082年,10bit的工作机器id可以支持1023台机器,序列号支持1毫秒产生4095个自增序列id。

工作机器id可以使用IP+Path来区分工作进程。如果工作机器比较少,可以使用配置文件来设置这个id是一个不错的选择,如果机器过多配置文件的维护是一个灾难性的事情。
实施现状:工作机器id有10位,根据我们公司目前已经未来5-10的业务量,同一个服务机器数超过1024台基本上不太可能。工作机器id推荐使用下面的结构来避免可能的重复。

9-87-0
用户可指定(默认为0)机器ip的后8位

考虑到我们公司的业务级别,同一个机房ip的后8位基本上不可能重复。后2位让用户指定是由于存在以下场景:
1)一个虚拟机下面可能存在两个进程号不同的同样服务(我们不建议,后续也希望通过运维来避免类似的部署)。如果存在这种情况,可以在JVM启动参数中添加HostId参数,为这个这台机器的服务指定一个不同于其他服务的HostId。
2)存在前后台服务部署在同一台机器上,都操作同一个库(建议后台服务通过调用前台的服务来操作库表,保证库表的单一操作)。如果存在这种情况,可以通过为前后台服务指定不同的服务编号serviceNo(只支持0,1,2,3)。
3)不同机房可能存在相同后8位ip尾号,比如兴议机房为10.10.100.123 滨安机房为10.20.100.123。如果存在这种情况,可以通过 a)在其中一台机器的环境变量中重新指定一下HostId;b)不同环境配置不同的服务编号serviceNo;c)服务启动JVM参数中为这个这台机器的服务指定一个不同于其他服务的HostId

变种snowflake算法
结合公司现状,我们在snowflake算法的基础上进行了部分改造,得到变种snowflake算法。我们推荐使用的分布式主键生成算法是变种的snowflake算法。这个算法更加充分利用了ID的位表达,比原生的snowflake算法多出1位使用。产生的ID结构如下:

63-6261-5251-2019-0
2位:410位:102432位:136年(单位为s)19位:1048560
保留位机器码时间戳自增码

时间戳生成: 32位时间戳代表秒的话,可以表示136年,比如我们取2016年11月11日0点0分0秒作为基准,32位时间表示当前时间转换秒数-基准时间转换秒数
自增码:服务数据源 原子自增的long类型变量,最大支持每秒1048560条记录,当一秒产生超过1048560个序号时,再次请求生成序号时,会阻塞等待下一秒到达才生成新的序号。为了避免自增码都是从0开始计数导致数据倾斜,自增码的起始值被设定成一个随机数。
机器码:可以参考上面描述的方案

我们实现变种snowflake算法的核心代码如下

    @PostConstruct
    public void init() {
        this.initHostId();
        this.initTime();
        this.initIncNo();
    }

    /**
     * 初始化{@link #hostId} {@link #shiftedHostId}
     */
    protected void initHostId() {
        if(serviceNo > 3 || serviceNo <0){
            LOG.error("serviceNo只支持0、1、2、3");
            throw new ShardingJdbcException("serviceNo只支持0、1、2、3");
        }
        
        if (hostId != 0) {
            this.shiftedHostId = this.hostId << this.hostTimeRule.getHostOffset();
            LOG.info("属性注入HostId。HostId:{}", hostId);
            LOG.info("初始化Id生成器。HostId:{},ShiftHostId:{}", hostId, shiftedHostId);
            return;
        }
        // 从JVM参数中,获取系统Id
        String host = System.getProperty(HOST_ID);
        if (host != null) {
            this.hostId = Integer.valueOf(host);
            LOG.info("从JVM参数中获取HostId。HostId:{}", hostId);
        } else {
            // 从环境中获取系统ID
            host = System.getenv(HOST_ID);
            if (host != null) {
                this.hostId = Integer.valueOf(host);
                LOG.info("从系统环境中获取HostId。HostId:{}", hostId);
            } else if (useSystemIpAsHostId) {
                //从网卡读取IP地址,转换成HostId。取后8bit位,hostId为service*256+ip后8位
                String ip = IpUtil.getLocalIPv4();
                this.hostId = serviceNo*256 + (int) (IpUtil.convertIPv4(ip) & 255);
                LOG.info("从网卡中获取HostId。serviceNo:{},IP:{}, HostId:{}", serviceNo,ip, hostId);
            } else {
                LOG.error("没有设置HostId,也没有开启useSystemIpAsHostId");
                throw new ShardingJdbcException("必须设置HostId,或者开启useSystemIpAsHostId为true");
            }
        }
        this.shiftedHostId = this.hostId << this.hostTimeRule.getHostOffset();
        LOG.info("初始化Id生成器。HostId:{},ShiftHostId:{}", hostId, shiftedHostId);
    }

    /**
     * 初始化{@link #baseTime }{@link #timeBaseLine } {@link #shiftedTime }
     */
    protected void initTime() {
        if (timeBaseLine > System.currentTimeMillis()) {
            LOG.error("时间戳底线时间timeBaseLine不能大于当前毫秒级时间戳。当前时间:{},timeBaseTime:{}", System.currentTimeMillis(),
                timeBaseLine);
            throw new ShardingJdbcException("时间戳底线时间timeBaseLine不能大于当前毫秒级时间戳。");
        }
        LOG.info("时间底线TimeBaseLine:{},时间跨度TimeGap:{}",  timeBaseLine, timeGap);
        long baseTime = System.currentTimeMillis();
        currentShiftedTime = (((baseTime - timeBaseLine) / timeGap)%(1L << hostTimeRule.getTimeLength()))<< hostTimeRule.getTimeOffset();
    }

    /**
     * 初始化{@link #incNo}
     */
    protected void initIncNo() {
        if (!randomIncNo) {
            LOG.debug("不需要随机IncNo。");
            return;
        }
        //使用随机初始化IncNo
        startIncNo = (int) ((1L << hostTimeRule.getIncNoLength()) * Math.random());
        //这里如果事先设置IncNo属性,就不要开启随机IncNo。
        if (incNo == 0) {
            incNo = startIncNo;
            LOG.info("设置随机IncNo成功。IncNo:{}", randomIncNo);
        } else {
            LOG.info("设置随机IncNo失败。请确认初始化IncNo为0,即不要设置IncNo属性,或者关闭randomIncNo属性!IncNo:{}", incNo);
            throw new ShardingJdbcException("设置随机IncNo失败。请确认初始化IncNo为0,即不要设置IncNo属性,或者关闭randomIncNo属性!");
        }
        maxIncNo = startIncNo + (1 << hostTimeRule.getIncNoLength());
    }

    public synchronized long getAndAdd(){
        return getAndAdd(1);
    }
    
    public synchronized long getAndAdd(int size){
        long currentIncNo = 0;
        //当一秒内请求超过 1 << hostTimeRule.getIncNoLength() 时要等待下一秒
        while(true){
            getShiftTime();
            currentIncNo = incNo;
            if(currentShiftedTime == shiftedTime){
                incNo = incNo+size;
                if(currentIncNo >= maxIncNo){
                    LOG.info("当前时间分片请求分布式主键id的自增值:{}达到算法瓶颈,需要等下一个时间分片才能创建.起始偏移:{},每一个时间分片最大支持生成个数:{}",new Object[]{currentIncNo,startIncNo,(1 << hostTimeRule.getTimeLength())});
                    continue;
                }
            }else{
                currentShiftedTime = shiftedTime;
                incNo = startIncNo; //从头开始计数
            }
            break;
        }
        
       return ((currentIncNo)%(1L << hostTimeRule.getIncNoLength())) << hostTimeRule.getIncNoOffset();
    }
    
    private void getShiftTime(){
        long baseTime = System.currentTimeMillis();
        shiftedTime = (((baseTime - timeBaseLine) / timeGap)%(1L << hostTimeRule.getTimeLength()))<< hostTimeRule.getTimeOffset();
    }
    @Override
    public Number generateId() {
        return shiftedHostId + shiftedTime + getAndAdd();
    }

snowflake算法的优势和劣势如下:
优势:在服务器规模不是很大(不超过1024条件) 全局唯一 ,单机递增 ,是数字类型,存储索引成本低。
劣势:机器规模大于1024无法支持,需要运维配合解决单机部署多个同服务进程问题。

带偏移的snowflake算法
什么是带偏移的snowflake算法?指的是某个变量的后多少位和另一个字段的后多少位有相同的二进制,从而这两个变量具有相同的偏移。也就是一个变量的生成依赖另一个字段,两者具有相同的偏移量。我们也可以用槽位来理解,就是具有相同位偏移,从而保证取模运算之后这两个变量会被分到同一个槽中。
举个栗子,当订单数量非常大时,需要对订单表做分库分表,查询维度分为患者维度(对应买家维度)和医生维度(对应卖家维度)。患者就诊表以及医生接单表的数量和订单表的数量是一样的。同理也需要对患者就诊表以及医生接单表进行分库分表。这样就存在3个分库分表。
通过偏移绑定,让订单的生成id的后多少位(比如后10位)和用户id的后多少位(比如后10位)具有相同的偏移。也就是订单生成部分依赖患者id。这样通过订单id或者患者id进行取模运算(mod 1024)都能定位到同一个分库分表(槽),这样患者就诊表和订单表就是同一个表,从而将3个分库分表减少为2个分库分表。以最大支撑分库分表数量为1024,这样我们后10位用于偏移。带偏移的snowflake算法产生的ID结构如下:

64-6362-5352-2423-109-0
1位:符号位10位:102429位:17年14位:1638410位:1024个slot
保留位机器码时间戳自增码偏移位(槽位)

这样生成的id能够支撑17年;最大支持1024台应用机器同时生产数据;最大支持同一个用户每秒产生16384条记录。

核心代码如下

    @Override
    public Long generate(Long item) {
        return shiftedHostId + shiftedTime + getAndAdd() + ((item
            & hostTimeRule.getBiasedMask()) << hostTimeRule.getBiasedOffset());
    }

    @Override
    public Long batchGenerateMinId(Long item, int size) {
        return shiftedHostId + shiftedTime + getAndAdd(size) + (((item
            & hostTimeRule.getBiasedMask())) << hostTimeRule.getBiasedOffset());
    }

带偏移的snowflake算法的优势和劣势如下:
优势:在服务器规模不是很大(不超过1024条件) 全局唯一,是数字类型,存储索引成本低。通过偏移绑定,能减少一个分库分表。
劣势:不保证单机递增,机器规模大于1024无法支持,需要运维配合解决单机部署多个同服务进程问题。

https://yq.aliyun.com/articles/648038

免责声明:文章转载自《分布式主键生成算法》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇oracle数据库连接字符串一文看懂 MySQL 分区和分表,提高表增删改查效率下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

密码算法详解——AES

0 AES简介   我们知道数据加密标准(Data Encryption Standard: DES)的密钥长度是56比特,因此算法的理论安全强度是256。但二十世纪中后期正是计算机飞速发展的阶段,元器件制造工艺的进步使得计算机的处理能力越来越强,DES将不能提供足够的安全性。1997年1月2号,美国国家标准技术研究所(National Institut...

JAVA编程心得-JAVA实现CRC-CCITT(XMODEM)算法

CRC即循环冗余校验码(Cyclic Redundancy Check):是数据通信领域中最常用的一种差错校验码,其特征是信息字段和校验字段的长度可以任意选定。 1 byte checksum CRC-16 CRC-16 (Modbus)CRC-16 (Sick)CRC-CCITT (XModem)CRC-CCITT (0xFFFF) CRC-CCITT...

Hive基础之Hive数据类型

Hive数据类型    参考:中文博客:http://www.cnblogs.com/ggjucheng/archive/2013/01/03/2843448.html          英文:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types    1、列类型    ...

C# 时间戳与DateTime互转,使用 DateTimeOffset

/// <summary> /// 时间戳与DateTime互转 /// </summary> public class TicksTimeConvert { /* * 时间戳10位的是秒,13位的是毫秒 * * 1秒=1...

DB2 日期时间函数

db2日期时间函数 (DATE(TRIM(CHAR(DT#11Y))||'-'||TRIM(CHAR(DT#11M))||'-'||TRIM(CHAR(DT#11D))) BETWEEN DATE('" & strDate1 & "') AND DATE('" & strDate2 & "')) (Y > y) OR...

linux运维、架构之路-分布式存储Ceph

一、Ceph介绍        Ceph是一个Linux PB级分布式文件系统,能够在维护POSIX兼容性的同时加入了复制和容错功能。Ceph号称高可用的分布式存储系统,通过多个MON节点(通常为3个)维护集群的状态及元数据信息,而真正存储数据的OSD节点通过向MON节点汇报状态,并通过CRUSH算法将数据副本布局到相应OSD的所在磁盘上,完成数据的持久化...