Mongodb到mysql数据库的数据迁移(Java,Windows)

摘要:
mysqlimport-h<hostname>-P<port>-u<username>-P<password>--local<databasename><importfile>-c<columns>--以=结尾的字段,--用=括起来的字段“--以=--ignore lines=1-h hostname-P端口u用户名-P密码结尾的行,密码字符串和-P之间没有空格--local使用本地文件导入数据库名数据库名importfile导入文件路径,文件名被视为表名。在下面的示例Student-c列中,文件中的字段用逗号分隔--以逗号结尾的字段by=string将字符串设置为字段之间的分隔符分隔符可以是单个字符或多个字符--忽略行=n表示可以忽略前n行。

运行环境为windows

测试过260万的数据表,迁移大概要10分钟左右,当然肯定和网络,字段大小什么的有关系。

遇到的坑和注意点都用紫色标记了

PS:第一次写这么长的东西

一、Mongodb导出命令mongoexport

本地安装Mongodb,在安装目录的/bin下按住shift并右键“在此处打开命令窗口”,可执行以下语句进行导出。

mongoexport -h <ip:port> -d <database> -c <collection> -u <username> -p <password> --type <json/csv> -f <fileds> -o <outputfile> --limit %d --skip %d --noHeaderLine

  -h  host,主机ip+port

  -d  database,数据库名

  -c  collection,集合名(表名)

  -u  username,用户名

  -p  password,密码

  --type 导出类型 json/csv

  -f  当type为csv时必选,导出字段名,逗号分隔

  -o  outputfile,输出文件名

  -q  query,查询参数,为json字符串

  --sort 排序参数,为json字符串

  --limit 返回结果数,和skip分页时使用

  --skip 跳过的记录数

  --noHeaderLine 导出文件不包含首行字段名

示例:

mongoexport -h 10.10.10.10:27027 -d test -c Student -u mydb -p mydb --type csv -f "_id,stuno,stuname,age,sex" -o D:/Student.csv --limit 1000 --skip 0 -q {'stuno':'stu_11123'} --sort {age:1} --noHeaderLine

二、MySQL导入命令mysqlimport

本地安装MySQL,在安装目录的/bin下按住shift并右键“在此处打开命令窗口”,可执行以下语句进行导入。

mysqlimport -h <hostname> -P <port> -u <username> -p<password> --local <databasename> <importfile> -c <colums> --fields-terminated-by=, --fields-enclosed-by=" --lines-terminated-by=
 --ignore-lines=1

-h  hostname
-P  port
-u  username
-p   password, 密码字符串和-p之间没有空格
--local   使用本地的文件导入
databasename  数据库名
importfile  导入文件路径,文件名被认为是表名,如下例中的Student
-c  colums,文件中字段分割顺序,用逗号分割
--fields-terminated-by=字符串  设置字符串为字段之间的分隔符,可以为单个或多个字符。默认值为制表符“ ”。
--fields-enclosed-by=字符  设置字符来括住字段的值,只能为单个字符。
--fields-optionally-enclosed-by=字符  设置字符括住CHAR、VARCHAR和TEXT等字符型字段,只能为单个字符。
--fields-escaped-by=字符  设置转义字符,默认值为反斜线“”。
--lines-terminated-by=字符串  设置每行数据结尾的字符,可以为单个或多个字符,默认值为“ ”。
--ignore-lines=n  表示可以忽略前n行。可以用来跳过首行的字段名。

示例:

mysqlimport -h 10.10.10.10 -P 3306 -u dbtest -pdbtest --local mydb D:/Student.txt -c  "id,stuno,stuname,age,sex" --fields-terminated-by=, --fields-enclosed-by=" --lines-terminated-by=

三、Java运行cmd命令工具类

package util;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;

/**
 * 
 * Cmd命令執行工具
 *
 * @author  2018.03.14
 */
public class CmdUtil {
    private final static CmdUtil instance = new CmdUtil();

    private final static Runtime cmd = Runtime.getRuntime();
    
    public static CmdUtil getInstance() {
        return instance;
    }
    
    private CmdUtil() {
        
    }
    
    /**
     * 
     * 用于执行cmd命令并返回执行结果
     *
     * @param commondStr 命令字符串
     * @param dir 执行目录
     * @return 执行结果
     */
    public String exec(String commondStr, File dir) {
        //System.out.println(">" + commondStr);
        StringBuilder sb = new StringBuilder();
        try {
            // "/c"代表程序执行有参数,如果不加上就直接运行了cmd.exe; dir是程序执行目录
            Process process = cmd.exec("cmd.exe /c" + commondStr, null, dir);
            //接收执行结果字符串
            String temp = "";
            BufferedReader errBr = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            while (StringUtils.isNotEmpty(temp = errBr.readLine())) {
                sb.append(temp).append("
");
            }
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            while (StringUtils.isNotEmpty(temp = br.readLine())) {
                sb.append(temp).append("
");
            }
            br.close();
            errBr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } 
        //System.out.println(sb.toString());
        return sb.toString();
    }
}

四、文本文件处理工具类

在导入mysql数据库前,需要对文件进行一些操作,这里提供了对文件每一行字符进行处理并生成指定文件的类

package util;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;

/**
 * 
 * 文本操作工具
 *
 * @author  2018.03.14
 */
public class TextFileUtil {
    private static final TextFileUtil instance = new TextFileUtil();

    private TextFileUtil() {

    }

    public static TextFileUtil getInstance() {
        return instance;
    }

    /**
     * 复制文件并处理每一行的字符串
     *
     * @param srcFile 源文件
     * @param target 目标文件
     * @param opertor 每行的字符串处理类
     */
    public void transferLine(String srcFile, String target, OpertorInter opertor) {
        if (srcFile.equals(target)) {
            System.out.println("Warning : src file is same to target file");
            return;
        }
        FileReader fr = null;
        BufferedReader br = null;

        FileWriter fw = null;
        BufferedWriter bw = null;

        String temp = null;
        try {
            fr = new FileReader(srcFile);
            br = new BufferedReader(fr);

            fw = new FileWriter(target);
            bw = new BufferedWriter(fw);

            while (null != (temp = br.readLine())) {
                bw.write(opertor.transferLine(temp));
                bw.newLine();
            }

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != br && null != fr) {
                try {
                    br.close();
                    fr.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    br = null;
                    fr = null;
                }
            }
            if (null != bw && null != fw) {
                try {
                    bw.close();
                    fw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    bw = null;
                    fw = null;
                }
            }
        }

    }

}

其中OpertorInter接口方法transferLine(str)提供了对每行数据的处理操作,这里是接口和其实现类:

package util;

public interface OpertorInter {
    String transferLine(String before);
}
package util;

/**
 * 对每条记录进行处理
 *
 * @author  2018.03.14
 */
public class ObjectIdOpertor implements OpertorInter {

    public String transferLine(String before) {
        // 将空值设置为N,否则在mysqlimport时会将空值的日期字段设置为0000-00-00 00:00:00
        //首先是行尾的空值
        before = before.replaceFirst(",$", ",\\N");
        //再循环替换所有空值
        String temp = "";
        while (!temp.equals(before)) {
            temp = before;
            before = temp.replaceFirst(",,", ",\\N,");
        }
        
        //mongodb导出文件中_id字段转换为字符串,只有ObjectId类型时才需要转换
        if (before.startsWith("ObjectId")) {
            return before.substring(9, 33) + before.substring(34);
        }
        return before;
    }
}

五、Main方法

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import util.CmdUtil;
import util.ObjectIdOpertor;
import util.OpertorInter;
import util.TextFileUtil;

/**
 * 用于将mongodb中的表数据迁移到mysql数据库中
 * @author  2018.03.14
 */
public class DBTransferTest {
    public static void main(String[] args) {
        // mongodb中集合字段,导出的文件以此为准
        Map<String, String> fileds = new HashMap<String, String>();
        fileds.put("Student", "_id,stuno,stuname,age,sex");

        // mongo数据库信息
        String host = "10.10.10.10:27027";
        String database = "test";
        String userName = "mydb";
        String password = "mydb";
        // mysql数据库信息
        String hostForMysql = "10.10.10.10";
        int portForMysql = 3306;
        String userForMysql = "dbtest";
        String passForMysql = "dbtest";
        String databaseForMysql = "mydb";
        
        // cmd命令運行
        // mongodb运行目录
        File dir = new File("D:/Program Files/MongoDB/Server/3.6/bin");
        // mysql运行目录
        File mySqlDir = new File("D:/Program Files/mysql-5.6.39-winx64/bin");
        // mongodb导出命令 mongoexport -h <ip:port> -d <database> -c <collection> -u <username> -p <password> --type <json/csv> -f <fileds> -o <outputfile> --limit %d --skip %d
        // --noHeaderLine 不输出列名
        String exportSrcCmd = "mongoexport -h %s -d %s -c %s -u %s -p %s --type csv -f %s -o %s --limit %d --skip %d --noHeaderLine";
        // mysql导入命令 
     // mysqlimport -h <ip> -P <port> -u <username> -p<password> --local <database> <inputfile> -c <columes> --fields-terminated-by=\, --fields-enclosed-by=\" --lines-terminated-by=\n --ignore-lines=1
// --ignore-lines=1 忽略首行的列名,上面导出时已忽略,这里就不再跳过第一行了 String importSrcCmd = "mysqlimport -h %s -P %d -u %s -p%s --local %s %s -c %s --fields-terminated-by=, --fields-enclosed-by=\" --lines-terminated-by=\r\n"; // 匹配命令执行结果中的导入导出数 String regForExport = "(\d+) record"; String regForImport = "Records: (\d+)"; // 分页导出的每页数据量大小 int limit = 10000; // 分页参数 int skip = 0; // 由于mongodb导出的文件中_id字段 OpertorInter opertor = new ObjectIdOpertor(); long st = System.currentTimeMillis(); for (Map.Entry<String, String> en : fileds.entrySet()) { // 表名 String collection = en.getKey(); // 获取两数据库表的字段名 当前只有 _id --> id 不同 String filed = fileds.get(collection); String filedForMysql = filed.substring(1); // mongodb导出文件名 String output = "D:/" + collection + ".csv"; // mysql要导入的文件名,此处文件名决定了mysqlimport要导入的表名 String importFile = "D:/" + collection + ".txt"; long startTime = System.currentTimeMillis(); System.out.println("***********" + collection + "***********"); for (int pageNo = 0;; pageNo++) { long pageSt = System.currentTimeMillis(); // pageNo这里从0开始 skip = pageNo * limit; // mongoexport System.out.print("----" + (skip + 1) + "~" + (skip + limit) + ":[Exporting:"); String exportCmd = String.format(exportSrcCmd, host, database, collection, userName, password, filed, output, limit, skip); int exportNum = parseOperateNum(CmdUtil.getInstance().exec(exportCmd, dir), regForExport); System.out.print(exportNum); // 处理文件将_id字段的Object()去除,只保留string类型的id TextFileUtil.getInstance().transferLine(output, importFile, opertor); // mysqlimport System.out.print("][Importing:"); String importCmd = String.format(importSrcCmd, hostForMysql, portForMysql, userForMysql, passForMysql, databaseForMysql, importFile, filedForMysql); String reString = CmdUtil.getInstance().exec(importCmd, mySqlDir); int importNum = parseOperateNum(reString, regForImport); System.out.print(importNum + "]"); long pageEd = System.currentTimeMillis(); System.out.println("[Cost time:" + (pageEd-pageSt) + "ms]"); if (exportNum != importNum) { System.out.println("Error and Break: importNum is not same to exportNum--" + reString); break; } // 如果导出数量为0或者导出数量还不到一页,则没有下一页 if (limit > exportNum || 0 == exportNum) { System.out.println("----No records, export end."); break; } } long endTime = System.currentTimeMillis(); System.out.println("Cost time : " + (endTime - startTime) + "ms "); } long et = System.currentTimeMillis(); System.out.println("***********TOTAL COST TIME : " + (et - st) / 60000f + "min***********"); } /** * * 获取匹配到的字符并转为int * 这里用于获取cmd处理结果中的导入导出记录数,以判断是否最后一页数据 * * @param re * @param reg * @return */ public static int parseOperateNum(String re, String reg) { Pattern p = Pattern.compile(reg); Matcher m = p.matcher(re); if (m.find()) { return Integer.parseInt(m.group(1)); } return 0; } }

免责声明:文章转载自《Mongodb到mysql数据库的数据迁移(Java,Windows)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇oracle的定时任务linux下redis4.0.2集群部署(利用Ruby脚本命令)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

mysql创建触发器

首先,我们来了解一下什么是触发器,触发器,就是在对一张表数据进行增(insert),删(delete),改(update)的时候,为了保持数据的一致性,对别的表也要进行相应的数据修改。 我们都知道mysql最后事务提交后,数据是会保存到磁盘上的,那么每次在insert,delete,update时候旧数据和新数据,会在内存中生成临时的行数据,分别叫old和...

Mysql系列(三)—— Mysql主从复制配置

一.前言 主从复制是Mysql知识体系中非常重的要一个模块。学习主从复制和后续的读写分离是完善只是知识体系的重要环节。且主从复制读写分离的思想并不仅仅局限于Mysql,在很多存储系统中都有该方案,如:redis。 从应用的角度思考,主从复制有如下优点: 可以备份数据,容灾 可以做读写分离,分担单机Mysql节点的压力。master只做write,slav...

centos lamp/lnmp阶段复习 以后搬迁discuz论坛不需要重新安装,只需修改配置文件即可 安装wordpress 安装phpmyadmin 定时备份mysql两种方法 第二十五节课

centos  lamp/lnmp阶段复习 以后搬迁discuz论坛不需要重新安装,只需修改配置文件即可 安装wordpress  安装phpmyadmin  定时备份mysql两种方法  第二十五节课 wordpress下载地址:https://cn.wordpress.org/ 架构:nginx代理 +LAMP 修改Windows的hosts文件 ap...

java连接数据库时的报错

//java连接数据库时的报错1 package Java数据库编程; 2 3 import java.sql.DriverManager; 4 import java.sql.SQLException; 5 6 import com.mysql.jdbc.Connection; 7 8 public class ConnectionD...

MySQL Cluster

参考:http://www.cnblogs.com/linkstar/p/6510713.html 如果没有特殊声明,则所有命令都在node1上执行 192.168.1.201 node1  1核/448M  管理节点192.168.1.202 node2  1核/448M  数据节点和mysql节点在同一台机器上192.168.1.203 node3  ...

MongoDB数据创建与使用

MongoDB数据创建与使用 创建数据库代码功能:读取本地文本文件,并保存到数据库中 import pymongo #连接mongo数据库 client = pymongo.MongoClient('localhost',27017) #创建数据库 walden = client['walden'] #创建表 sheet_tab = walden['s...