本文共 8251 字,大约阅读时间需要 27 分钟。
这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。
public class UndoExecutorFactory { public static AbstractUndoExecutor getUndoExecutor( String dbType, SQLUndoLog sqlUndoLog) { if (!dbType.equals(JdbcConstants.MYSQL)) { throw new NotSupportYetException(dbType); } switch (sqlUndoLog.getSqlType()) { case INSERT: return new MySQLUndoInsertExecutor(sqlUndoLog); case UPDATE: return new MySQLUndoUpdateExecutor(sqlUndoLog); case DELETE: return new MySQLUndoDeleteExecutor(sqlUndoLog); default: throw new ShouldNeverHappenException(); } }}
说明:
说明:
public abstract class AbstractUndoExecutor { protected SQLUndoLog sqlUndoLog; protected abstract String buildUndoSQL(); public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) { this.sqlUndoLog = sqlUndoLog; } public void executeOn(Connection conn) throws SQLException { dataValidation(conn); try { // 拼接undoSql的模板 String undoSQL = buildUndoSQL(); // 获取PreparedStatement对象 PreparedStatement undoPST = conn.prepareStatement(undoSQL); // 获取回滚的记录 TableRecords undoRows = getUndoRows(); // 遍历所有待回滚的记录然后一条条的拼接字段 for (Row undoRow : undoRows.getRows()) { ArrayListundoValues = new ArrayList<>(); Field pkValue = null; for (Field field : undoRow.getFields()) { if (field.getKeyType() == KeyType.PrimaryKey) { pkValue = field; } else { undoValues.add(field); } } // 针对每一条回滚记录进行准备 undoPrepare(undoPST, undoValues, pkValue); // 执行回滚操作 undoPST.executeUpdate(); } } catch (Exception ex) { if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } } protected void undoPrepare(PreparedStatement undoPST, ArrayList undoValues, Field pkValue) throws SQLException { int undoIndex = 0; for (Field undoValue : undoValues) { undoIndex++; undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType()); } // PK is at last one. // INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?) // UPDATE a SET x=?, y=?, z=? WHERE pk = ? // DELETE FROM a WHERE pk = ? undoIndex++; undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType()); } protected abstract TableRecords getUndoRows(); protected void dataValidation(Connection conn) throws SQLException { // Validate if data is dirty. }}
说明:
public class MySQLUndoInsertExecutor extends AbstractUndoExecutor { @Override protected String buildUndoSQL() { TableRecords afterImage = sqlUndoLog.getAfterImage(); ListafterImageRows = afterImage.getRows(); if (afterImageRows == null || afterImageRows.size() == 0) { throw new ShouldNeverHappenException("Invalid UNDO LOG"); } Row row = afterImageRows.get(0); StringBuffer mainSQL = new StringBuffer( "DELETE FROM " + sqlUndoLog.getTableName()); StringBuffer where = new StringBuffer(" WHERE "); boolean first = true; for (Field field : row.getFields()) { if (field.getKeyType() == KeyType.PrimaryKey) { where.append(field.getName() + " = ? "); } } return mainSQL.append(where).toString(); } @Override protected void undoPrepare(PreparedStatement undoPST, ArrayList
undoValues, Field pkValue) throws SQLException { undoPST.setObject(1, pkValue.getValue(), pkValue.getType()); } public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) { super(sqlUndoLog); } @Override protected TableRecords getUndoRows() { return sqlUndoLog.getAfterImage(); }}
说明:
public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor { public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) { super(sqlUndoLog); } @Override protected String buildUndoSQL() { TableRecords beforeImage = sqlUndoLog.getBeforeImage(); ListbeforeImageRows = beforeImage.getRows(); if (beforeImageRows == null || beforeImageRows.size() == 0) { throw new ShouldNeverHappenException("Invalid UNDO LOG"); } Row row = beforeImageRows.get(0); StringBuffer insertColumns = new StringBuffer(); StringBuffer insertValues = new StringBuffer(); Field pkField = null; boolean first = true; for (Field field : row.getFields()) { if (field.getKeyType() == KeyType.PrimaryKey) { pkField = field; continue; } else { if (first) { first = false; } else { insertColumns.append(", "); insertValues.append(", "); } insertColumns.append(field.getName()); insertValues.append("?"); } } if (first) { first = false; } else { insertColumns.append(", "); insertValues.append(", "); } insertColumns.append(pkField.getName()); insertValues.append("?"); return "INSERT INTO " + sqlUndoLog.getTableName() + "(" + insertColumns.toString() + ") VALUES (" + insertValues.toString() + ")"; } @Override protected TableRecords getUndoRows() { return sqlUndoLog.getBeforeImage(); }}
说明:
public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor { @Override protected String buildUndoSQL() { TableRecords beforeImage = sqlUndoLog.getBeforeImage(); ListbeforeImageRows = beforeImage.getRows(); if (beforeImageRows == null || beforeImageRows.size() == 0) { throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO } Row row = beforeImageRows.get(0); StringBuffer mainSQL = new StringBuffer( "UPDATE " + sqlUndoLog.getTableName() + " SET "); StringBuffer where = new StringBuffer(" WHERE "); boolean first = true; for (Field field : row.getFields()) { if (field.getKeyType() == KeyType.PrimaryKey) { where.append(field.getName() + " = ?"); } else { if (first) { first = false; } else { mainSQL.append(", "); } mainSQL.append(field.getName() + " = ?"); } } return mainSQL.append(where).toString(); } public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) { super(sqlUndoLog); } @Override protected TableRecords getUndoRows() { return sqlUndoLog.getBeforeImage(); }}
说明:
转载地址:http://xndjx.baihongyu.com/