主要需要考虑的挑战:
- Android系统的电池优化和后台任务限制
- 在应用进入后台后,下载可能被系统暂停
- 在应用返回前台时需要恢复下载
- 需要正确处理应用被终止的情况
- 保持下载状态的一致性
解决方案可以包含以下组件:
- WorkManager:用于管理后台任务,即使应用被终止也能恢复任务
- ForegroundService:在下载过程中显示通知,降低系统杀死服务的概率
- Room数据库:存储下载任务状态,便于恢复
- 生命周期感知:监测应用前台/后台状态转换
- 下载队列管理:正确管理任务队列和状态转换
下面我将设计一个完整的解决方案,包括:
- 下载管理器架构
- 持久化存储
- 应用生命周期处理
- 后台服务
- 任务队列管理
- 暂停/恢复机制
Android长任务队列后台下载管理器设计
下面是一个基于WorkManager、ForegroundService和Room数据库的下载管理器实现,专注于解决Android后台下载限制问题:
// BackgroundDownloadManager.kt
import android.app.*
import android.content.Context
import android.content.Intent
import android.os.*
import androidx.annotation.RequiresApi
import androidx.core.app.NotificationCompat
import androidx.lifecycle.*
import androidx.room.*
import androidx.work.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import okhttp3.*
import java.io.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
/**
* 下载任务数据类 - Room数据库实体
*/
data class DownloadTask(
val id: String,
val url: String,
val fileName: String,
val savePath: String,
val totalBytes: Long = 0,
val downloadedBytes: Long = 0,
val status: DownloadStatus = DownloadStatus.PENDING,
val priority: Int = 0,
val createTime: Long = System.currentTimeMillis(),
val lastUpdated: Long = System.currentTimeMillis(),
val attemptCount: Int = 0,
val retryDelayMillis: Long = 0,
val chunkStart: Long = 0,
val chunkEnd: Long = 0
)
/**
* 下载状态枚举
*/
enum class DownloadStatus {
PENDING, // 等待下载
CONNECTING, // 连接中
DOWNLOADING, // 下载中
PAUSED, // 暂停状态
COMPLETED, // 下载完成
FAILED, // 下载失败
CANCELED // 下载取消
}
/**
* Room DAO接口
*/
interface DownloadTaskDao {
fun getAllTasks(): Flow<List<DownloadTask>>
fun getTasksByStatus(statuses: List<DownloadStatus>): Flow<List<DownloadTask>>
suspend fun getTaskById(taskId: String): DownloadTask?
suspend fun insertTask(task: DownloadTask)
suspend fun updateTask(task: DownloadTask)
suspend fun deleteTask(task: DownloadTask)
suspend fun deleteTasksByStatus(status: DownloadStatus)
suspend fun updateTasksStatus(currentStatus: DownloadStatus, newStatus: DownloadStatus)
}
/**
* Room数据库
*/
abstract class DownloadDatabase : RoomDatabase() {
abstract fun downloadTaskDao(): DownloadTaskDao
companion object {
private var INSTANCE: DownloadDatabase? = null
fun getDatabase(context: Context): DownloadDatabase {
return INSTANCE ?: synchronized(this) {
val instance = Room.databaseBuilder(
context.applicationContext,
DownloadDatabase::class.java,
"download_database"
).build()
INSTANCE = instance
instance
}
}
}
}
/**
* 下载事件 - 用于通知UI层
*/
sealed class DownloadEvent {
data class Progress(val taskId: String, val progress: Int, val downloadedBytes: Long, val totalBytes: Long) : DownloadEvent()
data class StatusChanged(val taskId: String, val status: DownloadStatus) : DownloadEvent()
data class Error(val taskId: String, val error: Throwable) : DownloadEvent()
data class Complete(val taskId: String, val filePath: String) : DownloadEvent()
}
/**
* 下载服务 - 使用前台服务确保在后台也能下载
*/
class DownloadService : Service() {
companion object {
private const val NOTIFICATION_ID = 1001
private const val CHANNEL_ID = "download_channel"
private const val CHANNEL_NAME = "Downloads"
fun startService(context: Context) {
val intent = Intent(context, DownloadService::class.java)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
context.startForegroundService(intent)
} else {
context.startService(intent)
}
}
fun stopService(context: Context) {
val intent = Intent(context, DownloadService::class.java)
context.stopService(intent)
}
}
private lateinit var notificationManager: NotificationManager
private var isServiceRunning = false
override fun onCreate() {
super.onCreate()
notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
createNotificationChannel()
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
if (!isServiceRunning) {
isServiceRunning = true
startForeground(NOTIFICATION_ID, createNotification("下载管理器正在运行"))
// 通知下载管理器服务已启动
BackgroundDownloadManager.getInstance(applicationContext).onServiceStarted()
}
return START_STICKY
}
override fun onBind(intent: Intent?): IBinder? = null
override fun onDestroy() {
isServiceRunning = false
// 通知下载管理器服务已停止
BackgroundDownloadManager.getInstance(applicationContext).onServiceStopped()
super.onDestroy()
}
fun updateNotification(title: String, message: String, progress: Int) {
val notification = createNotification(title, message, progress)
notificationManager.notify(NOTIFICATION_ID, notification)
}
private fun createNotificationChannel() {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val channel = NotificationChannel(
CHANNEL_ID,
CHANNEL_NAME,
NotificationManager.IMPORTANCE_LOW
).apply {
setShowBadge(false)
lockscreenVisibility = Notification.VISIBILITY_PUBLIC
}
notificationManager.createNotificationChannel(channel)
}
}
private fun createNotification(
title: String,
message: String = "正在处理下载任务",
progress: Int = 0
): Notification {
val pendingIntent = PendingIntent.getActivity(
this,
0,
packageManager.getLaunchIntentForPackage(packageName),
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE
} else {
PendingIntent.FLAG_UPDATE_CURRENT
}
)
val builder = NotificationCompat.Builder(this, CHANNEL_ID)
.setSmallIcon(android.R.drawable.stat_sys_download)
.setContentTitle(title)
.setContentText(message)
.setContentIntent(pendingIntent)
.setPriority(NotificationCompat.PRIORITY_LOW)
.setOnlyAlertOnce(true)
.setOngoing(true)
if (progress > 0) {
builder.setProgress(100, progress, false)
}
return builder.build()
}
}
/**
* 下载工作器 - 基于WorkManager
*/
class DownloadWorker(
context: Context,
params: WorkerParameters
) : CoroutineWorker(context, params) {
companion object {
const val KEY_TASK_ID = "task_id"
private const val TAG_DOWNLOAD = "download_worker"
fun enqueueWork(context: Context, taskId: String): Operation {
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
val inputData = workDataOf(KEY_TASK_ID to taskId)
val request = OneTimeWorkRequestBuilder<DownloadWorker>()
.setConstraints(constraints)
.setInputData(inputData)
.addTag(TAG_DOWNLOAD)
.addTag(taskId)
.setBackoffCriteria(
BackoffPolicy.LINEAR,
WorkRequest.MIN_BACKOFF_MILLIS,
TimeUnit.MILLISECONDS
)
.build()
return WorkManager.getInstance(context)
.enqueueUniqueWork(
taskId,
ExistingWorkPolicy.REPLACE,
request
)
}
fun cancelWork(context: Context, taskId: String) {
WorkManager.getInstance(context).cancelUniqueWork(taskId)
}
}
override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
val taskId = inputData.getString(KEY_TASK_ID) ?: return@withContext Result.failure()
val downloadManager = BackgroundDownloadManager.getInstance(applicationContext)
return@withContext try {
val task = downloadManager.getTaskById(taskId) ?: return@withContext Result.failure()
// 更新任务状态为正在下载
downloadManager.updateTaskStatus(task.id, DownloadStatus.DOWNLOADING)
// 执行下载
downloadManager.performDownload(task)
Result.success()
} catch (e: Exception) {
// 如果是因为应用进入后台被系统限制,将任务状态设为PAUSED
if (e is InterruptedException || e.cause is InterruptedException) {
downloadManager.updateTaskStatus(taskId, DownloadStatus.PAUSED)
Result.retry()
} else {
downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
Result.failure()
}
}
}
}
/**
* 应用生命周期监听
*/
class AppLifecycleObserver(private val context: Context) : DefaultLifecycleObserver {
private val downloadManager = BackgroundDownloadManager.getInstance(context)
override fun onStart(owner: LifecycleOwner) {
// 应用进入前台
downloadManager.onAppForeground()
}
override fun onStop(owner: LifecycleOwner) {
// 应用进入后台
downloadManager.onAppBackground()
}
}
/**
* 后台下载管理器
*/
class BackgroundDownloadManager private constructor(private val context: Context) {
companion object {
private var INSTANCE: BackgroundDownloadManager? = null
fun getInstance(context: Context): BackgroundDownloadManager {
return INSTANCE ?: synchronized(this) {
INSTANCE ?: BackgroundDownloadManager(context.applicationContext).also {
INSTANCE = it
}
}
}
}
private val database = DownloadDatabase.getDatabase(context)
private val downloadTaskDao = database.downloadTaskDao()
private val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()
private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
// 事件流
private val _downloadEvents = MutableSharedFlow<DownloadEvent>(extraBufferCapacity = 100)
val downloadEvents: SharedFlow<DownloadEvent> = _downloadEvents.asSharedFlow()
// 任务列表流
val allTasks = downloadTaskDao.getAllTasks()
val activeTasks = downloadTaskDao.getTasksByStatus(
listOf(DownloadStatus.PENDING, DownloadStatus.CONNECTING, DownloadStatus.DOWNLOADING)
)
// 应用前台状态追踪
private val isAppInForeground = AtomicBoolean(true)
private val isServiceRunning = AtomicBoolean(false)
private val activeDownloads = AtomicInteger(0)
// 初始化
init {
// 恢复被暂停的任务
downloadScope.launch {
downloadTaskDao.updateTasksStatus(DownloadStatus.DOWNLOADING, DownloadStatus.PENDING)
downloadTaskDao.updateTasksStatus(DownloadStatus.CONNECTING, DownloadStatus.PENDING)
// 获取等待中的任务并启动
downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PENDING)).first().forEach { task ->
enqueueDownload(task.id)
}
}
}
// 添加下载任务
suspend fun addDownloadTask(
url: String,
fileName: String,
savePath: String,
priority: Int = 0
): String {
val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"
val downloadTask = DownloadTask(
id = taskId,
url = url,
fileName = fileName,
savePath = savePath,
priority = priority,
status = DownloadStatus.PENDING
)
downloadTaskDao.insertTask(downloadTask)
enqueueDownload(taskId)
return taskId
}
// 获取任务信息
suspend fun getTaskById(taskId: String): DownloadTask? {
return downloadTaskDao.getTaskById(taskId)
}
// 更新任务状态
suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
val updatedTask = task.copy(
status = status,
lastUpdated = System.currentTimeMillis()
)
downloadTaskDao.updateTask(updatedTask)
_downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
}
// 暂停下载
suspend fun pauseDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
DownloadWorker.cancelWork(context, taskId)
updateTaskStatus(taskId, DownloadStatus.PAUSED)
}
}
// 恢复下载
suspend fun resumeDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
updateTaskStatus(taskId, DownloadStatus.PENDING)
enqueueDownload(taskId)
}
}
// 取消下载
suspend fun cancelDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
DownloadWorker.cancelWork(context, taskId)
updateTaskStatus(taskId, DownloadStatus.CANCELED)
// 删除部分下载的文件
val file = File(task.savePath, task.fileName)
if (file.exists()) {
file.delete()
}
}
// 删除任务
suspend fun deleteTask(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
DownloadWorker.cancelWork(context, taskId)
}
downloadTaskDao.deleteTask(task)
}
// 清理已完成任务
suspend fun clearCompletedTasks() {
downloadTaskDao.deleteTasksByStatus(DownloadStatus.COMPLETED)
}
// 应用进入前台
fun onAppForeground() {
isAppInForeground.set(true)
// 恢复被暂停的下载
downloadScope.launch {
downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PAUSED)).first().forEach { task ->
// 只恢复因进入后台而被暂停的任务
if (task.attemptCount > 0) {
resumeDownload(task.id)
}
}
}
}
// 应用进入后台
fun onAppBackground() {
isAppInForeground.set(false)
// 根据设置决定是否在后台继续下载
val continueInBackground = true // 这里可以从SharedPreferences读取配置
if (!continueInBackground) {
downloadScope.launch {
// 暂停所有正在下载的任务
downloadTaskDao.getTasksByStatus(
listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING)
).first().forEach { task ->
pauseDownload(task.id)
}
}
} else {
// 确保前台服务正在运行
if (activeDownloads.get() > 0 && !isServiceRunning.get()) {
DownloadService.startService(context)
}
}
}
// 服务启动回调
fun onServiceStarted() {
isServiceRunning.set(true)
}
// 服务停止回调
fun onServiceStopped() {
isServiceRunning.set(false)
}
// 入队下载任务
private fun enqueueDownload(taskId: String) {
if (!isAppInForeground.get()) {
// 如果应用在后台,确保服务正在运行
DownloadService.startService(context)
}
// 使用WorkManager调度下载任务
DownloadWorker.enqueueWork(context, taskId)
// 跟踪活跃下载数量
activeDownloads.incrementAndGet()
}
// 执行下载
suspend fun performDownload(task: DownloadTask) = withContext(Dispatchers.IO) {
try {
// 确保目录存在
val saveDir = File(task.savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}
val file = File(saveDir, task.fileName)
// 获取文件信息
var totalBytes = task.totalBytes
var downloadedBytes = task.downloadedBytes
// 如果文件已存在且下载过,获取已下载的字节数
if (file.exists() && downloadedBytes > 0) {
if (file.length() < downloadedBytes) {
// 文件大小异常,从头开始下载
file.delete()
downloadedBytes = 0
}
} else if (downloadedBytes > 0) {
// 数据不一致,从头开始下载
downloadedBytes = 0
}
// 构建请求
val request = Request.Builder()
.url(task.url)
.apply {
if (downloadedBytes > 0) {
// 断点续传
header("Range", "bytes=$downloadedBytes-")
}
}
.build()
// 执行请求
okHttpClient.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("HTTP error: ${response.code}")
}
// 获取总大小
val body = response.body ?: throw IOException("Empty response body")
val contentLength = body.contentLength()
if (totalBytes <= 0 && contentLength > 0) {
totalBytes = contentLength + downloadedBytes
// 更新任务信息
val updatedTask = task.copy(totalBytes = totalBytes)
downloadTaskDao.updateTask(updatedTask)
}
// 打开输出流
val outputStream = if (downloadedBytes > 0) {
FileOutputStream(file, true) // 续传模式
} else {
FileOutputStream(file) // 从头开始
}
// 下载文件
outputStream.use { output ->
val buffer = ByteArray(8192)
val input = body.byteStream()
var bytesRead: Int
var lastProgressUpdate = 0L
while (input.read(buffer).also { bytesRead = it } != -1) {
// 检查是否应该暂停
if (!isAppInForeground.get() && !isServiceRunning.get()) {
throw InterruptedException("Download paused due to app background")
}
output.write(buffer, 0, bytesRead)
downloadedBytes += bytesRead
// 更新进度,但控制更新频率
val now = System.currentTimeMillis()
if (now - lastProgressUpdate > 500) { // 每500ms更新一次
lastProgressUpdate = now
val progress = if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else {
0
}
// 更新任务信息
val updatedTask = task.copy(
downloadedBytes = downloadedBytes,
lastUpdated = now
)
downloadTaskDao.updateTask(updatedTask)
// 发送进度事件
_downloadEvents.emit(
DownloadEvent.Progress(
task.id,
progress,
downloadedBytes,
totalBytes
)
)
// 更新通知
if (isServiceRunning.get()) {
(context.getSystemService(Context.NOTIFICATION_SERVICE) as? NotificationManager)?.let { notificationManager ->
val notification = NotificationCompat.Builder(context, DownloadService.CHANNEL_ID)
.setSmallIcon(android.R.drawable.stat_sys_download)
.setContentTitle("正在下载 ${task.fileName}")
.setContentText("$progress% - ${formatBytes(downloadedBytes)}/${formatBytes(totalBytes)}")
.setProgress(100, progress, false)
.build()
notificationManager.notify(DownloadService.NOTIFICATION_ID, notification)
}
}
}
}
}
// 下载完成
val finalTask = task.copy(
status = DownloadStatus.COMPLETED,
downloadedBytes = totalBytes,
lastUpdated = System.currentTimeMillis()
)
downloadTaskDao.updateTask(finalTask)
// 发送完成事件
_downloadEvents.emit(DownloadEvent.Complete(task.id, file.absolutePath))
// 更新活跃下载数量
val remaining = activeDownloads.decrementAndGet()
// 如果没有活跃下载且服务正在运行,停止服务
if (remaining <= 0 && isServiceRunning.get()) {
DownloadService.stopService(context)
}
}
} catch (e: Exception) {
// 更新活跃下载数量
activeDownloads.decrementAndGet()
// 重新抛出异常
throw e
}
}
// 格式化字节大小
private fun formatBytes(bytes: Long): String {
if (bytes <= 0) return "0 B"
val units = arrayOf("B", "KB", "MB", "GB", "TB")
val digitGroups = (Math.log10(bytes.toDouble()) / Math.log10(1024.0)).toInt()
return String.format("%.1f %s", bytes / Math.pow(1024.0, digitGroups.toDouble()), units[digitGroups])
}
}
// 增强的网络感知下载解决方案
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.net.ConnectivityManager
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import android.os.Build
import androidx.work.*
import java.io.IOException
import java.util.concurrent.TimeUnit
/**
* 网络状态监听器
*/
class NetworkMonitor(private val context: Context) {
interface NetworkListener {
fun onNetworkAvailable()
fun onNetworkUnavailable()
}
private val listeners = mutableListOf<NetworkListener>()
private var isNetworkAvailable = false
private var isMonitoring = false
private val connectivityManager by lazy {
context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
}
// 网络回调 (Android 5.0+)
private val networkCallback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
val wasUnavailable = !isNetworkAvailable
isNetworkAvailable = true
if (wasUnavailable) {
notifyNetworkAvailable()
}
}
override fun onLost(network: Network) {
// 检查是否真的没有可用网络了
if (isAnyNetworkAvailable()) {
return
}
isNetworkAvailable = false
notifyNetworkUnavailable()
}
}
// 广播接收器 (兼容旧版Android)
private val networkReceiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
val wasUnavailable = !isNetworkAvailable
isNetworkAvailable = isAnyNetworkAvailable()
if (isNetworkAvailable && wasUnavailable) {
notifyNetworkAvailable()
} else if (!isNetworkAvailable) {
notifyNetworkUnavailable()
}
}
}
// 开始监听
fun startMonitoring() {
if (isMonitoring) return
isNetworkAvailable = isAnyNetworkAvailable()
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
// Android 7.0+ 使用NetworkCallback
val request = NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.build()
connectivityManager.registerNetworkCallback(request, networkCallback)
} else {
// 旧版Android使用广播
val filter = IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)
context.registerReceiver(networkReceiver, filter)
}
isMonitoring = true
}
// 停止监听
fun stopMonitoring() {
if (!isMonitoring) return
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
connectivityManager.unregisterNetworkCallback(networkCallback)
} else {
try {
context.unregisterReceiver(networkReceiver)
} catch (e: Exception) {
// 忽略接收器未注册的异常
}
}
isMonitoring = false
}
// 添加监听器
fun addListener(listener: NetworkListener) {
if (!listeners.contains(listener)) {
listeners.add(listener)
}
}
// 移除监听器
fun removeListener(listener: NetworkListener) {
listeners.remove(listener)
}
// 检查是否有可用网络
fun isNetworkAvailable(): Boolean {
return isAnyNetworkAvailable()
}
// 检查是否有可用网络的实现
private fun isAnyNetworkAvailable(): Boolean {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
val network = connectivityManager.activeNetwork ?: return false
val capabilities = connectivityManager.getNetworkCapabilities(network) ?: return false
return capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
} else {
val networkInfo = connectivityManager.activeNetworkInfo
return networkInfo != null && networkInfo.isConnected
}
}
// 通知网络可用
private fun notifyNetworkAvailable() {
listeners.forEach { it.onNetworkAvailable() }
}
// 通知网络不可用
private fun notifyNetworkUnavailable() {
listeners.forEach { it.onNetworkUnavailable() }
}
}
/**
* 增强型下载工作器,支持网络感知
*/
class NetworkAwareDownloadWorker(
appContext: Context,
workerParams: WorkerParameters
) : CoroutineWorker(appContext, workerParams) {
companion object {
const val TAG_DOWNLOAD_WORKER = "network_aware_download_worker"
const val KEY_TASK_ID = "task_id"
const val KEY_RETRY_COUNT = "retry_count"
const val MAX_RETRY_COUNT = 10
const val NOTIFICATION_CHANNEL_ID = "download_channel"
const val NOTIFICATION_ID = 1
// 调度下载任务
fun scheduleDownload(context: Context, taskId: String, retryCount: Int = 0) {
val data = workDataOf(
KEY_TASK_ID to taskId,
KEY_RETRY_COUNT to retryCount
)
// 设置网络约束
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.setRequiresStorageNotLow(true)
.build()
val workRequest = OneTimeWorkRequestBuilder<NetworkAwareDownloadWorker>()
.setConstraints(constraints)
.setInputData(data)
.addTag(TAG_DOWNLOAD_WORKER)
.addTag(taskId)
.setBackoffCriteria(
BackoffPolicy.EXPONENTIAL,
WorkRequest.MIN_BACKOFF_MILLIS,
TimeUnit.MILLISECONDS
)
.build()
WorkManager.getInstance(context)
.enqueueUniqueWork(
"download_$taskId",
ExistingWorkPolicy.REPLACE,
workRequest
)
}
// 取消下载任务
fun cancelDownload(context: Context, taskId: String) {
WorkManager.getInstance(context)
.cancelUniqueWork("download_$taskId")
}
}
override suspend fun doWork(): Result {
val taskId = inputData.getString(KEY_TASK_ID) ?: return Result.failure()
val retryCount = inputData.getInt(KEY_RETRY_COUNT, 0)
// 获取下载任务
val downloadManager = NetworkAwareDownloadManager.getInstance(applicationContext)
val task = downloadManager.getTaskById(taskId) ?: return Result.failure()
// 更新任务状态为下载中
downloadManager.updateTaskStatus(taskId, DownloadStatus.RUNNING)
try {
// 创建前台服务通知
setForeground(createForegroundInfo("准备下载..."))
// 执行下载
downloadManager.downloadFile(task, this)
// 下载完成
downloadManager.updateTaskStatus(taskId, DownloadStatus.COMPLETED)
return Result.success()
} catch (e: Exception) {
e.printStackTrace()
// 确定错误类型
val isNetworkError = e is IOException &&
(e.message?.contains("network", ignoreCase = true) == true ||
e.message?.contains("connect", ignoreCase = true) == true ||
e.message?.contains("timeout", ignoreCase = true) == true)
if (isNetworkError) {
downloadManager.updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
// 网络错误,判断是否需要重试
if (retryCount < MAX_RETRY_COUNT) {
val newRetryCount = retryCount + 1
// 等待时间随重试次数增加
val delayInSeconds = Math.min(30, Math.pow(2.0, newRetryCount.toDouble())).toLong()
// 安排下一次重试
val retryData = workDataOf(
KEY_TASK_ID to taskId,
KEY_RETRY_COUNT to newRetryCount
)
return Result.retry()
}
}
// 达到最大重试次数或非网络错误
downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
return Result.failure()
}
}
// 创建前台服务信息
private fun createForegroundInfo(message: String, progress: Int = 0): ForegroundInfo {
// 实现创建通知的逻辑
// ...与之前的实现类似...
// 简化版本
val notification = NotificationCompat.Builder(applicationContext, NOTIFICATION_CHANNEL_ID)
.setContentTitle("下载中")
.setContentText(message)
.setSmallIcon(android.R.drawable.stat_sys_download)
.setOngoing(true)
.apply {
if (progress > 0) {
setProgress(100, progress, false)
}
}
.build()
return ForegroundInfo(NOTIFICATION_ID, notification)
}
}
/**
* 增强的下载状态枚举
*/
enum class DownloadStatus {
PENDING, // 等待下载
RUNNING, // 下载中
PAUSED, // 手动暂停
WAITING_FOR_NETWORK, // 等待网络
COMPLETED, // 完成
FAILED, // 失败
CANCELED // 取消
}
/**
* 网络感知下载管理器
*/
class NetworkAwareDownloadManager private constructor(private val context: Context) : NetworkMonitor.NetworkListener {
companion object {
private var INSTANCE: NetworkAwareDownloadManager? = null
fun getInstance(context: Context): NetworkAwareDownloadManager {
return INSTANCE ?: synchronized(this) {
INSTANCE ?: NetworkAwareDownloadManager(context.applicationContext).also {
INSTANCE = it
}
}
}
}
private val downloadTaskDao: DownloadTaskDao by lazy {
DownloadDatabase.getDatabase(context).downloadTaskDao()
}
private val networkMonitor = NetworkMonitor(context)
private val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build()
private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private val _downloadEvents = MutableSharedFlow<DownloadEvent>()
val downloadEvents = _downloadEvents.asSharedFlow()
val allTasks = downloadTaskDao.getAllTasks()
init {
// 开始监听网络状态
networkMonitor.addListener(this)
networkMonitor.startMonitoring()
// 应用启动时检查和恢复下载任务
downloadScope.launch {
// 当状态是WAITING_FOR_NETWORK且有网络连接时,恢复下载
if (networkMonitor.isNetworkAvailable()) {
downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
resumeDownloadWithNetwork(task.id)
}
}
}
}
override fun onNetworkAvailable() {
// 网络恢复时恢复等待网络的下载任务
downloadScope.launch {
downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
resumeDownloadWithNetwork(task.id)
}
}
}
override fun onNetworkUnavailable() {
// 网络断开时暂停正在下载的任务
downloadScope.launch {
downloadTaskDao.getTasksByStatus(DownloadStatus.RUNNING).first().forEach { task ->
pauseDownloadDueToNetwork(task.id)
}
}
}
// 添加下载任务
suspend fun addDownloadTask(
url: String,
fileName: String,
savePath: String,
priority: Int = 0
): String {
val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"
// 创建保存目录
val saveDir = File(savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}
// 创建任务
val task = DownloadTask(
id = taskId,
url = url,
fileName = fileName,
savePath = savePath,
status = if (networkMonitor.isNetworkAvailable()) DownloadStatus.PENDING else DownloadStatus.WAITING_FOR_NETWORK,
priority = priority
)
// 保存到数据库
downloadTaskDao.insertTask(task)
// 如果网络可用,立即开始下载
if (networkMonitor.isNetworkAvailable()) {
NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
} else {
_downloadEvents.emit(
DownloadEvent.StatusChanged(
taskId,
DownloadStatus.WAITING_FOR_NETWORK
)
)
}
return taskId
}
// 暂停下载
suspend fun pauseDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.RUNNING ||
task.status == DownloadStatus.PENDING ||
task.status == DownloadStatus.WAITING_FOR_NETWORK) {
// 取消Worker
NetworkAwareDownloadWorker.cancelDownload(context, taskId)
// 更新状态
updateTaskStatus(taskId, DownloadStatus.PAUSED)
}
}
// 因网络问题暂停下载
private suspend fun pauseDownloadDueToNetwork(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.RUNNING || task.status == DownloadStatus.PENDING) {
// 更新状态为等待网络
updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
}
}
// 恢复下载
suspend fun resumeDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
if (networkMonitor.isNetworkAvailable()) {
// 网络可用,开始下载
updateTaskStatus(taskId, DownloadStatus.PENDING)
NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
} else {
// 网络不可用,标记为等待网络
updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
}
}
}
// 网络恢复时恢复下载
private suspend fun resumeDownloadWithNetwork(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status == DownloadStatus.WAITING_FOR_NETWORK) {
updateTaskStatus(taskId, DownloadStatus.PENDING)
NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
}
}
// 取消下载
suspend fun cancelDownload(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
// 取消Worker
NetworkAwareDownloadWorker.cancelDownload(context, taskId)
// 更新状态
updateTaskStatus(taskId, DownloadStatus.CANCELED)
// 删除部分下载的文件
val file = File(task.savePath, task.fileName)
if (file.exists()) {
file.delete()
}
}
// 删除任务
suspend fun deleteTask(taskId: String) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
if (task.status != DownloadStatus.COMPLETED) {
// 取消下载
NetworkAwareDownloadWorker.cancelDownload(context, taskId)
}
// 从数据库删除
downloadTaskDao.deleteTask(task)
}
// 获取任务
suspend fun getTaskById(taskId: String): DownloadTask? {
return downloadTaskDao.getTaskById(taskId)
}
// 更新任务状态
suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
val task = downloadTaskDao.getTaskById(taskId) ?: return
val updatedTask = task.copy(
status = status,
lastUpdated = System.currentTimeMillis()
)
downloadTaskDao.updateTask(updatedTask)
_downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
}
// 执行文件下载
suspend fun downloadFile(task: DownloadTask, worker: NetworkAwareDownloadWorker) {
// 检查网络状态
if (!networkMonitor.isNetworkAvailable()) {
throw IOException("网络不可用")
}
// 确保目录存在
val saveDir = File(task.savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}
val file = File(saveDir, task.fileName)
var totalBytes = task.totalBytes
var downloadedBytes = task.downloadedBytes
// 检查文件是否已存在且有部分数据
if (file.exists() && downloadedBytes > 0) {
val fileLength = file.length()
if (fileLength != downloadedBytes) {
// 文件大小不一致,使用实际文件大小
downloadedBytes = fileLength
}
} else if (downloadedBytes > 0) {
// 文件不存在但记录有下载字节,重置
downloadedBytes = 0
}
// 构建请求
val requestBuilder = Request.Builder()
.url(task.url)
// 添加Range头以支持断点续传
if (downloadedBytes > 0) {
requestBuilder.addHeader("Range", "bytes=$downloadedBytes-")
}
val request = requestBuilder.build()
// 执行请求
okHttpClient.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("HTTP error: ${response.code}")
}
val body = response.body ?: throw IOException("Empty response body")
val contentLength = body.contentLength()
// 确定总大小
if (totalBytes <= 0 && contentLength > 0) {
totalBytes = when {
response.code == 206 -> { // 断点续传
val contentRange = response.header("Content-Range")
if (contentRange != null) {
// 解析形如 "bytes 100-999/1000" 的Content-Range头
val rangeMatcher = Regex("bytes \\d+-\\d+/(\\d+)").find(contentRange)
rangeMatcher?.groupValues?.get(1)?.toLongOrNull() ?: (downloadedBytes + contentLength)
} else {
downloadedBytes + contentLength
}
}
else -> contentLength // 完整下载
}
// 更新任务总大小
val updatedTask = task.copy(totalBytes = totalBytes)
downloadTaskDao.updateTask(updatedTask)
}
// 准备输出流
val outputStream = if (downloadedBytes > 0) {
FileOutputStream(file, true) // 断点续传,追加模式
} else {
FileOutputStream(file) // 从头下载,覆盖模式
}
outputStream.use { output ->
val buffer = ByteArray(8192)
val input = body.byteStream()
var bytesRead: Int
var lastProgressUpdate = 0L
while (input.read(buffer).also { bytesRead = it } != -1) {
// 定期检查网络状态
if (!networkMonitor.isNetworkAvailable()) {
throw IOException("网络连接已断开")
}
if (worker.isStopped) {
throw InterruptedException("下载已停止")
}
output.write(buffer, 0, bytesRead)
downloadedBytes += bytesRead
// 更新进度
val now = System.currentTimeMillis()
if (now - lastProgressUpdate > 500) { // 每500ms更新一次
lastProgressUpdate = now
val progress = if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else 0
// 更新数据库
val updatedTask = task.copy(
downloadedBytes = downloadedBytes,
lastUpdated = now
)
downloadTaskDao.updateTask(updatedTask)
// 通知进度
_downloadEvents.emit(
DownloadEvent.Progress(
task.id,
progress,
downloadedBytes,
totalBytes
)
)
// 更新Worker通知
worker.setForeground(
worker.createForegroundInfo(
"下载中...",
progress
)
)
}
}
}
}
// 下载完成
val updatedTask = task.copy(
downloadedBytes = totalBytes,
status = DownloadStatus.COMPLETED,
lastUpdated = System.currentTimeMillis()
)
downloadTaskDao.updateTask(updatedTask)
_downloadEvents.emit(
DownloadEvent.Complete(
task.id,
File(task.savePath, task.fileName).absolutePath
)
)
}
// 释放资源
fun release() {
networkMonitor.removeListener(this)
networkMonitor.stopMonitoring()
downloadScope.cancel()
}
}