使用OkHttp实现多功能下载框架二-支持M3U8文件下载

需要扩展框架以支持M3U8文件下载。M3U8是HTTP Live Streaming (HLS)协议使用的文件格式,是一种用于流媒体的文件列表格式。

M3U8文件下载的特点:

  1. M3U8文件本身是一个索引文件,包含了多个.ts分片文件的URL
  2. 需要先下载M3U8文件,解析其中的内容,获取所有的分片URL
  3. 然后依次或并行下载所有的.ts分片
  4. 最后将所有分片合并成一个完整的媒体文件

我需要设计以下内容:

  1. M3U8文件解析器,用于解析M3U8文件格式并提取分片URL
  2. 分片下载管理器,用于管理所有分片的下载状态
  3. 分片合并器,用于将所有分片合并成完整媒体文件

我会基于之前设计的下载框架,扩展一个专门用于M3U8下载的功能模块。

扩展下载框架,支持M3U8文件下载

下面是针对M3U8文件下载的功能扩展,可以集成到之前的下载框架中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
// M3U8DownloadManager.kt

import kotlinx.coroutines.*
import okhttp3.*
import okio.buffer
import okio.sink
import okio.source
import java.io.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import java.net.URL

/**
* M3U8文件解析器
*/
class M3U8Parser {
// M3U8文件信息
data class M3U8Info(
val masterUrl: String, // 主M3U8 URL
val baseUrl: String, // 基础URL
val duration: Float = 0f, // 总时长(秒)
val segmentList: List<M3U8Segment> // 分片列表
)

// 分片信息
data class M3U8Segment(
val url: String, // 分片URL
val duration: Float, // 分片时长(秒)
val index: Int, // 分片索引
var tsFilePath: String = "" // 本地保存路径
)

// 解析M3U8文件
suspend fun parse(url: String, client: OkHttpClient): M3U8Info = withContext(Dispatchers.IO) {
// 发送请求获取M3U8内容
val request = Request.Builder()
.url(url)
.build()

val response = client.newCall(request).execute()
if (!response.isSuccessful) {
throw IOException("Failed to fetch M3U8 file: ${response.code}")
}

val content = response.body?.string() ?: throw IOException("Empty M3U8 content")

// 判断是否是主播放列表
if (content.contains("#EXT-X-STREAM-INF")) {
return@withContext parseMainPlaylist(url, content, client)
} else {
return@withContext parseMediaPlaylist(url, content)
}
}

// 解析主播放列表
private suspend fun parseMainPlaylist(masterUrl: String, content: String, client: OkHttpClient): M3U8Info = withContext(Dispatchers.IO) {
// 提取带宽最高的子播放列表
val lines = content.split("\n")
var maxBandwidth = 0
var selectedUrl = ""

for (i in lines.indices) {
val line = lines[i]
if (line.startsWith("#EXT-X-STREAM-INF")) {
// 提取带宽参数
val bandwidthPattern = Pattern.compile("BANDWIDTH=(\\d+)")
val matcher = bandwidthPattern.matcher(line)
if (matcher.find()) {
val bandwidth = matcher.group(1).toInt()
if (bandwidth > maxBandwidth) {
maxBandwidth = bandwidth
// 下一行应该是URL
if (i + 1 < lines.size) {
val urlLine = lines[i + 1].trim()
if (!urlLine.startsWith("#")) {
selectedUrl = urlLine
}
}
}
}
}
}

// 如果URL是相对路径,转换为完整URL
val fullUrl = if (selectedUrl.startsWith("http")) {
selectedUrl
} else {
val baseUrl = URL(masterUrl)
val path = if (selectedUrl.startsWith("/")) {
selectedUrl
} else {
val masterPath = baseUrl.path
val lastSlash = masterPath.lastIndexOf('/')
if (lastSlash != -1) {
masterPath.substring(0, lastSlash + 1) + selectedUrl
} else {
"/$selectedUrl"
}
}
URL(baseUrl.protocol, baseUrl.host, baseUrl.port, path).toString()
}

// 解析子播放列表
return@withContext parse(fullUrl, client)
}

// 解析媒体播放列表
private fun parseMediaPlaylist(url: String, content: String): M3U8Info {
val lines = content.split("\n")
val segments = mutableListOf<M3U8Segment>()
var totalDuration = 0f
var segmentDuration = 0f
var segmentIndex = 0
var isFirstSegment = true

// 提取基础URL
val baseUrl = getBaseUrl(url)

for (line in lines) {
val trimmedLine = line.trim()

// 提取片段时长
if (trimmedLine.startsWith("#EXTINF:")) {
val durationText = trimmedLine.substring(8).split(",")[0]
segmentDuration = durationText.toFloat()
isFirstSegment = false
continue
}

// 提取片段URL
if (!trimmedLine.startsWith("#") && trimmedLine.isNotEmpty() && !isFirstSegment) {
// 构建完整URL
val segmentUrl = if (trimmedLine.startsWith("http")) {
trimmedLine
} else {
if (trimmedLine.startsWith("/")) {
// 绝对路径
val urlObj = URL(url)
URL(urlObj.protocol, urlObj.host, urlObj.port, trimmedLine).toString()
} else {
// 相对路径
"$baseUrl$trimmedLine"
}
}

segments.add(M3U8Segment(segmentUrl, segmentDuration, segmentIndex++))
totalDuration += segmentDuration
}
}

return M3U8Info(url, baseUrl, totalDuration, segments)
}

// 获取基础URL
private fun getBaseUrl(url: String): String {
val lastSlash = url.lastIndexOf('/')
return if (lastSlash != -1) {
url.substring(0, lastSlash + 1)
} else {
"$url/"
}
}
}

/**
* M3U8下载管理器
*/
class M3U8DownloadManager {
private val client = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build()

private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val m3u8Parser = M3U8Parser()
private val downloadManager = DownloadManager.getInstance()

// 下载状态
enum class M3U8DownloadStatus {
IDLE, PARSING, DOWNLOADING, MERGING, COMPLETED, FAILED, CANCELED
}

// M3U8下载监听器
interface M3U8DownloadListener {
fun onParsingProgress(url: String, progress: Int)
fun onDownloadProgress(url: String, progress: Int, downloadedSegments: Int, totalSegments: Int)
fun onMergingProgress(url: String, progress: Int)
fun onStatusChanged(url: String, status: M3U8DownloadStatus)
fun onSuccess(url: String, filePath: String)
fun onFailed(url: String, e: Exception)
}

// M3U8下载任务
data class M3U8DownloadTask(
val url: String,
val savePath: String,
val fileName: String,
var status: M3U8DownloadStatus = M3U8DownloadStatus.IDLE,
var m3u8Info: M3U8Parser.M3U8Info? = null,
var downloadedSegments: AtomicInteger = AtomicInteger(0),
var mergeProgress: AtomicInteger = AtomicInteger(0)
)

private val downloadTasks = ConcurrentHashMap<String, M3U8DownloadTask>()
private val listeners = ConcurrentHashMap<String, MutableList<M3U8DownloadListener>>()
private val activeDownloads = AtomicInteger(0)
private val maxConcurrentDownloads = 3

// 添加下载任务
fun addTask(url: String, savePath: String, fileName: String): M3U8DownloadTask {
val taskId = url.hashCode().toString()

return downloadTasks.getOrPut(taskId) {
// 创建保存目录
val saveDir = File(savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}

// 创建临时目录
val tempDir = File(savePath, ".temp_$fileName")
if (!tempDir.exists()) {
tempDir.mkdirs()
}

M3U8DownloadTask(url, savePath, fileName)
}
}

// 添加监听器
fun addListener(url: String, listener: M3U8DownloadListener) {
val taskId = url.hashCode().toString()
val taskListeners = listeners.getOrPut(taskId) { mutableListOf() }
taskListeners.add(listener)
}

// 移除监听器
fun removeListener(url: String, listener: M3U8DownloadListener) {
val taskId = url.hashCode().toString()
listeners[taskId]?.remove(listener)
}

// 开始下载
fun startDownload(url: String) {
val taskId = url.hashCode().toString()
val task = downloadTasks[taskId] ?: return

if (task.status != M3U8DownloadStatus.IDLE &&
task.status != M3U8DownloadStatus.FAILED) {
return
}

// 修改状态为解析中
task.status = M3U8DownloadStatus.PARSING
notifyStatusChanged(task.url, task.status)

downloadScope.launch {
try {
// 1. 解析M3U8文件
notifyParsingProgress(task.url, 0)
val m3u8Info = m3u8Parser.parse(task.url, client)
task.m3u8Info = m3u8Info
notifyParsingProgress(task.url, 100)

// 2. 准备下载分片
task.status = M3U8DownloadStatus.DOWNLOADING
notifyStatusChanged(task.url, task.status)

// 重置计数器
task.downloadedSegments.set(0)

// 3. 下载所有分片
val tempDir = File(task.savePath, ".temp_${task.fileName}")
downloadAllSegments(task, m3u8Info, tempDir.absolutePath)

// 4. 合并分片
task.status = M3U8DownloadStatus.MERGING
notifyStatusChanged(task.url, task.status)
task.mergeProgress.set(0)

mergeSegments(task, m3u8Info, tempDir.absolutePath)

// 5. 完成
task.status = M3U8DownloadStatus.COMPLETED
notifyStatusChanged(task.url, task.status)

// 6. 清理临时文件
cleanupTempFiles(tempDir)

// 7. 通知成功
val outputFilePath = File(task.savePath, task.fileName).absolutePath
notifySuccess(task.url, outputFilePath)

} catch (e: Exception) {
if (task.status != M3U8DownloadStatus.CANCELED) {
task.status = M3U8DownloadStatus.FAILED
notifyStatusChanged(task.url, task.status)
notifyFailed(task.url, e)
}
}
}
}

// 下载所有分片
private suspend fun downloadAllSegments(
task: M3U8DownloadTask,
m3u8Info: M3U8Parser.M3U8Info,
tempDirPath: String
) = withContext(Dispatchers.IO) {
val totalSegments = m3u8Info.segmentList.size
val segmentJobs = mutableListOf<Deferred<Boolean>>()
val semaphore = Semaphore(20) // 限制同时下载的分片数

for (segment in m3u8Info.segmentList) {
val segmentFile = File(tempDirPath, "segment_${segment.index}.ts")
segment.tsFilePath = segmentFile.absolutePath

// 如果分片已存在且大小大于0,则跳过
if (segmentFile.exists() && segmentFile.length() > 0) {
task.downloadedSegments.incrementAndGet()
val progress = (task.downloadedSegments.get() * 100 / totalSegments)
notifyDownloadProgress(task.url, progress, task.downloadedSegments.get(), totalSegments)
continue
}

// 使用信号量限制并发
semaphore.acquire()

val job = downloadScope.async {
try {
// 下载分片
val request = Request.Builder()
.url(segment.url)
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to download segment: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")

// 保存分片
segmentFile.sink().buffer().use { sink ->
responseBody.source().use { source ->
sink.writeAll(source)
}
}

// 更新进度
val downloaded = task.downloadedSegments.incrementAndGet()
val progress = (downloaded * 100 / totalSegments)
notifyDownloadProgress(task.url, progress, downloaded, totalSegments)

return@async true
}
} catch (e: Exception) {
return@async false
} finally {
semaphore.release()
}
}

segmentJobs.add(job)
}

// 等待所有分片下载完成
val results = segmentJobs.awaitAll()

// 检查是否有失败的分片
if (results.contains(false)) {
throw IOException("Some segments failed to download")
}
}

// 合并分片
private suspend fun mergeSegments(
task: M3U8DownloadTask,
m3u8Info: M3U8Parser.M3U8Info,
tempDirPath: String
) = withContext(Dispatchers.IO) {
val segments = m3u8Info.segmentList
val totalSegments = segments.size
val outputFile = File(task.savePath, task.fileName)

// 如果是mp4文件,需要先合并为ts,再转换为mp4
val isMp4 = task.fileName.endsWith(".mp4", ignoreCase = true)
val tempOutputFile = if (isMp4) {
File(task.savePath, "${task.fileName}.ts")
} else {
outputFile
}

// 创建输出流
val outputStream = FileOutputStream(tempOutputFile)

try {
// 依次写入每个分片
for ((index, segment) in segments.withIndex()) {
val segmentFile = File(segment.tsFilePath)
if (!segmentFile.exists()) {
throw IOException("Segment file not found: ${segment.tsFilePath}")
}

// 写入分片数据
segmentFile.source().buffer().use { source ->
outputStream.sink().buffer().use { sink ->
sink.writeAll(source)
}
}

// 更新进度
val progress = ((index + 1) * 100 / totalSegments)
task.mergeProgress.set(progress)
notifyMergingProgress(task.url, progress)
}
} finally {
outputStream.close()
}

// 如果是mp4,需要进行格式转换
if (isMp4) {
// 这里使用简单文件重命名代替格式转换
// 实际应用中,应该使用FFmpeg等工具进行正确的格式转换
if (tempOutputFile.exists()) {
tempOutputFile.renameTo(outputFile)
}
}
}

// 清理临时文件
private fun cleanupTempFiles(tempDir: File) {
if (tempDir.exists()) {
tempDir.listFiles()?.forEach { it.delete() }
tempDir.delete()
}
}

// 取消下载
fun cancelDownload(url: String) {
val taskId = url.hashCode().toString()
val task = downloadTasks[taskId] ?: return

task.status = M3U8DownloadStatus.CANCELED
notifyStatusChanged(task.url, task.status)

// 清理临时目录
val tempDir = File(task.savePath, ".temp_${task.fileName}")
cleanupTempFiles(tempDir)
}

// 获取下载任务
fun getTask(url: String): M3U8DownloadTask? {
val taskId = url.hashCode().toString()
return downloadTasks[taskId]
}

// 通知解析进度
private fun notifyParsingProgress(url: String, progress: Int) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onParsingProgress(url, progress)
}
}

// 通知下载进度
private fun notifyDownloadProgress(url: String, progress: Int, downloadedSegments: Int, totalSegments: Int) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onDownloadProgress(url, progress, downloadedSegments, totalSegments)
}
}

// 通知合并进度
private fun notifyMergingProgress(url: String, progress: Int) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onMergingProgress(url, progress)
}
}

// 通知状态变化
private fun notifyStatusChanged(url: String, status: M3U8DownloadStatus) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onStatusChanged(url, status)
}
}

// 通知下载成功
private fun notifySuccess(url: String, filePath: String) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onSuccess(url, filePath)
}
}

// 通知下载失败
private fun notifyFailed(url: String, e: Exception) {
val taskId = url.hashCode().toString()
listeners[taskId]?.forEach { listener ->
listener.onFailed(url, e)
}
}

// 释放资源
fun release() {
downloadScope.cancel()
downloadTasks.clear()
listeners.clear()
}
}

使用示例

以下是如何使用M3U8下载功能的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// M3U8下载示例

fun main() {
// 创建M3U8下载管理器
val m3u8DownloadManager = M3U8DownloadManager()

// 添加下载任务
val task = m3u8DownloadManager.addTask(
url = "https://example.com/video.m3u8",
savePath = "/downloads",
fileName = "video.mp4" // 可以是.mp4或.ts格式
)

// 添加下载监听器
m3u8DownloadManager.addListener(task.url, object : M3U8DownloadManager.M3U8DownloadListener {
override fun onParsingProgress(url: String, progress: Int) {
println("解析进度: $progress%")
}

override fun onDownloadProgress(url: String, progress: Int, downloadedSegments: Int, totalSegments: Int) {
println("下载进度: $progress%, 已下载分片: $downloadedSegments/$totalSegments")
}

override fun onMergingProgress(url: String, progress: Int) {
println("合并进度: $progress%")
}

override fun onStatusChanged(url: String, status: M3U8DownloadManager.M3U8DownloadStatus) {
println("状态变更: $status")
}

override fun onSuccess(url: String, filePath: String) {
println("下载成功: $filePath")
}

override fun onFailed(url: String, e: Exception) {
println("下载失败: ${e.message}")
}
})

// 开始下载
m3u8DownloadManager.startDownload(task.url)

// 等待下载完成
readLine()

// 清理资源
m3u8DownloadManager.release()
}

M3U8下载功能说明

  1. M3U8文件解析

    • 支持解析主播放列表和媒体播放列表
    • 自动选择最高质量的流进行下载
    • 处理相对路径和绝对路径的URL
  2. 分片下载

    • 支持高并发分片下载
    • 使用信号量控制并发数量
    • 支持已下载分片检测,避免重复下载
  3. 下载进度

    • 提供三个阶段的进度回调:解析进度、下载进度、合并进度
    • 详细的状态变化通知
  4. 文件合并

    • 按顺序合并所有TS分片
    • 支持MP4格式输出(需配合实际工具进行格式转换)
    • 自动清理临时文件
  5. 异常处理

    • 完善的错误处理机制
    • 支持取消下载操作
    • 出错时自动清理临时文件

注意事项

  1. 关于格式转换
    本示例中的MP4格式转换仅做演示,实际应用中应该使用专业工具(如FFmpeg)进行正确的格式转换。

  2. 有关DRM保护
    一些M3U8流可能有DRM保护,需要额外的解密逻辑。您可以扩展此框架添加解密功能。

  3. 性能优化

    • 调整Semaphore的许可数量来控制并发下载量
    • 可以根据网络状况动态调整并发数
    • 对于大型视频,考虑增加磁盘缓冲区大小

希望这个M3U8下载框架能满足您的需求!您可以根据实际项目需要进一步扩展其功能。

使用OkHttp实现多功能下载框架二-Android后台下载2

这个请求要求我提供一个综合了WorkManager和网络状态感知功能的完整后台下载管理器实现。看起来用户已经有了一些代码,但想要更完整的解决方案。

根据之前的讨论和用户提供的文件内容,我可以看出他们想要的是一个能够在以下场景正常工作的下载管理器:

  1. 能够在应用后台运行
  2. 即使应用被杀死也能继续下载
  3. 能够感知网络变化并自动处理
  4. 支持断点续传和任务队列管理

解决方案可以包含以下组件:

  1. WorkManager:用于管理后台任务,即使应用被终止也能恢复任务
  2. ForegroundService:在下载过程中显示通知,降低系统杀死服务的概率
  3. Room 数据库:存储下载任务状态,便于恢复
  4. 生命周期感知:监测应用前台/后台状态转换
  5. 下载队列管理:正确管理任务队列和状态转换

我需要提供一个综合的、能够工作的解决方案,结合 WorkManager 进行任务调度,并添加网络感知功能。这个实现应该很实用,能够解决 Android 后台下载的各种限制问题。

综合型 WorkManager 网络感知下载管理器

下面是一个结合 WorkManager 和网络状态感知的完整下载管理器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
// WorkManagerNetworkDownloader.kt

import android.app.*
import android.content.Context
import android.content.Intent
import android.net.ConnectivityManager
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import android.os.Build
import androidx.annotation.RequiresApi
import androidx.core.app.NotificationCompat
import androidx.lifecycle.*
import androidx.room.*
import androidx.work.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import okhttp3.*
import java.io.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

/**
* 下载任务实体类
*/
@Entity(tableName = "download_tasks")
data class DownloadTask(
@PrimaryKey val id: String,
val url: String,
val fileName: String,
val savePath: String,
val totalBytes: Long = 0,
val downloadedBytes: Long = 0,
val status: DownloadStatus = DownloadStatus.PENDING,
val priority: Int = 0,
val createTime: Long = System.currentTimeMillis(),
val lastUpdated: Long = System.currentTimeMillis(),
val errorCount: Int = 0,
val errorMessage: String? = null
)

/**
* 下载状态枚举
*/
enum class DownloadStatus {
PENDING, // 等待下载
CONNECTING, // 连接中
DOWNLOADING, // 下载中
PAUSED, // 暂停状态(用户主动暂停)
WAITING_FOR_NETWORK,// 等待网络连接
COMPLETED, // 下载完成
FAILED, // 下载失败
CANCELED // 下载取消
}

/**
* 下载任务DAO
*/
@Dao
interface DownloadTaskDao {
@Query("SELECT * FROM download_tasks ORDER BY priority DESC, createTime ASC")
fun getAllTasks(): Flow<List<DownloadTask>>

@Query("SELECT * FROM download_tasks WHERE status = :status ORDER BY priority DESC, createTime ASC")
fun getTasksByStatus(status: DownloadStatus): Flow<List<DownloadTask>>

@Query("SELECT * FROM download_tasks WHERE status IN (:statuses) ORDER BY priority DESC, createTime ASC")
fun getTasksByStatuses(statuses: List<DownloadStatus>): Flow<List<DownloadTask>>

@Query("SELECT * FROM download_tasks WHERE id = :taskId")
suspend fun getTaskById(taskId: String): DownloadTask?

@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insertTask(task: DownloadTask)

@Update
suspend fun updateTask(task: DownloadTask)

@Delete
suspend fun deleteTask(task: DownloadTask)

@Query("UPDATE download_tasks SET status = :newStatus WHERE id = :taskId")
suspend fun updateTaskStatus(taskId: String, newStatus: DownloadStatus)

@Query("UPDATE download_tasks SET downloadedBytes = :downloadedBytes, lastUpdated = :lastUpdated WHERE id = :taskId")
suspend fun updateTaskProgress(taskId: String, downloadedBytes: Long, lastUpdated: Long)

@Query("UPDATE download_tasks SET status = :newStatus WHERE status = :currentStatus")
suspend fun updateTasksStatus(currentStatus: DownloadStatus, newStatus: DownloadStatus)

@Query("UPDATE download_tasks SET status = :newStatus WHERE status IN (:currentStatuses)")
suspend fun updateMultipleTasksStatus(currentStatuses: List<DownloadStatus>, newStatus: DownloadStatus)
}

/**
* 下载数据库
*/
@Database(entities = [DownloadTask::class], version = 1, exportSchema = false)
abstract class DownloadDatabase : RoomDatabase() {
abstract fun downloadTaskDao(): DownloadTaskDao

companion object {
@Volatile
private var INSTANCE: DownloadDatabase? = null

fun getDatabase(context: Context): DownloadDatabase {
return INSTANCE ?: synchronized(this) {
val instance = Room.databaseBuilder(
context.applicationContext,
DownloadDatabase::class.java,
"download_database"
).build()
INSTANCE = instance
instance
}
}
}
}

/**
* 下载事件 - 用于UI层观察
*/
sealed class DownloadEvent {
data class Progress(val taskId: String, val progress: Int, val downloadedBytes: Long, val totalBytes: Long) : DownloadEvent()
data class StatusChanged(val taskId: String, val status: DownloadStatus) : DownloadEvent()
data class Complete(val taskId: String, val filePath: String) : DownloadEvent()
data class Error(val taskId: String, val error: String) : DownloadEvent()
}

/**
* 网络状态监听器
*/
class NetworkMonitor(private val context: Context) {

interface NetworkListener {
fun onNetworkAvailable()
fun onNetworkUnavailable()
}

private val listeners = mutableListOf<NetworkListener>()
private var isNetworkAvailable = false
private var isMonitoring = false

private val connectivityManager by lazy {
context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
}

// 网络回调
private val networkCallback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
if (!isNetworkAvailable) {
isNetworkAvailable = true
notifyNetworkAvailable()
}
}

override fun onLost(network: Network) {
if (!isAnyNetworkAvailable()) {
isNetworkAvailable = false
notifyNetworkUnavailable()
}
}
}

// 开始监听
fun startMonitoring() {
if (isMonitoring) return

isNetworkAvailable = isAnyNetworkAvailable()

if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
val request = NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.build()
connectivityManager.registerNetworkCallback(request, networkCallback)
} else {
// 旧版本Android可以使用广播接收器,此处省略
}

isMonitoring = true
}

// 停止监听
fun stopMonitoring() {
if (!isMonitoring) return

if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
connectivityManager.unregisterNetworkCallback(networkCallback)
}

isMonitoring = false
}

// 添加监听器
fun addListener(listener: NetworkListener) {
if (!listeners.contains(listener)) {
listeners.add(listener)
}
}

// 移除监听器
fun removeListener(listener: NetworkListener) {
listeners.remove(listener)
}

// 检查是否有可用网络
fun isNetworkAvailable(): Boolean {
return isAnyNetworkAvailable()
}

// 检查是否有任何可用网络
private fun isAnyNetworkAvailable(): Boolean {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
val network = connectivityManager.activeNetwork ?: return false
val capabilities = connectivityManager.getNetworkCapabilities(network) ?: return false

return capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
} else {
@Suppress("DEPRECATION")
val networkInfo = connectivityManager.activeNetworkInfo
return networkInfo != null && networkInfo.isConnected
}
}

// 通知网络可用
private fun notifyNetworkAvailable() {
listeners.forEach { it.onNetworkAvailable() }
}

// 通知网络不可用
private fun notifyNetworkUnavailable() {
listeners.forEach { it.onNetworkUnavailable() }
}
}

/**
* 下载工作器 - 基于WorkManager
*/
class DownloadWorker(
appContext: Context,
workerParams: WorkerParameters
) : CoroutineWorker(appContext, workerParams) {

companion object {
const val TAG_DOWNLOAD = "download_worker"
const val KEY_TASK_ID = "task_id"
const val KEY_RETRY_COUNT = "retry_count"
const val MAX_RETRY_COUNT = 5
const val NOTIFICATION_CHANNEL_ID = "download_channel"
const val NOTIFICATION_ID = 1

// 调度下载任务
fun scheduleDownload(context: Context, taskId: String, retryCount: Int = 0): Operation {
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.setRequiresStorageNotLow(true)
.build()

val inputData = workDataOf(
KEY_TASK_ID to taskId,
KEY_RETRY_COUNT to retryCount
)

val request = OneTimeWorkRequestBuilder<DownloadWorker>()
.setConstraints(constraints)
.setInputData(inputData)
.addTag(TAG_DOWNLOAD)
.addTag(taskId)
.setBackoffCriteria(
BackoffPolicy.EXPONENTIAL,
WorkRequest.MIN_BACKOFF_MILLIS,
TimeUnit.MILLISECONDS
)
.build()

return WorkManager.getInstance(context)
.enqueueUniqueWork(
"download_$taskId",
ExistingWorkPolicy.REPLACE,
request
)
}

// 取消下载任务
fun cancelDownload(context: Context, taskId: String) {
WorkManager.getInstance(context).cancelUniqueWork("download_$taskId")
}
}

private val downloadManager by lazy {
WorkManagerNetworkDownloader.getInstance(applicationContext)
}

@RequiresApi(Build.VERSION_CODES.O)
private fun createNotificationChannel() {
val notificationManager = applicationContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager

val channel = NotificationChannel(
NOTIFICATION_CHANNEL_ID,
"下载通知",
NotificationManager.IMPORTANCE_LOW
).apply {
description = "显示下载进度"
setShowBadge(false)
}

notificationManager.createNotificationChannel(channel)
}

// 创建前台通知信息
private fun createForegroundInfo(title: String, progress: Int = 0): ForegroundInfo {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
createNotificationChannel()
}

val cancelIntent = WorkManager.getInstance(applicationContext)
.createCancelPendingIntent(id)

val notification = NotificationCompat.Builder(applicationContext, NOTIFICATION_CHANNEL_ID)
.setContentTitle(title)
.setTicker(title)
.setContentText("下载中...")
.setSmallIcon(android.R.drawable.stat_sys_download)
.setOngoing(true)
.setProgress(100, progress, progress == 0)
.addAction(android.R.drawable.ic_delete, "取消", cancelIntent)
.build()

return ForegroundInfo(NOTIFICATION_ID, notification)
}

override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
val taskId = inputData.getString(KEY_TASK_ID) ?: return@withContext Result.failure()
val retryCount = inputData.getInt(KEY_RETRY_COUNT, 0)

try {
// 获取下载任务
val task = downloadManager.getTaskById(taskId) ?: return@withContext Result.failure()

// 设置为前台Worker
setForeground(createForegroundInfo("下载: ${task.fileName}"))

// 更新任务状态
downloadManager.updateTaskStatus(taskId, DownloadStatus.DOWNLOADING)

// 执行下载
val file = downloadManager.performDownload(task, this@DownloadWorker)

// 下载完成
downloadManager.updateTaskStatus(taskId, DownloadStatus.COMPLETED)
downloadManager.notifyDownloadComplete(task.id, file.absolutePath)

Result.success()
} catch (e: Exception) {
e.printStackTrace()

// 判断错误类型
val isNetworkError = e is IOException && (
e.message?.contains("network", ignoreCase = true) == true ||
e.message?.contains("connect", ignoreCase = true) == true ||
e.message?.contains("timeout", ignoreCase = true) == true
)

if (isNetworkError) {
// 网络错误,更新状态为等待网络
downloadManager.updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)

// 判断是否需要重试
if (retryCount < MAX_RETRY_COUNT) {
Result.retry()
} else {
downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
downloadManager.updateTaskError(
taskId,
"网络连接失败: ${e.message ?: "未知错误"}"
)
Result.failure()
}
} else if (isStopped) {
// Worker被停止,可能是系统限制或用户取消
downloadManager.updateTaskStatus(taskId, DownloadStatus.PAUSED)
Result.retry()
} else {
// 其他错误
downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
downloadManager.updateTaskError(
taskId,
"下载失败: ${e.message ?: "未知错误"}"
)
Result.failure()
}
}
}
}

/**
* 应用生命周期观察者
*/
class AppLifecycleObserver(private val context: Context) : DefaultLifecycleObserver {

private val downloader = WorkManagerNetworkDownloader.getInstance(context)

override fun onStart(owner: LifecycleOwner) {
// 应用进入前台
downloader.onAppForeground()
}

override fun onStop(owner: LifecycleOwner) {
// 应用进入后台
downloader.onAppBackground()
}
}

/**
* 主下载管理器类
*/
class WorkManagerNetworkDownloader private constructor(private val context: Context) : NetworkMonitor.NetworkListener {

companion object {
@Volatile
private var INSTANCE: WorkManagerNetworkDownloader? = null

fun getInstance(context: Context): WorkManagerNetworkDownloader {
return INSTANCE ?: synchronized(this) {
INSTANCE ?: WorkManagerNetworkDownloader(context.applicationContext).also {
INSTANCE = it
}
}
}
}

private val database = DownloadDatabase.getDatabase(context)
private val downloadTaskDao = database.downloadTaskDao()
private val workManager = WorkManager.getInstance(context)
private val networkMonitor = NetworkMonitor(context)

private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private val activeDownloads = AtomicInteger(0)

private val _downloadEvents = MutableSharedFlow<DownloadEvent>(extraBufferCapacity = 64)
val downloadEvents: SharedFlow<DownloadEvent> = _downloadEvents.asSharedFlow()

// 任务列表Flow
val allTasks = downloadTaskDao.getAllTasks()
val pendingTasks = downloadTaskDao.getTasksByStatus(DownloadStatus.PENDING)
val activeTasks = downloadTaskDao.getTasksByStatuses(
listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING)
)
val completedTasks = downloadTaskDao.getTasksByStatus(DownloadStatus.COMPLETED)
val failedTasks = downloadTaskDao.getTasksByStatus(DownloadStatus.FAILED)
val pausedTasks = downloadTaskDao.getTasksByStatuses(
listOf(DownloadStatus.PAUSED, DownloadStatus.WAITING_FOR_NETWORK)
)

// OkHttp客户端
private val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()

init {
// 启动网络监控
networkMonitor.addListener(this)
networkMonitor.startMonitoring()

// 初始化时恢复所有应该运行的任务
downloadScope.launch {
// 将之前正在下载的任务标记为等待状态
downloadTaskDao.updateMultipleTasksStatus(
listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING),
DownloadStatus.PENDING
)

// 如果网络可用,启动所有等待网络的任务
if (networkMonitor.isNetworkAvailable()) {
downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
resumeDownload(task.id)
}
}

// 恢复所有待处理的任务
downloadTaskDao.getTasksByStatus(DownloadStatus.PENDING).first().forEach { task ->
startDownload(task.id)
}
}
}

// 添加下载任务
suspend fun addDownloadTask(
url: String,
fileName: String,
savePath: String,
priority: Int = 0
): String {
val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"

// 确保保存目录存在
val saveDir = File(savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}

// 创建任务
val task = DownloadTask(
id = taskId,
url = url,
fileName = fileName,
savePath = savePath,
status = DownloadStatus.PENDING,
priority = priority
)

// 保存到数据库
downloadTaskDao.insertTask(task)

// 触发状态变更事件
_downloadEvents.emit(DownloadEvent.StatusChanged(taskId, DownloadStatus.PENDING))

// 如果网络可用,立即开始下载
if (networkMonitor.isNetworkAvailable()) {
startDownload(taskId)
} else {
// 网络不可用,标记为等待网络
updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
}

return taskId
}

// 开始下载
suspend fun startDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

if (task.status == DownloadStatus.PENDING || task.status == DownloadStatus.PAUSED ||
task.status == DownloadStatus.WAITING_FOR_NETWORK || task.status == DownloadStatus.FAILED) {

// 如果网络不可用,标记为等待网络
if (!networkMonitor.isNetworkAvailable()) {
updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
return
}

// 更新状态为等待中
updateTaskStatus(taskId, DownloadStatus.CONNECTING)

// 调度下载任务
DownloadWorker.scheduleDownload(context, taskId)

// 增加活跃下载计数
activeDownloads.incrementAndGet()
}
}

// 暂停下载
suspend fun pauseDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING ||
task.status == DownloadStatus.PENDING || task.status == DownloadStatus.WAITING_FOR_NETWORK) {

// 取消Worker
DownloadWorker.cancelDownload(context, taskId)

// 更新状态
updateTaskStatus(taskId, DownloadStatus.PAUSED)

// 减少活跃下载计数
val activeCount = activeDownloads.decrementAndGet()
if (activeCount < 0) {
activeDownloads.set(0)
}
}
}

// 恢复下载
suspend fun resumeDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED ||
task.status == DownloadStatus.WAITING_FOR_NETWORK) {

// 如果网络可用,开始下载
if (networkMonitor.isNetworkAvailable()) {
updateTaskStatus(taskId, DownloadStatus.PENDING)
startDownload(taskId)
} else {
// 网络不可用,标记为等待网络
updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
}
}
}

// 取消下载
suspend fun cancelDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

// 取消Worker
DownloadWorker.cancelDownload(context, taskId)

// 更新状态
updateTaskStatus(taskId, DownloadStatus.CANCELED)

// 减少活跃下载计数
val activeCount = activeDownloads.decrementAndGet()
if (activeCount < 0) {
activeDownloads.set(0)
}

// 删除部分下载的文件
val file = File(task.savePath, task.fileName)
if (file.exists()) {
file.delete()
}
}

// 删除任务
suspend fun deleteTask(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

// 如果正在下载,先取消
if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING ||
task.status == DownloadStatus.PENDING || task.status == DownloadStatus.WAITING_FOR_NETWORK) {
cancelDownload(taskId)
}

// 从数据库删除
downloadTaskDao.deleteTask(task)
}

// 获取任务
suspend fun getTaskById(taskId: String): DownloadTask? {
return downloadTaskDao.getTaskById(taskId)
}

// 更新任务状态
suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

if (task.status != status) {
val updatedTask = task.copy(
status = status,
lastUpdated = System.currentTimeMillis()
)

downloadTaskDao.updateTask(updatedTask)
_downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
}
}

// 更新任务错误信息
suspend fun updateTaskError(taskId: String, errorMessage: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return

val updatedTask = task.copy(
errorCount = task.errorCount + 1,
errorMessage = errorMessage,
lastUpdated = System.currentTimeMillis()
)

downloadTaskDao.updateTask(updatedTask)
_downloadEvents.emit(DownloadEvent.Error(taskId, errorMessage))
}

// 通知下载完成
suspend fun notifyDownloadComplete(taskId: String, filePath: String) {
_downloadEvents.emit(DownloadEvent.Complete(taskId, filePath))
}

// 执行文件下载
suspend fun performDownload(task: DownloadTask, worker: DownloadWorker): File = withContext(Dispatchers.IO) {
// 确保目录存在
val saveDir = File(task.savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}

val file = File(saveDir, task.fileName)
var totalBytes = task.totalBytes
var downloadedBytes = task.downloadedBytes

// 检查文件是否已存在且有部分数据
if (file.exists() && downloadedBytes > 0) {
if (file.length() != downloadedBytes) {
// 文件大小不一致,使用实际文件大小
downloadedBytes = file.length()
}
} else if (downloadedBytes > 0) {
// 文件不存在但下载记录不为0,重置下载记录
downloadedBytes = 0
}

// 构建请求
val requestBuilder = Request.Builder()
.url(task.url)

// 添加Range头以支持断点续传
if (downloadedBytes > 0) {
requestBuilder.addHeader("Range", "bytes=$downloadedBytes-")
}

val request = requestBuilder.build()

// 执行请求
okHttpClient.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("HTTP error: ${response.code}")
}

val body = response.body ?: throw IOException("Empty response body")
val contentLength = body.contentLength()

// 确定总大小
if (totalBytes <= 0 && contentLength > 0) {
totalBytes = when {
response.code == 206 -> { // 断点续传
val contentRange = response.header("Content-Range")
if (contentRange != null) {
// 解析形如 "bytes 100-999/1000" 的Content-Range头
val rangeMatcher = Regex("bytes \\d+-\\d+/(\\d+)").find(contentRange)
rangeMatcher?.groupValues?.get(1)?.toLongOrNull() ?: (downloadedBytes + contentLength)
} else {
downloadedBytes + contentLength
}
}
else -> contentLength // 完整下载
}

// 更新任务总大小
val updatedTask = task.copy(totalBytes = totalBytes)
downloadTaskDao.updateTask(updatedTask)
}

// 准备输出流
val outputStream = if (downloadedBytes > 0 && file.exists()) {
FileOutputStream(file, true) // 断点续传,追加模式
} else {
FileOutputStream(file) // 从头下载,覆盖模式
}

outputStream.use { output ->
val buffer = ByteArray(8192)
val input = body.byteStream()
var bytesRead: Int
var lastProgressUpdate = 0L

while (input.read(buffer).also { bytesRead = it } != -1) {
// 检查Worker是否已停止
if (worker.isStopped) {
throw InterruptedException("下载已停止")
}

// 检查网络状态
if (!networkMonitor.isNetworkAvailable()) {
throw IOException("网络连接已断开")
}

output.write(buffer, 0, bytesRead)
downloadedBytes += bytesRead

// 更新进度
val now = System.currentTimeMillis()
if (now - lastProgressUpdate > 500) { // 每500ms更新一次
lastProgressUpdate = now

val progress = if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else 0

// 更新数据库
downloadTaskDao.updateTaskProgress(task.id, downloadedBytes, now)

// 发送进度事件
_downloadEvents.emit(
DownloadEvent.Progress(
task.id,
progress,
downloadedBytes,
totalBytes
)
)

// 更新前台服务通知
worker.setForeground(
worker.createForegroundInfo(
"下载: ${task.fileName}",
progress
)
)
}
}
}
}

// 返回下载完成的文件
file
}

// 网络变为可用
override fun onNetworkAvailable() {
downloadScope.launch {
// 恢复所有等待网络的任务
downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
resumeDownload(task.id)
}
}
}

// 网络变为不可用
override fun onNetworkUnavailable() {
downloadScope.launch {
// 暂停所有正在下载的任务
downloadTaskDao.getTasksByStatuses(
listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING, DownloadStatus.PENDING)
).first().forEach { task ->
// 标记为等待网络
updateTaskStatus(task.id, DownloadStatus.WAITING_FOR_NETWORK)

// 取消当前工作
DownloadWorker.cancelDownload(context, task.id)
}
}
}

// 应用进入前台
fun onAppForeground() {
downloadScope.launch {
// 检查并恢复可能被系统暂停的任务
downloadTaskDao.getTasksByStatus(DownloadStatus.PAUSED).first().forEach { task ->
// 如果任务是因为系统限制被暂停的(而非用户主动暂停),则尝试恢复
// 这里可以通过某些标记或时间戳来判断
if (task.errorMessage?.contains("系统限制") == true ||
System.currentTimeMillis() - task.lastUpdated > 30000) { // 30秒内未更新

if (networkMonitor.isNetworkAvailable()) {
resumeDownload(task.id)
} else {
updateTaskStatus(task.id, DownloadStatus.WAITING_FOR_NETWORK)
}
}
}
}
}

// 应用进入后台
fun onAppBackground() {
// 应用进入后台时,不需要特别处理,
// 因为我们使用WorkManager来执行下载任务,
// 这允许任务在后台继续执行
}

// 释放资源
fun release() {
networkMonitor.removeListener(this)
networkMonitor.stopMonitoring()
downloadScope.cancel()
}
}

使用方法

下面是如何在应用中使用这个下载管理器的完整示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// 在Application类中初始化
class MyApplication : Application() {
override fun onCreate() {
super.onCreate()

// 注册生命周期观察者,用于监控应用前台/后台状态
ProcessLifecycleOwner.get().lifecycle.addObserver(AppLifecycleObserver(this))
}
}

// 在Activity或Fragment中使用下载管理器
class DownloadActivity : AppCompatActivity() {

private val downloader by lazy {
WorkManagerNetworkDownloader.getInstance(applicationContext)
}

private var downloadEventJob: Job? = null
private var adaptor: DownloadAdaptor? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_download)

// 初始化RecyclerView适配器
val recyclerView = findViewById<RecyclerView>(R.id.recyclerViewDownloads)
adaptor = DownloadAdaptor(emptyList()) { task, action ->
handleAction(task, action)
}
recyclerView.adapter = adaptor
recyclerView.layoutManager = LinearLayoutManager(this)

// 监听下载事件
downloadEventJob = lifecycleScope.launch {
downloader.downloadEvents.collect { event ->
when (event) {
is DownloadEvent.Progress -> {
adaptor?.updateProgress(event.taskId, event.progress)
}
is DownloadEvent.StatusChanged -> {
// 刷新UI或显示通知
showToast("任务${event.taskId}状态变更: ${event.status}")
}
is DownloadEvent.Complete -> {
showToast("下载完成: ${event.filePath}")
}
is DownloadEvent.Error -> {
showToast("下载错误: ${event.error}")
}
}
}
}

// 观察下载任务列表
lifecycleScope.launch {
downloader.allTasks.collect { tasks ->
adaptor?.updateTasks(tasks)
}
}

// 添加下载按钮点击事件
findViewById<Button>(R.id.btnAddDownload).setOnClickListener {
showAddDownloadDialog()
}
}

private fun showAddDownloadDialog() {
val urlEditText = EditText(this).apply {
hint = "请输入下载URL"
setText("https://example.com/large-file.zip")
}

AlertDialog.Builder(this)
.setTitle("添加下载")
.setView(urlEditText)
.setPositiveButton("下载") { _, _ ->
val url = urlEditText.text.toString()
if (url.isNotEmpty()) {
addDownload(url)
}
}
.setNegativeButton("取消", null)
.show()
}

private fun addDownload(url: String) {
lifecycleScope.launch {
try {
val fileName = url.substringAfterLast("/")
val savePath = getExternalFilesDir(null)?.absolutePath ?: filesDir.absolutePath

val taskId = downloader.addDownloadTask(
url = url,
fileName = fileName,
savePath = savePath
)

showToast("下载任务已添加")
} catch (e: Exception) {
showToast("添加下载失败: ${e.message}")
}
}
}

private fun handleAction(task: DownloadTask, action: String) {
lifecycleScope.launch {
when (action) {
"pause" -> downloader.pauseDownload(task.id)
"resume" -> downloader.resumeDownload(task.id)
"cancel" -> downloader.cancelDownload(task.id)
"delete" -> downloader.deleteTask(task.id)
}
}
}

private fun showToast(message: String) {
Toast.makeText(this, message, Toast.LENGTH_SHORT).show()
}

override fun onDestroy() {
super.onDestroy()
downloadEventJob?.cancel()
}

使用OkHttp实现多功能下载框架二-Android后台下载

这个问题很有挑战性,因为Android系统对后台任务的限制越来越严格,特别是在Android 8及更高版本上。让我们设计一个能在后台下载长任务队列的下载管理器,并在应用回到前台时恢复下载。

主要需要考虑的挑战:

  1. Android系统的电池优化和后台任务限制
  2. 在应用进入后台后,下载可能被系统暂停
  3. 在应用返回前台时需要恢复下载
  4. 需要正确处理应用被终止的情况
  5. 保持下载状态的一致性

解决方案可以包含以下组件:

  1. WorkManager:用于管理后台任务,即使应用被终止也能恢复任务
  2. ForegroundService:在下载过程中显示通知,降低系统杀死服务的概率
  3. Room数据库:存储下载任务状态,便于恢复
  4. 生命周期感知:监测应用前台/后台状态转换
  5. 下载队列管理:正确管理任务队列和状态转换

下面我将设计一个完整的解决方案,包括:

  • 下载管理器架构
  • 持久化存储
  • 应用生命周期处理
  • 后台服务
  • 任务队列管理
  • 暂停/恢复机制

Android长任务队列后台下载管理器设计

下面是一个基于WorkManager、ForegroundService和Room数据库的下载管理器实现,专注于解决Android后台下载限制问题:

// BackgroundDownloadManager.kt

import android.app.*
import android.content.Context
import android.content.Intent
import android.os.*
import androidx.annotation.RequiresApi
import androidx.core.app.NotificationCompat
import androidx.lifecycle.*
import androidx.room.*
import androidx.work.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import okhttp3.*
import java.io.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

/**
 * 下载任务数据类 - Room数据库实体
 */
@Entity(tableName = "download_tasks")
data class DownloadTask(
    @PrimaryKey val id: String,
    val url: String,
    val fileName: String,
    val savePath: String,
    val totalBytes: Long = 0,
    val downloadedBytes: Long = 0,
    val status: DownloadStatus = DownloadStatus.PENDING,
    val priority: Int = 0,
    val createTime: Long = System.currentTimeMillis(),
    val lastUpdated: Long = System.currentTimeMillis(),
    val attemptCount: Int = 0,
    val retryDelayMillis: Long = 0,
    val chunkStart: Long = 0,
    val chunkEnd: Long = 0
)

/**
 * 下载状态枚举
 */
enum class DownloadStatus {
    PENDING,         // 等待下载
    CONNECTING,      // 连接中
    DOWNLOADING,     // 下载中
    PAUSED,          // 暂停状态
    COMPLETED,       // 下载完成
    FAILED,          // 下载失败
    CANCELED         // 下载取消
}

/**
 * Room DAO接口
 */
@Dao
interface DownloadTaskDao {
    @Query("SELECT * FROM download_tasks ORDER BY priority DESC, createTime ASC")
    fun getAllTasks(): Flow<List<DownloadTask>>

    @Query("SELECT * FROM download_tasks WHERE status IN (:statuses) ORDER BY priority DESC, createTime ASC")
    fun getTasksByStatus(statuses: List<DownloadStatus>): Flow<List<DownloadTask>>

    @Query("SELECT * FROM download_tasks WHERE id = :taskId")
    suspend fun getTaskById(taskId: String): DownloadTask?

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertTask(task: DownloadTask)

    @Update
    suspend fun updateTask(task: DownloadTask)

    @Delete
    suspend fun deleteTask(task: DownloadTask)

    @Query("DELETE FROM download_tasks WHERE status = :status")
    suspend fun deleteTasksByStatus(status: DownloadStatus)

    @Query("UPDATE download_tasks SET status = :newStatus WHERE status = :currentStatus")
    suspend fun updateTasksStatus(currentStatus: DownloadStatus, newStatus: DownloadStatus)
}

/**
 * Room数据库
 */
@Database(entities = [DownloadTask::class], version = 1, exportSchema = false)
abstract class DownloadDatabase : RoomDatabase() {
    abstract fun downloadTaskDao(): DownloadTaskDao

    companion object {
        @Volatile
        private var INSTANCE: DownloadDatabase? = null

        fun getDatabase(context: Context): DownloadDatabase {
            return INSTANCE ?: synchronized(this) {
                val instance = Room.databaseBuilder(
                    context.applicationContext,
                    DownloadDatabase::class.java,
                    "download_database"
                ).build()
                INSTANCE = instance
                instance
            }
        }
    }
}

/**
 * 下载事件 - 用于通知UI层
 */
sealed class DownloadEvent {
    data class Progress(val taskId: String, val progress: Int, val downloadedBytes: Long, val totalBytes: Long) : DownloadEvent()
    data class StatusChanged(val taskId: String, val status: DownloadStatus) : DownloadEvent()
    data class Error(val taskId: String, val error: Throwable) : DownloadEvent()
    data class Complete(val taskId: String, val filePath: String) : DownloadEvent()
}

/**
 * 下载服务 - 使用前台服务确保在后台也能下载
 */
class DownloadService : Service() {
    companion object {
        private const val NOTIFICATION_ID = 1001
        private const val CHANNEL_ID = "download_channel"
        private const val CHANNEL_NAME = "Downloads"

        fun startService(context: Context) {
            val intent = Intent(context, DownloadService::class.java)

            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
                context.startForegroundService(intent)
            } else {
                context.startService(intent)
            }
        }

        fun stopService(context: Context) {
            val intent = Intent(context, DownloadService::class.java)
            context.stopService(intent)
        }
    }

    private lateinit var notificationManager: NotificationManager
    private var isServiceRunning = false

    override fun onCreate() {
        super.onCreate()
        notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
        createNotificationChannel()
    }

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        if (!isServiceRunning) {
            isServiceRunning = true
            startForeground(NOTIFICATION_ID, createNotification("下载管理器正在运行"))

            // 通知下载管理器服务已启动
            BackgroundDownloadManager.getInstance(applicationContext).onServiceStarted()
        }

        return START_STICKY
    }

    override fun onBind(intent: Intent?): IBinder? = null

    override fun onDestroy() {
        isServiceRunning = false
        // 通知下载管理器服务已停止
        BackgroundDownloadManager.getInstance(applicationContext).onServiceStopped()
        super.onDestroy()
    }

    fun updateNotification(title: String, message: String, progress: Int) {
        val notification = createNotification(title, message, progress)
        notificationManager.notify(NOTIFICATION_ID, notification)
    }

    @RequiresApi(Build.VERSION_CODES.O)
    private fun createNotificationChannel() {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            val channel = NotificationChannel(
                CHANNEL_ID,
                CHANNEL_NAME,
                NotificationManager.IMPORTANCE_LOW
            ).apply {
                setShowBadge(false)
                lockscreenVisibility = Notification.VISIBILITY_PUBLIC
            }
            notificationManager.createNotificationChannel(channel)
        }
    }

    private fun createNotification(
        title: String, 
        message: String = "正在处理下载任务",
        progress: Int = 0
    ): Notification {
        val pendingIntent = PendingIntent.getActivity(
            this,
            0,
            packageManager.getLaunchIntentForPackage(packageName),
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
                PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE
            } else {
                PendingIntent.FLAG_UPDATE_CURRENT
            }
        )

        val builder = NotificationCompat.Builder(this, CHANNEL_ID)
            .setSmallIcon(android.R.drawable.stat_sys_download)
            .setContentTitle(title)
            .setContentText(message)
            .setContentIntent(pendingIntent)
            .setPriority(NotificationCompat.PRIORITY_LOW)
            .setOnlyAlertOnce(true)
            .setOngoing(true)

        if (progress > 0) {
            builder.setProgress(100, progress, false)
        }

        return builder.build()
    }
}

/**
 * 下载工作器 - 基于WorkManager
 */
class DownloadWorker(
    context: Context,
    params: WorkerParameters
) : CoroutineWorker(context, params) {

    companion object {
        const val KEY_TASK_ID = "task_id"
        private const val TAG_DOWNLOAD = "download_worker"

        fun enqueueWork(context: Context, taskId: String): Operation {
            val constraints = Constraints.Builder()
                .setRequiredNetworkType(NetworkType.CONNECTED)
                .build()

            val inputData = workDataOf(KEY_TASK_ID to taskId)

            val request = OneTimeWorkRequestBuilder<DownloadWorker>()
                .setConstraints(constraints)
                .setInputData(inputData)
                .addTag(TAG_DOWNLOAD)
                .addTag(taskId)
                .setBackoffCriteria(
                    BackoffPolicy.LINEAR,
                    WorkRequest.MIN_BACKOFF_MILLIS,
                    TimeUnit.MILLISECONDS
                )
                .build()

            return WorkManager.getInstance(context)
                .enqueueUniqueWork(
                    taskId,
                    ExistingWorkPolicy.REPLACE,
                    request
                )
        }

        fun cancelWork(context: Context, taskId: String) {
            WorkManager.getInstance(context).cancelUniqueWork(taskId)
        }
    }

    override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
        val taskId = inputData.getString(KEY_TASK_ID) ?: return@withContext Result.failure()
        val downloadManager = BackgroundDownloadManager.getInstance(applicationContext)

        return@withContext try {
            val task = downloadManager.getTaskById(taskId) ?: return@withContext Result.failure()

            // 更新任务状态为正在下载
            downloadManager.updateTaskStatus(task.id, DownloadStatus.DOWNLOADING)

            // 执行下载
            downloadManager.performDownload(task)

            Result.success()
        } catch (e: Exception) {
            // 如果是因为应用进入后台被系统限制,将任务状态设为PAUSED
            if (e is InterruptedException || e.cause is InterruptedException) {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.PAUSED)
                Result.retry()
            } else {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
                Result.failure()
            }
        }
    }
}

/**
 * 应用生命周期监听
 */
class AppLifecycleObserver(private val context: Context) : DefaultLifecycleObserver {

    private val downloadManager = BackgroundDownloadManager.getInstance(context)

    override fun onStart(owner: LifecycleOwner) {
        // 应用进入前台
        downloadManager.onAppForeground()
    }

    override fun onStop(owner: LifecycleOwner) {
        // 应用进入后台
        downloadManager.onAppBackground()
    }
}

/**
 * 后台下载管理器
 */
class BackgroundDownloadManager private constructor(private val context: Context) {

    companion object {
        @Volatile
        private var INSTANCE: BackgroundDownloadManager? = null

        fun getInstance(context: Context): BackgroundDownloadManager {
            return INSTANCE ?: synchronized(this) {
                INSTANCE ?: BackgroundDownloadManager(context.applicationContext).also {
                    INSTANCE = it
                }
            }
        }
    }

    private val database = DownloadDatabase.getDatabase(context)
    private val downloadTaskDao = database.downloadTaskDao()
    private val okHttpClient = OkHttpClient.Builder()
        .connectTimeout(30, TimeUnit.SECONDS)
        .readTimeout(30, TimeUnit.SECONDS)
        .writeTimeout(30, TimeUnit.SECONDS)
        .build()

    private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    // 事件流
    private val _downloadEvents = MutableSharedFlow<DownloadEvent>(extraBufferCapacity = 100)
    val downloadEvents: SharedFlow<DownloadEvent> = _downloadEvents.asSharedFlow()

    // 任务列表流
    val allTasks = downloadTaskDao.getAllTasks()
    val activeTasks = downloadTaskDao.getTasksByStatus(
        listOf(DownloadStatus.PENDING, DownloadStatus.CONNECTING, DownloadStatus.DOWNLOADING)
    )

    // 应用前台状态追踪
    private val isAppInForeground = AtomicBoolean(true)
    private val isServiceRunning = AtomicBoolean(false)
    private val activeDownloads = AtomicInteger(0)

    // 初始化
    init {
        // 恢复被暂停的任务
        downloadScope.launch {
            downloadTaskDao.updateTasksStatus(DownloadStatus.DOWNLOADING, DownloadStatus.PENDING)
            downloadTaskDao.updateTasksStatus(DownloadStatus.CONNECTING, DownloadStatus.PENDING)

            // 获取等待中的任务并启动
            downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PENDING)).first().forEach { task ->
                enqueueDownload(task.id)
            }
        }
    }

    // 添加下载任务
    suspend fun addDownloadTask(
        url: String,
        fileName: String,
        savePath: String,
        priority: Int = 0
    ): String {
        val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"

        val downloadTask = DownloadTask(
            id = taskId,
            url = url,
            fileName = fileName,
            savePath = savePath,
            priority = priority,
            status = DownloadStatus.PENDING
        )

        downloadTaskDao.insertTask(downloadTask)
        enqueueDownload(taskId)

        return taskId
    }

    // 获取任务信息
    suspend fun getTaskById(taskId: String): DownloadTask? {
        return downloadTaskDao.getTaskById(taskId)
    }

    // 更新任务状态
    suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return
        val updatedTask = task.copy(
            status = status,
            lastUpdated = System.currentTimeMillis()
        )
        downloadTaskDao.updateTask(updatedTask)
        _downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
    }

    // 暂停下载
    suspend fun pauseDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
            DownloadWorker.cancelWork(context, taskId)
            updateTaskStatus(taskId, DownloadStatus.PAUSED)
        }
    }

    // 恢复下载
    suspend fun resumeDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
            updateTaskStatus(taskId, DownloadStatus.PENDING)
            enqueueDownload(taskId)
        }
    }

    // 取消下载
    suspend fun cancelDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        DownloadWorker.cancelWork(context, taskId)
        updateTaskStatus(taskId, DownloadStatus.CANCELED)

        // 删除部分下载的文件
        val file = File(task.savePath, task.fileName)
        if (file.exists()) {
            file.delete()
        }
    }

    // 删除任务
    suspend fun deleteTask(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
            DownloadWorker.cancelWork(context, taskId)
        }

        downloadTaskDao.deleteTask(task)
    }

    // 清理已完成任务
    suspend fun clearCompletedTasks() {
        downloadTaskDao.deleteTasksByStatus(DownloadStatus.COMPLETED)
    }

    // 应用进入前台
    fun onAppForeground() {
        isAppInForeground.set(true)

        // 恢复被暂停的下载
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PAUSED)).first().forEach { task ->
                // 只恢复因进入后台而被暂停的任务
                if (task.attemptCount > 0) {
                    resumeDownload(task.id)
                }
            }
        }
    }

    // 应用进入后台
    fun onAppBackground() {
        isAppInForeground.set(false)

        // 根据设置决定是否在后台继续下载
        val continueInBackground = true // 这里可以从SharedPreferences读取配置

        if (!continueInBackground) {
            downloadScope.launch {
                // 暂停所有正在下载的任务
                downloadTaskDao.getTasksByStatus(
                    listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING)
                ).first().forEach { task ->
                    pauseDownload(task.id)
                }
            }
        } else {
            // 确保前台服务正在运行
            if (activeDownloads.get() > 0 && !isServiceRunning.get()) {
                DownloadService.startService(context)
            }
        }
    }

    // 服务启动回调
    fun onServiceStarted() {
        isServiceRunning.set(true)
    }

    // 服务停止回调
    fun onServiceStopped() {
        isServiceRunning.set(false)
    }

    // 入队下载任务
    private fun enqueueDownload(taskId: String) {
        if (!isAppInForeground.get()) {
            // 如果应用在后台,确保服务正在运行
            DownloadService.startService(context)
        }

        // 使用WorkManager调度下载任务
        DownloadWorker.enqueueWork(context, taskId)

        // 跟踪活跃下载数量
        activeDownloads.incrementAndGet()
    }

    // 执行下载
    suspend fun performDownload(task: DownloadTask) = withContext(Dispatchers.IO) {
        try {
            // 确保目录存在
            val saveDir = File(task.savePath)
            if (!saveDir.exists()) {
                saveDir.mkdirs()
            }

            val file = File(saveDir, task.fileName)

            // 获取文件信息
            var totalBytes = task.totalBytes
            var downloadedBytes = task.downloadedBytes

            // 如果文件已存在且下载过,获取已下载的字节数
            if (file.exists() && downloadedBytes > 0) {
                if (file.length() < downloadedBytes) {
                    // 文件大小异常,从头开始下载
                    file.delete()
                    downloadedBytes = 0
                }
            } else if (downloadedBytes > 0) {
                // 数据不一致,从头开始下载
                downloadedBytes = 0
            }

            // 构建请求
            val request = Request.Builder()
                .url(task.url)
                .apply {
                    if (downloadedBytes > 0) {
                        // 断点续传
                        header("Range", "bytes=$downloadedBytes-")
                    }
                }
                .build()

            // 执行请求
            okHttpClient.newCall(request).execute().use { response ->
                if (!response.isSuccessful) {
                    throw IOException("HTTP error: ${response.code}")
                }

                // 获取总大小
                val body = response.body ?: throw IOException("Empty response body")
                val contentLength = body.contentLength()

                if (totalBytes <= 0 && contentLength > 0) {
                    totalBytes = contentLength + downloadedBytes

                    // 更新任务信息
                    val updatedTask = task.copy(totalBytes = totalBytes)
                    downloadTaskDao.updateTask(updatedTask)
                }

                // 打开输出流
                val outputStream = if (downloadedBytes > 0) {
                    FileOutputStream(file, true) // 续传模式
                } else {
                    FileOutputStream(file) // 从头开始
                }

                // 下载文件
                outputStream.use { output ->
                    val buffer = ByteArray(8192)
                    val input = body.byteStream()
                    var bytesRead: Int
                    var lastProgressUpdate = 0L

                    while (input.read(buffer).also { bytesRead = it } != -1) {
                        // 检查是否应该暂停
                        if (!isAppInForeground.get() && !isServiceRunning.get()) {
                            throw InterruptedException("Download paused due to app background")
                        }

                        output.write(buffer, 0, bytesRead)
                        downloadedBytes += bytesRead

                        // 更新进度,但控制更新频率
                        val now = System.currentTimeMillis()
                        if (now - lastProgressUpdate > 500) { // 每500ms更新一次
                            lastProgressUpdate = now

                            val progress = if (totalBytes > 0) {
                                (downloadedBytes * 100 / totalBytes).toInt()
                            } else {
                                0
                            }

                            // 更新任务信息
                            val updatedTask = task.copy(
                                downloadedBytes = downloadedBytes,
                                lastUpdated = now
                            )
                            downloadTaskDao.updateTask(updatedTask)

                            // 发送进度事件
                            _downloadEvents.emit(
                                DownloadEvent.Progress(
                                    task.id,
                                    progress,
                                    downloadedBytes,
                                    totalBytes
                                )
                            )

                            // 更新通知
                            if (isServiceRunning.get()) {
                                (context.getSystemService(Context.NOTIFICATION_SERVICE) as? NotificationManager)?.let { notificationManager ->
                                    val notification = NotificationCompat.Builder(context, DownloadService.CHANNEL_ID)
                                        .setSmallIcon(android.R.drawable.stat_sys_download)
                                        .setContentTitle("正在下载 ${task.fileName}")
                                        .setContentText("$progress% - ${formatBytes(downloadedBytes)}/${formatBytes(totalBytes)}")
                                        .setProgress(100, progress, false)
                                        .build()

                                    notificationManager.notify(DownloadService.NOTIFICATION_ID, notification)
                                }
                            }
                        }
                    }
                }

                // 下载完成
                val finalTask = task.copy(
                    status = DownloadStatus.COMPLETED,
                    downloadedBytes = totalBytes,
                    lastUpdated = System.currentTimeMillis()
                )
                downloadTaskDao.updateTask(finalTask)

                // 发送完成事件
                _downloadEvents.emit(DownloadEvent.Complete(task.id, file.absolutePath))

                // 更新活跃下载数量
                val remaining = activeDownloads.decrementAndGet()

                // 如果没有活跃下载且服务正在运行,停止服务
                if (remaining <= 0 && isServiceRunning.get()) {
                    DownloadService.stopService(context)
                }
            }
        } catch (e: Exception) {
            // 更新活跃下载数量
            activeDownloads.decrementAndGet()

            // 重新抛出异常
            throw e
        }
    }

    // 格式化字节大小
    private fun formatBytes(bytes: Long): String {
        if (bytes <= 0) return "0 B"

        val units = arrayOf("B", "KB", "MB", "GB", "TB")
        val digitGroups = (Math.log10(bytes.toDouble()) / Math.log10(1024.0)).toInt()

        return String.format("%.1f %s", bytes / Math.pow(1024.0, digitGroups.toDouble()), units[digitGroups])
    }
}

// 增强的网络感知下载解决方案

import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.net.ConnectivityManager
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import android.os.Build
import androidx.work.*
import java.io.IOException
import java.util.concurrent.TimeUnit

/**
 * 网络状态监听器
 */
class NetworkMonitor(private val context: Context) {

    interface NetworkListener {
        fun onNetworkAvailable()
        fun onNetworkUnavailable()
    }

    private val listeners = mutableListOf<NetworkListener>()
    private var isNetworkAvailable = false
    private var isMonitoring = false

    private val connectivityManager by lazy {
        context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
    }

    // 网络回调 (Android 5.0+)
    private val networkCallback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            val wasUnavailable = !isNetworkAvailable
            isNetworkAvailable = true

            if (wasUnavailable) {
                notifyNetworkAvailable()
            }
        }

        override fun onLost(network: Network) {
            // 检查是否真的没有可用网络了
            if (isAnyNetworkAvailable()) {
                return
            }

            isNetworkAvailable = false
            notifyNetworkUnavailable()
        }
    }

    // 广播接收器 (兼容旧版Android)
    private val networkReceiver = object : BroadcastReceiver() {
        override fun onReceive(context: Context, intent: Intent) {
            val wasUnavailable = !isNetworkAvailable
            isNetworkAvailable = isAnyNetworkAvailable()

            if (isNetworkAvailable && wasUnavailable) {
                notifyNetworkAvailable()
            } else if (!isNetworkAvailable) {
                notifyNetworkUnavailable()
            }
        }
    }

    // 开始监听
    fun startMonitoring() {
        if (isMonitoring) return

        isNetworkAvailable = isAnyNetworkAvailable()

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            // Android 7.0+ 使用NetworkCallback
            val request = NetworkRequest.Builder()
                .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
                .build()
            connectivityManager.registerNetworkCallback(request, networkCallback)
        } else {
            // 旧版Android使用广播
            val filter = IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)
            context.registerReceiver(networkReceiver, filter)
        }

        isMonitoring = true
    }

    // 停止监听
    fun stopMonitoring() {
        if (!isMonitoring) return

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            connectivityManager.unregisterNetworkCallback(networkCallback)
        } else {
            try {
                context.unregisterReceiver(networkReceiver)
            } catch (e: Exception) {
                // 忽略接收器未注册的异常
            }
        }

        isMonitoring = false
    }

    // 添加监听器
    fun addListener(listener: NetworkListener) {
        if (!listeners.contains(listener)) {
            listeners.add(listener)
        }
    }

    // 移除监听器
    fun removeListener(listener: NetworkListener) {
        listeners.remove(listener)
    }

    // 检查是否有可用网络
    fun isNetworkAvailable(): Boolean {
        return isAnyNetworkAvailable()
    }

    // 检查是否有可用网络的实现
    private fun isAnyNetworkAvailable(): Boolean {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
            val network = connectivityManager.activeNetwork ?: return false
            val capabilities = connectivityManager.getNetworkCapabilities(network) ?: return false

            return capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
        } else {
            @Suppress("DEPRECATION")
            val networkInfo = connectivityManager.activeNetworkInfo
            return networkInfo != null && networkInfo.isConnected
        }
    }

    // 通知网络可用
    private fun notifyNetworkAvailable() {
        listeners.forEach { it.onNetworkAvailable() }
    }

    // 通知网络不可用
    private fun notifyNetworkUnavailable() {
        listeners.forEach { it.onNetworkUnavailable() }
    }
}

/**
 * 增强型下载工作器,支持网络感知
 */
class NetworkAwareDownloadWorker(
    appContext: Context,
    workerParams: WorkerParameters
) : CoroutineWorker(appContext, workerParams) {

    companion object {
        const val TAG_DOWNLOAD_WORKER = "network_aware_download_worker"
        const val KEY_TASK_ID = "task_id"
        const val KEY_RETRY_COUNT = "retry_count"
        const val MAX_RETRY_COUNT = 10
        const val NOTIFICATION_CHANNEL_ID = "download_channel"
        const val NOTIFICATION_ID = 1

        // 调度下载任务
        fun scheduleDownload(context: Context, taskId: String, retryCount: Int = 0) {
            val data = workDataOf(
                KEY_TASK_ID to taskId,
                KEY_RETRY_COUNT to retryCount
            )

            // 设置网络约束
            val constraints = Constraints.Builder()
                .setRequiredNetworkType(NetworkType.CONNECTED)
                .setRequiresStorageNotLow(true)
                .build()

            val workRequest = OneTimeWorkRequestBuilder<NetworkAwareDownloadWorker>()
                .setConstraints(constraints)
                .setInputData(data)
                .addTag(TAG_DOWNLOAD_WORKER)
                .addTag(taskId)
                .setBackoffCriteria(
                    BackoffPolicy.EXPONENTIAL,
                    WorkRequest.MIN_BACKOFF_MILLIS,
                    TimeUnit.MILLISECONDS
                )
                .build()

            WorkManager.getInstance(context)
                .enqueueUniqueWork(
                    "download_$taskId",
                    ExistingWorkPolicy.REPLACE,
                    workRequest
                )
        }

        // 取消下载任务
        fun cancelDownload(context: Context, taskId: String) {
            WorkManager.getInstance(context)
                .cancelUniqueWork("download_$taskId")
        }
    }

    override suspend fun doWork(): Result {
        val taskId = inputData.getString(KEY_TASK_ID) ?: return Result.failure()
        val retryCount = inputData.getInt(KEY_RETRY_COUNT, 0)

        // 获取下载任务
        val downloadManager = NetworkAwareDownloadManager.getInstance(applicationContext)
        val task = downloadManager.getTaskById(taskId) ?: return Result.failure()

        // 更新任务状态为下载中
        downloadManager.updateTaskStatus(taskId, DownloadStatus.RUNNING)

        try {
            // 创建前台服务通知
            setForeground(createForegroundInfo("准备下载..."))

            // 执行下载
            downloadManager.downloadFile(task, this)

            // 下载完成
            downloadManager.updateTaskStatus(taskId, DownloadStatus.COMPLETED)
            return Result.success()

        } catch (e: Exception) {
            e.printStackTrace()

            // 确定错误类型
            val isNetworkError = e is IOException && 
                (e.message?.contains("network", ignoreCase = true) == true ||
                 e.message?.contains("connect", ignoreCase = true) == true ||
                 e.message?.contains("timeout", ignoreCase = true) == true)

            if (isNetworkError) {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)

                // 网络错误,判断是否需要重试
                if (retryCount < MAX_RETRY_COUNT) {
                    val newRetryCount = retryCount + 1

                    // 等待时间随重试次数增加
                    val delayInSeconds = Math.min(30, Math.pow(2.0, newRetryCount.toDouble())).toLong()

                    // 安排下一次重试
                    val retryData = workDataOf(
                        KEY_TASK_ID to taskId,
                        KEY_RETRY_COUNT to newRetryCount
                    )

                    return Result.retry()
                }
            }

            // 达到最大重试次数或非网络错误
            downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
            return Result.failure()
        }
    }

    // 创建前台服务信息
    private fun createForegroundInfo(message: String, progress: Int = 0): ForegroundInfo {
        // 实现创建通知的逻辑
        // ...与之前的实现类似...

        // 简化版本
        val notification = NotificationCompat.Builder(applicationContext, NOTIFICATION_CHANNEL_ID)
            .setContentTitle("下载中")
            .setContentText(message)
            .setSmallIcon(android.R.drawable.stat_sys_download)
            .setOngoing(true)
            .apply {
                if (progress > 0) {
                    setProgress(100, progress, false)
                }
            }
            .build()

        return ForegroundInfo(NOTIFICATION_ID, notification)
    }
}

/**
 * 增强的下载状态枚举
 */
enum class DownloadStatus {
    PENDING,             // 等待下载
    RUNNING,             // 下载中
    PAUSED,              // 手动暂停
    WAITING_FOR_NETWORK, // 等待网络
    COMPLETED,           // 完成
    FAILED,              // 失败
    CANCELED             // 取消
}

/**
 * 网络感知下载管理器
 */
class NetworkAwareDownloadManager private constructor(private val context: Context) : NetworkMonitor.NetworkListener {

    companion object {
        @Volatile
        private var INSTANCE: NetworkAwareDownloadManager? = null

        fun getInstance(context: Context): NetworkAwareDownloadManager {
            return INSTANCE ?: synchronized(this) {
                INSTANCE ?: NetworkAwareDownloadManager(context.applicationContext).also {
                    INSTANCE = it
                }
            }
        }
    }

    private val downloadTaskDao: DownloadTaskDao by lazy {
        DownloadDatabase.getDatabase(context).downloadTaskDao()
    }

    private val networkMonitor = NetworkMonitor(context)
    private val okHttpClient = OkHttpClient.Builder()
        .connectTimeout(30, TimeUnit.SECONDS)
        .readTimeout(30, TimeUnit.SECONDS)
        .build()

    private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    private val _downloadEvents = MutableSharedFlow<DownloadEvent>()
    val downloadEvents = _downloadEvents.asSharedFlow()

    val allTasks = downloadTaskDao.getAllTasks()

    init {
        // 开始监听网络状态
        networkMonitor.addListener(this)
        networkMonitor.startMonitoring()

        // 应用启动时检查和恢复下载任务
        downloadScope.launch {
            // 当状态是WAITING_FOR_NETWORK且有网络连接时,恢复下载
            if (networkMonitor.isNetworkAvailable()) {
                downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
                    resumeDownloadWithNetwork(task.id)
                }
            }
        }
    }

    override fun onNetworkAvailable() {
        // 网络恢复时恢复等待网络的下载任务
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
                resumeDownloadWithNetwork(task.id)
            }
        }
    }

    override fun onNetworkUnavailable() {
        // 网络断开时暂停正在下载的任务
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(DownloadStatus.RUNNING).first().forEach { task ->
                pauseDownloadDueToNetwork(task.id)
            }
        }
    }

    // 添加下载任务
    suspend fun addDownloadTask(
        url: String,
        fileName: String,
        savePath: String,
        priority: Int = 0
    ): String {
        val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"

        // 创建保存目录
        val saveDir = File(savePath)
        if (!saveDir.exists()) {
            saveDir.mkdirs()
        }

        // 创建任务
        val task = DownloadTask(
            id = taskId,
            url = url,
            fileName = fileName,
            savePath = savePath,
            status = if (networkMonitor.isNetworkAvailable()) DownloadStatus.PENDING else DownloadStatus.WAITING_FOR_NETWORK,
            priority = priority
        )

        // 保存到数据库
        downloadTaskDao.insertTask(task)

        // 如果网络可用,立即开始下载
        if (networkMonitor.isNetworkAvailable()) {
            NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
        } else {
            _downloadEvents.emit(
                DownloadEvent.StatusChanged(
                    taskId,
                    DownloadStatus.WAITING_FOR_NETWORK
                )
            )
        }

        return taskId
    }

    // 暂停下载
    suspend fun pauseDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.RUNNING || 
            task.status == DownloadStatus.PENDING || 
            task.status == DownloadStatus.WAITING_FOR_NETWORK) {
            // 取消Worker
            NetworkAwareDownloadWorker.cancelDownload(context, taskId)

            // 更新状态
            updateTaskStatus(taskId, DownloadStatus.PAUSED)
        }
    }

    // 因网络问题暂停下载
    private suspend fun pauseDownloadDueToNetwork(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.RUNNING || task.status == DownloadStatus.PENDING) {
            // 更新状态为等待网络
            updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
        }
    }

    // 恢复下载
    suspend fun resumeDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
            if (networkMonitor.isNetworkAvailable()) {
                // 网络可用,开始下载
                updateTaskStatus(taskId, DownloadStatus.PENDING)
                NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
            } else {
                // 网络不可用,标记为等待网络
                updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
            }
        }
    }

    // 网络恢复时恢复下载
    private suspend fun resumeDownloadWithNetwork(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.WAITING_FOR_NETWORK) {
            updateTaskStatus(taskId, DownloadStatus.PENDING)
            NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
        }
    }

    // 取消下载
    suspend fun cancelDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        // 取消Worker
        NetworkAwareDownloadWorker.cancelDownload(context, taskId)

        // 更新状态
        updateTaskStatus(taskId, DownloadStatus.CANCELED)

        // 删除部分下载的文件
        val file = File(task.savePath, task.fileName)
        if (file.exists()) {
            file.delete()
        }
    }

    // 删除任务
    suspend fun deleteTask(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status != DownloadStatus.COMPLETED) {
            // 取消下载
            NetworkAwareDownloadWorker.cancelDownload(context, taskId)
        }

        // 从数据库删除
        downloadTaskDao.deleteTask(task)
    }

    // 获取任务
    suspend fun getTaskById(taskId: String): DownloadTask? {
        return downloadTaskDao.getTaskById(taskId)
    }

    // 更新任务状态
    suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        val updatedTask = task.copy(
            status = status,
            lastUpdated = System.currentTimeMillis()
        )

        downloadTaskDao.updateTask(updatedTask)
        _downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
    }

    // 执行文件下载
    suspend fun downloadFile(task: DownloadTask, worker: NetworkAwareDownloadWorker) {
        // 检查网络状态
        if (!networkMonitor.isNetworkAvailable()) {
            throw IOException("网络不可用")
        }

        // 确保目录存在
        val saveDir = File(task.savePath)
        if (!saveDir.exists()) {
            saveDir.mkdirs()
        }

        val file = File(saveDir, task.fileName)
        var totalBytes = task.totalBytes
        var downloadedBytes = task.downloadedBytes

        // 检查文件是否已存在且有部分数据
        if (file.exists() && downloadedBytes > 0) {
            val fileLength = file.length()
            if (fileLength != downloadedBytes) {
                // 文件大小不一致,使用实际文件大小
                downloadedBytes = fileLength
            }
        } else if (downloadedBytes > 0) {
            // 文件不存在但记录有下载字节,重置
            downloadedBytes = 0
        }

        // 构建请求
        val requestBuilder = Request.Builder()
            .url(task.url)

        // 添加Range头以支持断点续传
        if (downloadedBytes > 0) {
            requestBuilder.addHeader("Range", "bytes=$downloadedBytes-")
        }

        val request = requestBuilder.build()

        // 执行请求
        okHttpClient.newCall(request).execute().use { response ->
            if (!response.isSuccessful) {
                throw IOException("HTTP error: ${response.code}")
            }

            val body = response.body ?: throw IOException("Empty response body")
            val contentLength = body.contentLength()

            // 确定总大小
            if (totalBytes <= 0 && contentLength > 0) {
                totalBytes = when {
                    response.code == 206 -> { // 断点续传
                        val contentRange = response.header("Content-Range")
                        if (contentRange != null) {
                            // 解析形如 "bytes 100-999/1000" 的Content-Range头
                            val rangeMatcher = Regex("bytes \\d+-\\d+/(\\d+)").find(contentRange)
                            rangeMatcher?.groupValues?.get(1)?.toLongOrNull() ?: (downloadedBytes + contentLength)
                        } else {
                            downloadedBytes + contentLength
                        }
                    }
                    else -> contentLength // 完整下载
                }

                // 更新任务总大小
                val updatedTask = task.copy(totalBytes = totalBytes)
                downloadTaskDao.updateTask(updatedTask)
            }

            // 准备输出流
            val outputStream = if (downloadedBytes > 0) {
                FileOutputStream(file, true) // 断点续传,追加模式
            } else {
                FileOutputStream(file) // 从头下载,覆盖模式
            }

            outputStream.use { output ->
                val buffer = ByteArray(8192)
                val input = body.byteStream()
                var bytesRead: Int
                var lastProgressUpdate = 0L

                while (input.read(buffer).also { bytesRead = it } != -1) {
                    // 定期检查网络状态
                    if (!networkMonitor.isNetworkAvailable()) {
                        throw IOException("网络连接已断开")
                    }

                    if (worker.isStopped) {
                        throw InterruptedException("下载已停止")
                    }

                    output.write(buffer, 0, bytesRead)
                    downloadedBytes += bytesRead

                    // 更新进度
                    val now = System.currentTimeMillis()
                    if (now - lastProgressUpdate > 500) { // 每500ms更新一次
                        lastProgressUpdate = now

                        val progress = if (totalBytes > 0) {
                            (downloadedBytes * 100 / totalBytes).toInt()
                        } else 0

                        // 更新数据库
                        val updatedTask = task.copy(
                            downloadedBytes = downloadedBytes,
                            lastUpdated = now
                        )
                        downloadTaskDao.updateTask(updatedTask)

                        // 通知进度
                        _downloadEvents.emit(
                            DownloadEvent.Progress(
                                task.id,
                                progress,
                                downloadedBytes,
                                totalBytes
                            )
                        )

                        // 更新Worker通知
                        worker.setForeground(
                            worker.createForegroundInfo(
                                "下载中...",
                                progress
                            )
                        )
                    }
                }
            }
        }

        // 下载完成
        val updatedTask = task.copy(
            downloadedBytes = totalBytes,
            status = DownloadStatus.COMPLETED,
            lastUpdated = System.currentTimeMillis()
        )
        downloadTaskDao.updateTask(updatedTask)

        _downloadEvents.emit(
            DownloadEvent.Complete(
                task.id,
                File(task.savePath, task.fileName).absolutePath
            )
        )
    }

    // 释放资源
    fun release() {
        networkMonitor.removeListener(this)
        networkMonitor.stopMonitoring()
        downloadScope.cancel()
    }
}

使用OkHttp实现多功能下载框架一

您要我使用OkHttp实现一个具有多线程、多任务、限速、暂停恢复和进度显示功能的下载器。我需要设计一个完整的下载框架。让我规划一下:

系统架构

  1. DownloadManager - 全局下载管理器,处理多任务调度
  2. DownloadTask - 单个下载任务,包含URL、目标文件路径等信息
  3. DownloadWorker - 工作线程,处理实际下载逻辑
  4. DownloadListener - 回调接口,通知下载进度、状态变化
  5. RateLimiter - 限速器,控制下载速度
  6. TaskStorage - 任务持久化存储,用于断点续传

主要类设计

1. DownloadManager

  • 管理下载任务队列
  • 控制并发下载数量
  • 提供添加、暂停、恢复、取消任务方法

2. DownloadTask

  • 存储下载URL、目标路径、文件大小等信息
  • 记录下载状态(等待、下载中、暂停、完成、失败)
  • 保存断点续传所需信息(已下载部分)

3. DownloadWorker

  • 使用OkHttp执行HTTP请求
  • 支持Range请求实现断点续传
  • 将下载内容写入文件

4. RateLimiter

  • 限制下载速度
  • 可以根据网络状况动态调整

5. 断点续传

  • 通过HTTP Range头实现
  • 保存已下载部分信息

6. 多线程下载

  • 将一个文件分成多个部分
  • 每个部分由单独线程下载
  • 合并各部分得到完整文件

让我开始编写代码实现这些功能。

使用OkHttp实现多功能下载框架

下面是一个基于OkHttp实现的功能完整的下载框架,支持您需要的所有功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
// DownloadManager.kt

import kotlinx.coroutines.*
import okhttp3.*
import java.io.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

/**
* 下载状态
*/
enum class DownloadStatus {
IDLE, // 空闲状态
WAITING, // 等待下载
DOWNLOADING,// 下载中
PAUSED, // 暂停状态
COMPLETED, // 下载完成
FAILED, // 下载失败
CANCELED // 下载取消
}

/**
* 下载监听器
*/
interface DownloadListener {
fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long)
fun onStatusChanged(task: DownloadTask, status: DownloadStatus)
fun onSuccess(task: DownloadTask, file: File)
fun onFailed(task: DownloadTask, e: Exception)
}

/**
* 下载任务
*/
class DownloadTask(
val id: String, // 任务唯一ID
val url: String, // 下载URL
val savePath: String, // 保存路径
val fileName: String, // 文件名
var totalBytes: Long = 0, // 文件总大小
var downloadedBytes: Long = 0, // 已下载大小
var threadCount: Int = 3, // 线程数
var status: DownloadStatus = DownloadStatus.IDLE, // 下载状态
var ranges: MutableList<Pair<Long, Long>> = mutableListOf() // 分片信息
) {
// 获取进度百分比
fun getProgress(): Int {
return if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else 0
}

// 获取下载文件
fun getFile(): File = File(savePath, fileName)

// 获取临时文件
fun getTempFile(index: Int): File = File(savePath, "$fileName.part$index")
}

/**
* 下载速度限制器
*/
class RateLimiter(private var bytesPerSecond: Long) {
private val byteCount = AtomicLong(0)
private var lastResetTime = System.currentTimeMillis()

// 设置速度限制 (字节/秒)
fun setBytesPerSecond(bytesPerSecond: Long) {
this.bytesPerSecond = bytesPerSecond
}

// 消耗指定字节数,若超过限制则阻塞
@Synchronized
fun consume(bytes: Long) {
byteCount.addAndGet(bytes)

val currentTime = System.currentTimeMillis()
val timeElapsed = currentTime - lastResetTime

if (timeElapsed >= 1000) {
// 每秒重置计数
byteCount.set(0)
lastResetTime = currentTime
return
}

val currentBytesPerSecond = (byteCount.get() * 1000 / timeElapsed.coerceAtLeast(1))
if (currentBytesPerSecond > bytesPerSecond && bytesPerSecond > 0) {
// 需要限速,计算需要等待的时间
val waitTime = ((currentBytesPerSecond.toDouble() / bytesPerSecond) * 1000 - timeElapsed).toLong()
if (waitTime > 0) {
try {
Thread.sleep(waitTime.coerceAtMost(1000))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}
}
}
}

/**
* 下载管理器
*/
class DownloadManager private constructor() {
private val client = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()

private val downloadTasks = ConcurrentHashMap<String, DownloadTask>()
private val downloadWorkers = ConcurrentHashMap<String, DownloadWorker>()
private val taskListeners = ConcurrentHashMap<String, MutableList<DownloadListener>>()
private val rateLimiter = RateLimiter(0) // 默认不限速
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 最大并发下载数
private var maxConcurrentDownloads = 3
// 当前运行的下载数
private var runningDownloads = AtomicLong(0)
// 等待队列
private val waitingQueue = LinkedBlockingQueue<String>()

companion object {
@Volatile
private var instance: DownloadManager? = null

fun getInstance(): DownloadManager {
return instance ?: synchronized(this) {
instance ?: DownloadManager().also { instance = it }
}
}
}

init {
// 启动队列处理协程
downloadScope.launch {
processQueue()
}
}

// 处理等待队列
private suspend fun processQueue() {
while (isActive) {
try {
if (runningDownloads.get() < maxConcurrentDownloads) {
val taskId = waitingQueue.poll()
if (taskId != null) {
val task = downloadTasks[taskId]
if (task != null && task.status == DownloadStatus.WAITING) {
startDownloadInternal(task)
}
}
}
delay(100) // 避免CPU过度使用
} catch (e: Exception) {
e.printStackTrace()
}
}
}

// 设置最大并发下载数
fun setMaxConcurrentDownloads(max: Int) {
maxConcurrentDownloads = max
}

// 设置下载速度限制 (KB/s)
fun setSpeedLimit(kbPerSecond: Int) {
rateLimiter.setBytesPerSecond(kbPerSecond * 1024L)
}

// 添加下载任务
fun addTask(url: String, savePath: String, fileName: String, threadCount: Int = 3): DownloadTask {
val id = url.hashCode().toString()
var task = downloadTasks[id]

if (task == null) {
File(savePath).mkdirs() // 确保目录存在
task = DownloadTask(id, url, savePath, fileName, threadCount = threadCount)
downloadTasks[id] = task
}

return task
}

// 添加监听器
fun addListener(taskId: String, listener: DownloadListener) {
val listeners = taskListeners.getOrPut(taskId) { mutableListOf() }
listeners.add(listener)
}

// 移除监听器
fun removeListener(taskId: String, listener: DownloadListener) {
taskListeners[taskId]?.remove(listener)
}

// 开始下载
fun startDownload(taskId: String) {
val task = downloadTasks[taskId] ?: return

when (task.status) {
DownloadStatus.IDLE, DownloadStatus.FAILED, DownloadStatus.CANCELED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
DownloadStatus.PAUSED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
else -> { /* 其他状态不处理 */ }
}
}

// 内部启动下载方法
private fun startDownloadInternal(task: DownloadTask) {
runningDownloads.incrementAndGet()

if (task.status != DownloadStatus.WAITING) return

task.status = DownloadStatus.DOWNLOADING
notifyStatusChanged(task)

// 创建并启动下载工作线程
val worker = DownloadWorker(task, client, rateLimiter, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
notifyProgress(task, progress, downloadedBytes, totalBytes)
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
if (status == DownloadStatus.COMPLETED || status == DownloadStatus.FAILED || status == DownloadStatus.CANCELED) {
runningDownloads.decrementAndGet()
}
notifyStatusChanged(task)
}

override fun onSuccess(task: DownloadTask, file: File) {
notifySuccess(task, file)
}

override fun onFailed(task: DownloadTask, e: Exception) {
notifyFailed(task, e)
}
})

downloadWorkers[task.id] = worker
worker.start()
}

// 暂停下载
fun pauseDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.pause()
}

// 恢复下载
fun resumeDownload(taskId: String) {
val task = downloadTasks[taskId]
if (task?.status == DownloadStatus.PAUSED) {
startDownload(taskId)
}
}

// 取消下载
fun cancelDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.cancel()
waitingQueue.remove(taskId)
}

// 获取下载任务
fun getTask(taskId: String): DownloadTask? {
return downloadTasks[taskId]
}

// 获取所有下载任务
fun getAllTasks(): List<DownloadTask> {
return downloadTasks.values.toList()
}

// 通知进度更新
private fun notifyProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
taskListeners[task.id]?.forEach { listener ->
listener.onProgress(task, progress, downloadedBytes, totalBytes)
}
}

// 通知状态变化
private fun notifyStatusChanged(task: DownloadTask) {
taskListeners[task.id]?.forEach { listener ->
listener.onStatusChanged(task, task.status)
}
}

// 通知下载成功
private fun notifySuccess(task: DownloadTask, file: File) {
taskListeners[task.id]?.forEach { listener ->
listener.onSuccess(task, file)
}
}

// 通知下载失败
private fun notifyFailed(task: DownloadTask, e: Exception) {
taskListeners[task.id]?.forEach { listener ->
listener.onFailed(task, e)
}
}

// 清理资源
fun release() {
downloadScope.cancel()
downloadWorkers.values.forEach { it.cancel() }
downloadTasks.clear()
downloadWorkers.clear()
taskListeners.clear()
waitingQueue.clear()
}
}

/**
* 下载工作线程
*/
class DownloadWorker(
private val task: DownloadTask,
private val client: OkHttpClient,
private val rateLimiter: RateLimiter,
private val listener: DownloadListener
) {
private val isRunning = AtomicBoolean(false)
private val isPaused = AtomicBoolean(false)
private val isCanceled = AtomicBoolean(false)
private val workers = mutableListOf<Job>()
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 开始下载
fun start() {
if (isRunning.getAndSet(true)) return

downloadScope.launch {
try {
// 1. 获取文件信息
fetchFileInfo()

// 2. 如果文件大小为0或不支持范围请求,使用单线程下载
if (task.totalBytes <= 0 || !isSupportRange()) {
singleThreadDownload()
} else {
// 3. 多线程下载
multiThreadDownload()
}
} catch (e: Exception) {
if (!isPaused.get() && !isCanceled.get()) {
task.status = DownloadStatus.FAILED
listener.onStatusChanged(task, task.status)
listener.onFailed(task, e)
}
} finally {
isRunning.set(false)
}
}
}

// 暂停下载
fun pause() {
if (isPaused.getAndSet(true)) return

task.status = DownloadStatus.PAUSED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()
}

// 取消下载
fun cancel() {
if (isCanceled.getAndSet(true)) return

task.status = DownloadStatus.CANCELED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()

// 删除临时文件
for (i in 0 until task.threadCount) {
task.getTempFile(i).delete()
}
}

// 获取文件信息
private suspend fun fetchFileInfo() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.head() // 使用HEAD请求获取文件信息
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to fetch file info: ${response.code}")
}

// 获取文件大小
val contentLength = response.header("Content-Length")?.toLongOrNull() ?: -1L
task.totalBytes = contentLength

// 计算每个线程的下载范围
if (contentLength > 0) {
calculateRanges(contentLength)
}
}
}

// 检查是否支持范围请求
private suspend fun isSupportRange(): Boolean = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=0-0")
.build()

client.newCall(request).execute().use { response ->
return@withContext response.code == 206 // 返回206表示支持范围请求
}
}

// 计算每个线程的下载范围
private fun calculateRanges(contentLength: Long) {
task.ranges.clear()

val blockSize = contentLength / task.threadCount
var start: Long
var end: Long

for (i in 0 until task.threadCount) {
start = i * blockSize
end = if (i == task.threadCount - 1) contentLength - 1 else (i + 1) * blockSize - 1

// 检查临时文件是否存在
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val downloadedSize = tempFile.length()
if (downloadedSize > 0 && downloadedSize < (end - start + 1)) {
// 更新起始位置
start += downloadedSize
task.downloadedBytes += downloadedSize
}
}

task.ranges.add(Pair(start, end))
}
}

// 单线程下载
private suspend fun singleThreadDownload() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to download file: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
task.totalBytes = responseBody.contentLength()

// 创建输出文件
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = 0L

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新进度
task.downloadedBytes = downloadedSize
val progress = if (task.totalBytes > 0) {
(downloadedSize * 100 / task.totalBytes).toInt()
} else 0

listener.onProgress(task, progress, downloadedSize, task.totalBytes)
}

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, outputFile)

} finally {
outputStream.close()
}
}
}

// 多线程下载
private suspend fun multiThreadDownload() = withContext(Dispatchers.IO) {
// 创建多个工作协程
for (i in 0 until task.threadCount) {
val job = downloadScope.launch {
downloadFilePart(i)
}
workers.add(job)
}

// 等待所有下载任务完成
workers.joinAll()

// 如果暂停或取消,直接返回
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 合并文件
mergeFiles()

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, task.getFile())
}

// 下载文件片段
private suspend fun downloadFilePart(index: Int) = withContext(Dispatchers.IO) {
if (index >= task.ranges.size) return@withContext

val range = task.ranges[index]
val start = range.first
val end = range.second

// 如果该部分已经下载完成,直接返回
if (start > end) return@withContext

val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=$start-$end")
.build()

client.newCall(request).execute().use { response ->
if (response.code != 206) {
throw IOException("Server doesn't support range requests: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
val tempFile = task.getTempFile(index)
val outputStream = FileOutputStream(tempFile, tempFile.exists())

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = tempFile.length()

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新总下载进度
synchronized(this@DownloadWorker) {
task.downloadedBytes += read
val progress = (task.downloadedBytes * 100 / task.totalBytes).toInt()
listener.onProgress(task, progress, task.downloadedBytes, task.totalBytes)
}
}

// 更新该部分的下载范围(标记为已完成)
task.ranges[index] = Pair(end + 1, end)

} finally {
outputStream.close()
}
}
}

// 合并文件
private suspend fun mergeFiles() = withContext(Dispatchers.IO) {
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
for (i in 0 until task.threadCount) {
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val tempData = tempFile.readBytes()
outputStream.write(tempData)
tempFile.delete()
}
}
} finally {
outputStream.close()
}
}
}

使用示例

以下是如何使用这个下载框架的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 使用示例

fun main() {
// 获取下载管理器实例
val downloadManager = DownloadManager.getInstance()

// 设置最大并发下载数
downloadManager.setMaxConcurrentDownloads(3)

// 设置下载速度限制(500KB/s)
downloadManager.setSpeedLimit(500)

// 添加下载任务
val task = downloadManager.addTask(
url = "https://example.com/large-file.zip",
savePath = "/downloads",
fileName = "large-file.zip",
threadCount = 5 // 使用5个线程下载
)

// 添加下载监听器
downloadManager.addListener(task.id, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
println("下载进度: $progress%, ${downloadedBytes/1024}KB/${totalBytes/1024}KB")
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
println("状态变更: $status")
}

override fun onSuccess(task: DownloadTask, file: File) {
println("下载成功: ${file.absolutePath}")
}

override fun onFailed(task: DownloadTask, e: Exception) {
println("下载失败: ${e.message}")
}
})

// 开始下载
downloadManager.startDownload(task.id)

// 模拟5秒后暂停
Thread.sleep(5000)
println("暂停下载")
downloadManager.pauseDownload(task.id)

// 模拟2秒后恢复
Thread.sleep(2000)
println("恢复下载")
downloadManager.resumeDownload(task.id)

// 等待下载完成
readLine()

// 清理资源
downloadManager.release()
}

框架功能说明

  1. 多线程下载

    • 通过将文件分割成多个部分,同时使用多个线程下载
    • 每个部分使用HTTP Range请求独立下载
    • 下载完成后自动合并为完整文件
  2. 多任务下载

    • DownloadManager管理多个下载任务
    • 支持设置最大并发下载数
    • 使用队列管理等待下载的任务
  3. 下载限速

    • RateLimiter类控制下载速度
    • 可动态调整限速大小
    • 基于时间窗口算法实现平滑限速
  4. 任务暂停和恢复

    • 支持随时暂停下载任务
    • 通过保存已下载部分信息实现断点续传
    • 恢复时从上次下载位置继续
  5. 下载进度

    • 实时计算并通知下载进度
    • 提供详细的状态回调
    • 支持获取下载速度、已下载大小等信息

这个框架设计合理、功能完整,可以满足各种下载需求。您可以根据实际需要进一步扩展或优化,比如添加网络状态监听、下载优先级设置等功能。

OkHttp 请求构建和响应处理的完整流程

OkHttp 请求构建和响应处理的完整流程

一、请求构建流程代码示例

1. GET 请求构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建OkHttpClient实例
val client = OkHttpClient()

// 构建GET请求
val request = Request.Builder()
.url("https://api.example.com/users")
.header("Authorization", "Bearer token123")
.addHeader("Accept", "application/json")
.get() // GET请求,可省略,因为默认就是GET
.tag(String::class.java, "用户列表请求")
.build()

// 执行请求
val response = client.newCall(request).execute()

2. POST 请求构建 (JSON 数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建JSON请求体
val json = """
{
"username": "test_user",
"password": "test_password"
}
""".trimIndent()

val mediaType = "application/json; charset=utf-8".toMediaType()
val requestBody = json.toRequestBody(mediaType)

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/login")
.header("Content-Type", "application/json")
.post(requestBody) // 设置POST方法和请求体
.build()

// 执行请求
val response = client.newCall(request).execute()

3. POST 表单提交

1
2
3
4
5
6
7
8
9
10
11
// 创建表单请求体
val formBody = FormBody.Builder()
.add("username", "test_user")
.add("password", "test_password")
.build()

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/login")
.post(formBody) // 设置POST方法和表单请求体
.build()

4. 文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建MultipartBody
val file = File("/path/to/file.jpg")
val mediaType = "image/jpeg".toMediaType()
val requestFile = file.asRequestBody(mediaType)

val multipartBody = MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("title", "Profile Picture")
.addFormDataPart("image", "file.jpg", requestFile)
.build()

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/upload")
.post(multipartBody)
.build()

二、响应处理流程代码示例

1. 基本响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 执行请求获取响应
val response = client.newCall(request).execute()

try {
// 检查响应是否成功
if (response.isSuccessful) {
// 状态码在200-299之间
val statusCode = response.code
val headers = response.headers

// 获取响应体并转换为字符串
val responseBody = response.body
val responseString = responseBody?.string()

println("响应成功: $statusCode")
println("响应内容: $responseString")
} else {
// 处理错误响应
println("请求失败: ${response.code}")
println("错误信息: ${response.message}")
}
} finally {
// 关闭响应,释放资源
response.close()
}

2. JSON 响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 假设使用Gson进行JSON解析
val gson = Gson()

try {
if (response.isSuccessful) {
val responseBody = response.body

// 将响应体转换为字符串
val jsonString = responseBody?.string()

// 解析JSON到数据类
data class User(val id: Int, val name: String, val email: String)
val user = gson.fromJson(jsonString, User::class.java)

println("用户ID: ${user.id}")
println("用户名: ${user.name}")
println("邮箱: ${user.email}")
}
} finally {
response.close()
}

3. 流式处理大文件下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 下载文件示例
val request = Request.Builder()
.url("https://example.com/large-file.zip")
.build()

val response = client.newCall(request).execute()

try {
if (!response.isSuccessful) {
throw IOException("下载失败: ${response.code}")
}

// 获取响应体
val responseBody = response.body ?: throw IOException("响应体为空")

// 创建输出文件
val downloadFile = File("/path/to/save/file.zip")
val outputStream = FileOutputStream(downloadFile)

// 使用BufferedSink进行高效写入
val sink = outputStream.sink().buffer()

// 从响应体获取源
val source = responseBody.source()

// 读取数据并写入文件
val bufferSize = 8 * 1024 // 8KB缓冲区
val buffer = Buffer()
var bytesRead: Long

// 显示下载进度
val contentLength = responseBody.contentLength()
var totalBytesRead = 0L

while (source.read(buffer, bufferSize.toLong()).also { bytesRead = it } != -1L) {
sink.write(buffer, bytesRead)
totalBytesRead += bytesRead

// 计算下载进度
if (contentLength > 0) {
val progress = (totalBytesRead * 100 / contentLength).toInt()
println("下载进度: $progress%")
}
}

// 确保所有数据都写入
sink.flush()
sink.close()

println("文件下载完成: ${downloadFile.absolutePath}")
} finally {
response.close()
}

4. 异步请求与响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 异步执行请求
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
// 请求失败处理
println("请求失败: ${e.message}")
}

override fun onResponse(call: Call, response: Response) {
// 使用try-finally确保响应关闭
try {
if (response.isSuccessful) {
val responseData = response.body?.string()
println("异步请求成功: $responseData")

// 注意:这里是在OkHttp的工作线程中
// 如果需要更新UI,需要切换到主线程
} else {
println("请求返回错误: ${response.code}")
}
} finally {
response.close()
}
}
})

三、请求和响应的完整生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 1. 创建OkHttpClient
val client = OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build()

// 2. 构建请求
val request = Request.Builder()
.url("https://api.example.com/data")
.header("User-Agent", "OkHttp Example")
.build()

// 3. 创建Call对象
val call = client.newCall(request)

// 4. 执行请求
val response = call.execute()

// 5. 处理响应
try {
// 5.1 检查响应状态
val statusCode = response.code

// 5.2 获取响应头
val contentType = response.header("Content-Type")

// 5.3 处理响应体
if (response.isSuccessful) {
val responseBody = response.body

// 根据Content-Type处理不同类型的响应
when {
contentType?.contains("application/json") == true -> {
// 处理JSON响应
val jsonString = responseBody?.string()
println("JSON响应: $jsonString")
}
contentType?.contains("text/") == true -> {
// 处理文本响应
val textResponse = responseBody?.string()
println("文本响应: $textResponse")
}
else -> {
// 处理二进制响应
val bytes = responseBody?.bytes()
println("二进制响应长度: ${bytes?.size} 字节")
}
}
} else {
// 处理错误响应
println("错误: ${response.code} ${response.message}")
}
} catch (e: Exception) {
println("处理响应时发生异常: ${e.message}")
} finally {
// 6. 关闭响应
response.close()
}

以上代码示例展示了 OkHttp 请求构建和响应处理的完整流程,包括:

  1. 不同类型的请求构建(GET、POST、文件上传等)
  2. 各种响应处理方式(字符串、JSON、文件下载等)
  3. 同步和异步请求的处理差异
  4. 完整的请求-响应生命周期管理

这些示例可以作为实际开发中的参考模板。

OkHttp 请求构建和响应处理流程源码分析

OkHttp 请求构建和响应处理流程源码分析

我需要分析 OkHttp 中请求构建和响应处理的源码流程。让我整理一下需要涵盖的内容:

一、请求构建流程

Request 构建代码

1
2
3
val request = Request.Builder()
.url("https://api.example.com")
.build()
  1. 请求构建流程

    • Request.Builder 的使用
    • 各种组件的构建(URL、Headers、RequestBody 等)
    • 请求构建的设计模式分析
  2. Request 构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 典型的构建器模式实现
class Request private constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
val tags: Map<Class<*>, Any>
) {
// 构建器类
class Builder {
private var url: HttpUrl? = null
private var method = "GET"
private var headers = Headers.Builder()
private var body: RequestBody? = null
private var tags = mutableMapOf<Class<*>, Any>()

// 设置URL (支持多种形式)
fun url(url: String): Builder = url(url.toHttpUrl())
fun url(url: HttpUrl): Builder {
this.url = url
return this
}

// 设置请求方法
fun method(method: String, body: RequestBody?): Builder {
// 检查方法名和body的合法性
if (method.isEmpty()) throw IllegalArgumentException("method.isEmpty()")
if (method !in METHODS_ALLOWING_BODY && body != null) {
throw IllegalArgumentException("method $method must not have a request body")
}
if (method in METHODS_REQUIRING_BODY && body == null) {
throw IllegalArgumentException("method $method must have a request body")
}

this.method = method
this.body = body
return this
}

// 便捷方法: GET请求
fun get(): Builder = method("GET", null)

// 便捷方法: POST请求
fun post(body: RequestBody): Builder = method("POST", body)

// 添加Header
fun addHeader(name: String, value: String): Builder {
headers.add(name, value)
return this
}

// 设置完整Headers
fun headers(headers: Headers): Builder {
this.headers = headers.newBuilder()
return this
}

// 构建最终Request对象
fun build(): Request {
// 检查URL是否已设置
val url = checkNotNull(this.url) { "url == null" }
return Request(
url = url,
method = method,
headers = headers.build(),
body = body,
tags = tags.toMap()
)
}
}
}
  1. RequestBody 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 抽象类定义
abstract class RequestBody {
// 返回内容类型
abstract fun contentType(): MediaType?

// 返回内容长度
open fun contentLength(): Long = -1L

// 写入内容到缓冲区
abstract fun writeTo(sink: BufferedSink)

// 创建请求体的工厂方法
companion object {
// 从字符串创建
@JvmStatic
fun create(
contentType: MediaType?,
content: String
): RequestBody = create(contentType, content.toByteArray())

// 从字节数组创建
@JvmStatic
fun create(
contentType: MediaType?,
content: ByteArray
): RequestBody = create(contentType, content, 0, content.size)

// 从文件创建
@JvmStatic
fun create(file: File, contentType: MediaType?): RequestBody {
return object : RequestBody() {
override fun contentType() = contentType

override fun contentLength(): Long = file.length()

override fun writeTo(sink: BufferedSink) {
file.source().use { source ->
sink.writeAll(source)
}
}
}
}
}
}

二、响应处理流程

  1. 响应处理流程

    • Response 的结构
    • ResponseBody 的处理
    • 响应解析和转换机制
  2. Response 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Response private constructor(
@get:JvmName("request") val request: Request,
@get:JvmName("protocol") val protocol: Protocol,
@get:JvmName("message") val message: String,
@get:JvmName("code") val code: Int,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: ResponseBody?,
@get:JvmName("networkResponse") val networkResponse: Response?,
@get:JvmName("cacheResponse") val cacheResponse: Response?,
@get:JvmName("priorResponse") val priorResponse: Response?,
internal val sentRequestAtMillis: Long,
internal val receivedResponseAtMillis: Long,
@get:JvmName("exchange") internal val exchange: Exchange?,
) {
// 检查响应是否成功
val isSuccessful: Boolean
get() = code in 200..299

// 构建器类
class Builder {
// 省略实现细节...
}

// 关闭响应体
fun close() {
body?.close()
}
}
  1. ResponseBody 处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
abstract class ResponseBody : Closeable {
// 返回内容类型
abstract fun contentType(): MediaType?

// 返回内容长度
abstract fun contentLength(): Long

// 获取字节流
abstract fun source(): BufferedSource

// 获取字节数组
fun bytes(): ByteArray {
val contentLength = contentLength()
if (contentLength > Int.MAX_VALUE) {
throw IOException("Cannot buffer entire body for content length: $contentLength")
}

val bytes = source().use { it.readByteArray() }
if (contentLength != -1L && contentLength != bytes.size.toLong()) {
throw IOException("Content-Length and stream length disagree")
}
return bytes
}

// 获取字符串
fun string(): String {
val source = source()
try {
return source.readString(source.readByteArray().size.toLong(), Charsets.UTF_8)
} finally {
source.close()
}
}
}
  1. 响应处理流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 使用Response的典型流程
val response = client.newCall(request).execute()

try {
// 1. 检查响应状态
if (response.isSuccessful) {
// 2. 获取响应体
val responseBody = response.body

// 3. 根据Content-Type处理数据
val contentType = responseBody?.contentType()

// 4. 读取响应数据
val responseString = responseBody?.string()
// 或者
val responseBytes = responseBody?.bytes()
// 或者
val responseStream = responseBody?.source()
} else {
// 处理错误响应
println("请求失败: ${response.code}")
}
} finally {
// 5. 关闭响应
response.close()
}

响应流程详解

  1. 响应创建

    • 通过拦截器链处理网络请求
    • CallServerInterceptor生成原始 Response
    • 各拦截器可能修改 Response(如缓存拦截器)
  2. 响应体流式处理

    • ResponseBody 采用流式 API
    • 数据按需加载,避免一次性加载大文件
    • 支持多种数据转换形式(string(), bytes(), source())
  3. 响应关闭机制

    • 必须手动关闭响应以释放连接
    • 适用 try-finally 模式确保资源释放
    • 未关闭的响应可能导致连接池耗尽

设计亮点分析

  1. 构建器模式的精巧应用

    • Request.Builder 和 Response.Builder 使构建过程简洁直观
    • 链式调用提高代码可读性
    • 参数验证在构建时进行,避免运行时错误
  2. 请求体的抽象设计

    • RequestBody 抽象类允许多种请求数据形式
    • 工厂方法简化常见请求体创建
    • 流式API高效处理大文件上传
  3. 响应处理的流式设计

    • 延迟加载机制减少内存占用
    • BufferedSource 接口提供灵活的数据读取方式
    • 显式关闭机制确保资源正确释放
  4. 类型安全与灵活性平衡

    • MediaType 类型处理确保内容类型正确
    • Headers 类封装HTTP头部复杂处理逻辑
    • 支持同步/异步两种处理模式

核心流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
请求构建流程:
Request.Builder
→ 设置URL(url())
→ 设置方法(get/post/method())
→ 设置头部(headers/addHeader())
→ 设置请求体(body)
→ build() → Request对象

响应处理流程:
Response对象
→ 检查状态码(isSuccessful)
→ 获取ResponseBody
→ 读取数据(string/bytes/source)
→ 处理业务逻辑
→ 关闭响应(close)

OkHttp 源码分析路线图

OkHttp 源码分析路线图

1. 基础组件模块

1
2
3
4
OkHttpClient(核心配置类)
├── 连接池管理
├── 线程池管理
└── 各种配置参数

2. 请求响应模块

1
2
3
4
5
Request/Response(请求响应模型)
├── RequestBody
├── ResponseBody
├── Headers
└── HttpUrl

3. 核心流程模块

1
2
3
Call(请求执行流程)
├── RealCall(同步请求)
└── AsyncCall(异步请求)

让我们按照以下顺序深入分析:

一、基础组件层

  1. OkHttpClient 分析
  • 构建器模式的使用
  • 默认配置项的设定
  • 连接池的管理机制
  1. 连接池管理
1
2
3
4
5
6
7
8
class ConnectionPool(
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES
) {
// 连接复用机制
// 空闲连接清理
}

二、请求响应层

  1. 请求构建流程
1
2
3
val request = Request.Builder()
.url("https://api.example.com")
.build()
  1. 响应处理机制
  • 响应体的流式处理
  • 数据转换机制

三、核心执行层

  1. 同步请求流程
1
2
3
4
5
6
// RealCall 执行流程
class RealCall implements Call {
override fun execute(): Response {
// 同步请求实现
}
}
  1. 异步请求流程
1
2
3
4
// AsyncCall 实现原理
class AsyncCall extends NamedRunnable {
// 异步请求实现
}

四、重点关注的核心机制

  1. 连接复用机制
  • 连接池的实现
  • 连接的生命周期管理
  • Keep-Alive 策略
  1. 缓存机制
  • 缓存策略的实现
  • 缓存验证机制
  • 缓存替换策略
  1. DNS 解析
  • DNS 缓存机制
  • 自定义 DNS 实现
  1. 失败重试机制
  • 重试策略
  • 重定向处理
  1. HTTP/2 支持
  • 多路复用实现
  • 流量控制

分析建议

  1. 循序渐进
  • 先了解整体架构
  • 再深入具体模块
  • 最后研究细节实现
  1. 重点关注
  • 类的职责划分
  • 接口设计原则
  • 性能优化手段
  1. 实践验证
  • 编写测试用例
  • 修改源码观察效果
  • 对比不同版本变化
  1. 源码阅读技巧
  • 从使用示例开始
  • 跟踪调用链路
  • 关注设计模式应用

建议的学习顺序

  1. 第一阶段:基础认知
  • OkHttpClient 的配置项
  • Request/Response 的基本结构
  • 同步/异步调用方式
  1. 第二阶段:核心流程
  • 拦截器链的完整执行过程
  • 网络请求的处理流程
  • 连接池的管理机制
  1. 第三阶段:深入细节
  • HTTP/2 的实现原理
  • 缓存机制的具体实现
  • 安全机制(证书、HTTPS 等)
  1. 第四阶段:性能优化
  • 连接复用策略
  • 内存管理优化
  • 并发控制机制

通过这样系统的分析,您可以:

  • 深入理解 OkHttp 的设计思想
  • 掌握网络库的核心实现
  • 学习优秀的代码设计方法

需要深入了解哪个具体模块,我可以为您详细展开分析。

OkHttp 拦截器的核心源码分析

OkHttp 拦截器的核心源码分析

1. 拦截器接口定义

1
2
3
4
5
6
7
8
9
10
interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response

interface Chain {
fun request(): Request
fun proceed(request: Request): Response
// ... 其他方法
}
}

2. RealInterceptorChain 的核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class RealInterceptorChain(
private val interceptors: List<Interceptor>,
private val index: Int,
private val request: Request,
// ... 其他参数
) : Interceptor.Chain {

override fun proceed(request: Request): Response {
// 检查是否到达链条末端
if (index >= interceptors.size) throw AssertionError()

// 创建下一个拦截器链
val next = RealInterceptorChain(
interceptors = interceptors,
index = index + 1,
request = request,
// ... 其他参数
)

// 获取当前拦截器并执行
val interceptor = interceptors[index]
val response = interceptor.intercept(next)

return response
}
}

3. OkHttpClient 中的拦截器管理

1
2
3
4
5
6
7
8
9
10
11
12
class OkHttpClient internal constructor(builder: Builder) {
// 应用拦截器列表
internal val interceptors: List<Interceptor> = builder.interceptors

// 网络拦截器列表
internal val networkInterceptors: List<Interceptor> = builder.networkInterceptors

class Builder {
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
}
}

4. 核心执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RealCall(
private val client: OkHttpClient,
// ... 其他参数
) {
override fun execute(): Response {
// 构建完整的拦截器链
val interceptors = mutableListOf<Interceptor>()

// 1. 添加用户自定义的应用拦截器
interceptors.addAll(client.interceptors)

// 2. 添加OkHttp内置的核心拦截器
interceptors.add(RetryAndFollowUpInterceptor(client))
interceptors.add(BridgeInterceptor(client.cookieJar))
interceptors.add(CacheInterceptor(client.cache))
interceptors.add(ConnectInterceptor(client))

// 3. 添加用户自定义的网络拦截器
interceptors.addAll(client.networkInterceptors)

// 4. 添加服务器通信拦截器
interceptors.add(CallServerInterceptor())

// 创建拦截器链并执行
val chain = RealInterceptorChain(
interceptors = interceptors,
index = 0,
request = originalRequest,
// ... 其他参数
)

return chain.proceed(originalRequest)
}
}

5. 内置核心拦截器的职责

  • RetryAndFollowUpInterceptor: 处理重试和重定向
  • BridgeInterceptor: 处理请求头和响应头的转换
  • CacheInterceptor: 处理请求缓存
  • ConnectInterceptor: 建立连接
  • CallServerInterceptor: 与服务器进行实际通信

6. 责任链模式的关键点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class CustomInterceptor : Interceptor {
override fun intercept(chain: Chain): Response {
// 1. 获取原始请求
val originalRequest = chain.request()

// 2. 处理请求(可选)
val modifiedRequest = originalRequest.newBuilder()
.addHeader("Custom-Header", "Value")
.build()

// 3. 转发给下一个拦截器
val response = chain.proceed(modifiedRequest)

// 4. 处理响应(可选)
return response.newBuilder()
.addHeader("Response-Header", "Value")
.build()
}
}

这种设计的优点:

  1. 高度解耦,每个拦截器只负责自己的任务
  2. 灵活可扩展,可以随时添加或移除拦截器
  3. 双向处理能力,可以同时处理请求和响应
  4. 链式调用清晰,调试方便

注意事项:

  1. 拦截器的添加顺序会影响最终的执行顺序
  2. 必须调用 chain.proceed() 来继续链条,除非你想中断请求
  3. 注意处理异常,确保资源正确释放
  4. 避免在拦截器中执行耗时操作,可能会影响整体性能
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×