×
文章路径: Java

Spring配置多数据源进行数据同步

发表于3年前(Jan 20, 2015 4:59:00 PM)  阅读 4623  评论 4

分类: Java 数据库 案例

标签: 数据同步 annotation 多数据源 多事务 动态切换 AOP 反射 模型转换 AbstractRoutingDataSource PlatformTransactionManager

1、需求背景:

存在数据中心,使用oracle数据库A,数据中心包含众多逻辑、物理模型。现在业务库B,需要从数据中心获取数据,同步数据改动。业务库使用自己的逻辑、物理模型。一般来说数据中心建立的是通用的,共所有业务库使用的模型,而业务库的模型针对特定业务,相对要简单许多。实际上就本案例来说,同一个对象模型,在数据中心使用多张物理表组成,在业务库,已经简化、合成一张物理表。

2、需要解决的问题:

(1)、模型的转换。完成数据中心模型到业务库模型的转换。

(2)、数据变化的同步。数据中心将对数据中心模型发生改动时,记录日志,业务系统需订阅这些日志,分析日志,同步改动的数据。

3、解决思路:

为了统一理解,以下正文,数据中心称为被同步目标库或源目标库,业务库称为同步目标库。

(1)模型的转换。

对于模型的转换,笔者最先想到的就是直接使用sql。数据变化无法是增删改,删除最简单,只要知道删除哪个对象,直接在将同步目标对象根据主键删除就行了。而增加和修改,只要写一句sql,将同步对象表中所有需要的字段从被同步目标表中查询出来,插入同步对象表中就ok了。

在实际案例中,被同步目标对象模型很复杂,中间既包含了父子关系,又有组合关系,通常一个sql很难搞定,当然你可以使用存储过程来完成。这里考虑到以后维护的方便,决定还是采用建立实体对象映射的方式。即分别建立同步目标库映射实体,以及被同步目标库映射实体,实体与数据库之间的操作由hibernate完成,这样我们只需分别建模,完成两个实体之间的转换就可以了。

(2)数据同步。

数据同步的话如果自己实现可以参考笔者之前写的文章:数据同步引发的一些思考

如果使用ETL工具,可能上面的模型的转换在ETL工具中一起做了。实际案例中,就有业务系统使用Kettle是实现。这两年公司渐渐开始使用Kettle作为ETL工具,在以后工作中很有可能也会使用他,到时再说。

4、实现框架:

整个框架设计采用普通java工程来实现,与业务系统分开部署,定时抽取同步数据,这样对业务系统影响最小。

采用Spring+Hibernate框架,对被同步目标库对象和同步目标库对象分别建立映射实体,这里需要配置两个数据源。对于多个数据源,其实如果表数量不是很多的话,一般我们可以使用dblink的方式解决,这样就能转换成一个数据源的操作,也许会省很多事,在这里我们还是采用多数据源配置实现。

模型转换,使用java的annotation以及反射机制实现。

5、具体实现:

(1)Spring多数据源配置:

采用Spring框架里面的AbstractRoutingDataSource类实现动态数据源的切换。

首先在applicationContext.xml中配置我们要用到的两个数据源:

 

    <bean id="targetDataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="${jdbc.driverClassName}" />
		<property name="url" value="${target.jdbc.url}" />
		<property name="username" value="${target.jdbc.username}" />
		<property name="password" value="${target.jdbc.password}" />
		<property name="initialSize" value="${jdbc.initialSize}" />
		<property name="maxActive" value="${jdbc.maxActive}" />
		<property name="maxIdle" value="${jdbc.maxIdle}" />
		<property name="maxWait" value="1000" />
	</bean>


	<bean id="sourceDataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="${jdbc.driverClassName}" />
		<property name="url" value="${source.jdbc.url}" />
		<property name="username" value="${source.jdbc.username}" />
		<property name="password" value="${source.jdbc.password}" />
		<property name="initialSize" value="${jdbc.initialSize}" />
		<property name="maxActive" value="${jdbc.maxActive}" />
		<property name="maxIdle" value="${jdbc.maxIdle}" />
		<property name="maxWait" value="1000" />
	</bean>

 

添加DatabaseContextHolder,用来保存当前应该使用的数据源名称:

 

public class DatabaseContextHolder {

	private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
	
	public static void setCustomerType(String customerType) {
		contextHolder.set(customerType);
	}
	
	public static String getCustomerType() {
		return contextHolder.get();
	}
	
	public static void clearCustomerType() {
		contextHolder.remove();
	}
	
}

添加AbstractRoutingDataSource实现类,实现数据源路由选择:

 

 

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DynamicDataSource extends AbstractRoutingDataSource {

	@Override
	protected Object determineCurrentLookupKey() {
		return DatabaseContextHolder.getCustomerType()+"DataSource";
	}

}

然后回到applicationContext.xml里面,配置动态数据源:

 

 

    <bean id="dataSource" class="com.cangzhitao.datasync.datasource.DynamicDataSource">
		<property name="targetDataSources">
			<map key-type="java.lang.String">
				<entry value-ref="targetDataSource" key="targetDataSource"></entry>
				<entry value-ref="sourceDataSource" key="sourceDataSource"></entry>
			</map>
		</property>
		<property name="defaultTargetDataSource" ref="targetDataSource">
		</property>
	</bean>

 

动态数据源的bean名称为dataSource,为默认名称,别的bean可以使用自动注入完成数据源的注入,跟以往配置一样。在这里,默认约定所有数据源名称都以DataSource结尾,当然这并不是必要的,在下面会再说到。

以上完成了多个数据源的配置,但是还不能动态切换,还需使用aop来自动切换数据源:

 

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Repository;

@Repository(value="dataSourceInterceptor")
@Aspect
@Order(value=1)
public class DataSourceInterceptor {

	@Before("execution(* com.cangzhitao.datasync.target.dao.impl.*.*(..))")
	public void settargetDataSource(JoinPoint jp) {
		DatabaseContextHolder.setCustomerType("target");
	}
	
	@Before("execution(* com.cangzhitao.datasync.source.dao.impl.*.*(..))")
	public void setsourceDataSource(JoinPoint jp) {
		DatabaseContextHolder.setCustomerType("source");
	}
	
}

 

因为默认使用了DataSource后缀,这里只需添加前缀即可,现实中用户不必这么做,这里笔者是由别的考虑。

到这里,就完成了多数据源的配置,当使用target包里面的dao进行操作时,将会提前切换成targetDataSource,同样,使用source包里面的dao进行操作时,将提前将数据源切换成sourceDataSource。

但是笔者实现时其实没有这么顺利,注意到切面DataSourceInterceptor中的Order注解,最开始时,笔者是没加Order的,在没加order的情况下,测试两边分别查询,能切换到正确的数据源,是没有问题的,而如果有一边涉及到修改操作,笔者使用的声明式事务,使用注解配置,则出现问题,报表或视图不存在,显然数据源切换错误。笔者dao中的save方法:

 

    @Transactional
	public TargetObject save(TargetObject targetObject) {
		this.getHibernateTemplate().saveOrUpdate(targetObject);
		return targetObject;
	}

 

笔者debug时,发现在执行修改前,DataSourceInterceptor里的数据源上下文切换确实已经进行了,但是就是报错,百思不得其解,笔者发现所有修改操作,带注解事务的都这样,以为问题出在事务身上,于是开始考虑配置多事务,走了一条弯路,其实最后发现不是这个原因,但这里还是顺带讲一下如何配置多事务。

(2)Spring多事务配置:

这里要区分一个概念,分布式事务。本文并没有用到分布式事务,其实每个事务都是只操作了一个单独的数据库,并不存在一个事务跨数据库操作的情况,而且,针对被同步目标数据库,我们只是做读取操作。

Spring配置多事务也很简单,只是要使用注解配置事务比较麻烦,这里同样采用多数据源类似的配置方式。

在多数据源配置的基础上,在applicationContext.xml中增加如下配置:

 

    <bean id="transactionManager" class="com.cangzhitao.datasync.datasource.DynamicTransactionManager">  
        <property name="targetTransactionManagers">  
            <map value-type="org.springframework.transaction.PlatformTransactionManager">  
                <entry key="targetTransactionManager" value-ref="targetTransactionManager" />  
                <entry key="sourceTransactionManager" value-ref="sourceTransactionManager" />  
            </map>  
        </property>  
    </bean>  

 

每个事务管理器对应一个数据源,然后建立一个DynamicTransactionManager用来实现PlatformTransactionManager:

 


import java.util.HashMap;
import java.util.Map;

import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;

public class DynamicTransactionManager implements PlatformTransactionManager {

	private Map<String, PlatformTransactionManager> targetTransactionManagers = new HashMap<String, PlatformTransactionManager>();
	
	protected PlatformTransactionManager getTargetTransactionManager() {
		String context = DatabaseContextHolder.getCustomerType()+"TransactionManager";
		return targetTransactionManagers.get(context);
	}
	
	public void setTargetTransactionManagers(Map<String, PlatformTransactionManager> targetTransactionManagers) {  
        this.targetTransactionManagers = targetTransactionManagers;  
    }  
	
	public void commit(TransactionStatus arg0) throws TransactionException {
		getTargetTransactionManager().commit(arg0);
	}

	public TransactionStatus getTransaction(TransactionDefinition arg0)
			throws TransactionException {
		return getTargetTransactionManager().getTransaction(arg0);
	}

	public void rollback(TransactionStatus arg0) throws TransactionException {
		getTargetTransactionManager().rollback(arg0);
	}

}

DynamicTransactionManager将管理多个PlatformTransactionManager,需要事务时从中获取即可。而DynamicTransactionManager使用的数据源是dataSource,即开始配置的动态数据源,从而也可以动态切换数据源。注意,事务的切换同数据的切换使用的是同一个aop实现。

 

配置完多事务,重新测试,笔者才想到之前报错的执行顺序应该是这样的,先打开事务,然后切换数据源,然后进行修改,这样就导致了报错。而正确的流程应该是先切换数据源,然后打开事务,再进行修改。由此可见,应该是AOP代码执行的顺序问题,注解声明式事务也是通过aop实现。

去掉多事务配置,给DataSourceInterceptor加上@Order(value=1)注解后,发现测试成功,看来笔者的判断是对的。进一步测试Order的值,只要是value小于Integer的MAX_VALUE,都可以成功,由此推断,事务的Order的值不是配置的MAX_VALUE就是没有配置,而没进行配置的话,使用的就是最大值,即Order默认值为Integer.MAX_VALUE。

至此,Spring如何配置多数据源、多事务已经清晰了,通过上面的方法,配置多sessionFactory或者hibernateTemplate应该都是很简单的事。

(3)模型转换:

添加两个注解接口,ConvertObject,在被同步目标实体类名上使用,用来标示该被同步目标实体可以转换成哪个同步目标实体:

 

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ConvertObject {
	
	public String name();

}

ConvertAttribute,在被同步目标实体类属性上使用,用来标示该被同步目标实体属性对应同步目标实体的哪个属性,其中属性分为两种类型,一个是普通字段属性,一个是复合对象属性:

 

 

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ConvertAttribute {
	
	public String name();

    public String type() default "field";
	
	public String target() default "";

}

 

所有的被同步目标实体都继承了SourceObject对象,所有同步目标实体都继承于TargetObject对象,SourceObject对象里面定义了如何转换成TargetObject的方法。

public TargetObject getTargetObject(String targetObjectName)方法是重点,大量使用了java的反射机制。

 


import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;


public class SourceObject implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 7123105341708695326L;

	private final static String TARGET_PACKAGE = "com.cangzhitao.datasync.target.entities";
	
	
	public String[] getTargetObjectNames() {
		Class<? extends SourceObject> c = this.getClass();
		ConvertObject converObject = null;
		converObject = c.getAnnotation(ConvertObject.class);
		if(converObject==null) {
			System.err.println("没有配置需要转换的目标对象!");
			return null;
		}
		String names = converObject.name();
		if(names==null||"".equals(names)) {
			return null;
		}
		String[] targetObjectNames = names.split(",");
		return targetObjectNames;
	}

	public TargetObject getTargetObject() {
		return getTargetObject(null);
	}
	
	
	@SuppressWarnings("unchecked")
	public TargetObject getTargetObject(String targetObjectName) {
		TargetObject targetObject = null;
		Class<? extends SourceObject> c = this.getClass();
		ConvertObject converObject = null;
		converObject = c.getAnnotation(ConvertObject.class);
		if(converObject==null) {
			System.err.println("没有配置需要转换的目标对象!");
			return null;
		}
		String names = converObject.name();
		if(names==null||"".equals(names)) {
			return null;
		}
		String[] targetObjectNames = names.split(",");
		if(targetObjectNames==null||targetObjectNames.length==0) {
			return null;
		}
		if(targetObjectName==null) {
			targetObjectName = targetObjectNames[0];
		}
		Class<? extends TargetObject> targetObjectC = null;
		try {
			targetObjectC = (Class<? extends TargetObject>) Class.forName(TARGET_PACKAGE+"."+targetObjectName);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
		if(targetObjectC==null) {
			System.err.println("配置需要转换的目标对象不存在:"+targetObjectName+"!");
			return null;
		}
		try {
			targetObject = targetObjectC.newInstance();
		} catch (Exception e) {
			e.printStackTrace();
			System.err.println("配置需要转换的目标对象不存在:"+targetObjectName+"!");
			return null;
		} 
		convert(null, this, targetObject);
		
		return targetObject;
	}
	
	@SuppressWarnings("unchecked")
	public void convert(Map<String, String> sourceMap, SourceObject sourceObject, TargetObject targetObject) {
		if(sourceMap==null) {
			sourceMap = new HashMap<String, String>();
		}
		String scName = sourceObject.getClass().getName();
		if(sourceMap.get(scName)!=null) {
			return;
		}
		sourceMap.put(scName, "");
		
		String targetObjectName = targetObject.getClass().getSimpleName();
		
		Class<?> supperC = sourceObject.getClass();
		while(true) {
			if(supperC.getSimpleName().equals(SourceObject.class.getSimpleName())||supperC.getSimpleName().equals(Object.class.getSimpleName())) {
				break;
			}
			for(Field field : supperC.getDeclaredFields()) {
				field.setAccessible(true);
				Object sourceValue = null;
				try {
					sourceValue = field.get(sourceObject);
				} catch(Exception e) {
					e.printStackTrace();
				}
				if(sourceValue==null) {
					continue;
				}
				ConvertAttribute converAttribute = field.getAnnotation(ConvertAttribute.class);
				if(converAttribute!=null) {
					String type = converAttribute.type();
					
					String name = converAttribute.name();
					if(name.indexOf(".")>=0) {
						if(name.startsWith(targetObjectName+".")) {
							name =  name.replace(targetObjectName+".", "");
						} else {
							continue;
						}
					}					
					Method m = null;
					try {
						Object value = null;
						value = sourceValue;
						if("class".equals(type)) {
							String targetName = converAttribute.target();
							if(value instanceof List) {
								List<SourceObject> soList = (List<SourceObject>) value;
								List<TargetObject> toList = new ArrayList<TargetObject>();
								for(int i=0;i<soList.size();i++) {
									toList.add(soList.get(i).getTargetObject(targetName));
								}
								value = toList;
								m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), List.class);
							} else {
								SourceObject so = (SourceObject) sourceValue;
								value = so.getTargetObject(targetName);
								m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), value.getClass());
							}
						} else if("field".equals(type)) {
							value = sourceValue;
							m = targetObject.getClass().getMethod("set"+StringUtil.initialString(name), value.getClass());
						}
						m.invoke(targetObject, value);
					} catch (Exception e) {
						e.printStackTrace();
					}
					
					
				} else {
					if(sourceValue instanceof SourceObject) {
						convert(sourceMap, (SourceObject) sourceValue, targetObject);
					} else if(sourceValue instanceof List) {
						List<SourceObject> ls = (List<SourceObject>) sourceValue;
						if(ls.size()>0) {
							convert(sourceMap, ls.get(0), targetObject);
						}
					}
				}
				
			}
			
			supperC = supperC.getSuperclass();
			
		}
	}
}

简单使用示例:

 

 



import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;



@ConvertObject(name="TargetPerson")
@Entity
@Table(name="person")
public class SourcePerson extends SourceObject {

	/**
	 * 
	 */
	private static final long serialVersionUID = 8443769981018349479L;

	@Id
	@Column(name="id")
	private long id; 
	
	@ConvertAttribute(name="pid")
	@Column(name="mrid")
	private String mrid;
	
	@ConvertAttribute(name="status")
	@Column(name="status")
	private String stauts;

	@ConvertAttribute(name="description")
	@Column(name="description")
	private String description;
	
	@ConvertAttribute(name="name")
	@Column(name="name")
	private String name;
	
	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getMrid() {
		return mrid;
	}

	public void setMrid(String mrid) {
		this.mrid = mrid;
	}

	public String getStauts() {
		return stauts;
	}

	public void setStauts(String stauts) {
		this.stauts = stauts;
	}

	public String getDescription() {
		return description;
	}

	public void setDescription(String description) {
		this.description = description;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
	
	
}

其中@ConvertObject中的name可以配多个值,用“,”隔开,一个源对象可以转换成多个目标对象这是合理的,如果配置了多个对象,相应的属性可能需要作出区分,如果属性不带类名,则对所有转换对象都支持,如果带类名,表示只针对特定对象。

 

6、后记:

以上基本上就是这个案例笔者的思考路线以及解决方式,将中间出现的问题,以及可能的解决方式都做了说明,有点冗繁。其中模型转换部分不够成熟,可能应根据你实际情况作出些调整,主要是关联关系中,一对多,多对多时,需要选择合适的处理方案。写到最后,虽然没用过Kettle,感觉Kettle思路基本应该也是这样了。

该案例在数据库级别完成也是可以的,但是一般来说,单纯的sql很枯燥,而且不好维护,更重要的一点是不够通用,试想一下,如果是异构数据库之间同步,会怎么样。笔者的方案可以很好的解决这个问题。

发表评论