×
文章路径: Java

java实现多线程、断点下载

发表于3年前(Jun 8, 2015 11:58:42 AM)  阅读 504  评论 0

分类: Java 类库工具

标签: 多线程下载 断点下载 Content-Disposition RANGE Content-Range

1、知识预备

平时我们用下载工具用的很多,下载工具具有多线程下载,断点续传等优点。多线程下载简单来说就是向服务器发起多个连接,一般来说服务器会给当前连接的每个客户端平均分配带宽,如果你的连接多,自然而然的速度就比别人快。不过现在很多服务器都有安全机制,一般会限制每个ip的连接数,碰到这种情况的话,就要靠代理ip来解决了。

使用多线程下载主要是发起连接时,向服务器说明此次连接需要获取的数据段,通过URLConnection类的setRequestProperty("RANGE","bytes=0-23245")来设定,如果服务器支持的话,在response的header里面将会带入Content-Range属性,里面标示了这次返回的字节范围。

断点续传的话就比较简单,无非记录本次下载的进度,下次连接时,还是通过制定数据字节范围,继续未完成的下载。麻烦的地方主要在于如何记录进度,记录的频率,还有异常的情况发生,这里异常的情况指的是,用户强行结束进程,电脑宕机断电等情况,如何在这些情况发生后进行恢复。

2、如何记录下载进度

本文实现的下载采用文件记录下载进度,每个线程都会生成一个相应的临时文件,以及线程的下载记录进度文件。下载记录进度文件里面标示两个信息,一个是临时文件的字节范围,另一个则是对应服务器的字节范围,如"0,10902;0,10902&",分号前的一组数字是临时文件的字节范围,后面的一组数字是服务器的字节范围,&是结束符,如果不存在&,标示改记录文件是不可用的(因为频繁读写记录文件,存在正要读的时候,该文件新的记录信息并为写完的情况)。考虑了下载任务改变线程数的情况,所以这样的数组可能是有多个的,本文虽没有具体实现线程数的动态改变,但还是考虑了这种情况。记录文件的写入频率是以下载的缓存大小为依据的。

同时为了应对异常情况的发生,对于下载进度记录文件的写入方式采取的是备份写入的方式。简单的来说是,先写入备份文件,然后删除原文件,再将备份文件重命名为原文件。这时如果发生异常情况,存在备份文件,检查备份文件信息是否正确,如正确,以备份文件为准进行恢复,否则使用前一次的原文件,这样重复下载的数据最多只是缓存的数据大小,影响最小。最终下载完文件合并时也采取类似的机制。

3、实现

该代码现在只是简单实现了基本功能,笔者也是学习,大概搭了下框架,并没有针对众多的下载协议进行完善,很多异常也没有处理,也没有加入重试机制。如果以后有兴趣的话,再慢慢完善。

package com.cangzhitao.download;

/**
 * 下载文件协议类型
 * @author Administrator
 *
 */
public enum DownloadProtocol {
	HTTP,FTP
}

package com.cangzhitao.download;

/**
 * 下载文件协议类型
 * @author Administrator
 *
 */
public enum DownloadProtocol {
	HTTP,FTP
}

package com.cangzhitao.download;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
 * 下载任务
 * @author Administrator
 *
 */
public class DownloadTask implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 4173642312135619019L;

	/**
	 * 下载文件的地址
	 */
	private String url;
	
	/**
	 * 下载文件的名称
	 */
	private String fileName;
	
	/**
	 * 下载文件的扩展名
	 */
	private String ext;
	
	/**
	 * 文件下载存放的目录
	 */
	private String filePath;
	
	/**
	 * 文件下载线程数
	 */
	private int threadNum;
	
	/**
	 * 文件下载协议
	 */
	private DownloadProtocol downloadProtocol = DownloadProtocol.HTTP;
	
	/**
	 * 下载任务的状态
	 */
	private DownloadStatus downloadStatus = DownloadStatus.READY;
	
	/**
	 * 临时文件目录
	 */
	private String tempDir;
	
	/**
	 * 缓存文件大小
	 */
	private int buffSize = 1024;
	
	/**
	 * 文件大小,有可能有些下载协议在开始时获取不到下载文件大小
	 */
	private long fileSize;
	
	
	public DownloadTask() {
		
	}
	
	
	
	public DownloadTask(String url, String fileName, String filePath,
			int threadNum) {
		super();
		this.url = url;
		this.fileName = fileName;
		this.filePath = filePath;
		this.threadNum = threadNum;
		this.tempDir = filePath;
		
	}
	
	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getFileName() {
		return fileName;
	}

	public void setFileName(String fileName) {
		this.fileName = fileName;
	}

	public String getExt() {
		return ext;
	}

	public void setExt(String ext) {
		this.ext = ext;
	}

	public String getFilePath() {
		return filePath;
	}

	public void setFilePath(String filePath) {
		this.filePath = filePath;
	}

	public int getThreadNum() {
		return threadNum;
	}

	public void setThreadNum(int threadNum) {
		this.threadNum = threadNum;
	}



	public DownloadProtocol getDownloadProtocol() {
		return downloadProtocol;
	}



	public void setDownloadProtocol(DownloadProtocol downloadProtocol) {
		this.downloadProtocol = downloadProtocol;
	}



	public String getTempDir() {
		return tempDir;
	}



	public void setTempDir(String tempDir) {
		this.tempDir = tempDir;
	}

	private List<DownloadThread> threadList;


	
	public class DownloadThread extends Thread {
		
		/**
		 * 线程编号
		 */
		private int no;
		
		/**
		 * 记录各线程下载的进度情况
		 */
		private List<Long[]> pos;
		
		/**
		 * 该进程任务是否已经完成后
		 */
		private boolean down = false;
		
		/**
		 * 剩余需要下载的字节数
		 */
		private long left;
		
		
		public DownloadThread(int no) {
			this.no = no;
			pos = getPosList();
			left = 0;
			for(int i=0;i<pos.size();i++) {
				Long[] p = pos.get(i);
				left = left+(p[1]-p[0]);
			}
		}
		
		/**
		 * 读取当前进度信息
		 * @return
		 */
		private List<Long[]> getPosList() {
			List<Long[]> list = new ArrayList<Long[]>();
			try {
				String dataInfo = readDownloadDataInfo(no);
				dataInfo = dataInfo.trim();
				String[] str1s = dataInfo.split("\\|");
				for(int i=0;i<str1s.length;i++) {
					String[] str2s = str1s[i].split(";");
					Long[] p = new Long[4];
					p[0] = Long.parseLong(str2s[0].split(",")[0]);
					p[1] = Long.parseLong(str2s[0].split(",")[1]);
					p[2] = Long.parseLong(str2s[1].split(",")[0]);
					p[3] = Long.parseLong(str2s[1].split(",")[1]);
					list.add(p);
				}
			} catch(Exception e) {
				e.printStackTrace();
			}
			return list;
		}
		
		public long getLeft() {
			return left;
		}
		
		
		public void run() {
			File tempFile = new File(tempDir+File.separator+getFileName()+"."+no);
			RandomAccessFile file = null;
			try {
				file = new RandomAccessFile(tempFile.getAbsolutePath(), "rw");
			} catch (Exception e) {
				e.printStackTrace();
			}
			
			List<Long[]> tempPos = getPosList();
			byte[] buff = new byte[getBuffSize()];
			for(int i=0;i<pos.size();i++) {
				URLConnection uc = null;
				InputStream is = null;
				
				Long[] p = pos.get(i);
				//本地文件的起始
				long istart = p[0];
				long iend = p[1];
				//文件流的起始
				long ostart = p[2];
				long oend = p[3];
				try {
					long start = -1;
					if(i==0) {
						start = ostart;
					} else {
						start = ostart-pos.get(i-1)[3];
					}
					long size = oend - ostart;

					uc = new URL(getUrl()).openConnection();
					//请求指定的字节范围
					uc.setRequestProperty("RANGE", "bytes="+start+"-"+(size+start));
					is = uc.getInputStream();
					
					file.seek(istart);
					
					int hasRead = 0;
					long sum = 0;
					while(sum<size) {
						int tempSize = buffSize;
						if(tempSize+sum>=size) {
							tempSize = (int) (size - sum);
						}
						buff = new byte[tempSize];
						//存在读取不够的情况
						while(true) {
							hasRead = is.read(buff);
							file.write(buff, 0, hasRead);
							sum += hasRead;
							tempPos.get(i)[0] += hasRead; 
							tempPos.get(i)[2] += hasRead;
							int error = buff.length-hasRead;
							if(error==0) {
								break;
							} else {
								buff = new byte[error];
							}
						}
						
						String dataInfo = "";
						left = 0;
						for(int j=0;j<tempPos.size();j++) {
							Long[] k = tempPos.get(j);
							dataInfo+= k[0]+","+k[1]+";"+k[2]+","+k[3]+"|";
							left = left + (k[1]-k[0]);
						}
						dataInfo = dataInfo.substring(0, dataInfo.length()-1);
						writeDownloadDataInfo(no, dataInfo);
					}
					
				}catch(Exception e) {
					e.printStackTrace();
				}
				try {
					is.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			try {
				file.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			down = true;
		}
		
		public synchronized void writeDownloadDataInfo(int no, String info) {
			try {
				File file = new File(tempDir+File.separator+getFileName()+".data."+no+".bak");
				BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)));
				bw.write(info+"&");
				bw.flush();
				bw.close();
				File sfile = new File(tempDir+File.separator+getFileName()+".data."+no);
				if(sfile.exists()) {
					sfile.delete();
				}
				file.renameTo(sfile);
			} catch(Exception e) {
				e.printStackTrace();
			}
		}
		
		public synchronized String readDownloadDataInfo(int no) {
			String info = "";
			try {
				RandomAccessFile file = new RandomAccessFile(tempDir+File.separator+getFileName()+".data."+no,"r");
				info = file.readLine();
				info = info.replace("&", "");
				file.close();
			} catch(Exception e) {
				e.printStackTrace();
			}
			return info;
		}

		public boolean isDown() {
			return down;
		}

		public void setDown(boolean down) {
			this.down = down;
		}
	}
	
	/**
	 * 检查是否已经下载完成
	 * @return
	 */
	public boolean checkDown() {
		if(downloadStatus == DownloadStatus.DOWN) {
			return true;
		}
		for(int i=0;i<threadList.size();i++) {
			DownloadThread thread = threadList.get(i);
			if(!thread.isDown()) {
				return false;
			}
		}
		downloadStatus = DownloadStatus.PREDOWN;
		return true;
	}
	
	/**
	 * 获取整个任务下载剩余字节数
	 * @return
	 */
	public long getLeftSize() {
		if(downloadStatus==DownloadStatus.DOWN) {
			return 0;
		}
		long left = 0;
		for(int i=0;i<threadList.size();i++) {
			DownloadThread thread = threadList.get(i);
			left += thread.getLeft();
		}
		return left;
	}
	
	/**
	 * 获取整个任务已经下载字节数
	 * @return
	 */
	public long getDownSize() {
		return fileSize - getLeftSize();
	}
	
	/**
	 * 合并临时文件
	 * 先建立.tmp文件,完成后再删掉线程临时文件,避免合并过程异常终止时丢失数据
	 */
	public void merge() {
		if(downloadStatus!=DownloadStatus.PREDOWN) {
			return;
		}
		FileOutputStream file = null;
		try {
			File tempFilePre = new File(filePath+File.separator+getFileName()+".tmp");
			File targetFile = new File(filePath+File.separator+getFileName());
			file = new FileOutputStream(tempFilePre);
			byte[] buff = new byte[getBuffSize()];
			for(int i=0;i<threadNum;i++) {
				File tmpFile = new File(tempDir+File.separator+getFileName()+"."+i);
				FileInputStream fis = new FileInputStream(tmpFile);
				int hasRead = 0;
				long size = tmpFile.length(); 
				int mod = (int) (size%buffSize);
				int count = (int) (size/buffSize);
				if(mod!=0) {
					count++;
				}
				while(count>0) {
					if(count==1) {
						if(mod==0) {
							buff = new byte[buffSize];
						} else {
							buff = new byte[mod];
						}
					} else {
						buff = new byte[buffSize];
					}
					count--;
					
					while(true) {
						hasRead = fis.read(buff);
						file.write(buff, 0, hasRead);
						file.flush();
						int error = buff.length-hasRead;
						if(error==0) {
							break;
						} else {
							buff = new byte[error];
						}
					}
					
				}
				fis.close();
				
			}
			file.close();
			
			if(targetFile.exists()) {
				targetFile.delete();
			}
			tempFilePre.renameTo(targetFile);
			
			delTempFile();
		} catch (Exception e) {
			e.printStackTrace();
		}
		downloadStatus = DownloadStatus.DOWN;
		
	}
	
	/**
	 * 删除临时文件
	 */
	private void delTempFile() {
		String tempFile = tempDir+File.separator+getFileName();
		for(int i=0;i<threadNum;i++) {
			File delFile = new File(tempFile+"."+i);
			delFile.delete();
			delFile = new File(tempFile+".data."+i);
			delFile.delete();
		}
	}
	
	/**
	 * 开始下载任务
	 */
	public void start() {
		parserDownloadInfo();
		boolean hasDown = checkHasDown();
		if(hasDown) {
			return;
		}
		if(downloadStatus==DownloadStatus.READY) {
			preStart();
		}
		if(downloadStatus==DownloadStatus.PAUSE) {
			checkDataInfoBack();
			downloadStatus = DownloadStatus.WORKING;
			threadList = new ArrayList<DownloadTask.DownloadThread>();
			long eachSize = Math.round(fileSize/threadNum);
			long sum = 0;
			for(int i=0;i<threadNum;i++) {
				long tempSize = eachSize;
				if(i==threadNum) {
					tempSize = fileSize-sum;
				}
				sum = sum + tempSize; 
				DownloadThread thread = new DownloadThread(i);
				threadList.add(thread);
				thread.start();
			}
		}
	}
	
	/**
	 * 检查备份文件
	 */
	private void checkDataInfoBack() {
		//检查各线程的进度情况
		for(int i=0;i<threadNum;i++) {
			File file = new File(tempDir+File.separator+getFileName()+".data."+i+".bak");
			if(file.exists()) {
				String info = readFileContent(file);
				if(info.endsWith("&")) {
					File sfile = new File(tempDir+File.separator+getFileName()+".data."+i);
					if(sfile.exists()) {
						sfile.delete();
					}
					file.renameTo(sfile);
				} else {
					file.delete();
				}
			}
		}
	}
	
	/**
	 * 检查是否已经完成
	 */
	private boolean checkHasDown() {
		File targetFile = new File(getFilePath()+File.separator+getFileName());
		File tmpFile = new File(getFilePath()+File.separator+getFileName()+".tmp");
		if(targetFile.exists()) {
			if(targetFile.length()==getFileSize()) {
				downloadStatus = DownloadStatus.DOWN;
				delTempFile();
				return true;
			} else {
				downloadStatus = DownloadStatus.PREDOWN;
				merge();
				return true;
			}
		} else {
			if(tmpFile.exists()) {
				if(tmpFile.length()==getFileSize()) {
					tmpFile.renameTo(targetFile);
					delTempFile();
					return true;
				}
			}
		}
		return false;
	}
	
	private void parserDownloadInfo() {
		try {
			URL url = new URL(getUrl());
			HttpURLConnection conn = (HttpURLConnection) url.openConnection();
			String contentLength =  conn.getHeaderField("Content-Length");
			fileSize = Long.parseLong(contentLength);
			if(fileName==null) {
				String contentType = conn.getHeaderField("Content-Type");
				if("application/octet-stream".equals(contentType)) {
					String cd = conn.getHeaderField("Content-Disposition");
					if(cd==null) {
						cd = "";
					}
					String regex = "filename\\s*=\\s*\"(.*)\"";
					Pattern pattern = Pattern.compile(regex);
					Matcher matcher = pattern.matcher(cd);
					if(matcher.find()) {
						fileName = matcher.group(1);
						fileName = URLDecoder.decode(fileName,"UTF-8");
					}
				}
			}
			if(fileName==null) {
				fileName = getUrl().substring(getUrl().lastIndexOf("/"));
				fileName = URLDecoder.decode(fileName,"UTF-8");
			}
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 正式开始下载时,解析文件大小,文件名等
	 */
	public void preStart() {
		if(downloadStatus!=DownloadStatus.READY) {
			return;
		}
		try {
			
			long eachSize = (long) Math.ceil(Double.valueOf(fileSize)/Double.valueOf(threadNum));
			long sum = 0;
			
			RandomAccessFile file = new RandomAccessFile(getFilePath()+File.separator+getFileName()+".tmp", "rw");
			file.close();
			
			for(int i=0;i<threadNum;i++) {
				
				long tempSize = eachSize;
				if(i==threadNum-1) {
					tempSize = fileSize-sum;
				}
				String dataInfo = "0"+","+tempSize+";"+sum+","+(sum+tempSize);
				writeDownloadDataInfo(i, dataInfo);
				file = new RandomAccessFile(tempDir+File.separator+getFileName()+"."+i, "rw");
//				file.setLength(tempSize);
				file.close();
				sum = sum + tempSize;
			}
			
			
		} catch(Exception e) {
			e.printStackTrace();
		}
		downloadStatus = DownloadStatus.PAUSE;
	}
	
	public String readFileContent(File file) {
		String info = "";
		try {
			if(!file.exists()) {
				return "";
			}
			RandomAccessFile rfile = new RandomAccessFile(file.getAbsolutePath(),"r");
			info = rfile.readLine();
			rfile.close();
		} catch(Exception e) {
			e.printStackTrace();
		}
		return info;
		
	}
	
	
	public void writeDownloadDataInfo(int no, String info) {
		try {
			File file = new File(tempDir+File.separator+getFileName()+".data."+no+".bak");
			BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)));
			bw.write(info+"&");
			bw.flush();
			bw.close();
			File sfile = new File(tempDir+File.separator+getFileName()+".data."+no);
			if(sfile.exists()) {
				sfile.delete();
			}
			file.renameTo(sfile);
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
	
	



	public long getFileSize() {
		return fileSize;
	}



	public void setFileSize(long fileSize) {
		this.fileSize = fileSize;
	}



	public int getBuffSize() {
		return buffSize;
	}



	public void setBuffSize(int buffSize) {
		this.buffSize = buffSize;
	}
	
	public static void main(String[] args) throws Exception {
		long s1 = new Date().getTime();
//		DownloadTask task = new DownloadTask("http://blog.cangzhitao.com/upload/2014/11/24/MultiThreadJob.zip", "a.zip", "c:/download", 1);
		//http://dldir1.qq.com/qqfile/qq/QQ7.2/14810/QQ7.2.exe
//		String url = "http://mirrors.ustc.edu.cn/eclipse/technology/epp/downloads/release/luna/SR2/eclipse-jee-luna-SR2-win32.zip";
		String url = "http://cdn2.ime.sogou.com/e64393a612f44c5a68585ea008ead0b2/5575015e/dl/index/1433389035/sogou_pinyin_76b.exe";
//		String url = "http://blog.cangzhitao.com/upload/2014/11/24/MultiThreadJob.zip";
		DownloadTask task = new DownloadTask(url, null, "c:/download", 1);
		
//		task.downloadStatus = DownloadStatus.PAUSE;
		
		task.setThreadNum(8);
		task.start();
		while(!task.checkDown()) {
			System.out.print(Double.valueOf(task.getDownSize()*100)/Double.valueOf(task.getFileSize())+"\r");
			Thread.sleep(1000);
		}
		task.merge();
		
		/*task.downloadStatus=DownloadStatus.PREDOWN;
		task.merge();*/
//		task.setThreadNum(4);
//		task.testM();
		
		/*DownloadTask task = new DownloadTask("http://blog.cangzhitao.com/upload/2014/11/24/MultiThreadJob.zip", "a.zip", "c:/download", 1);
		task.writeDownloadDataInfo(0, "aaa");
		System.out.println(task.readDownloadDataInfo(0));;*/
		
		long s2 = new Date().getTime();
		System.out.println(s2-s1);
	}



	public List<DownloadThread> getThreadList() {
		return threadList;
	}



	public void setThreadList(List<DownloadThread> threadList) {
		this.threadList = threadList;
	}
}

 

发表评论