Commit df366823 authored by Amos Feng's avatar Amos Feng
Browse files

update to add the wrapDataSource in the XATransactionManager

parent d7f2b185
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.proxy.backend.BackendDataSource;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
@@ -36,6 +37,7 @@ import java.util.Map.Entry;
 * @author zhangliang
 */
@Getter
@Slf4j
public final class JDBCBackendDataSource implements BackendDataSource {
    
    private final Map<String, DataSource> dataSourceMap;
@@ -47,7 +49,12 @@ public final class JDBCBackendDataSource implements BackendDataSource {
    private Map<String, DataSource> createDataSourceMap(final TransactionType transactionType, final Map<String, DataSourceParameter> dataSourceParameters) {
        Map<String, DataSource> result = new LinkedHashMap<>(dataSourceParameters.size());
        for (Entry<String, DataSourceParameter> entry : dataSourceParameters.entrySet()) {
            try {
                result.put(entry.getKey(), getBackendDataSourceFactory(transactionType).build(entry.getKey(), entry.getValue()));
            } catch (Exception e) {
                // we just ignore the data source which can not be build.
                log.warn("can not build " + entry.getKey() + " failed with " + e);
            }
        }
        return result;
    }
+2 −1
Original line number Diff line number Diff line
@@ -33,7 +33,8 @@ public interface JDBCBackendDataSourceFactory {
     *
     * @param dataSourceName data source name
     * @param dataSourceParameter data source connection parameter
     * @throws Exception when the data source can not be build
     * @return data source for connect backend databases
     */
    DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter);
    DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception;
}
+10 −31
Original line number Diff line number Diff line
@@ -17,12 +17,13 @@

package io.shardingsphere.proxy.backend.jdbc.datasource;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.google.common.base.Optional;
import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry;
import io.shardingsphere.transaction.manager.xa.XATransactionManager;

import javax.sql.DataSource;
import java.util.Properties;
import javax.sql.XADataSource;

/**
 * Backend data source factory using {@code AtomikosDataSourceBean} for JDBC and XA protocol.
@@ -33,34 +34,12 @@ import java.util.Properties;
public final class JDBCXABackendDataSourceFactory implements JDBCBackendDataSourceFactory {
    
    @Override
    public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) {
        AtomikosDataSourceBean result = new AtomikosDataSourceBean();
        result.setUniqueResourceName(dataSourceName);
        result.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
        result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize());
        result.setTestQuery("SELECT 1");
        result.setXaProperties(getProperties(dataSourceParameter));
        return result;
    public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) throws Exception {
        String xaDataSourceClassName = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
        XADataSource dataSource = (XADataSource) Class.forName(xaDataSourceClassName).newInstance();
        XATransactionManager xaTransactionManager
                = (XATransactionManager) ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA);
        return xaTransactionManager.wrapDataSource(dataSource, dataSourceName, dataSourceParameter);
    }
    
    private Properties getProperties(final DataSourceParameter dataSourceParameter) {
        Properties result = new Properties();
        result.setProperty("user", dataSourceParameter.getUsername());
        result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or(""));
        result.setProperty("URL", dataSourceParameter.getUrl());
        result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString());
        result.setProperty("autoReconnect", Boolean.TRUE.toString());
        result.setProperty("useServerPrepStmts", Boolean.TRUE.toString());
        result.setProperty("cachePrepStmts", Boolean.TRUE.toString());
        result.setProperty("prepStmtCacheSize", "250");
        result.setProperty("prepStmtCacheSqlLimit", "2048");
        result.setProperty("useLocalSessionState", Boolean.TRUE.toString());
        result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString());
        result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString());
        result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString());
        result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString());
        result.setProperty("maintainTimeStats", Boolean.FALSE.toString());
        result.setProperty("netTimeoutForStreamingResults", "0");
        return result;
    }
}
+14 −0
Original line number Diff line number Diff line
@@ -18,12 +18,26 @@
package io.shardingsphere.transaction.manager.xa;

import io.shardingsphere.transaction.event.xa.XATransactionEvent;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.manager.ShardingTransactionManager;

import javax.sql.DataSource;
import javax.sql.XADataSource;

/**
 * XA transaction manager.
 *
 * @author zhangliang
 */
public interface XATransactionManager extends ShardingTransactionManager<XATransactionEvent> {
    /**
     * Wrap the specific {@link XADataSource} and enroll it with a JTA.
     *
     * @param dataSource the data source to wrap
     * @param dataSourceName the data source name
     * @param dataSourceParameter the data source parameter
     * @throws Exception if can not wrap the data source
     * @return the wrapped data source
     */
    DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception;
}
+38 −0
Original line number Diff line number Diff line
@@ -18,16 +18,22 @@
package io.shardingsphere.transaction.manager.xa.atomikos;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.google.common.base.Optional;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.transaction.event.xa.XATransactionEvent;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.transaction.manager.xa.XATransactionManager;

import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import java.sql.SQLException;
import java.util.Properties;

/**
 * Atomikos XA transaction manager.
@@ -81,4 +87,36 @@ public final class AtomikosTransactionManager implements XATransactionManager {
            throw new SQLException(ex);
        }
    }

    @Override
    public DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) {
        AtomikosDataSourceBean result = new AtomikosDataSourceBean();
        result.setUniqueResourceName(dataSourceName);
        result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize());
        result.setTestQuery("SELECT 1");
        result.setXaProperties(getProperties(dataSourceParameter));
        result.setXaDataSource(dataSource);
        return result;
    }

    private Properties getProperties(final DataSourceParameter dataSourceParameter) {
        Properties result = new Properties();
        result.setProperty("user", dataSourceParameter.getUsername());
        result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or(""));
        result.setProperty("URL", dataSourceParameter.getUrl());
        result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString());
        result.setProperty("autoReconnect", Boolean.TRUE.toString());
        result.setProperty("useServerPrepStmts", Boolean.TRUE.toString());
        result.setProperty("cachePrepStmts", Boolean.TRUE.toString());
        result.setProperty("prepStmtCacheSize", "250");
        result.setProperty("prepStmtCacheSqlLimit", "2048");
        result.setProperty("useLocalSessionState", Boolean.TRUE.toString());
        result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString());
        result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString());
        result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString());
        result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString());
        result.setProperty("maintainTimeStats", Boolean.FALSE.toString());
        result.setProperty("netTimeoutForStreamingResults", "0");
        return result;
    }
}