MongoDB线程安全批量处理

摘要:
=null)if(args.length!

Mongo批处理工具类:

package com.saike.solr.server.util;

import java.net.UnknownHostException;
import java.util.ArrayList;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.MongoOptions;

/**
 * 批处理工具类
 * @author xieyong
 *
 */
public class UtileMongDB {
    
    UtilThreadLocal<ArrayList<DBObject>> localBatch;
    /**mongo单例对象  根据官方文档mongojava是线程安全的*/
    private static Mongo mongo;
    private static DBCollection coll;
    //private static Log log = LogFactory.getLog(UtileMongDB.class);
    private static DB db;
    
    static{
           /** 实例化db*/
           MongoOptions options = new MongoOptions();
                      options.autoConnectRetry = true;
                      options.connectionsPerHost = 1000;
                      options.maxWaitTime = 5000;
                      options.socketTimeout = 0;
                      options.connectTimeout = 15000;
                      options.threadsAllowedToBlockForConnectionMultiplier = 5000;
            try {
                mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT);
            } catch (UnknownHostException | MongoException e) {
                e.printStackTrace();
            }
            // boolean auth = db.authenticate(myUserName, myPassword);
    }
    
    public UtileMongDB(){
        try {
            localBatch = new UtilThreadLocal<ArrayList<DBObject>>(ArrayList.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
    /**
     * 返回db对象
     * @return db
     */
    public static DB getDB(){
        if(db==null){
            db = mongo.getDB(MongoDBConstant.MONGO_DB);
        }
        return db;
    }
    
    /**
     * 返回mongo
     * @return mongo连接池
     */
    public static Mongo getMong(){
        return mongo;
    }
    
    /**
     * 读取集合
     * @return mongo集合
     * */
    public static DBCollection getColl(String collname){
        return getDB().getCollection(collname);
    }
    
    public static DBCollection getColl(){
        return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION);
    }
    
    /**  crud操作 */
    public void addBatch(String key,String value){
        BasicDBObject basicDB = new BasicDBObject();
        basicDB.put(key, value);
        /** 这里用线程本地变量,不用会存在竞技条件*/
        localBatch.newGet().add(basicDB);
    }
    
    /**
     * 执行批处理
     * */
    public void executeInsertBatch(){
        getColl().insert(localBatch.get());
        localBatch.get().clear();
    }
    /**
     * 执行批量删除
     */
    public void executeDeleteBatch(){
        ArrayList<DBObject> array = localBatch.get();    
        for(DBObject obj:array){
            getColl().remove(obj);
        }
        localBatch.get().clear();
    }
    
    
    
    
    
    public DBCursor query(String key,String value){
        BasicDBObject basicDBObject = new BasicDBObject();
        basicDBObject.put(key,value);
        return getColl().find(basicDBObject);
    }
        
}

 

ThreadLocal的封装:

package com.saike.solr.server.util;

import java.lang.reflect.Constructor;

/**
 * 
 * @author xieyong
 *
 * @param <T> 本地线程变量对象了类型
 */
public class UtilThreadLocal<T> extends ThreadLocal<T> {
    /**参数集合*/
    Object[] obj;
    /**实例化构造函数*/
    Constructor<T> construct;
    
    /**
     * 
     * @param clazz        本地变量的class
     * @param args        构造函数的参数
     * @throws NoSuchMethodException
     * @throws SecurityException
     */
    public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{
        this.obj = obj;
        Class[] clazzs = null;
        /** new 获取参数class供获取构造函数用*/
        if(args != null)
            if(args.length !=0){
                clazzs = new Class[args.length];
                for(int i = 0;i<args.length;i++){
                    clazzs[i] = args[i].getClass();
                }
            }
        this.construct = clazz.getConstructor(clazzs);
    }
    
    /**
     * 如果当前线程没有对象创建一个新对象
     * @return
     */
    public T newGet(){
        T tar = super.get() ;
        if(tar == null){
            try {
                tar = construct.newInstance(obj);
                super.set(tar);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        return tar;
    }
}

免责声明:文章转载自《MongoDB线程安全批量处理》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Flask----目录结构C++选择文件打开方式的函数下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

mongodb模糊查询包含特殊字符

mongodb中的待特殊字符的模糊查询需要转义下才能查到 (name: /wo*2hjf/ 查不到 name: /wo*2hjf/ 查得到) let str=wo*2hjf; let filter1={ name: new RegExp(str) }; let filter2={ name: new RegExp(str.replace(/*/g,'\*...

MongoDB地理空间(2d)索引创建与查询

LBS(Location Based Services)定位服务,即根据用户位置查询用户附近相关信息,这一功能在很多应用上都有所使用。基于用户位置进行查询时,需要提供用户位置的经纬度。为了提高查询速度,MongoDB为坐标平面查询提供了专门的索引,称作地理空间(2d)索引。 1. 创建地理空间索引 地理空间索引又称为2d索引。创建其它形式的索引,我们会按升...

Mongodb学习笔记五(C#操作mongodb)

mongodb c# driver(驱动)介绍 目前基于C#的mongodb驱动有两种,分别是官方驱动(下载地址)和samus驱动(下载地址)。本次我们只演示官方驱动的使用方法。官方驱动文档查看 第一步:引用驱动dll 引用驱动有两种方式:1. 根据上面的下载地址下载对应的版本,然后引用到项目中。2. 在项目的引用上右击->管理NuGet程序包(首先...

Java中生成随机数的4种方式!

在 Java 中,生成随机数的场景有很多,所以本文我们就来盘点一下 4 种生成随机数的方式,以及它们之间的区别和每种生成方式所对应的场景。 1.Random Random 类诞生于 JDK 1.0,它产生的随机数是伪随机数,也就是有规则的随机数。Random 使用的随机算法为 linear congruential pseudorandom number...

mongodb 索引的基本命令

mongodb的索引: 在数据量超大的时候,能够极大的增快查询速率,但是会降低更新效率。建立索引: db.集合.ensureIndex({属性:1}) //1代表升序 -1代表降序 db.集合.ensureIndex({属性1:1,属性2:1}) //联合索引查看文档所有索引: db.集合.getIndexes()删除索引:...

Sqlite多线程相关整理

Sqlite多线程相关整理 Sqlite With MultiThreads 什么是线程安全? 当多个线程访问某个方法时,不管你通过怎样的调用方式、或者说这些线程如何交替地执行,我们在主程序中不需要去做任何的同步,这个类的结果行为都是我们设想的正确行为,那么我们就可以说这个类是线程安全的。 一 来自官方FAQ https://www.sqlite.org/...