Spring配置多数据源进行数据同步
发表于6年前(Jan 20, 2015 4:59:00 PM)  阅读 7277  评论 4
标签: 数据同步 annotation 多数据源 多事务 动态切换 AOP 反射 模型转换 AbstractRoutingDataSource PlatformTransactionManager
1、需求背景:
存在数据中心,使用oracle数据库A,数据中心包含众多逻辑、物理模型。现在业务库B,需要从数据中心获取数据,同步数据改动。业务库使用自己的逻辑、物理模型。一般来说数据中心建立的是通用的,共所有业务库使用的模型,而业务库的模型针对特定业务,相对要简单许多。实际上就本案例来说,同一个对象模型,在数据中心使用多张物理表组成,在业务库,已经简化、合成一张物理表。
2、需要解决的问题:
(1)、模型的转换。完成数据中心模型到业务库模型的转换。
(2)、数据变化的同步。数据中心将对数据中心模型发生改动时,记录日志,业务系统需订阅这些日志,分析日志,同步改动的数据。
3、解决思路:
为了统一理解,以下正文,数据中心称为被同步目标库或源目标库,业务库称为同步目标库。
(1)模型的转换。
对于模型的转换,笔者最先想到的就是直接使用sql。数据变化无法是增删改,删除最简单,只要知道删除哪个对象,直接在将同步目标对象根据主键删除就行了。而增加和修改,只要写一句sql,将同步对象表中所有需要的字段从被同步目标表中查询出来,插入同步对象表中就ok了。
在实际案例中,被同步目标对象模型很复杂,中间既包含了父子关系,又有组合关系,通常一个sql很难搞定,当然你可以使用存储过程来完成。这里考虑到以后维护的方便,决定还是采用建立实体对象映射的方式。即分别建立同步目标库映射实体,以及被同步目标库映射实体,实体与数据库之间的操作由hibernate完成,这样我们只需分别建模,完成两个实体之间的转换就可以了。
(2)数据同步。
数据同步的话如果自己实现可以参考笔者之前写的文章:数据同步引发的一些思考
如果使用ETL工具,可能上面的模型的转换在ETL工具中一起做了。实际案例中,就有业务系统使用Kettle是实现。这两年公司渐渐开始使用Kettle作为ETL工具,在以后工作中很有可能也会使用他,到时再说。
4、实现框架:
整个框架设计采用普通java工程来实现,与业务系统分开部署,定时抽取同步数据,这样对业务系统影响最小。
采用Spring+Hibernate框架,对被同步目标库对象和同步目标库对象分别建立映射实体,这里需要配置两个数据源。对于多个数据源,其实如果表数量不是很多的话,一般我们可以使用dblink的方式解决,这样就能转换成一个数据源的操作,也许会省很多事,在这里我们还是采用多数据源配置实现。
模型转换,使用java的annotation以及反射机制实现。
5、具体实现:
(1)Spring多数据源配置:
采用Spring框架里面的AbstractRoutingDataSource类实现动态数据源的切换。
首先在applicationContext.xml中配置我们要用到的两个数据源:
<bean id="targetDataSource" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${target.jdbc.url}" />
<property name="username" value="${target.jdbc.username}" />
<property name="password" value="${target.jdbc.password}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxActive" value="${jdbc.maxActive}" />
<property name="maxIdle" value="${jdbc.maxIdle}" />
<property name="maxWait" value="1000" />
</bean>
<bean id="sourceDataSource" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${source.jdbc.url}" />
<property name="username" value="${source.jdbc.username}" />
<property name="password" value="${source.jdbc.password}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxActive" value="${jdbc.maxActive}" />
<property name="maxIdle" value="${jdbc.maxIdle}" />
<property name="maxWait" value="1000" />
</bean>
添加DatabaseContextHolder,用来保存当前应该使用的数据源名称:
public class DatabaseContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static void setCustomerType(String customerType) {
contextHolder.set(customerType);
}
public static String getCustomerType() {
return contextHolder.get();
}
public static void clearCustomerType() {
contextHolder.remove();
}
}
添加AbstractRoutingDataSource实现类,实现数据源路由选择:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DatabaseContextHolder.getCustomerType()+"DataSource";
}
}
然后回到applicationContext.xml里面,配置动态数据源:
<bean id="dataSource" class="com.cangzhitao.datasync.datasource.DynamicDataSource">
<property name="targetDataSources">
<map key-type="java.lang.String">
<entry value-ref="targetDataSource" key="targetDataSource"></entry>
<entry value-ref="sourceDataSource" key="sourceDataSource"></entry>
</map>
</property>
<property name="defaultTargetDataSource" ref="targetDataSource">
</property>
</bean>
动态数据源的bean名称为dataSource,为默认名称,别的bean可以使用自动注入完成数据源的注入,跟以往配置一样。在这里,默认约定所有数据源名称都以DataSource结尾,当然这并不是必要的,在下面会再说到。
以上完成了多个数据源的配置,但是还不能动态切换,还需使用aop来自动切换数据源:
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Repository;
@Repository(value="dataSourceInterceptor")
@Aspect
@Order(value=1)
public class DataSourceInterceptor {
@Before("execution(* com.cangzhitao.datasync.target.dao.impl.*.*(..))")
public void settargetDataSource(JoinPoint jp) {
DatabaseContextHolder.setCustomerType("target");
}
@Before("execution(* com.cangzhitao.datasync.source.dao.impl.*.*(..))")
public void setsourceDataSource(JoinPoint jp) {
DatabaseContextHolder.setCustomerType("source");
}
}
因为默认使用了DataSource后缀,这里只需添加前缀即可,现实中用户不必这么做,这里笔者是由别的考虑。
到这里,就完成了多数据源的配置,当使用target包里面的dao进行操作时,将会提前切换成targetDataSource,同样,使用source包里面的dao进行操作时,将提前将数据源切换成sourceDataSource。
但是笔者实现时其实没有这么顺利,注意到切面DataSourceInterceptor中的Order注解,最开始时,笔者是没加Order的,在没加order的情况下,测试两边分别查询,能切换到正确的数据源,是没有问题的,而如果有一边涉及到修改操作,笔者使用的声明式事务,使用注解配置,则出现问题,报表或视图不存在,显然数据源切换错误。笔者dao中的save方法:
@Transactional
public TargetObject save(TargetObject targetObject) {
this.getHibernateTemplate().saveOrUpdate(targetObject);
return targetObject;
}
笔者debug时,发现在执行修改前,DataSourceInterceptor里的数据源上下文切换确实已经进行了,但是就是报错,百思不得其解,笔者发现所有修改操作,带注解事务的都这样,以为问题出在事务身上,于是开始考虑配置多事务,走了一条弯路,其实最后发现不是这个原因,但这里还是顺带讲一下如何配置多事务。
(2)Spring多事务配置:
这里要区分一个概念,分布式事务。本文并没有用到分布式事务,其实每个事务都是只操作了一个单独的数据库,并不存在一个事务跨数据库操作的情况,而且,针对被同步目标数据库,我们只是做读取操作。
Spring配置多事务也很简单,只是要使用注解配置事务比较麻烦,这里同样采用多数据源类似的配置方式。
在多数据源配置的基础上,在applicationContext.xml中增加如下配置:
<bean id="transactionManager" class="com.cangzhitao.datasync.datasource.DynamicTransactionManager">
<property name="targetTransactionManagers">
<map value-type="org.springframework.transaction.PlatformTransactionManager">
<entry key="targetTransactionManager" value-ref="targetTransactionManager" />
<entry key="sourceTransactionManager" value-ref="sourceTransactionManager" />
</map>
</property>
</bean>
每个事务管理器对应一个数据源,然后建立一个DynamicTransactionManager用来实现PlatformTransactionManager:
import java.util.HashMap;
import java.util.Map;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
public class DynamicTransactionManager implements PlatformTransactionManager {
private Map<String, PlatformTransactionManager> targetTransactionManagers = new HashMap<String, PlatformTransactionManager>();
protected PlatformTransactionManager getTargetTransactionManager() {
String context = DatabaseContextHolder.getCustomerType()+"TransactionManager";
return targetTransactionManagers.get(context);
}
public void setTargetTransactionManagers(Map<String, PlatformTransactionManager> targetTransactionManagers) {
this.targetTransactionManagers = targetTransactionManagers;
}
public void commit(TransactionStatus arg0) throws TransactionException {
getTargetTransactionManager().commit(arg0);
}
public TransactionStatus getTransaction(TransactionDefinition arg0)
throws TransactionException {
return getTargetTransactionManager().getTransaction(arg0);
}
public void rollback(TransactionStatus arg0) throws TransactionException {
getTargetTransactionManager().rollback(arg0);
}
}
DynamicTransactionManager将管理多个PlatformTransactionManager,需要事务时从中获取即可。而DynamicTransactionManager使用的数据源是dataSource,即开始配置的动态数据源,从而也可以动态切换数据源。注意,事务的切换同数据的切换使用的是同一个aop实现。
配置完多事务,重新测试,笔者才想到之前报错的执行顺序应该是这样的,先打开事务,然后切换数据源,然后进行修改,这样就导致了报错。而正确的流程应该是先切换数据源,然后打开事务,再进行修改。由此可见,应该是AOP代码执行的顺序问题,注解声明式事务也是通过aop实现。
去掉多事务配置,给DataSourceInterceptor加上@Order(value=1)注解后,发现测试成功,看来笔者的判断是对的。进一步测试Order的值,只要是value小于Integer的MAX_VALUE,都可以成功,由此推断,事务的Order的值不是配置的MAX_VALUE就是没有配置,而没进行配置的话,使用的就是最大值,即Order默认值为Integer.MAX_VALUE。
至此,Spring如何配置多数据源、多事务已经清晰了,通过上面的方法,配置多sessionFactory或者hibernateTemplate应该都是很简单的事。
(3)模型转换:
添加两个注解接口,ConvertObject,在被同步目标实体类名上使用,用来标示该被同步目标实体可以转换成哪个同步目标实体:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ConvertObject {
public String name();
}
ConvertAttribute,在被同步目标实体类属性上使用,用来标示该被同步目标实体属性对应同步目标实体的哪个属性,其中属性分为两种类型,一个是普通字段属性,一个是复合对象属性:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ConvertAttribute {
public String name();
public String type() default "field";
public String target() default "";
}
所有的被同步目标实体都继承了SourceObject对象,所有同步目标实体都继承于TargetObject对象,SourceObject对象里面定义了如何转换成TargetObject的方法。
public TargetObject getTargetObject(String targetObjectName)方法是重点,大量使用了java的反射机制。
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
public class SourceObject implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7123105341708695326L;
private final static String TARGET_PACKAGE = "com.cangzhitao.datasync.target.entities";
public String[] getTargetObjectNames() {
Class<? extends SourceObject> c = this.getClass();
ConvertObject converObject = null;
converObject = c.getAnnotation(ConvertObject.class);
if(converObject==null) {
System.err.println("没有配置需要转换的目标对象!");
return null;
}
String names = converObject.name();
if(names==null||"".equals(names)) {
return null;
}
String[] targetObjectNames = names.split(",");
return targetObjectNames;
}
public TargetObject getTargetObject() {
return getTargetObject(null);
}
@SuppressWarnings("unchecked")
public TargetObject getTargetObject(String targetObjectName) {
TargetObject targetObject = null;
Class<? extends SourceObject> c = this.getClass();
ConvertObject converObject = null;
converObject = c.getAnnotation(ConvertObject.class);
if(converObject==null) {
System.err.println("没有配置需要转换的目标对象!");
return null;
}
String names = converObject.name();
if(names==null||"".equals(names)) {
return null;
}
String[] targetObjectNames = names.split(",");
if(targetObjectNames==null||targetObjectNames.length==0) {
return null;
}
if(targetObjectName==null) {
targetObjectName = targetObjectNames[0];
}
Class<? extends TargetObject> targetObjectC = null;
try {
targetObjectC = (Class<? extends TargetObject>) Class.forName(TARGET_PACKAGE+"."+targetObjectName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
if(targetObjectC==null) {
System.err.println("配置需要转换的目标对象不存在:"+targetObjectName+"!");
return null;
}
try {
targetObject = targetObjectC.newInstance();
} catch (Exception e) {
e.printStackTrace();
System.err.println("配置需要转换的目标对象不存在:"+targetObjectName+"!");
return null;
}
convert(null, this, targetObject);
return targetObject;
}
@SuppressWarnings("unchecked")
public void convert(Map<String, String> sourceMap, SourceObject sourceObject, TargetObject targetObject) {
if(sourceMap==null) {
sourceMap = new HashMap<String, String>();
}
String scName = sourceObject.getClass().getName();
if(sourceMap.get(scName)!=null) {
return;
}
sourceMap.put(scName, "");
String targetObjectName = targetObject.getClass().getSimpleName();
Class<?> supperC = sourceObject.getClass();
while(true) {
if(supperC.getSimpleName().equals(SourceObject.class.getSimpleName())||supperC.getSimpleName().equals(Object.class.getSimpleName())) {
break;
}
for(Field field : supperC.getDeclaredFields()) {
field.setAccessible(true);
Object sourceValue = null;
try {
sourceValue = field.get(sourceObject);
} catch(Exception e) {
e.printStackTrace();
}
if(sourceValue==null) {
continue;
}
ConvertAttribute converAttribute = field.getAnnotation(ConvertAttribute.class);
if(converAttribute!=null) {
String type = converAttribute.type();
String name = converAttribute.name();
if(name.indexOf(".")>=0) {
if(name.startsWith(targetObjectName+".")) {
name = name.replace(targetObjectName+".", "");
} else {
continue;
}
}
Method m = null;
try {
Object value = null;
value = sourceValue;
if("class".equals(type)) {
String targetName = converAttribute.target();
if(value instanceof List) {
List<SourceObject> soList = (List<SourceObject>) value;
List<TargetObject> toList = new ArrayList<TargetObject>();
for(int i=0;i<soList.size();i++) {
toList.add(soList.get(i).getTargetObject(targetName));
}
value = toList;
m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), List.class);
} else {
SourceObject so = (SourceObject) sourceValue;
value = so.getTargetObject(targetName);
m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), value.getClass());
}
} else if("field".equals(type)) {
value = sourceValue;
m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), value.getClass());
}
m.invoke(targetObject, value);
} catch (Exception e) {
e.printStackTrace();
}
} else {
if(sourceValue instanceof SourceObject) {
convert(sourceMap, (SourceObject) sourceValue, targetObject);
} else if(sourceValue instanceof List) {
List<SourceObject> ls = (List<SourceObject>) sourceValue;
if(ls.size()>0) {
convert(sourceMap, ls.get(0), targetObject);
}
}
}
}
supperC = supperC.getSuperclass();
}
}
}
简单使用示例:
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@ConvertObject(name="TargetPerson")
@Entity
@Table(name="person")
public class SourcePerson extends SourceObject {
/**
*
*/
private static final long serialVersionUID = 8443769981018349479L;
@Id
@Column(name="id")
private long id;
@ConvertAttribute(name="pid")
@Column(name="mrid")
private String mrid;
@ConvertAttribute(name="status")
@Column(name="status")
private String stauts;
@ConvertAttribute(name="description")
@Column(name="description")
private String description;
@ConvertAttribute(name="name")
@Column(name="name")
private String name;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMrid() {
return mrid;
}
public void setMrid(String mrid) {
this.mrid = mrid;
}
public String getStauts() {
return stauts;
}
public void setStauts(String stauts) {
this.stauts = stauts;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
其中@ConvertObject中的name可以配多个值,用“,”隔开,一个源对象可以转换成多个目标对象这是合理的,如果配置了多个对象,相应的属性可能需要作出区分,如果属性不带类名,则对所有转换对象都支持,如果带类名,表示只针对特定对象。
6、后记:
以上基本上就是这个案例笔者的思考路线以及解决方式,将中间出现的问题,以及可能的解决方式都做了说明,有点冗繁。其中模型转换部分不够成熟,可能应根据你实际情况作出些调整,主要是关联关系中,一对多,多对多时,需要选择合适的处理方案。写到最后,虽然没用过Kettle,感觉Kettle思路基本应该也是这样了。
该案例在数据库级别完成也是可以的,但是一般来说,单纯的sql很枯燥,而且不好维护,更重要的一点是不够通用,试想一下,如果是异构数据库之间同步,会怎么样。笔者的方案可以很好的解决这个问题。