博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java Nio 多线程网络下载
阅读量:6292 次
发布时间:2019-06-22

本文共 10382 字,大约阅读时间需要 34 分钟。

--> 默认最多50个线程 同一文件下载失败延迟超过30秒就结束下载

--> 下载5分钟超时时间,假设5分钟内未下载完就结束下载

--> 依赖 commons-httpclient 与 commons-io 包

package com.leunpha;import org.apache.commons.httpclient.HttpClient;import org.apache.commons.httpclient.methods.GetMethod;import org.apache.commons.httpclient.params.HttpClientParams;import org.apache.commons.io.IOUtils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.*;import java.lang.reflect.Method;import java.nio.MappedByteBuffer;import java.nio.channels.FileChannel;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.Observable;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;import java.util.zip.ZipFile;/** * User: zhoujingjie * Date: 14-4-18 * Time: 下午12:52 */public class Downloader extends Observable {    protected String url, savePath;             //下载地址与保存路径    protected FileChannel channel;              //保存文件的通道    protected long size, perSize;              //文件大小与每一个小文件的大小    protected volatile long downloaded;       // 已下载的    protected int connectCount;                 //连接数    protected Connection[] connections;         //连接对象    protected boolean isSupportRange;         //是否支持断点下载    protected long timeout;                     //超时    protected boolean exists;                   //是否存在    private RandomAccessFile randomAccessFile;    protected volatile boolean stop;            //停止    private static volatile  boolean exception; //是否异常    private AtomicLong prevDownloaded = new AtomicLong(0); //上一次的下载结果    private static Log log = LogFactory.getLog(Downloader.class);    private AtomicInteger loseNum = new AtomicInteger(0);    private int maxThread;    public Downloader(String url, String savePath) throws IOException {        //超时一小时        this(url, savePath, 1000 * 60*5,50);    }    public Downloader(String url, String savePath, long timeout,int maxThread) throws FileNotFoundException {        this.timeout = timeout;        this.url = url;        this.maxThread = maxThread;        File file = new File(savePath);        if (!file.exists()) file.mkdirs();        this.savePath= file.getAbsolutePath() + "/" + url.substring(url.lastIndexOf("/"));        exists = new File(this.savePath).exists();        if(!exists){            randomAccessFile=   new RandomAccessFile(this.savePath+".temp", "rw");            channel =randomAccessFile.getChannel();        }    }    public GetMethod method(long start, long end) throws IOException {        GetMethod method = new GetMethod(Downloader.this.url);        method.setRequestHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36");        if (end > 0) {            method.setRequestHeader("Range", "bytes=" + start + "-" + (end - 1));        } else {            method.setRequestHeader("Range", "bytes=" + start + "-");        }        HttpClientParams clientParams = new HttpClientParams();        //5秒超时        clientParams.setConnectionManagerTimeout(5000);        HttpClient client = new HttpClient(clientParams);        client.executeMethod(method);        int statusCode = method.getStatusCode();        if (statusCode >= 200 && statusCode < 300) {            isSupportRange = (statusCode == 206) ? true : false;        }        return method;    }    public void init() throws IOException {        size = method(0, -1).getResponseContentLength();        if (isSupportRange) {            if (size < 4 * 1024 * 1024) {  //假设小于4M                connectCount = 1;            } else if (size < 10 * 1024 * 1024) { //假设文件小于10M 则两个连接                connectCount = 2;            } else if (size < 30 * 1024 * 1024) { //假设文件小于80M 则使用6个连接                connectCount = 3;            } else if (size < 60 * 1024 * 1024) {          //假设小于60M 则使用10个连接                connectCount = 4;            } else {                //否则为10个连接                connectCount = 5;            }        } else {            connectCount = 1;        }        log.debug(String.format("%s size:%s connectCount:%s", this.url, this.size, this.connectCount));        perSize = size / connectCount;        connections = new Connection[connectCount];        long offset = 0;        for (int i = 0; i < connectCount - 1; i++) {            connections[i] = new Connection(offset, offset + perSize);            offset += perSize;        }        connections[connectCount - 1] = new Connection(offset, size);    }    /**     * 强制释放内存映射     *     * @param mappedByteBuffer     */    static void unmapFileChannel(final MappedByteBuffer mappedByteBuffer) {        try {            if (mappedByteBuffer == null) {                return;            }            mappedByteBuffer.force();            AccessController.doPrivileged(new PrivilegedAction() {                @Override                public Object run() {                    try {                        Method getCleanerMethod = mappedByteBuffer.getClass().getMethod("cleaner", new Class[0]);                        getCleanerMethod.setAccessible(true);                        sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedByteBuffer, new Object[0]);                        cleaner.clean();                    } catch (Exception e) {                        //LOG.error("unmapFileChannel." + e.getMessage());                    }                    return null;                }            });        } catch (Exception e) {            log.debug("异常->exception=true");            exception = true;            log.error(e);        }    }    private void timer() {        Timer timer = new Timer();        //延迟3秒,3秒执行一次        timer.schedule(new TimerTask() {            @Override            public void run() {                log.debug(String.format("已下载-->%s -> %s",(((double) downloaded) / size * 100) + "%", url));                //假设上一次的下载大小与当前的一样就退出                if(prevDownloaded.get() ==downloaded && downloaded
=10){ log.debug(String.format("上次下载%s与当前下载%s一致,exception->true url:%s ",prevDownloaded.get(),downloaded,url)); exception = true; } } //假设下载完毕或者异常就退出 if(downloaded>=size || exception){ stop = true; cancel(); } //设置上次下载的大小等于如今的大小 prevDownloaded.set(downloaded); } },3000,3000); } public void start() throws IOException { if (exists) { log.info("文件已存在." + this.url); Thread.currentThread().interrupt(); return; } while (Thread.activeCount()>maxThread){ try { Thread.sleep(1000); } catch (InterruptedException e) {} } init(); timer(); CountDownLatch countDownLatch = new CountDownLatch(connections.length); log.debug("開始下载:" + url); for (int i = 0; i < connections.length; i++) { new DownloadPart(countDownLatch, i).start(); } end(countDownLatch); } private boolean rename(File tempFile){ File file = new File(this.savePath); boolean isRename=tempFile.renameTo(file); if(!isRename){ try { IOUtils.copy(new FileInputStream(tempFile),new FileOutputStream(file)); } catch (IOException e) { log.error(e); } } return true; } public void end(CountDownLatch countDownLatch){ try { //超过指定时间就直接结束 countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { exception = true; log.error(e); log.info("下载失败:"+this.url); } finally { try { channel.force(true); channel.close(); randomAccessFile.close(); } catch (IOException e) { log.error(e); } File temp = new File(this.savePath+".temp"); log.debug(String.format("%s %s", exception, this.url)); //假设有异常则删除已下载的暂时文件 if(exception){ if(!temp.delete()){ if(temp!=null)temp.delete(); } }else{ try { Thread.sleep(100); } catch (InterruptedException e) {} rename(temp); setChanged(); notifyObservers(this.url); log.info("下载成功:"+this.url); } } } private class Connection { long start, end; public Connection(long start, long end) { this.start = start; this.end = end; } public InputStream getInputStream() throws IOException { return method(start, end).getResponseBodyAsStream(); } } private class DownloadPart implements Runnable { CountDownLatch countDownLatch; int i; public DownloadPart(CountDownLatch countDownLatch, int i) { this.countDownLatch = countDownLatch; this.i = i; } public void start() { new Thread(this).start(); } @Override public void run() { MappedByteBuffer buffer = null; InputStream is = null; try { is = connections[i].getInputStream(); buffer = channel.map(FileChannel.MapMode.READ_WRITE, connections[i].start, connections[i].end - connections[i].start); byte[] bytes = new byte[4 * 1024]; int len; while ((len = is.read(bytes)) != -1 && !exception && !stop) { buffer.put(bytes, 0, len); downloaded+= len; } log.debug(String.format("file block had downloaded.%s %s",i,url)); } catch (IOException e) { log.error(e); } finally { unmapFileChannel(buffer); if(buffer != null)buffer.clear(); if (is != null) try { is.close(); } catch (IOException e) { } countDownLatch.countDown(); } } }}

转载地址:http://epcta.baihongyu.com/

你可能感兴趣的文章
用CSS画一个带阴影的三角形
查看>>
前端Vue:函数式组件
查看>>
程鑫峰:1.26特朗.普力挺美元力挽狂澜,伦敦金行情分析
查看>>
safari下video标签无法播放视频的问题
查看>>
01 iOS中UISearchBar 如何更改背景颜色,如何去掉两条黑线
查看>>
对象的继承及对象相关内容探究
查看>>
Spring: IOC容器的实现
查看>>
Serverless五大优势,成本和规模不是最重要的,这点才是
查看>>
Nginx 极简入门教程!
查看>>
iOS BLE 开发小记[4] 如何实现 CoreBluetooth 后台运行模式
查看>>
Item 23 不要在代码中使用新的原生态类型(raw type)
查看>>
为网页添加留言功能
查看>>
JavaScript—数组(17)
查看>>
Android 密钥保护和 C/S 网络传输安全理论指南
查看>>
以太坊ERC20代币合约优化版
查看>>
Why I Began
查看>>
同一台电脑上Windows 7和Ubuntu 14.04的CPU温度和GPU温度对比
查看>>
js数组的操作
查看>>
springmvc Could not write content: No serializer
查看>>
Python系语言发展综述
查看>>