Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java +8 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.proxy.backend.BackendDataSource; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import javax.sql.DataSource; import java.sql.Connection; Loading @@ -36,6 +37,7 @@ import java.util.Map.Entry; * @author zhangliang */ @Getter @Slf4j public final class JDBCBackendDataSource implements BackendDataSource { private final Map<String, DataSource> dataSourceMap; Loading @@ -47,7 +49,12 @@ public final class JDBCBackendDataSource implements BackendDataSource { private Map<String, DataSource> createDataSourceMap(final TransactionType transactionType, final Map<String, DataSourceParameter> dataSourceParameters) { Map<String, DataSource> result = new LinkedHashMap<>(dataSourceParameters.size()); for (Entry<String, DataSourceParameter> entry : dataSourceParameters.entrySet()) { try { result.put(entry.getKey(), getBackendDataSourceFactory(transactionType).build(entry.getKey(), entry.getValue())); } catch (Exception e) { // we just ignore the data source which can not be build. log.warn("can not build " + entry.getKey() + " failed with " + e); } } return result; } Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java +2 −1 Original line number Diff line number Diff line Loading @@ -33,7 +33,8 @@ public interface JDBCBackendDataSourceFactory { * * @param dataSourceName data source name * @param dataSourceParameter data source connection parameter * @throws Exception when the data source can not be build * @return data source for connect backend databases */ DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter); DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java +10 −31 Original line number Diff line number Diff line Loading @@ -17,12 +17,13 @@ package io.shardingsphere.proxy.backend.jdbc.datasource; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.google.common.base.Optional; import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; import io.shardingsphere.transaction.manager.xa.XATransactionManager; import javax.sql.DataSource; import java.util.Properties; import javax.sql.XADataSource; /** * Backend data source factory using {@code AtomikosDataSourceBean} for JDBC and XA protocol. Loading @@ -33,34 +34,12 @@ import java.util.Properties; public final class JDBCXABackendDataSourceFactory implements JDBCBackendDataSourceFactory { @Override public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) { AtomikosDataSourceBean result = new AtomikosDataSourceBean(); result.setUniqueResourceName(dataSourceName); result.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"); result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); result.setTestQuery("SELECT 1"); result.setXaProperties(getProperties(dataSourceParameter)); return result; public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) throws Exception { String xaDataSourceClassName = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"; XADataSource dataSource = (XADataSource) Class.forName(xaDataSourceClassName).newInstance(); XATransactionManager xaTransactionManager = (XATransactionManager) ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA); return xaTransactionManager.wrapDataSource(dataSource, dataSourceName, dataSourceParameter); } private Properties getProperties(final DataSourceParameter dataSourceParameter) { Properties result = new Properties(); result.setProperty("user", dataSourceParameter.getUsername()); result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); result.setProperty("URL", dataSourceParameter.getUrl()); result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); result.setProperty("autoReconnect", Boolean.TRUE.toString()); result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); result.setProperty("prepStmtCacheSize", "250"); result.setProperty("prepStmtCacheSqlLimit", "2048"); result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); result.setProperty("netTimeoutForStreamingResults", "0"); return result; } } sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManager.java +14 −0 Original line number Diff line number Diff line Loading @@ -18,12 +18,26 @@ package io.shardingsphere.transaction.manager.xa; import io.shardingsphere.transaction.event.xa.XATransactionEvent; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.ShardingTransactionManager; import javax.sql.DataSource; import javax.sql.XADataSource; /** * XA transaction manager. * * @author zhangliang */ public interface XATransactionManager extends ShardingTransactionManager<XATransactionEvent> { /** * Wrap the specific {@link XADataSource} and enroll it with a JTA. * * @param dataSource the data source to wrap * @param dataSourceName the data source name * @param dataSourceParameter the data source parameter * @throws Exception if can not wrap the data source * @return the wrapped data source */ DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; } sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/atomikos/AtomikosTransactionManager.java +38 −0 Original line number Diff line number Diff line Loading @@ -18,16 +18,22 @@ package io.shardingsphere.transaction.manager.xa.atomikos; import com.atomikos.icatch.jta.UserTransactionManager; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.google.common.base.Optional; import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.transaction.event.xa.XATransactionEvent; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.xa.XATransactionManager; import javax.sql.DataSource; import javax.sql.XADataSource; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import java.sql.SQLException; import java.util.Properties; /** * Atomikos XA transaction manager. Loading Loading @@ -81,4 +87,36 @@ public final class AtomikosTransactionManager implements XATransactionManager { throw new SQLException(ex); } } @Override public DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) { AtomikosDataSourceBean result = new AtomikosDataSourceBean(); result.setUniqueResourceName(dataSourceName); result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); result.setTestQuery("SELECT 1"); result.setXaProperties(getProperties(dataSourceParameter)); result.setXaDataSource(dataSource); return result; } private Properties getProperties(final DataSourceParameter dataSourceParameter) { Properties result = new Properties(); result.setProperty("user", dataSourceParameter.getUsername()); result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); result.setProperty("URL", dataSourceParameter.getUrl()); result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); result.setProperty("autoReconnect", Boolean.TRUE.toString()); result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); result.setProperty("prepStmtCacheSize", "250"); result.setProperty("prepStmtCacheSqlLimit", "2048"); result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); result.setProperty("netTimeoutForStreamingResults", "0"); return result; } } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSource.java +8 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.proxy.backend.BackendDataSource; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import javax.sql.DataSource; import java.sql.Connection; Loading @@ -36,6 +37,7 @@ import java.util.Map.Entry; * @author zhangliang */ @Getter @Slf4j public final class JDBCBackendDataSource implements BackendDataSource { private final Map<String, DataSource> dataSourceMap; Loading @@ -47,7 +49,12 @@ public final class JDBCBackendDataSource implements BackendDataSource { private Map<String, DataSource> createDataSourceMap(final TransactionType transactionType, final Map<String, DataSourceParameter> dataSourceParameters) { Map<String, DataSource> result = new LinkedHashMap<>(dataSourceParameters.size()); for (Entry<String, DataSourceParameter> entry : dataSourceParameters.entrySet()) { try { result.put(entry.getKey(), getBackendDataSourceFactory(transactionType).build(entry.getKey(), entry.getValue())); } catch (Exception e) { // we just ignore the data source which can not be build. log.warn("can not build " + entry.getKey() + " failed with " + e); } } return result; } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCBackendDataSourceFactory.java +2 −1 Original line number Diff line number Diff line Loading @@ -33,7 +33,8 @@ public interface JDBCBackendDataSourceFactory { * * @param dataSourceName data source name * @param dataSourceParameter data source connection parameter * @throws Exception when the data source can not be build * @return data source for connect backend databases */ DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter); DataSource build(String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/datasource/JDBCXABackendDataSourceFactory.java +10 −31 Original line number Diff line number Diff line Loading @@ -17,12 +17,13 @@ package io.shardingsphere.proxy.backend.jdbc.datasource; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.google.common.base.Optional; import io.shardingsphere.core.constant.transaction.TransactionType; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.ShardingTransactionManagerRegistry; import io.shardingsphere.transaction.manager.xa.XATransactionManager; import javax.sql.DataSource; import java.util.Properties; import javax.sql.XADataSource; /** * Backend data source factory using {@code AtomikosDataSourceBean} for JDBC and XA protocol. Loading @@ -33,34 +34,12 @@ import java.util.Properties; public final class JDBCXABackendDataSourceFactory implements JDBCBackendDataSourceFactory { @Override public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) { AtomikosDataSourceBean result = new AtomikosDataSourceBean(); result.setUniqueResourceName(dataSourceName); result.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"); result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); result.setTestQuery("SELECT 1"); result.setXaProperties(getProperties(dataSourceParameter)); return result; public DataSource build(final String dataSourceName, final DataSourceParameter dataSourceParameter) throws Exception { String xaDataSourceClassName = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"; XADataSource dataSource = (XADataSource) Class.forName(xaDataSourceClassName).newInstance(); XATransactionManager xaTransactionManager = (XATransactionManager) ShardingTransactionManagerRegistry.getInstance().getShardingTransactionManager(TransactionType.XA); return xaTransactionManager.wrapDataSource(dataSource, dataSourceName, dataSourceParameter); } private Properties getProperties(final DataSourceParameter dataSourceParameter) { Properties result = new Properties(); result.setProperty("user", dataSourceParameter.getUsername()); result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); result.setProperty("URL", dataSourceParameter.getUrl()); result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); result.setProperty("autoReconnect", Boolean.TRUE.toString()); result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); result.setProperty("prepStmtCacheSize", "250"); result.setProperty("prepStmtCacheSqlLimit", "2048"); result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); result.setProperty("netTimeoutForStreamingResults", "0"); return result; } }
sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/XATransactionManager.java +14 −0 Original line number Diff line number Diff line Loading @@ -18,12 +18,26 @@ package io.shardingsphere.transaction.manager.xa; import io.shardingsphere.transaction.event.xa.XATransactionEvent; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.ShardingTransactionManager; import javax.sql.DataSource; import javax.sql.XADataSource; /** * XA transaction manager. * * @author zhangliang */ public interface XATransactionManager extends ShardingTransactionManager<XATransactionEvent> { /** * Wrap the specific {@link XADataSource} and enroll it with a JTA. * * @param dataSource the data source to wrap * @param dataSourceName the data source name * @param dataSourceParameter the data source parameter * @throws Exception if can not wrap the data source * @return the wrapped data source */ DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) throws Exception; }
sharding-transaction/src/main/java/io/shardingsphere/transaction/manager/xa/atomikos/AtomikosTransactionManager.java +38 −0 Original line number Diff line number Diff line Loading @@ -18,16 +18,22 @@ package io.shardingsphere.transaction.manager.xa.atomikos; import com.atomikos.icatch.jta.UserTransactionManager; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.google.common.base.Optional; import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.transaction.event.xa.XATransactionEvent; import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.transaction.manager.xa.XATransactionManager; import javax.sql.DataSource; import javax.sql.XADataSource; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import java.sql.SQLException; import java.util.Properties; /** * Atomikos XA transaction manager. Loading Loading @@ -81,4 +87,36 @@ public final class AtomikosTransactionManager implements XATransactionManager { throw new SQLException(ex); } } @Override public DataSource wrapDataSource(XADataSource dataSource, String dataSourceName, DataSourceParameter dataSourceParameter) { AtomikosDataSourceBean result = new AtomikosDataSourceBean(); result.setUniqueResourceName(dataSourceName); result.setMaxPoolSize(dataSourceParameter.getMaximumPoolSize()); result.setTestQuery("SELECT 1"); result.setXaProperties(getProperties(dataSourceParameter)); result.setXaDataSource(dataSource); return result; } private Properties getProperties(final DataSourceParameter dataSourceParameter) { Properties result = new Properties(); result.setProperty("user", dataSourceParameter.getUsername()); result.setProperty("password", Optional.fromNullable(dataSourceParameter.getPassword()).or("")); result.setProperty("URL", dataSourceParameter.getUrl()); result.setProperty("pinGlobalTxToPhysicalConnection", Boolean.TRUE.toString()); result.setProperty("autoReconnect", Boolean.TRUE.toString()); result.setProperty("useServerPrepStmts", Boolean.TRUE.toString()); result.setProperty("cachePrepStmts", Boolean.TRUE.toString()); result.setProperty("prepStmtCacheSize", "250"); result.setProperty("prepStmtCacheSqlLimit", "2048"); result.setProperty("useLocalSessionState", Boolean.TRUE.toString()); result.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString()); result.setProperty("cacheResultSetMetadata", Boolean.TRUE.toString()); result.setProperty("cacheServerConfiguration", Boolean.TRUE.toString()); result.setProperty("elideSetAutoCommits", Boolean.TRUE.toString()); result.setProperty("maintainTimeStats", Boolean.FALSE.toString()); result.setProperty("netTimeoutForStreamingResults", "0"); return result; } }