您要我使用OkHttp实现一个具有多线程、多任务、限速、暂停恢复和进度显示功能的下载器。我需要设计一个完整的下载框架。让我规划一下:

系统架构

  1. DownloadManager - 全局下载管理器,处理多任务调度
  2. DownloadTask - 单个下载任务,包含URL、目标文件路径等信息
  3. DownloadWorker - 工作线程,处理实际下载逻辑
  4. DownloadListener - 回调接口,通知下载进度、状态变化
  5. RateLimiter - 限速器,控制下载速度
  6. TaskStorage - 任务持久化存储,用于断点续传

主要类设计

1. DownloadManager

  • 管理下载任务队列
  • 控制并发下载数量
  • 提供添加、暂停、恢复、取消任务方法

2. DownloadTask

  • 存储下载URL、目标路径、文件大小等信息
  • 记录下载状态(等待、下载中、暂停、完成、失败)
  • 保存断点续传所需信息(已下载部分)

3. DownloadWorker

  • 使用OkHttp执行HTTP请求
  • 支持Range请求实现断点续传
  • 将下载内容写入文件

4. RateLimiter

  • 限制下载速度
  • 可以根据网络状况动态调整

5. 断点续传

  • 通过HTTP Range头实现
  • 保存已下载部分信息

6. 多线程下载

  • 将一个文件分成多个部分
  • 每个部分由单独线程下载
  • 合并各部分得到完整文件

让我开始编写代码实现这些功能。

使用OkHttp实现多功能下载框架

下面是一个基于OkHttp实现的功能完整的下载框架,支持您需要的所有功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
// DownloadManager.kt

import kotlinx.coroutines.*
import okhttp3.*
import java.io.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

/**
* 下载状态
*/
enum class DownloadStatus {
IDLE, // 空闲状态
WAITING, // 等待下载
DOWNLOADING,// 下载中
PAUSED, // 暂停状态
COMPLETED, // 下载完成
FAILED, // 下载失败
CANCELED // 下载取消
}

/**
* 下载监听器
*/
interface DownloadListener {
fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long)
fun onStatusChanged(task: DownloadTask, status: DownloadStatus)
fun onSuccess(task: DownloadTask, file: File)
fun onFailed(task: DownloadTask, e: Exception)
}

/**
* 下载任务
*/
class DownloadTask(
val id: String, // 任务唯一ID
val url: String, // 下载URL
val savePath: String, // 保存路径
val fileName: String, // 文件名
var totalBytes: Long = 0, // 文件总大小
var downloadedBytes: Long = 0, // 已下载大小
var threadCount: Int = 3, // 线程数
var status: DownloadStatus = DownloadStatus.IDLE, // 下载状态
var ranges: MutableList<Pair<Long, Long>> = mutableListOf() // 分片信息
) {
// 获取进度百分比
fun getProgress(): Int {
return if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else 0
}

// 获取下载文件
fun getFile(): File = File(savePath, fileName)

// 获取临时文件
fun getTempFile(index: Int): File = File(savePath, "$fileName.part$index")
}

/**
* 下载速度限制器
*/
class RateLimiter(private var bytesPerSecond: Long) {
private val byteCount = AtomicLong(0)
private var lastResetTime = System.currentTimeMillis()

// 设置速度限制 (字节/秒)
fun setBytesPerSecond(bytesPerSecond: Long) {
this.bytesPerSecond = bytesPerSecond
}

// 消耗指定字节数,若超过限制则阻塞
@Synchronized
fun consume(bytes: Long) {
byteCount.addAndGet(bytes)

val currentTime = System.currentTimeMillis()
val timeElapsed = currentTime - lastResetTime

if (timeElapsed >= 1000) {
// 每秒重置计数
byteCount.set(0)
lastResetTime = currentTime
return
}

val currentBytesPerSecond = (byteCount.get() * 1000 / timeElapsed.coerceAtLeast(1))
if (currentBytesPerSecond > bytesPerSecond && bytesPerSecond > 0) {
// 需要限速,计算需要等待的时间
val waitTime = ((currentBytesPerSecond.toDouble() / bytesPerSecond) * 1000 - timeElapsed).toLong()
if (waitTime > 0) {
try {
Thread.sleep(waitTime.coerceAtMost(1000))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}
}
}
}

/**
* 下载管理器
*/
class DownloadManager private constructor() {
private val client = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()

private val downloadTasks = ConcurrentHashMap<String, DownloadTask>()
private val downloadWorkers = ConcurrentHashMap<String, DownloadWorker>()
private val taskListeners = ConcurrentHashMap<String, MutableList<DownloadListener>>()
private val rateLimiter = RateLimiter(0) // 默认不限速
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 最大并发下载数
private var maxConcurrentDownloads = 3
// 当前运行的下载数
private var runningDownloads = AtomicLong(0)
// 等待队列
private val waitingQueue = LinkedBlockingQueue<String>()

companion object {
@Volatile
private var instance: DownloadManager? = null

fun getInstance(): DownloadManager {
return instance ?: synchronized(this) {
instance ?: DownloadManager().also { instance = it }
}
}
}

init {
// 启动队列处理协程
downloadScope.launch {
processQueue()
}
}

// 处理等待队列
private suspend fun processQueue() {
while (isActive) {
try {
if (runningDownloads.get() < maxConcurrentDownloads) {
val taskId = waitingQueue.poll()
if (taskId != null) {
val task = downloadTasks[taskId]
if (task != null && task.status == DownloadStatus.WAITING) {
startDownloadInternal(task)
}
}
}
delay(100) // 避免CPU过度使用
} catch (e: Exception) {
e.printStackTrace()
}
}
}

// 设置最大并发下载数
fun setMaxConcurrentDownloads(max: Int) {
maxConcurrentDownloads = max
}

// 设置下载速度限制 (KB/s)
fun setSpeedLimit(kbPerSecond: Int) {
rateLimiter.setBytesPerSecond(kbPerSecond * 1024L)
}

// 添加下载任务
fun addTask(url: String, savePath: String, fileName: String, threadCount: Int = 3): DownloadTask {
val id = url.hashCode().toString()
var task = downloadTasks[id]

if (task == null) {
File(savePath).mkdirs() // 确保目录存在
task = DownloadTask(id, url, savePath, fileName, threadCount = threadCount)
downloadTasks[id] = task
}

return task
}

// 添加监听器
fun addListener(taskId: String, listener: DownloadListener) {
val listeners = taskListeners.getOrPut(taskId) { mutableListOf() }
listeners.add(listener)
}

// 移除监听器
fun removeListener(taskId: String, listener: DownloadListener) {
taskListeners[taskId]?.remove(listener)
}

// 开始下载
fun startDownload(taskId: String) {
val task = downloadTasks[taskId] ?: return

when (task.status) {
DownloadStatus.IDLE, DownloadStatus.FAILED, DownloadStatus.CANCELED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
DownloadStatus.PAUSED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
else -> { /* 其他状态不处理 */ }
}
}

// 内部启动下载方法
private fun startDownloadInternal(task: DownloadTask) {
runningDownloads.incrementAndGet()

if (task.status != DownloadStatus.WAITING) return

task.status = DownloadStatus.DOWNLOADING
notifyStatusChanged(task)

// 创建并启动下载工作线程
val worker = DownloadWorker(task, client, rateLimiter, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
notifyProgress(task, progress, downloadedBytes, totalBytes)
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
if (status == DownloadStatus.COMPLETED || status == DownloadStatus.FAILED || status == DownloadStatus.CANCELED) {
runningDownloads.decrementAndGet()
}
notifyStatusChanged(task)
}

override fun onSuccess(task: DownloadTask, file: File) {
notifySuccess(task, file)
}

override fun onFailed(task: DownloadTask, e: Exception) {
notifyFailed(task, e)
}
})

downloadWorkers[task.id] = worker
worker.start()
}

// 暂停下载
fun pauseDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.pause()
}

// 恢复下载
fun resumeDownload(taskId: String) {
val task = downloadTasks[taskId]
if (task?.status == DownloadStatus.PAUSED) {
startDownload(taskId)
}
}

// 取消下载
fun cancelDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.cancel()
waitingQueue.remove(taskId)
}

// 获取下载任务
fun getTask(taskId: String): DownloadTask? {
return downloadTasks[taskId]
}

// 获取所有下载任务
fun getAllTasks(): List<DownloadTask> {
return downloadTasks.values.toList()
}

// 通知进度更新
private fun notifyProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
taskListeners[task.id]?.forEach { listener ->
listener.onProgress(task, progress, downloadedBytes, totalBytes)
}
}

// 通知状态变化
private fun notifyStatusChanged(task: DownloadTask) {
taskListeners[task.id]?.forEach { listener ->
listener.onStatusChanged(task, task.status)
}
}

// 通知下载成功
private fun notifySuccess(task: DownloadTask, file: File) {
taskListeners[task.id]?.forEach { listener ->
listener.onSuccess(task, file)
}
}

// 通知下载失败
private fun notifyFailed(task: DownloadTask, e: Exception) {
taskListeners[task.id]?.forEach { listener ->
listener.onFailed(task, e)
}
}

// 清理资源
fun release() {
downloadScope.cancel()
downloadWorkers.values.forEach { it.cancel() }
downloadTasks.clear()
downloadWorkers.clear()
taskListeners.clear()
waitingQueue.clear()
}
}

/**
* 下载工作线程
*/
class DownloadWorker(
private val task: DownloadTask,
private val client: OkHttpClient,
private val rateLimiter: RateLimiter,
private val listener: DownloadListener
) {
private val isRunning = AtomicBoolean(false)
private val isPaused = AtomicBoolean(false)
private val isCanceled = AtomicBoolean(false)
private val workers = mutableListOf<Job>()
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 开始下载
fun start() {
if (isRunning.getAndSet(true)) return

downloadScope.launch {
try {
// 1. 获取文件信息
fetchFileInfo()

// 2. 如果文件大小为0或不支持范围请求,使用单线程下载
if (task.totalBytes <= 0 || !isSupportRange()) {
singleThreadDownload()
} else {
// 3. 多线程下载
multiThreadDownload()
}
} catch (e: Exception) {
if (!isPaused.get() && !isCanceled.get()) {
task.status = DownloadStatus.FAILED
listener.onStatusChanged(task, task.status)
listener.onFailed(task, e)
}
} finally {
isRunning.set(false)
}
}
}

// 暂停下载
fun pause() {
if (isPaused.getAndSet(true)) return

task.status = DownloadStatus.PAUSED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()
}

// 取消下载
fun cancel() {
if (isCanceled.getAndSet(true)) return

task.status = DownloadStatus.CANCELED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()

// 删除临时文件
for (i in 0 until task.threadCount) {
task.getTempFile(i).delete()
}
}

// 获取文件信息
private suspend fun fetchFileInfo() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.head() // 使用HEAD请求获取文件信息
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to fetch file info: ${response.code}")
}

// 获取文件大小
val contentLength = response.header("Content-Length")?.toLongOrNull() ?: -1L
task.totalBytes = contentLength

// 计算每个线程的下载范围
if (contentLength > 0) {
calculateRanges(contentLength)
}
}
}

// 检查是否支持范围请求
private suspend fun isSupportRange(): Boolean = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=0-0")
.build()

client.newCall(request).execute().use { response ->
return@withContext response.code == 206 // 返回206表示支持范围请求
}
}

// 计算每个线程的下载范围
private fun calculateRanges(contentLength: Long) {
task.ranges.clear()

val blockSize = contentLength / task.threadCount
var start: Long
var end: Long

for (i in 0 until task.threadCount) {
start = i * blockSize
end = if (i == task.threadCount - 1) contentLength - 1 else (i + 1) * blockSize - 1

// 检查临时文件是否存在
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val downloadedSize = tempFile.length()
if (downloadedSize > 0 && downloadedSize < (end - start + 1)) {
// 更新起始位置
start += downloadedSize
task.downloadedBytes += downloadedSize
}
}

task.ranges.add(Pair(start, end))
}
}

// 单线程下载
private suspend fun singleThreadDownload() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to download file: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
task.totalBytes = responseBody.contentLength()

// 创建输出文件
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = 0L

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新进度
task.downloadedBytes = downloadedSize
val progress = if (task.totalBytes > 0) {
(downloadedSize * 100 / task.totalBytes).toInt()
} else 0

listener.onProgress(task, progress, downloadedSize, task.totalBytes)
}

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, outputFile)

} finally {
outputStream.close()
}
}
}

// 多线程下载
private suspend fun multiThreadDownload() = withContext(Dispatchers.IO) {
// 创建多个工作协程
for (i in 0 until task.threadCount) {
val job = downloadScope.launch {
downloadFilePart(i)
}
workers.add(job)
}

// 等待所有下载任务完成
workers.joinAll()

// 如果暂停或取消,直接返回
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 合并文件
mergeFiles()

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, task.getFile())
}

// 下载文件片段
private suspend fun downloadFilePart(index: Int) = withContext(Dispatchers.IO) {
if (index >= task.ranges.size) return@withContext

val range = task.ranges[index]
val start = range.first
val end = range.second

// 如果该部分已经下载完成,直接返回
if (start > end) return@withContext

val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=$start-$end")
.build()

client.newCall(request).execute().use { response ->
if (response.code != 206) {
throw IOException("Server doesn't support range requests: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
val tempFile = task.getTempFile(index)
val outputStream = FileOutputStream(tempFile, tempFile.exists())

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = tempFile.length()

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新总下载进度
synchronized(this@DownloadWorker) {
task.downloadedBytes += read
val progress = (task.downloadedBytes * 100 / task.totalBytes).toInt()
listener.onProgress(task, progress, task.downloadedBytes, task.totalBytes)
}
}

// 更新该部分的下载范围(标记为已完成)
task.ranges[index] = Pair(end + 1, end)

} finally {
outputStream.close()
}
}
}

// 合并文件
private suspend fun mergeFiles() = withContext(Dispatchers.IO) {
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
for (i in 0 until task.threadCount) {
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val tempData = tempFile.readBytes()
outputStream.write(tempData)
tempFile.delete()
}
}
} finally {
outputStream.close()
}
}
}

使用示例

以下是如何使用这个下载框架的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 使用示例

fun main() {
// 获取下载管理器实例
val downloadManager = DownloadManager.getInstance()

// 设置最大并发下载数
downloadManager.setMaxConcurrentDownloads(3)

// 设置下载速度限制(500KB/s)
downloadManager.setSpeedLimit(500)

// 添加下载任务
val task = downloadManager.addTask(
url = "https://example.com/large-file.zip",
savePath = "/downloads",
fileName = "large-file.zip",
threadCount = 5 // 使用5个线程下载
)

// 添加下载监听器
downloadManager.addListener(task.id, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
println("下载进度: $progress%, ${downloadedBytes/1024}KB/${totalBytes/1024}KB")
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
println("状态变更: $status")
}

override fun onSuccess(task: DownloadTask, file: File) {
println("下载成功: ${file.absolutePath}")
}

override fun onFailed(task: DownloadTask, e: Exception) {
println("下载失败: ${e.message}")
}
})

// 开始下载
downloadManager.startDownload(task.id)

// 模拟5秒后暂停
Thread.sleep(5000)
println("暂停下载")
downloadManager.pauseDownload(task.id)

// 模拟2秒后恢复
Thread.sleep(2000)
println("恢复下载")
downloadManager.resumeDownload(task.id)

// 等待下载完成
readLine()

// 清理资源
downloadManager.release()
}

框架功能说明

  1. 多线程下载

    • 通过将文件分割成多个部分,同时使用多个线程下载
    • 每个部分使用HTTP Range请求独立下载
    • 下载完成后自动合并为完整文件
  2. 多任务下载

    • DownloadManager管理多个下载任务
    • 支持设置最大并发下载数
    • 使用队列管理等待下载的任务
  3. 下载限速

    • RateLimiter类控制下载速度
    • 可动态调整限速大小
    • 基于时间窗口算法实现平滑限速
  4. 任务暂停和恢复

    • 支持随时暂停下载任务
    • 通过保存已下载部分信息实现断点续传
    • 恢复时从上次下载位置继续
  5. 下载进度

    • 实时计算并通知下载进度
    • 提供详细的状态回调
    • 支持获取下载速度、已下载大小等信息

这个框架设计合理、功能完整,可以满足各种下载需求。您可以根据实际需要进一步扩展或优化,比如添加网络状态监听、下载优先级设置等功能。

作者

Dench

发布于

2025-04-12

更新于

2025-04-12

许可协议

CC BY-NC-SA 4.0

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×