前言
接上一篇动态创建自定义Bean,这篇我们来创建MysQL多数据源。
- 自定义Bean实现我用的是BeanDefinitionRegistryPostProcessor。
- 连接池用的是alibaba的DruidDataSource。
- 支持全局的数据源统一启动。
Bean定义
基本上和上一篇的定义差不多就不再赘述,只看关键:
- 给每一个数据源起名
dataName
。 - 全局设置一个为主库
primary
,因为在动态切换数据源功能会需要指定,也避免无主数据源会出一些启动问题。
[{
"mysqlDruids": {
"druidCommon": {
"initialSize": 1,
"maxActive": 20,
"maxWait": 60000,
"minIdle": 1
},
"druidConnectConfigs": [{
"dataName": "dataSource",
"driverClassName": "com.mysql.jdbc.Driver",
"password": "123456",
"publicKey": "xxx",
"url": "jdbc:mysql://localhost:3006/mysql?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&autoReconnect=true&failOverReadOnly=false",
"username": "root",
"primary": true
}, {
"dataName": "db2",
"driverClassName": "com.mysql.jdbc.Driver",
"password": "123456",
"publicKey": "xxx",
"url": "jdbc:mysql://localhost:3006/mysql?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&autoReconnect=true&failOverReadOnly=false",
"username": "root"
}]
}
}]
Bean定义示例
@Configuration
public class InitMysqlDruid implements EnvironmentAware, BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
@Override
public void postProcessBeanDefinitionRegistry(@NonNull BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
for (Tenants tenant : tenants) {
MysqlDruids temp = tenant.getMysqlDruids();
if (temp == null || temp.getDruidConnectConfigs() == null) {
continue;
}
DruidCommon common = temp.getDruidCommon();
for (DruidConnectConfigs config : temp.getDruidConnectConfigs()) {
//如果有重名的则提示,直接按覆盖处理
if (beanDefinitionRegistry.containsBeanDefinition(config.getDataName())) {
log.warn("The Same Data Name By Mysql : {}, Overwrite!", config.getDataName());
}
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder
.genericBeanDefinition(DruidDataSource.class);
beanDefinitionBuilder.setInitMethodName("init");
beanDefinitionBuilder.setDestroyMethodName("close");
beanDefinitionBuilder.addPropertyValue("url", config.getUrl());
beanDefinitionBuilder.addPropertyValue("username", config.getUsername());
beanDefinitionBuilder.addPropertyValue("password", config.getPassword());
beanDefinitionBuilder.addPropertyValue("driverClassName", config.getDriverClassName());
beanDefinitionBuilder.addPropertyValue("initialSize", Objects.nonNull(config.getInitialSize()) ? config.getInitialSize() : config.getInitialSize());
beanDefinitionBuilder.addPropertyValue("minIdle", Objects.nonNull(config.getMinIdle()) ? config.getMinIdle() : common.getMinIdle());
beanDefinitionBuilder.addPropertyValue("maxActive", Objects.nonNull(config.getMaxActive()) ? config.getMaxActive() : common.getMaxActive());
beanDefinitionBuilder.addPropertyValue("maxWait", Objects.nonNull(config.getMaxWait()) ? config.getMaxWait() : common.getMaxWait());
beanDefinitionBuilder.addPropertyValue("timeBetweenEvictionRunsMillis", Objects.nonNull(config.getTimeBetweenEvictionRunsMillis()) ? config.getTimeBetweenEvictionRunsMillis() : common.getTimeBetweenEvictionRunsMillis());
beanDefinitionBuilder.addPropertyValue("minEvictableIdleTimeMillis", Objects.nonNull(config.getMinEvictableIdleTimeMillis()) ? config.getMinEvictableIdleTimeMillis() : common.getMinEvictableIdleTimeMillis());
beanDefinitionBuilder.addPropertyValue("maxEvictableIdleTimeMillis", Objects.nonNull(config.getMaxEvictableIdleTimeMillis()) ? config.getMaxEvictableIdleTimeMillis() : common.getMaxEvictableIdleTimeMillis());
if (config.getFilterConfigEnabled() && Objects.nonNull(config.getPublicKey())) {
beanDefinitionBuilder.addPropertyValue("filters", Objects.nonNull(config.getFilters()) ? config.getFilters() : common.getFilters());
beanDefinitionBuilder.addPropertyValue("connectionProperties", common.getConnectionProperties().replace("${publicKey}", config.getPublicKey()));
}
BeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition();
if (config.isPrimary()) {
primaryName = config.getDataName();
beanDefinition.setPrimary(true);
}
beanDefinitionRegistry.registerBeanDefinition(config.getDataName(), beanDefinition);
}
}
}
}
到这里数据库连接池已经定义完毕,启动也是OK的
2023-01-19 17:23:04.884 INFO [ main] [TID: N/A|] c.a.d.p.DruidDataSource : {dataSource-1} inited
2023-01-19 17:23:05.427 INFO [ main] [TID: N/A|] c.a.d.p.DruidDataSource : {dataSource-2} inited
2023-01-19 17:23:06.054 INFO [ main] [TID: N/A|] c.a.d.p.DruidDataSource : {dataSource-3} inited
2023-01-19 17:23:06.623 INFO [ main] [TID: N/A|] c.a.d.p.DruidDataSource : {dataSource-4} inited
数据源路由
我们上面定义的多数据源肯定是需要一个动态切换的功能,如果只是用
@Resource(“db1”)
@Resource(“db2”)
这种方式去if - else判断那就low爆了是吧
SpringBoot给我们提供了一个MySQL动态数据源的路由类org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource
极其方便我们的开发工作,o( ̄▽ ̄)dniubility。
public class MysqlDynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return MysqlDynamicDataSourceContextHolder.getContextKey();
}
/**
* 1)设置目标所有数据源
* 2)设置默认住数据源
* 3)这里为什么要手工设置呢,因为后面全局都是给了这个MysqlDynamicDataSource做为主数据源,所以这里等于把数据源路由给刷新到此类,我是这么理解的,大家先品一品。
* 可以不用配置,默认也可以让AbstractRoutingDataSource去查找,都可以
*/
public void initDataByGroup(ApplicationContext applicationContext, String primaryName) {
Map<Object, Object> targetDataSources = Maps.newConcurrentMap();
Set<String> dataNames = MysqlDataNameMappingUtils.getAllDataNames();
for (String dataName : dataNames) {
DataSource defaultTargetDataSource = applicationContext.getBean(dataName, DruidDataSource.class);
targetDataSources.put(dataName, defaultTargetDataSource);
if (StringUtils.equals(dataName, primaryName)) {
this.setDefaultTargetDataSource(defaultTargetDataSource);
}
}
this.setTargetDataSources(targetDataSources);
}
}
AbstractRoutingDataSource原理简析
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
@Nullable
private Map<Object, Object> targetDataSources;// 目标数据源map
@Nullable
private Object defaultTargetDataSource;// 默认数据源
private boolean lenientFallback = true;
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
@Nullable
private Map<Object, DataSource> resolvedDataSources;
@Nullable
private DataSource resolvedDefaultDataSource;
public AbstractRoutingDataSource() {
}
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
this.targetDataSources = targetDataSources;
}
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
// 初始化 Bean 时执行
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property 'targetDataSources' is required");
} else {
// 将targetDataSources属性的值赋值给resolvedDataSources,后续需要用到resolvedDataSources
this.resolvedDataSources = new HashMap(this.targetDataSources.size());
this.targetDataSources.forEach((key, value) -> {
Object lookupKey = this.resolveSpecifiedLookupKey(key);
DataSource dataSource = this.resolveSpecifiedDataSource(value);
// 存放数据源唯一标识和数据源对象
this.resolvedDataSources.put(lookupKey, dataSource);
});
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
}
// 重写了 getConnection 方法,ORM 框架执行语句前会调用该处
@Override
public Connection getConnection() throws SQLException {
return this.determineTargetDataSource().getConnection();
}
// 同上
@Override
public Connection getConnection(String username, String password) throws SQLException {
return this.determineTargetDataSource().getConnection(username, password);
}
// 👇 重点
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
// 调用我们重写的determineCurrentLookupKey方法,返回的是数据源的唯一标识
Object lookupKey = this.determineCurrentLookupKey();
// 从map中查询该标识对应的数据源,然后方法返回该数据源,调用 getConnection 打开对应连接
DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
} else {
return dataSource;
}
}
// 钩子方法,供我们重写
@Nullable
protected abstract Object determineCurrentLookupKey();
}
1 自定义类继承 AbstractRoutingDataSource
,重写 determineCurrentLookupKey()
,返回数据源的唯一标识;
2 将数据源名称和数据源封装为 map,调用 AbstractRoutingDataSource
类的 setTargetDataSources()
设置目标数据源。AbstractRoutingDataSource
类实现了 InitializingBean
接口,重写了 afterPropertySet()
(对该方法不熟悉的话请回顾一下 Bean 的生命周期,该方法在 Bean 的属性注入后执行),该方法内部对 resolvedDataSources 属性赋值(将 targetDataSources
的值放进去),后续会用到 resolvedDataSources ;
3 AbstractRoutingDataSource
实现了 DataSource 接口,重写了 getConnection()
,当 ORM 框架执行 sql 语句前总是执行 getConnection(),然后就调用到了重写后的 getConnection(),该方法内部调用了 AbstractRoutingDataSource
类的 determineTargetDataSource()
;
4 determineTargetDataSource() 内部调用了自定义类重写的 determineCurrentLookupKey()
,返回数据源的映射,然后从 resolvedDataSources(map) 属性获取到数据源,进行后续的操作。
让Spring加载这个主数据源路由Bean,后面Spring都会自动去使用这个数据源类了。
// 我是在InitMysqlDruid里面定义的,这里拖出来写
@Bean("dynamicDataSource")
public DataSource dataSource() {
MysqlDynamicDataSource dynamicDataSource = new MysqlDynamicDataSource();
dynamicDataSource.initDataByGroup(applicationContext, primaryName);
return dynamicDataSource;
}
数据源动态切换
我们知道每一个方法都是一个线程执行,那么需要在线程中去判断该线程是用哪个数据源的,所以用到了ThreadLocal本地线程对象,这个大家应该不是陌生的类了。
String就是数据源名称,json里面定义的dataName了。
public class MysqlDynamicDataSourceContextHolder {
private static final ThreadLocal<String> DATASOURCE_CONTEXT_KEY_HOLDER = new ThreadLocal<>();
/**
* 设置数据源
*
* @param key
*/
public static void setContextKey(String key) {
DATASOURCE_CONTEXT_KEY_HOLDER.set(key);
}
public static String getContextKey() {
return DATASOURCE_CONTEXT_KEY_HOLDER.get();
}
public static void removeContextKey() {
DATASOURCE_CONTEXT_KEY_HOLDER.remove();
}
}
在定义一个注解@MysqlDynamicData
如果加了注解则用注解的切面去切换数据源,这里需要加一个value如果想指定某个数据源就可以用到了,如果只有一个数据源,那么就不指定就会拿第一个数据源。
但是这里需要说明的业务场景,比如我做这个场景是多租户不同数据源的场景,如:租户A有db1、db2两个数据源;租户B有db3、db4两个数据源。当一个租户进来的时候他有两个数据源,我们提供工具的也是不知道他要哪一个数据源,那么必须由用户在value中指定数据源名称。
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MysqlDynamicData {
//可设置数据源名称
String value() default "";
}
AOP切面
@Aspect
@Component
public class MysqlDynamicDataSourceAspect {
@Pointcut("@annotation(com.dynamic.datasource.mysql.annotation.MysqlDynamicData)")
public void dataSourcePointCut() {
}
@Around("dataSourcePointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
//逻辑省略各种实现MysqlDynamicDataSourceContextHolder.setContextKey(dataName);功能
switchDataSource(joinPoint);
try {
return joinPoint.proceed();
} finally {
MysqlDynamicDataSourceContextHolder.removeContextKey();
}
}
}
使用方式
1 在原系统中有配置过xml的数据源则修改点如下
- 删除 bean配置的dataSource数据源连接,统一使用多数据源初始化去加载。
- 替换 xml中SqlSessionFactoryBean中的ref为dynamicDataSource
- 替换 xml中DataSourceTransactionManager中的ref为dynamicDataSource
这里要替换是由于主数据源已经变成了我们维护的这个动态数据源类了,所以必须交由它去路由,否则你怎么换都不生效。
2 在Service层使用动态数据源注解
- 第一种是该租户数据源只有一个,这无需填写value值,默认就会只选第一个数据源。如果配置定义了有多个数据源但这里不指定则使用该租户的第一个数据源。
- 第二种是有该租户有主从数据源,那么需要指定该方法要用哪一个数据源,value值就是配置数据源的dataName。如果找不到数据源则抛异常。
@Override
@MysqlDynamicData("dataSource")
public Object selectPageByMap(Map map) {
return myMapper.selectPageByMap(map);
}
@Override
@MysqlDynamicData
public Object selectById(Long id) {
return myMapper.selectById(id);
}