snowflake 分布式唯一ID生成器

摘要:
当然你也可以使用java实现版.具体详细信息:参考网址Snowflake的使用安装requestspipinstallrequests安装pysnowflakepipinstallpysnowflake启动pysnowflake服务snowflake_start_server--address=192.168.10.145--port=30001--dc=1--worker=1--log_file_prefix=/tmp/pysnowflask.log#--address:本机的IP地址默认localhost这里解释一下参数意思:#--dc:数据中心唯一标识符默认为0#--worker:工作者唯一标识符默认为0#--log_file_prefix:日志文件所在位置使用示例#导入pysnowflake客户端˃˃˃importsnowflake.client#链接服务端并初始化一个pysnowflake客户端˃˃˃host='192.168.10.145'˃˃˃port=30001˃˃˃snowflake.client.setup#生成一个全局唯一的ID˃˃˃snowflake.client.get_guid()3631957913783762945#查看当前状态˃˃˃snowflake.client.get_stats(){'dc':1,'worker':1,'timestamp':1454126885629,#currenttimestampforthisworker'last_timestamp':1454126890928,#thelasttimestampthatgeneratedIDon'sequence':1,#thesequencenumberforlasttimestamp'sequence_overload':1,#thenumberoftimesthatthesequenceisoverflow'errors':1,#thenumberoftimesthatclockwentbackward}数据整理重建ID重建ID是一个很庞大的工程,首先要很了解表的结构。

本文来自我的github pages博客http://galengao.github.io/ 即www.gaohuirong.cn

摘要:

  • 原文参考运维生存和开源中国上的代码整理
  • 我的环境是python3.5,pip8.2的

一、python版本

前言

由于考虑到以后要动态切分数据,防止将不同表切分数据到同一个表中时出现主键相等的冲突情况,这里我们使用一个全局ID生存器。重要的是他是自增的。
这边我使用Snowflake的python实现版(pysnowflake)。当然你也可以使用java实现版.
具体详细信息:参考网址

Snowflake的使用

  • 安装 requests
pip install requests
  • 安装 pysnowflake
pip install pysnowflake
  • 启动pysnowflake服务

snowflake_start_server 
  --address=192.168.10.145--port=30001--dc=1--worker=1--log_file_prefix=/tmp/pysnowflask.log


# --address:本机的IP地址默认localhost这里解释一下参数意思(可以通过--help来获取):
# --dc:数据中心唯一标识符默认为0
# --worker:工作者唯一标识符默认为0
# --log_file_prefix:日志文件所在位置

  • 使用示例(这边引用官网的)
#导入pysnowflake客户端
>>> importsnowflake.client
 
#链接服务端并初始化一个pysnowflake客户端
>>> host = '192.168.10.145'
>>> port = 30001
>>>snowflake.client.setup(host, port)
#生成一个全局唯一的ID(在MySQL中可以用BIGINT UNSIGNED对应)
>>>snowflake.client.get_guid()
3631957913783762945
#查看当前状态
>>>snowflake.client.get_stats()
{
  'dc': 1,
  'worker': 1,
  'timestamp': 1454126885629, #current timestamp for this worker
  'last_timestamp': 1454126890928, #the last timestamp that generated ID on
  'sequence': 1, #the sequence number for last timestamp
  'sequence_overload': 1, #the number of times that the sequence is overflow
  'errors': 1, #the number of times that clock went backward
}
  • 数据整理重建ID

重建ID是一个很庞大的工程,首先要很了解表的结构。不然,如果少更新了某个表的一列都会导致数据的不一致。
当然,如果你的表中有很强的外键以及设置了级联那更新一个主键会更新其他相关联的外键。这里我还是不建议去依赖外键级联更新来投机取巧毕竟如果有数据库的设计在项目的里程碑中经过了n次变化,也不能肯定设置的外键一定是级联更新的。
在这边我强烈建议重建ID时候讲MySQL中的检查外键的参数设置为0。

SET FOREIGN_KEY_CHECKS=0;

小提示:其实理论上我们是没有必要重建ID的因为原来的ID已经是唯一的了而且是整型,他兼容BIGINT。但是这里我还是做了重建,主要是因为以后的数据一致。并且如果有些人的ID不是整型的,而是有一定含义的那时候也肯定需要做ID的重建。

  • 修改相关表ID的数据类型为BIGINT
--修改商品表 goods_id 字段
ALTER TABLEgoods_1
  MODIFY COLUMN goods_id BIGINT UNSIGNED NOT NULLCOMMENT '商品ID';
 
--修改出售订单表 goods_id 字段
ALTER TABLEsell_order_1
  MODIFY COLUMN sell_order_id BIGINT UNSIGNED NOT NULLCOMMENT '出售订单ID';
 
--修改购买订单表 buy_order_id 字段
ALTER TABLEbuy_order_1
  MODIFY COLUMN buy_order_id BIGINT UNSIGNED NOT NULLCOMMENT '出售订单ID与出售订单相等';
 
--修改订单商品表 order_goods_id、orders_id、goods_id 字段
ALTER TABLEorder_goods_1
  MODIFY COLUMN order_goods_id BIGINT UNSIGNED NOT NULLCOMMENT '订单商品表ID';
ALTER TABLEorder_goods_1
  MODIFY COLUMN sell_order_id BIGINT UNSIGNED NOT NULLCOMMENT '订单ID';
ALTER TABLEorder_goods_1
  MODIFY COLUMN goods_id BIGINT UNSIGNED NOT NULLCOMMENT '商品ID';
  • 使用python重建ID

使用的python 模块:

模块名版本备注
pysnowflake0.1.3全局ID生成器
mysql_connector_python2.1.3mysql python API

这边只展示主程序:完整的程序在附件中都有

if __name__=='__main__':
  #设置默认的数据库链接参数
  db_config ={
    'user'    : 'root',
    'password': 'root',
    'host'    : '127.0.0.1',
    'port'    : 3306,
    'database': 'test'}
  #设置snowflake链接默认参数
  snowflake_config ={
    'host': '192.168.137.11',
    'port': 30001}
 
  rebuild =Rebuild()
  #设置数据库配置
rebuild.set_db_config(db_config)
  #设置snowflak配置
rebuild.set_snowflake_config(snowflake_config)
  #链接配置snowflak
rebuild.setup_snowflake()
 
  #生成数据库链接和
rebuild.get_conn_cursor()
 
  ##########################################################################
  ## 修改商品ID
  ##########################################################################
  #获得商品的游标
  goods_sql = '''SELECT goods_id FROM goods
  '''goods_iter =rebuild.execute_select_sql([goods_sql])
  #根据获得的商品ID更新商品表(goods)和订单商品表(order_goods)的商品ID 
  for goods ingoods_iter:
    for (goods_id, ) ingoods:
      rebuild.update_table_id('goods', 'goods_id', goods_id)
      rebuild.update_table_id('order_goods', 'goods_id', goods_id, rebuild.get_current_guid())
    rebuild.commit()
 
  ##########################################################################
  ## 修改订单ID, 这边我们规定出售订单ID和购买订单ID相等
  ##########################################################################
  #获得订单的游标
  orders_sql = '''SELECT sell_order_id FROM sell_order_1
  '''sell_order_iter =rebuild.execute_select_sql([orders_sql])
  #根据出售订单修改 出售订单(sell_order_1)、购买订单(buy_order_1)、订单商品(order_goods)的出售订单ID
  for sell_order_1 insell_order_iter:
    for (sell_order_id, ) insell_order_1:
      rebuild.update_table_id('sell_order_1', 'sell_order_id', sell_order_id)
      rebuild.update_table_id('buy_order_1', 'buy_order_id', sell_order_id, rebuild.get_current_guid())
      rebuild.update_table_id('order_goods', 'sell_order_id', sell_order_id, rebuild.get_current_guid())
    rebuild.commit()
 
  ##########################################################################
  ## 修改订单商品表ID
  ##########################################################################
  #获得订单商品的游标
  order_goods_sql = '''SELECT order_goods_id FROM order_goods
  '''order_goods_iter =rebuild.execute_select_sql([order_goods_sql])
  for order_goods inorder_goods_iter:
    for (order_goods_id, ) inorder_goods:
      rebuild.update_table_id('order_goods', 'order_goods_id', order_goods_id)
    rebuild.commit()
  #关闭游标
  rebuild.close_cursor('select')
  rebuild.close_cursor('dml')
  #关闭连接
  rebuild.close_conn()

完整的python程序:rebuild_id.py
执行程序

python rebuild_id.py
  • 最后查看表的结果
SELECT * FROM goods LIMIT 0, 1;
+---------------------+------------+---------+----------+
| goods_id            | goods_name | price   | store_id |
+---------------------+------------+---------+----------+
| 3791337987775664129 | goods1     | 9369.00 |        1 |
+---------------------+------------+---------+----------+
SELECT * FROM sell_order_1 LIMIT 0, 1;
+---------------------+---------------+---------+---------+--------+
| sell_order_id       | user_guide_id | user_id | price   | status |
+---------------------+---------------+---------+---------+--------+
| 3791337998693437441 |             1 |      10 | 5320.00 |      1 |
+---------------------+---------------+---------+---------+--------+
SELECT * FROM buy_order_1 LIMIT 0, 1;
+---------------------+---------+---------------+
| buy_order_id        | user_id | user_guide_id |
+---------------------+---------+---------------+
| 3791337998693437441 |      10 |             1 |
+---------------------+---------+---------------+
SELECT * FROM order_goods LIMIT 0, 1;
+---------------------+---------------------+---------------------+---------------+---------+------+
| order_goods_id      | sell_order_id       | goods_id            | user_guide_id | price   | num  |
+---------------------+---------------------+---------------------+---------------+---------+------+
| 3792076554839789569 | 3792076377064214529 | 3792076372429508609 |             1 | 9744.00 |    2 |
+---------------------+---------------------+---------------------+---------------+---------+------+

建议:如果在生产上有使用到snowflake请务必要弄一个高可用防止单点故障,具体策略看你们自己定啦。

二、java版本

代码一

/*** @authorzhujuan
 * From: https://github.com/twitter/snowflake* An object that generates IDs.
 * This is broken into a separate class in case
 * we ever want to support multiple worker threads
 * per process
 */
public classIdWorker {
     
    protected static final Logger LOG = LoggerFactory.getLogger(IdWorker.class);
     
    private longworkerId;
    private longdatacenterId;
    private long sequence = 0L;
 
    private long twepoch = 1288834974657L;
 
    private long workerIdBits = 5L;
    private long datacenterIdBits = 5L;
    private long maxWorkerId = -1L ^ (-1L <<workerIdBits);
    private long maxDatacenterId = -1L ^ (-1L <<datacenterIdBits);
    private long sequenceBits = 12L;
 
    private long workerIdShift =sequenceBits;
    private long datacenterIdShift = sequenceBits +workerIdBits;
    private long timestampLeftShift = sequenceBits + workerIdBits +datacenterIdBits;
    private long sequenceMask = -1L ^ (-1L <<sequenceBits);
 
    private long lastTimestamp = -1L;
 
    public IdWorker(long workerId, longdatacenterId) {
        //sanity check for workerId
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId =workerId;
        this.datacenterId =datacenterId;
        LOG.info(String.format("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d", timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId));
    }
 
    public synchronized longnextId() {
        long timestamp =timeGen();
 
        if (timestamp <lastTimestamp) {
            LOG.error(String.format("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp));
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp -timestamp));
        }
 
        if (lastTimestamp ==timestamp) {
            sequence = (sequence + 1) &sequenceMask;
            if (sequence == 0) {
                timestamp =tilNextMillis(lastTimestamp);
            }
        } else{
            sequence = 0L;
        }
 
        lastTimestamp =timestamp;
 
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) |sequence;
    }
 
    protected long tilNextMillis(longlastTimestamp) {
        long timestamp =timeGen();
        while (timestamp <=lastTimestamp) {
            timestamp =timeGen();
        }
        returntimestamp;
    }
 
    protected longtimeGen() {
        returnSystem.currentTimeMillis();
    }
}

代码二

public classIdWorkerTest {
 
    static class IdWorkThread implementsRunnable {
        private Set<Long>set;
        privateIdWorker idWorker;
 
        public IdWorkThread(Set<Long>set, IdWorker idWorker) {
            this.set =set;
            this.idWorker =idWorker;
        }
 
        @Override
        public voidrun() {
            while (true) {
                long id =idWorker.nextId();
                if (!set.add(id)) {
                    System.out.println("duplicate:" +id);
                }
            }
        }
    }
 
    public static voidmain(String[] args) {
        Set<Long> set = new HashSet<Long>();
        final IdWorker idWorker1 = new IdWorker(0, 0);
        final IdWorker idWorker2 = new IdWorker(1, 0);
        Thread t1 = new Thread(newIdWorkThread(set, idWorker1));
        Thread t2 = new Thread(newIdWorkThread(set, idWorker2));
        t1.setDaemon(true);
        t2.setDaemon(true);
        t1.start();
        t2.start();
        try{
            Thread.sleep(30000);
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

免责声明:文章转载自《snowflake 分布式唯一ID生成器》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Oracle基础 自定义函数tcpdump使用方法小结下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

sqlserver 查询当天/本周/本月/本季度/本年的数据

当天数据:select * from tableName where datediff(day, 字段名,getdate())=0 本周数据:select * from tableName where datediff(week, 字段名,getdate())=0 本月:select * from tableName where datediff(mont...

SqlServer基础之(触发器)

SqlServer基础之(触发器)  阅读目录 一:触发器的优点 二:触发器的作用 三:触发器的分类 四:触发器的工作原理 五:创建触发器 六:管理触发器  概念:   触发器(trigger)是SQL server 提供给程序员和数据分析员来保证数据完整性的一种方法,它是与表事件相关的特殊的存储过程,它的执行不是由程序调用,也不是手工启动,而是...

分布式系统唯一ID生成方案汇总

系统唯一ID是我们在设计一个系统的时候常常会遇见的问题,也常常为这个问题而纠结。生成ID的方法有很多,适应不同的场景、需求以及性能要求。所以有些比较复杂的系统会有多个ID生成的策略。下面就介绍一些常见的ID生成策略。 1. 数据库自增长序列或字段 最常见的方式。利用数据库,全数据库唯一。 优点: 1)简单,代码方便,性能可以接受。 2)数字ID天然排序,...

数据库三级考试的随笔3.0

1 select name,grade,评价= 2 case 3 when grade >90 then '优秀' 4 when grade between 80 and 90 then '中等' 5 when grade <80 then '垃圾' 6 end 7 from student 这个就是运行结果 1 select...

UNIX时间戳及日期的转换与计算

UNIX时间戳是保存日期和时间的一种紧凑简洁的方法,是大多数UNIX系统中保存当前日期和时间的一种方法,也是在大多数计算机语言中表示日期和时间的一种标准格式。以32位整数表示格林威治标准时间,例如,使用证书11230499325表示当前时间的时间戳。UNIX时间戳是从1970年1月1日零点(UTC/GMT的午夜)开始起到当前时间所经过的秒数。1970年1月...

SQL存储过程实例详解

      本文用3个题目,从建立数据库到创建存储过程,详细讲解数据库的功能。 题目1         学校图书馆借书信息管理系统建立三个表:         学生信息表:student 字段名称 数据类型 说明 stuID char(10) 学生编号,主键 stuName Varchar(10) 学生名称 major Varchar...