×

一个简单的缓存框架MemCache

发表于3年前(Dec 24, 2014 2:29:11 PM)  阅读 1142  评论 0

分类: Java 类库工具 MemCache

标签: memcache 缓存框架 自动刷新

1、背景介绍:

缓存框架笔者用得不多,主要是传统行业对使用缓存的概念很薄弱,不像互联网行业那么强烈。笔者曾经用过Ehcache,Memcached,就只有这两个了,而且基本都用得很少。具笔者有限的了解,这里就不百度了,有兴趣的读者自己百度,这两个框架共同点:都支持分布式部署,都支持缓存失效,都能控制缓存大小。不同点:Ehcache支持持久化到硬盘,而Memcached就不行,只能使用内存,Ehcache集群之间数据会保持一致,挂了一台,不影响数据的丢失,而Memcached数据是分成几份存在不同集群机器上,挂了一台,那台机器上的数据就丢失了,所以互联网企业用redis比较多,能弥补Memcached的不足。Memcached主要是更轻量级,笔者上次使用Memcached是用来作为Session服务器。

缓存是一个很简单的东西,初级程序员往往会觉得很高大上,其实很多人都在实际中应用过。举个很简单的例子,项目中经常有些静态常量,我们会写个工具类来引用,其实这也可以算作缓存,缓存的作用就是将经常访问且很少改变的数据保存在内存中(或硬盘一切读取很快的介质),我们将这个静态常量扩展成一个Map集合,里面可以存任意的数据,这就是一个简单的缓存框架了。

2、memcache框架介绍:

笔者写的memcache框架是根据这两年使用自己使用缓存的一些实际经验,需求来实现的。单机版,主要就两个功能吧,首先,基本的数据存取功能,其次就是缓存数据自动刷新功能,自动刷新功能是不同其他缓存框架的地方,也可以说是一个亮点。memcache没有缓存超时失效,根据笔者经验,很少会出现这种要求,一般失效都是因为数据要更新了,而不是这部分数据没了,所以笔者设想到了指定时间,缓存的数据会自动进行更新,不需要你人为干预,当然,如果有事件触发,需要立即刷新,也提供了接口刷新数据。

3、memcache框架实现:

3.1、CachePool 缓存池:

CachePool缓存池是真正存储缓存数据的地方,参考Ehcache的方式,可以有多个CachePool,每个CachePool有唯一的名称标示,不同的缓存内容可以存在不同的CachePool里面,设想以后可以针对每个CachePool做不同的操作,现在这版本使用一个或多个CachePool没有区别。CachePool里面就是使用一个Map来存储缓存数据。

package com.cangzhitao.memcache;

import java.util.HashMap;
import java.util.Map;

/**
 * 缓存池
 * @author Administrator
 *
 */
public class CachePool {

        public static final String DEFAULT_POOL_NAME = "default";
        
        /**
         * 缓存池名,不同的数据可以放在不同的缓冲池里面
         */
        private String poolName = DEFAULT_POOL_NAME;
        
        public CachePool(String poolName) {
                super();
                this.poolName = poolName;
        }

        private Map<String, Object> map = new HashMap<String, Object>();
        
        
        public Object getObject(String key) {
                return map.get(key);
        }
         
        public void removeObject(String key) {
                map.remove(key);
        }
        
        public Object addObject(String key, Object value) {
                map.put(key, value);
                return value;
        }

        public String getPoolName() {
                return poolName;
        }
        
        
        
}

3.2、CacheObject缓存对象:

CacheObject主要是承载每一个缓存数据的一些主体信息,如key,唯一标示缓存对象(同一个缓存池中),还有一些比较重要的属性,invokeObject,invokeMethod,invokeParams,这些参数表明了该缓存数据是怎么得到的,如一个博客首页的热门文章列表,我们一般是通过service查询出来,然后传给view层进行展现,那这个service就对应了invokeObject,查询的方法就对应了invokeMethod,查询参数就对应了invokeParams,该缓存对象会通过反射自动调用该方法将数据查询出来并进行缓存。

那CacheObject什么时候调用查询方法呢?这就是在flushMode里面,现在设计了三种flushMode,第一种INVOKE,被动调用,即不主动刷新,只有当用户发出刷新指定才会调用查询方法,第二种SLEEP,配合SLEEP刷新模式,我们还需指定sleep参数,表示每次刷新完后隔多久再次刷新,sleep为毫秒数,第三种ONTIME,到达指定时间进行刷新,配合ONTIME模式,需要指定刷新时间,这里引用了quartz框架中的CronExpression表达式来指定刷新时间。基本上这三种模式已经够用了。

package com.cangzhitao.memcache;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Date;

import org.apache.log4j.Logger;

import com.cangzhitao.memcache.util.TimeUtil;

/**
 * 缓存对象
 * @author Administrator
 *
 */
public class CacheObject implements Serializable {
        
        private static final Logger log = Logger.getLogger(CacheObject.class);
        
        
        private int flushMode = FlushMode.INVOKE;

        /**
         * 
         */
        private static final long serialVersionUID = -7990547906082845825L;
        
        /**
         * 缓冲池名
         */
        private String cachePoolName = CachePool.DEFAULT_POOL_NAME;

        /**
         * 缓存唯一标示
         */
        private String key;
        
        /**
         * 获取缓存数据的对象引用
         */
        private Object invokeObject;
        
        /**
         * 获取缓存数据的对象引用的方法
         */
        private Method invokeMethod;
        
        /**
         * 获取缓存数据的对象引用的方法参数
         */
        private Object[] invokeParams;
        
        public CacheObject() {
                
        }
        
        public CacheObject(String key, int flushMode, Object invokeObject, Method invokeMethod, Object[] invokeParams) {
                this.key = key;
                this.flushMode = flushMode;
                this.invokeObject = invokeObject;
                this.invokeMethod = invokeMethod;
                this.invokeParams = invokeParams;
        }
        
        public CacheObject(String key, int flushMode, Object invokeObject, Method invokeMethod, Object[] invokeParams, String cronExpr) {
                this(key, flushMode, invokeObject, invokeMethod, invokeParams);
                this.cronExpr = cronExpr;
        }
        
        public CacheObject(String key, int flushMode, Object invokeObject, Method invokeMethod, Object[] invokeParams, int sleep) {
                this(key, flushMode, invokeObject, invokeMethod, invokeParams);
                this.sleep = sleep;
        }
        
        
        /**
         * 刷新数据
         * @return
         */
        public synchronized Object flushData() {
                Object v = null;
                if(invokeObject==null) {
                        log.info("key:"+key+"|invokeObject is null!");
                        return null;
                }
                if(invokeMethod==null) {
                        log.info("key:"+key+"|invokeMethod is null!");
                        return null;
                }
                
                try {
                        long now = new Date().getTime();
                        lastFlushTime = now;
                        v = invokeMethod.invoke(invokeObject, invokeParams);
                } catch (Exception e) {
                        e.printStackTrace();
                        log.error(e.getMessage());
                }
                return v;
        }

        public String getKey() {
                return key;
        }

        public void setKey(String key) {
                this.key = key;
        }

        public Object getInvokeObject() {
                return invokeObject;
        }

        public void setInvokeObject(Object invokeObject) {
                this.invokeObject = invokeObject;
        }

        public Method getInvokeMethod() {
                return invokeMethod;
        }

        public void setInvokeMethod(Method invokeMethod) {
                this.invokeMethod = invokeMethod;
        }

        public String getCachePoolName() {
                return cachePoolName;
        }

        public void setCachePoolName(String cachePoolName) {
                this.cachePoolName = cachePoolName;
        }

        public Object[] getInvokeParams() {
                return invokeParams;
        }

        public void setInvokeParams(Object[] invokeParams) {
                this.invokeParams = invokeParams;
        }

        
        /**
         * 上次执行时间
         */
        private long lastFlushTime = new Date().getTime();
        
        /**
         * 睡眠模式下执行时间间隔
         */
        private long sleep = 3600000l;
        
        /**
         * 
         */
        private String cronExpr = "0 0 0/1 * *  ?";
        
        
        
        /**
         * 检查是否到刷新时间了
         * @return
         */
        public synchronized boolean checkFlush() {
                
                if(flushMode==FlushMode.INVOKE) {
                        return false;
                }
                long now = new Date().getTime();
                if(flushMode==FlushMode.SLEEP) {
                        long flushTime = lastFlushTime+sleep;
                        if(flushTime-now<=0) {
                                lastFlushTime = flushTime;
                                return true;
                        } else {
                                return false;
                        }
                }
                if(flushMode==FlushMode.ONTIME) {
                        Date nextTime = TimeUtil.getCronJobNextFireTime(cronExpr, lastFlushTime);
                        if(nextTime==null) {
                                return false;
                        }
                        long flushTime = nextTime.getTime();
                        if(flushTime-now<=0) {
                                lastFlushTime = flushTime;
                                return true;
                        } else {
                                return false;
                        }
                }
                return false;
        }

        public int getFlushMode() {
                return flushMode;
        }

        public void setFlushMode(int flushMode) {
                this.flushMode = flushMode;
        }

        public long getSleep() {
                return sleep;
        }

        public void setSleep(long sleep) {
                this.sleep = sleep;
        }

        public String getCronExpr() {
                return cronExpr;
        }

        public void setCronExpr(String cronExpr) {
                this.cronExpr = cronExpr;
        }
        
}
package com.cangzhitao.memcache;

public class FlushMode {

        /**
         * 被动调用刷新
         */
        public static final int INVOKE = 0;
        
        /**
         * 每隔多久刷新
         */
        public static final int SLEEP = 1;
        
        /**
         * 按时刷新
         */
        public static final int ONTIME = 2;
        
}
package com.cangzhitao.memcache.util;

import java.util.Date;

import org.quartz.CronExpression;

public class TimeUtil {

        /**
         * 获得cronExpr下次的执行时间
         * @param cronExpr
         * @return
         */
        public static Date getCronJobNextFireTime(String cronExpr, long time) {
                Date fireTime = null;
                try {
                CronExpression cron = new CronExpression(cronExpr);
                fireTime = cron.getNextValidTimeAfter(new Date(time));
                } catch (Exception e) {
                        e.printStackTrace();
                }
        return fireTime;
        }
        
}

3.3、CacheManage缓存管理类:

框架对外打交道的方法主要都是通过CacheManage来完成。CacheManage主要任务:1、完成缓存池的管理;2、完成缓存对象的存取;3、监听缓存对象,适时进行刷新。缓存池的管理不需要用户去管,在没有用户所需要的缓存池的时候,CacheManage会自动新建缓存池。

3.3.1、添加缓存对象:

添加缓存对象有两个方法:addCache(CacheObject cache)与addCache(CacheObject cache, Object value),第一个方法是当不知道缓存具体数据时使用,调用时它会自动执行一次初始化操作,查询数据;第二个方法则是已经知道缓存具体数据,避免再一次重复查询时使用。

3.3.2、获取缓存对象:

获取缓存对象也有两个方法:getCache(String cacheKey)与getCache(String cachePoolName, String cacheKey),第一个方法表示使用默认的缓存池,第二个则是使用指定的缓存池,很多其他功能都重载了使用默认缓冲池的方法。该方法在没找到缓存对象是会跑出自定义的CacheNotFoundException,请注意没找到缓存对象跟缓存对象为null是两回事。

3.3.3、删除缓存对象:

删除缓存对象两个方法:removeCache(String cacheKey)与removeCache(String cachePoolName, String cacheKey)。

3.3.4、刷新缓存对象:

主要讲下flushData(String cachePoolName,final String cacheKey, boolean wait)方法,其中wait为true时,当前线程将阻塞,等待缓存刷新完成,false则立即返回。

package com.cangzhitao.memcache;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.cangzhitao.memcache.exception.CacheNotFoundException;

/**
 * 缓存管理工具
 * @author Administrator
 *
 */
public class CacheManage {

        static {
                init();
        }
        
        /**
         * 所有缓冲池的集合
         */
        private static ConcurrentMap<String, CachePool> poolMap = new ConcurrentHashMap<String, CachePool>();
        
        /**
         * 当前缓冲对象key集合
         */
        private static ConcurrentMap<String, ConcurrentMap<String, CacheObject>> cacheKeyMap = new ConcurrentHashMap<String, ConcurrentMap<String,CacheObject>>();
        
        
        /**
         * 获得缓存数据
         * @param cacheKey 缓存对象标示
         * @return 返回null标示缓存对象存在,值为null
         * @throws CacheNotFoundException 当缓存对象不存在时抛出异常
         */
        public static Object getCache(String cacheKey) throws CacheNotFoundException {
                return getCache(CachePool.DEFAULT_POOL_NAME, cacheKey);
        }
        
        /**
         * 获得缓存数据
         * @param cachePoolName 缓冲池名
         * @param cacheKey 缓存对象标示
         * @return 返回null标示缓存对象存在,值为null
         * @throws CacheNotFoundException 当缓存对象不存在时抛出异常
         */
        public static Object getCache(String cachePoolName, String cacheKey) throws CacheNotFoundException {
                CacheObject exist = existKey(cachePoolName, cacheKey);
                if(exist==null) {
                        throw new CacheNotFoundException(String.format("CachePool(%s) don't have this key(%s)!", cachePoolName, cacheKey));
                }
                CachePool pool = getCachePool(cachePoolName);
                return pool.getObject(cacheKey);
        }
        
        /**
         * 添加缓存数据,相同则会覆盖
         * @param cache
         * @return
         */
        public static Object addCache(CacheObject cache, Object value) {
                CachePool cachePool = getCachePool(cache.getCachePoolName());
                cacheKeyMap.get(cache.getCachePoolName()).put(cache.getKey(), cache);
                cachePool.addObject(cache.getKey(), value);
                return value;
        }
        
        public static Object addCache(CacheObject cache) {
                CachePool cachePool = getCachePool(cache.getCachePoolName());
                cacheKeyMap.get(cache.getCachePoolName()).put(cache.getKey(), cache);
                Object value = cache.flushData();
                cachePool.addObject(cache.getKey(), value);
                return value;
        }
        
        /**
         * 删除默认池中的缓冲
         * @param cacheKey 缓存标示
         */
        public static void removeCache(String cacheKey) {
                removeCache(CachePool.DEFAULT_POOL_NAME, cacheKey);
        }
        
        /**
         * 删除缓存
         * @param cachePoolName 缓存池名
         * @param cacheKey 缓存标示
         */
        public static void removeCache(String cachePoolName, String cacheKey) {
                CacheObject exist = existKey(cachePoolName, cacheKey);
                if(exist==null) {
                        return;
                }
                CachePool pool = getCachePool(cachePoolName);
                pool.removeObject(cacheKey);
                cacheKeyMap.get(cachePoolName).remove(cacheKey);
        }
        
        private static CachePool getCachePool(String poolName) {
                CachePool pool = poolMap.get(poolName);
                if(pool==null) {
                        pool = new CachePool(poolName);
                        poolMap.put(poolName, pool);
                        cacheKeyMap.put(poolName, new ConcurrentHashMap<String, CacheObject>());
                }
                return pool;
        }
        
        /**
         * 刷新缓存数据
         * @param poolName
         * @param cacheKey
         * @param wait true则阻塞等待刷新完成
         * @return
         */
        public static Object flushData(String cachePoolName,final String cacheKey, boolean wait) {
                final CacheObject exist = existKey(cachePoolName, cacheKey);
                if(exist==null) {
                        return null;
                }
                final CachePool pool = getCachePool(cachePoolName);
                if(wait) {
                        Object value = exist.flushData();
                        return pool.addObject(cacheKey, value);
                } else {
                        new Thread() {
                                public void run() {
                                        Object value = exist.flushData();
                                        pool.addObject(cacheKey, value);
                                }
                        }.start();
                        return null;
                }
        }
        
        public static Object flushData(String cacheKey, boolean wait) {
                return flushData(CachePool.DEFAULT_POOL_NAME, cacheKey, wait);
        }
        
        public static Object flushData(String cacheKey) {
                return flushData(cacheKey, false);
        }
        
        
        /**
         * 初始化监听程序,进行数据刷新
         */
        public static void init() {
                Thread thread = new Thread() {
                        public void run() {
                                while(true) {
                                        Iterator<String> poolit = cacheKeyMap.keySet().iterator();
                                        while(poolit.hasNext()) {
                                                ConcurrentMap<String, CacheObject> cacheMap = cacheKeyMap.get(poolit.next());
                                                Iterator<String> cacheit = cacheMap.keySet().iterator();
                                                while(cacheit.hasNext()) {
                                                        final CacheObject o = cacheMap.get(cacheit.next());
                                                        synchronized (o) {
                                                                if(o.checkFlush()==false) {
                                                                        continue;
                                                                }
                                                                new Thread() {
                                                                        public void run() {
                                                                                Object value = o.flushData();
                                                                                CachePool pool = getCachePool(o.getCachePoolName());
                                                                                pool.addObject(o.getKey(), value);
                                                                        }
                                                                        
                                                                }.start();
                                                        }
                                                }
                                        }
                                }
                        }
                };
                thread.setDaemon(true);
                thread.start();
        }
        
        public static CacheObject existKey(String cachePoolName, String cacheKey) {
                CacheObject exist = null;
                try {
                        exist = cacheKeyMap.get(cachePoolName).get(cacheKey);
                } catch (Exception e) {
                        return null;
                }
                return exist;
        }
        
}
package com.cangzhitao.memcache.exception;

public class CacheNotFoundException extends Exception {

        /**
         * 
         */
        private static final long serialVersionUID = -5996791022119850256L;

        public CacheNotFoundException(String message) {
                super(message);
        }

        

}
4、使用例子:

/**
 * 阅读最多文章
 */
private void addMostReadPageListCache() {
            ReadConfig readConfig = (ReadConfig) BlogConfigFactory.getBaseOption(ReadConfig.class, false);
            try {
                    IPostService postService = (IPostService) SpringUtil.getBean("postService");
                    Method method = null;
                    method = IPostService.class.getMethod("findPostMostReadListVO", new Class[]{int.class, int.class});
                    Object[] params = new Object[]{1, Integer.parseInt(readConfig.getMostReadPostSize())};
                    //热门文章,每天凌晨2点刷新一次
                    CacheObject cacheObject = new CacheObject(BlogIndexController.CACHE_POST_MOST_READ_LIST, FlushMode.ONTIME, postService, method, params, "0 0 2 * *  ?");
                    CacheManage.addCache(cacheObject);
            } catch (Exception e) {
                    e.printStackTrace();
            }
}

这是之前举的博客热门文章的例子,这里在web容器初始化的时候统计一次热门文章,然后这个模块数据就不需要你再去管了,他会在你指定的时间进行刷新。

5、注意事项:

如果存在缓存中的数据有些是hibernate延迟加载的,必须在session关闭之前将会要用到的数据加载进来,要不然会报延迟加载的异常。

可能会有些多线程的线程安全问题。一般来说,缓存这块比较难出现线程安全问题,出现了问题也不大,最多那一瞬间的数据没有取到最新的。

如果您发现有什么问题或者建议,欢迎Emali给我。

/upload/2014/11/24/memcache.zip

发表评论