Unverified Commit db3f7a30 authored by lgcareer's avatar lgcareer Committed by GitHub
Browse files

Merge pull request #72 from lgcareer/dev-20190415

Dev 20190415 add upgrade function
parents 0e4c8651 3a4b152b
Loading
Loading
Loading
Loading
+56 −0
Original line number Diff line number Diff line
@@ -368,5 +368,61 @@ public class FileUtils {
        org.apache.commons.io.FileUtils.forceDelete(new File(filename));
    }

    /**
     * Gets all the parent subdirectories of the parentDir directory
     * @param parentDir
     * @return
     */
    public static File[] getAllDir(String parentDir){
        if(parentDir == null || "".equals(parentDir)) {
            throw new RuntimeException("parentDir can not be empty");
        }

        File file = new File(parentDir);
        if(!file.exists() || !file.isDirectory()) {
            throw new RuntimeException("parentDir not exist, or is not a directory:"+parentDir);
        }

        File[] schemaDirs = file.listFiles(new FileFilter() {

            @Override
            public boolean accept(File pathname) {
                if (pathname.isDirectory()) {
                    return true;
                }
                else {
                    return false;
                }
            }
        });

        return schemaDirs;
    }

    /**
     * Get Content
     * @param inputStream
     * @return
     * @throws IOException
     */
    public static String readFile2Str(InputStream inputStream) throws IOException{
        String all_content=null;
        try {
            all_content = new String();
            InputStream ins = inputStream;
            ByteArrayOutputStream outputstream = new ByteArrayOutputStream();
            byte[] str_b = new byte[1024];
            int i = -1;
            while ((i=ins.read(str_b)) > 0) {
                outputstream.write(str_b,0,i);
            }
            all_content = outputstream.toString();
            return all_content;
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
            throw new RuntimeException(e);
        }
    }


}
+104 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.common.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;

public class MysqlUtil {

	public static final Logger logger = LoggerFactory.getLogger(MysqlUtil.class);

	private static MysqlUtil instance;

	MysqlUtil() {
	}

	public static MysqlUtil getInstance() {
		if (null == instance) {
			syncInit();
		}
		return instance;
	}

	private static synchronized void syncInit() {
		if (instance == null) {
			instance = new MysqlUtil();
		}
	}

	public void release(ResultSet rs, Statement stmt, Connection conn) {
		try {
			if (rs != null) {
				rs.close();
				rs = null;
			}
		} catch (SQLException e) {
			logger.error(e.getMessage(),e);
			throw new RuntimeException(e);
		} finally {
			try {
				if (stmt != null) {
					stmt.close();
					stmt = null;
				}
			} catch (SQLException e) {
				logger.error(e.getMessage(),e);
				throw new RuntimeException(e);
			} finally {
				try {
					if (conn != null) {
						conn.close();
						conn = null;
					}
				} catch (SQLException e) {
					logger.error(e.getMessage(),e);
					throw new RuntimeException(e);
				}
			}
		}
	}

	public static void realeaseResource(ResultSet rs, PreparedStatement ps, Connection conn) {
		MysqlUtil.getInstance().release(rs,ps,conn);
		if (null != rs) {
			try {
				rs.close();
			} catch (SQLException e) {
				logger.error(e.getMessage(),e);
			}
		}

		if (null != ps) {
			try {
				ps.close();
			} catch (SQLException e) {
				logger.error(e.getMessage(),e);
			}
		}

		if (null != conn) {
			try {
				conn.close();
			} catch (SQLException e) {
				logger.error(e.getMessage(),e);
			}
		}
	}
}
+150 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.common.utils;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Metadata related common classes
 *
 */
public class SchemaUtils {
	
	private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class);
	private static Pattern p = Pattern.compile("\\s*|\t|\r|\n");

	/**
	 * 获取所有upgrade目录下的可升级的schema
	 * Gets upgradable schemas for all upgrade directories
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static List<String> getAllSchemaList() {
		List<String> schemaDirList = new ArrayList<>();
		File[] schemaDirArr = FileUtils.getAllDir("sql/upgrade");
		if(schemaDirArr == null || schemaDirArr.length == 0) {
			return null;
		}
		
		for(File file : schemaDirArr) {
			schemaDirList.add(file.getName());
		}
		
		Collections.sort(schemaDirList , new Comparator() {
			@Override
			public int compare(Object o1 , Object o2){
				try {
					String dir1 = String.valueOf(o1);
					String dir2 = String.valueOf(o2);
					String version1 = dir1.split("_")[0];
					String version2 = dir2.split("_")[0];
					if(version1.equals(version2)) {
						return 0;
					}
					
					if(SchemaUtils.isAGreatVersion(version1, version2)) {
						return 1;
					}
					
					return -1;
					
				} catch (Exception e) {
					logger.error(e.getMessage(),e);
					throw new RuntimeException(e);
				}
			}
		});
		
		return schemaDirList;
	}
	
	/**
	 * 判断schemaVersion是否比version版本高
	 * Determine whether schemaVersion is higher than version
	 * @param schemaVersion
	 * @param version
	 * @return
	 */
	public static boolean isAGreatVersion(String schemaVersion, String version) {
		if(StringUtils.isEmpty(schemaVersion) || StringUtils.isEmpty(version)) {
			throw new RuntimeException("schemaVersion or version is empty");
		}
		
		String[] schemaVersionArr = schemaVersion.split("\\.");
		String[] versionArr = version.split("\\.");
		int arrLength = schemaVersionArr.length < versionArr.length ? schemaVersionArr.length : versionArr.length;
		for(int i = 0 ; i < arrLength ; i++) {
			if(Integer.valueOf(schemaVersionArr[i]) > Integer.valueOf(versionArr[i])) {
				return true;
			}else if(Integer.valueOf(schemaVersionArr[i]) < Integer.valueOf(versionArr[i])) {
				return false;
			}
		}
		
		// 说明直到第arrLength-1个元素,两个版本号都一样,此时谁的arrLength大,谁的版本号就大
		// If the version and schema version is the same from 0 up to the arrlength-1 element,whoever has a larger arrLength has a larger version number
		return schemaVersionArr.length > versionArr.length;
	}
	
	/**
	 * Gets the current software version number of the system
	 * @return
	 */
	public static String getSoftVersion() {
		String soft_version;
		try {
			soft_version = FileUtils.readFile2Str(new FileInputStream(new File("sql/soft_version")));
			soft_version = replaceBlank(soft_version);
		} catch (FileNotFoundException e) {
			logger.error(e.getMessage(),e);
			throw new RuntimeException("Failed to get the product version description file. The file could not be found", e);
		} catch (IOException e) {
			logger.error(e.getMessage(),e);
			throw new RuntimeException("Failed to get product version number description file, failed to read the file", e);
		}
		return soft_version;
	}
	
	/**
	 * 去掉字符串中的空格回车换行和制表符
	 * Strips the string of space carriage returns and tabs
	 * @param str
	 * @return
	 */
	public static String replaceBlank(String str) {
		String dest = "";
		if (str!=null) {

			Matcher m = p.matcher(str);
			dest = m.replaceAll("");
		}
		return dest;
	}
}
+317 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.common.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.LineNumberReader;
import java.io.Reader;
import java.sql.*;

/*
 * Slightly modified version of the com.ibatis.common.jdbc.ScriptRunner class
 * from the iBATIS Apache project. Only removed dependency on Resource class
 * and a constructor
 */
/*
 *  Copyright 2004 Clinton Begin
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

/**
 * Tool to run database scripts
 */
public class ScriptRunner {

	public static final Logger logger = LoggerFactory.getLogger(ScriptRunner.class);

	private static final String DEFAULT_DELIMITER = ";";

	private Connection connection;

	private boolean stopOnError;
	private boolean autoCommit;

	private String delimiter = DEFAULT_DELIMITER;
	private boolean fullLineDelimiter = false;

	/**
	 * Default constructor
	 */
	public ScriptRunner(Connection connection, boolean autoCommit, boolean stopOnError) {
		this.connection = connection;
		this.autoCommit = autoCommit;
		this.stopOnError = stopOnError;
	}

	public static void main(String[] args) {
		String dbName = "db_mmu";
		String appKey = dbName.substring(dbName.lastIndexOf("_")+1, dbName.length());
		System.out.println(appKey);
	}

	public void setDelimiter(String delimiter, boolean fullLineDelimiter) {
		this.delimiter = delimiter;
		this.fullLineDelimiter = fullLineDelimiter;
	}

	/**
	 * Runs an SQL script (read in using the Reader parameter)
	 *
	 * @param reader
	 *            - the source of the script
	 */
	public void runScript(Reader reader) throws IOException, SQLException {
		try {
			boolean originalAutoCommit = connection.getAutoCommit();
			try {
				if (originalAutoCommit != this.autoCommit) {
					connection.setAutoCommit(this.autoCommit);
				}
				runScript(connection, reader);
			} finally {
				connection.setAutoCommit(originalAutoCommit);
			}
		} catch (IOException e) {
			throw e;
		} catch (SQLException e) {
			throw e;
		} catch (Exception e) {
			throw new RuntimeException("Error running script.  Cause: " + e, e);
		}
	}

	public void runScript(Reader reader, String dbName) throws IOException, SQLException {
		try {
			boolean originalAutoCommit = connection.getAutoCommit();
			try {
				if (originalAutoCommit != this.autoCommit) {
					connection.setAutoCommit(this.autoCommit);
				}
				runScript(connection, reader, dbName);
			} finally {
				connection.setAutoCommit(originalAutoCommit);
			}
		} catch (IOException e) {
			throw e;
		} catch (SQLException e) {
			throw e;
		} catch (Exception e) {
			throw new RuntimeException("Error running script.  Cause: " + e, e);
		}
	}

	/**
	 * Runs an SQL script (read in using the Reader parameter) using the connection
	 * passed in
	 *
	 * @param conn
	 *            - the connection to use for the script
	 * @param reader
	 *            - the source of the script
	 * @throws SQLException
	 *             if any SQL errors occur
	 * @throws IOException
	 *             if there is an error reading from the Reader
	 */
	private void runScript(Connection conn, Reader reader) throws IOException, SQLException {
		StringBuffer command = null;
		try {
			LineNumberReader lineReader = new LineNumberReader(reader);
			String line = null;
			while ((line = lineReader.readLine()) != null) {
				if (command == null) {
					command = new StringBuffer();
				}
				String trimmedLine = line.trim();
				if (trimmedLine.startsWith("--")) {
					logger.info(trimmedLine);
				} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
					// Do nothing
				} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) {
					// Do nothing

				} else if (trimmedLine.startsWith("delimiter")) {
					String newDelimiter = trimmedLine.split(" ")[1];
					this.setDelimiter(newDelimiter, fullLineDelimiter);

				} else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
						|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
					command.append(line.substring(0, line.lastIndexOf(getDelimiter())));
					command.append(" ");
					Statement statement = conn.createStatement();

					// logger.info(command.toString());

					boolean hasResults = false;
					logger.info("sql:"+command.toString());
					if (stopOnError) {
						hasResults = statement.execute(command.toString());
					} else {
						try {
							statement.execute(command.toString());
						} catch (SQLException e) {
							logger.error(e.getMessage(),e);
							throw e;
						}
					}

					ResultSet rs = statement.getResultSet();
					if (hasResults && rs != null) {
						ResultSetMetaData md = rs.getMetaData();
						int cols = md.getColumnCount();
						for (int i = 0; i < cols; i++) {
							String name = md.getColumnLabel(i);
							logger.info(name + "\t");
						}
						logger.info("");
						while (rs.next()) {
							for (int i = 0; i < cols; i++) {
								String value = rs.getString(i);
								logger.info(value + "\t");
							}
							logger.info("");
						}
					}

					command = null;
					try {
						statement.close();
					} catch (Exception e) {
						// Ignore to workaround a bug in Jakarta DBCP
					}
					Thread.yield();
				} else {
					command.append(line);
					command.append(" ");
				}
			}

		} catch (SQLException e) {
			logger.error("Error executing: " + command.toString());
			throw e;
		} catch (IOException e) {
			e.fillInStackTrace();
			logger.error("Error executing: " + command.toString());
			throw e;
		}
	}

	private void runScript(Connection conn, Reader reader , String dbName) throws IOException, SQLException {
		StringBuffer command = null;
		String sql = "";
		String appKey = dbName.substring(dbName.lastIndexOf("_")+1, dbName.length());
		try {
			LineNumberReader lineReader = new LineNumberReader(reader);
			String line = null;
			while ((line = lineReader.readLine()) != null) {
				if (command == null) {
					command = new StringBuffer();
				}
				String trimmedLine = line.trim();
				if (trimmedLine.startsWith("--")) {
					logger.info(trimmedLine);
				} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
					// Do nothing
				} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) {
					// Do nothing

				} else if (trimmedLine.startsWith("delimiter")) {
					String newDelimiter = trimmedLine.split(" ")[1];
					this.setDelimiter(newDelimiter, fullLineDelimiter);

				} else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
						|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
					command.append(line.substring(0, line.lastIndexOf(getDelimiter())));
					command.append(" ");
					Statement statement = conn.createStatement();

					// logger.info(command.toString());

					sql = command.toString().replaceAll("\\{\\{APPDB\\}\\}", dbName);
					boolean hasResults = false;
					logger.info("sql:"+sql);
					if (stopOnError) {
						hasResults = statement.execute(sql);
					} else {
						try {
							statement.execute(sql);
						} catch (SQLException e) {
							logger.error(e.getMessage(),e);
							throw e;
						}
					}

					ResultSet rs = statement.getResultSet();
					if (hasResults && rs != null) {
						ResultSetMetaData md = rs.getMetaData();
						int cols = md.getColumnCount();
						for (int i = 0; i < cols; i++) {
							String name = md.getColumnLabel(i);
							logger.info(name + "\t");
						}
						logger.info("");
						while (rs.next()) {
							for (int i = 0; i < cols; i++) {
								String value = rs.getString(i);
								logger.info(value + "\t");
							}
							logger.info("");
						}
					}

					command = null;
					try {
						statement.close();
					} catch (Exception e) {
						// Ignore to workaround a bug in Jakarta DBCP
					}
					Thread.yield();
				} else {
					command.append(line);
					command.append(" ");
				}
			}

		} catch (SQLException e) {
			logger.error("Error executing: " + sql);
			throw e;
		} catch (IOException e) {
			e.fillInStackTrace();
			logger.error("Error executing: " + sql);
			throw e;
		}
	}

	private String getDelimiter() {
		return delimiter;
	}
	
}
 No newline at end of file
+82 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.dao.upgrade;

import cn.escheduler.common.utils.SchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * upgrade manager
 */
public class EschedulerManager {
    private static final Logger logger = LoggerFactory.getLogger(EschedulerManager.class);
    UpgradeDao upgradeDao = UpgradeDao.getInstance();

    public void initEscheduler() {
        this.initEschedulerSchema();
    }

    public void initEschedulerSchema() {

        logger.info("Start initializing the ark manager mysql table structure");
        upgradeDao.initEschedulerSchema();
    }


    /**
     * upgrade escheduler
     */
    public void upgradeEscheduler() throws Exception{

        // Gets a list of all upgrades
        List<String> schemaList = SchemaUtils.getAllSchemaList();
        if(schemaList == null || schemaList.size() == 0) {
            logger.info("There is no schema to upgrade!");
        }else {

            String version = "";
            // The target version of the upgrade
            String schemaVersion = "";
            for(String schemaDir : schemaList) {
                // Gets the version of the current system
                if (upgradeDao.isExistsTable("t_escheduler_version")) {
                    version = upgradeDao.getCurrentVersion();
                }else {
                    version = "1.0.0";
                }

                schemaVersion = schemaDir.split("_")[0];
                if(SchemaUtils.isAGreatVersion(schemaVersion , version)) {

                    logger.info("upgrade escheduler metadata version from " + version + " to " + schemaVersion);


                    logger.info("Begin upgrading escheduler's mysql table structure");
                    upgradeDao.upgradeEscheduler(schemaDir);

                }

            }
        }

        // Assign the value of the version field in the version table to the version of the product
        upgradeDao.updateVersion(SchemaUtils.getSoftVersion());
    }
}
Loading