转自 http://www.blogjava.net/canvas/articles/bandwidthlimiter.html
这里简单的讨论一下java设计网络程序中如何控制上传和下载速度,我们常见的FTP,HTTP,BT等协议都是TCP的,但是现在流行的utorrent却基于UDP实现了自己UTP协议(UDP+拥塞控制),不管使用什么协议,站在I/O的角度来说,限速的控制思路都是一样的。
思路很简单,如下:
1.假设下载或者上传速度上限是m (KB/s),那么发送一个固定的字节数据(假设是n字节)的时间花费是:n/m;
2.假设现在要发送n字节的数据,那么理论所需的时间应该是n/m,而在实际情况下,发送n字节的数据只花费了t秒,那么发送该发送线程就应该睡眠n/m-t秒,这样就基本实现了速度的控制。
代码以TCP为例
速度控制
思路很简单,如下:
1.假设下载或者上传速度上限是m (KB/s),那么发送一个固定的字节数据(假设是n字节)的时间花费是:n/m;
2.假设现在要发送n字节的数据,那么理论所需的时间应该是n/m,而在实际情况下,发送n字节的数据只花费了t秒,那么发送该发送线程就应该睡眠n/m-t秒,这样就基本实现了速度的控制。
代码以TCP为例
速度控制
1
package
com.actiontec.net.bandwidth;
2
3 /**
4 *
5 * @author Le
6 *
7 */
8 public class BandwidthLimiter {
9
10 /* KB */
11 private static Long KB = 1024l ;
12
13 /* The smallest count chunk length in bytes */
14 private static Long CHUNK_LENGTH = 1024l ;
15
16 /* How many bytes will be sent or receive */
17 private int bytesWillBeSentOrReceive = 0 ;
18
19 /* When the last piece was sent or receive */
20 private long lastPieceSentOrReceiveTick = System.nanoTime();
21
22 /* Default rate is 1024KB/s */
23 private int maxRate = 1024 ;
24
25 /* Time cost for sending CHUNK_LENGTH bytes in nanoseconds */
26 private long timeCostPerChunk = ( 1000000000l * CHUNK_LENGTH)
27 / ( this .maxRate * KB);
28
29 /**
30 * Initialize a BandwidthLimiter object with a certain rate.
31 *
32 * @param maxRate
33 * the download or upload speed in KBytes
34 */
35 public BandwidthLimiter( int maxRate) {
36 this .setMaxRate(maxRate);
37 }
38
39 /**
40 * Set the max upload or download rate in KB/s. maxRate must be grater than
41 * 0. If maxRate is zero, it means there is no bandwidth limit.
42 *
43 * @param maxRate
44 * If maxRate is zero, it means there is no bandwidth limit.
45 * @throws IllegalArgumentException
46 */
47 public synchronized void setMaxRate( int maxRate)
48 throws IllegalArgumentException {
49 if (maxRate < 0 ) {
50 throw new IllegalArgumentException( " maxRate can not less than 0 " );
51 }
52 this .maxRate = maxRate < 0 ? 0 : maxRate;
53 if (maxRate == 0 )
54 this .timeCostPerChunk = 0 ;
55 else
56 this .timeCostPerChunk = ( 1000000000l * CHUNK_LENGTH)
57 / ( this .maxRate * KB);
58 }
59
60 /**
61 * Next 1 byte should do bandwidth limit.
62 */
63 public synchronized void limitNextBytes() {
64 this .limitNextBytes( 1 );
65 }
66
67 /**
68 * Next len bytes should do bandwidth limit
69 *
70 * @param len
71 */
72 public synchronized void limitNextBytes( int len) {
73 this .bytesWillBeSentOrReceive += len;
74
75 /* We have sent CHUNK_LENGTH bytes */
76 while ( this .bytesWillBeSentOrReceive > CHUNK_LENGTH) {
77 long nowTick = System.nanoTime();
78 long missedTime = this .timeCostPerChunk
79 - (nowTick - this .lastPieceSentOrReceiveTick);
80 if (missedTime > 0 ) {
81 try {
82 Thread.sleep(missedTime / 1000000 ,
83 ( int ) (missedTime % 1000000 ));
84 } catch (InterruptedException e) {
85 e.printStackTrace();
86 }
87 }
88 this .bytesWillBeSentOrReceive -= CHUNK_LENGTH;
89 this .lastPieceSentOrReceiveTick = nowTick
90 + (missedTime > 0 ? missedTime : 0 );
91 }
92 }
93 }
94
2
3 /**
4 *
5 * @author Le
6 *
7 */
8 public class BandwidthLimiter {
9
10 /* KB */
11 private static Long KB = 1024l ;
12
13 /* The smallest count chunk length in bytes */
14 private static Long CHUNK_LENGTH = 1024l ;
15
16 /* How many bytes will be sent or receive */
17 private int bytesWillBeSentOrReceive = 0 ;
18
19 /* When the last piece was sent or receive */
20 private long lastPieceSentOrReceiveTick = System.nanoTime();
21
22 /* Default rate is 1024KB/s */
23 private int maxRate = 1024 ;
24
25 /* Time cost for sending CHUNK_LENGTH bytes in nanoseconds */
26 private long timeCostPerChunk = ( 1000000000l * CHUNK_LENGTH)
27 / ( this .maxRate * KB);
28
29 /**
30 * Initialize a BandwidthLimiter object with a certain rate.
31 *
32 * @param maxRate
33 * the download or upload speed in KBytes
34 */
35 public BandwidthLimiter( int maxRate) {
36 this .setMaxRate(maxRate);
37 }
38
39 /**
40 * Set the max upload or download rate in KB/s. maxRate must be grater than
41 * 0. If maxRate is zero, it means there is no bandwidth limit.
42 *
43 * @param maxRate
44 * If maxRate is zero, it means there is no bandwidth limit.
45 * @throws IllegalArgumentException
46 */
47 public synchronized void setMaxRate( int maxRate)
48 throws IllegalArgumentException {
49 if (maxRate < 0 ) {
50 throw new IllegalArgumentException( " maxRate can not less than 0 " );
51 }
52 this .maxRate = maxRate < 0 ? 0 : maxRate;
53 if (maxRate == 0 )
54 this .timeCostPerChunk = 0 ;
55 else
56 this .timeCostPerChunk = ( 1000000000l * CHUNK_LENGTH)
57 / ( this .maxRate * KB);
58 }
59
60 /**
61 * Next 1 byte should do bandwidth limit.
62 */
63 public synchronized void limitNextBytes() {
64 this .limitNextBytes( 1 );
65 }
66
67 /**
68 * Next len bytes should do bandwidth limit
69 *
70 * @param len
71 */
72 public synchronized void limitNextBytes( int len) {
73 this .bytesWillBeSentOrReceive += len;
74
75 /* We have sent CHUNK_LENGTH bytes */
76 while ( this .bytesWillBeSentOrReceive > CHUNK_LENGTH) {
77 long nowTick = System.nanoTime();
78 long missedTime = this .timeCostPerChunk
79 - (nowTick - this .lastPieceSentOrReceiveTick);
80 if (missedTime > 0 ) {
81 try {
82 Thread.sleep(missedTime / 1000000 ,
83 ( int ) (missedTime % 1000000 ));
84 } catch (InterruptedException e) {
85 e.printStackTrace();
86 }
87 }
88 this .bytesWillBeSentOrReceive -= CHUNK_LENGTH;
89 this .lastPieceSentOrReceiveTick = nowTick
90 + (missedTime > 0 ? missedTime : 0 );
91 }
92 }
93 }
94
下载控制
1
package
com.actiontec.net.bandwidth;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5
6 /**
7 * @author Le
8 *
9 */
10 public class DownloadLimiter extends InputStream {
11 private InputStream is = null ;
12 private BandwidthLimiter bandwidthLimiter = null ;
13
14 public DownloadLimiter(InputStream is, BandwidthLimiter bandwidthLimiter)
15 {
16 this .is = is;
17 this .bandwidthLimiter = bandwidthLimiter;
18 }
19 @Override
20 public int read() throws IOException {
21 if ( this .bandwidthLimiter != null )
22 this .bandwidthLimiter.limitNextBytes();
23 return this .is.read();
24 }
25
26 public int read( byte b[], int off, int len) throws IOException
27 {
28 if (bandwidthLimiter != null )
29 bandwidthLimiter.limitNextBytes(len);
30 return this .is.read(b, off, len);
31 }
32 }
2
3 import java.io.IOException;
4 import java.io.InputStream;
5
6 /**
7 * @author Le
8 *
9 */
10 public class DownloadLimiter extends InputStream {
11 private InputStream is = null ;
12 private BandwidthLimiter bandwidthLimiter = null ;
13
14 public DownloadLimiter(InputStream is, BandwidthLimiter bandwidthLimiter)
15 {
16 this .is = is;
17 this .bandwidthLimiter = bandwidthLimiter;
18 }
19 @Override
20 public int read() throws IOException {
21 if ( this .bandwidthLimiter != null )
22 this .bandwidthLimiter.limitNextBytes();
23 return this .is.read();
24 }
25
26 public int read( byte b[], int off, int len) throws IOException
27 {
28 if (bandwidthLimiter != null )
29 bandwidthLimiter.limitNextBytes(len);
30 return this .is.read(b, off, len);
31 }
32 }
同样,上传控制
1
package
com.actiontec.net.bandwidth;
2
3 import java.io.IOException;
4 import java.io.OutputStream;
5
6 /**
7 * @author Le
8 *
9 */
10 public class UploadLimiter extends OutputStream {
11 private OutputStream os = null ;
12 private BandwidthLimiter bandwidthLimiter = null ;
13
14 public UploadLimiter(OutputStream os, BandwidthLimiter bandwidthLimiter)
15 {
16 this .os = os;
17 this .bandwidthLimiter = bandwidthLimiter;
18 }
19
20 @Override
21 public void write( int b) throws IOException {
22 if (bandwidthLimiter != null )
23 bandwidthLimiter.limitNextBytes();
24 this .os.write(b);
25 }
26
27 public void write( byte [] b, int off, int len) throws IOException {
28 if (bandwidthLimiter != null )
29 bandwidthLimiter.limitNextBytes(len);
30 this .os.write(b, off, len);
31 }
32
33 }
2
3 import java.io.IOException;
4 import java.io.OutputStream;
5
6 /**
7 * @author Le
8 *
9 */
10 public class UploadLimiter extends OutputStream {
11 private OutputStream os = null ;
12 private BandwidthLimiter bandwidthLimiter = null ;
13
14 public UploadLimiter(OutputStream os, BandwidthLimiter bandwidthLimiter)
15 {
16 this .os = os;
17 this .bandwidthLimiter = bandwidthLimiter;
18 }
19
20 @Override
21 public void write( int b) throws IOException {
22 if (bandwidthLimiter != null )
23 bandwidthLimiter.limitNextBytes();
24 this .os.write(b);
25 }
26
27 public void write( byte [] b, int off, int len) throws IOException {
28 if (bandwidthLimiter != null )
29 bandwidthLimiter.limitNextBytes(len);
30 this .os.write(b, off, len);
31 }
32
33 }
对于一个TCP socket
1
ServerSocket socket
=
new
ServerSocket();
2 // 其它初始化略
2 // 其它初始化略
1
//
从socket中以一定的速率读数据
2 // ```java
3 DownloadLimiter dl = new DownloadLimiter(socket.getInputStream(), new BandwidthLimiter( 6250 ));
4 is = new DataInputStream(dl);
5
6 // 读数据
7 int len = is.readInt();
8 ByteBuffer buffer = ByteBuffer.allocate( 4 + len);
9 buffer.putInt(len);
10 is.readFully(buffer.array(), 4 , buffer.remaining());
11 // ```
12
13 // 以一定的速率写数据到socket
14 // ```java
15 UploadLimiter ul = new UploadLimiter(socket.getOutputStream(), new BandwidthLimiter( 6250 ));
16 ul.write(
);
17 // ```
2 // ```java
3 DownloadLimiter dl = new DownloadLimiter(socket.getInputStream(), new BandwidthLimiter( 6250 ));
4 is = new DataInputStream(dl);
5
6 // 读数据
7 int len = is.readInt();
8 ByteBuffer buffer = ByteBuffer.allocate( 4 + len);
9 buffer.putInt(len);
10 is.readFully(buffer.array(), 4 , buffer.remaining());
11 // ```
12
13 // 以一定的速率写数据到socket
14 // ```java
15 UploadLimiter ul = new UploadLimiter(socket.getOutputStream(), new BandwidthLimiter( 6250 ));
16 ul.write(

17 // ```