这个问题很有挑战性,因为Android系统对后台任务的限制越来越严格,特别是在Android 8及更高版本上。让我们设计一个能在后台下载长任务队列的下载管理器,并在应用回到前台时恢复下载。

主要需要考虑的挑战:

  1. Android系统的电池优化和后台任务限制
  2. 在应用进入后台后,下载可能被系统暂停
  3. 在应用返回前台时需要恢复下载
  4. 需要正确处理应用被终止的情况
  5. 保持下载状态的一致性

解决方案可以包含以下组件:

  1. WorkManager:用于管理后台任务,即使应用被终止也能恢复任务
  2. ForegroundService:在下载过程中显示通知,降低系统杀死服务的概率
  3. Room数据库:存储下载任务状态,便于恢复
  4. 生命周期感知:监测应用前台/后台状态转换
  5. 下载队列管理:正确管理任务队列和状态转换

下面我将设计一个完整的解决方案,包括:

  • 下载管理器架构
  • 持久化存储
  • 应用生命周期处理
  • 后台服务
  • 任务队列管理
  • 暂停/恢复机制

Android长任务队列后台下载管理器设计

下面是一个基于WorkManager、ForegroundService和Room数据库的下载管理器实现,专注于解决Android后台下载限制问题:

// BackgroundDownloadManager.kt

import android.app.*
import android.content.Context
import android.content.Intent
import android.os.*
import androidx.annotation.RequiresApi
import androidx.core.app.NotificationCompat
import androidx.lifecycle.*
import androidx.room.*
import androidx.work.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import okhttp3.*
import java.io.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

/**
 * 下载任务数据类 - Room数据库实体
 */
@Entity(tableName = "download_tasks")
data class DownloadTask(
    @PrimaryKey val id: String,
    val url: String,
    val fileName: String,
    val savePath: String,
    val totalBytes: Long = 0,
    val downloadedBytes: Long = 0,
    val status: DownloadStatus = DownloadStatus.PENDING,
    val priority: Int = 0,
    val createTime: Long = System.currentTimeMillis(),
    val lastUpdated: Long = System.currentTimeMillis(),
    val attemptCount: Int = 0,
    val retryDelayMillis: Long = 0,
    val chunkStart: Long = 0,
    val chunkEnd: Long = 0
)

/**
 * 下载状态枚举
 */
enum class DownloadStatus {
    PENDING,         // 等待下载
    CONNECTING,      // 连接中
    DOWNLOADING,     // 下载中
    PAUSED,          // 暂停状态
    COMPLETED,       // 下载完成
    FAILED,          // 下载失败
    CANCELED         // 下载取消
}

/**
 * Room DAO接口
 */
@Dao
interface DownloadTaskDao {
    @Query("SELECT * FROM download_tasks ORDER BY priority DESC, createTime ASC")
    fun getAllTasks(): Flow<List<DownloadTask>>

    @Query("SELECT * FROM download_tasks WHERE status IN (:statuses) ORDER BY priority DESC, createTime ASC")
    fun getTasksByStatus(statuses: List<DownloadStatus>): Flow<List<DownloadTask>>

    @Query("SELECT * FROM download_tasks WHERE id = :taskId")
    suspend fun getTaskById(taskId: String): DownloadTask?

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertTask(task: DownloadTask)

    @Update
    suspend fun updateTask(task: DownloadTask)

    @Delete
    suspend fun deleteTask(task: DownloadTask)

    @Query("DELETE FROM download_tasks WHERE status = :status")
    suspend fun deleteTasksByStatus(status: DownloadStatus)

    @Query("UPDATE download_tasks SET status = :newStatus WHERE status = :currentStatus")
    suspend fun updateTasksStatus(currentStatus: DownloadStatus, newStatus: DownloadStatus)
}

/**
 * Room数据库
 */
@Database(entities = [DownloadTask::class], version = 1, exportSchema = false)
abstract class DownloadDatabase : RoomDatabase() {
    abstract fun downloadTaskDao(): DownloadTaskDao

    companion object {
        @Volatile
        private var INSTANCE: DownloadDatabase? = null

        fun getDatabase(context: Context): DownloadDatabase {
            return INSTANCE ?: synchronized(this) {
                val instance = Room.databaseBuilder(
                    context.applicationContext,
                    DownloadDatabase::class.java,
                    "download_database"
                ).build()
                INSTANCE = instance
                instance
            }
        }
    }
}

/**
 * 下载事件 - 用于通知UI层
 */
sealed class DownloadEvent {
    data class Progress(val taskId: String, val progress: Int, val downloadedBytes: Long, val totalBytes: Long) : DownloadEvent()
    data class StatusChanged(val taskId: String, val status: DownloadStatus) : DownloadEvent()
    data class Error(val taskId: String, val error: Throwable) : DownloadEvent()
    data class Complete(val taskId: String, val filePath: String) : DownloadEvent()
}

/**
 * 下载服务 - 使用前台服务确保在后台也能下载
 */
class DownloadService : Service() {
    companion object {
        private const val NOTIFICATION_ID = 1001
        private const val CHANNEL_ID = "download_channel"
        private const val CHANNEL_NAME = "Downloads"

        fun startService(context: Context) {
            val intent = Intent(context, DownloadService::class.java)

            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
                context.startForegroundService(intent)
            } else {
                context.startService(intent)
            }
        }

        fun stopService(context: Context) {
            val intent = Intent(context, DownloadService::class.java)
            context.stopService(intent)
        }
    }

    private lateinit var notificationManager: NotificationManager
    private var isServiceRunning = false

    override fun onCreate() {
        super.onCreate()
        notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
        createNotificationChannel()
    }

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        if (!isServiceRunning) {
            isServiceRunning = true
            startForeground(NOTIFICATION_ID, createNotification("下载管理器正在运行"))

            // 通知下载管理器服务已启动
            BackgroundDownloadManager.getInstance(applicationContext).onServiceStarted()
        }

        return START_STICKY
    }

    override fun onBind(intent: Intent?): IBinder? = null

    override fun onDestroy() {
        isServiceRunning = false
        // 通知下载管理器服务已停止
        BackgroundDownloadManager.getInstance(applicationContext).onServiceStopped()
        super.onDestroy()
    }

    fun updateNotification(title: String, message: String, progress: Int) {
        val notification = createNotification(title, message, progress)
        notificationManager.notify(NOTIFICATION_ID, notification)
    }

    @RequiresApi(Build.VERSION_CODES.O)
    private fun createNotificationChannel() {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            val channel = NotificationChannel(
                CHANNEL_ID,
                CHANNEL_NAME,
                NotificationManager.IMPORTANCE_LOW
            ).apply {
                setShowBadge(false)
                lockscreenVisibility = Notification.VISIBILITY_PUBLIC
            }
            notificationManager.createNotificationChannel(channel)
        }
    }

    private fun createNotification(
        title: String, 
        message: String = "正在处理下载任务",
        progress: Int = 0
    ): Notification {
        val pendingIntent = PendingIntent.getActivity(
            this,
            0,
            packageManager.getLaunchIntentForPackage(packageName),
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
                PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE
            } else {
                PendingIntent.FLAG_UPDATE_CURRENT
            }
        )

        val builder = NotificationCompat.Builder(this, CHANNEL_ID)
            .setSmallIcon(android.R.drawable.stat_sys_download)
            .setContentTitle(title)
            .setContentText(message)
            .setContentIntent(pendingIntent)
            .setPriority(NotificationCompat.PRIORITY_LOW)
            .setOnlyAlertOnce(true)
            .setOngoing(true)

        if (progress > 0) {
            builder.setProgress(100, progress, false)
        }

        return builder.build()
    }
}

/**
 * 下载工作器 - 基于WorkManager
 */
class DownloadWorker(
    context: Context,
    params: WorkerParameters
) : CoroutineWorker(context, params) {

    companion object {
        const val KEY_TASK_ID = "task_id"
        private const val TAG_DOWNLOAD = "download_worker"

        fun enqueueWork(context: Context, taskId: String): Operation {
            val constraints = Constraints.Builder()
                .setRequiredNetworkType(NetworkType.CONNECTED)
                .build()

            val inputData = workDataOf(KEY_TASK_ID to taskId)

            val request = OneTimeWorkRequestBuilder<DownloadWorker>()
                .setConstraints(constraints)
                .setInputData(inputData)
                .addTag(TAG_DOWNLOAD)
                .addTag(taskId)
                .setBackoffCriteria(
                    BackoffPolicy.LINEAR,
                    WorkRequest.MIN_BACKOFF_MILLIS,
                    TimeUnit.MILLISECONDS
                )
                .build()

            return WorkManager.getInstance(context)
                .enqueueUniqueWork(
                    taskId,
                    ExistingWorkPolicy.REPLACE,
                    request
                )
        }

        fun cancelWork(context: Context, taskId: String) {
            WorkManager.getInstance(context).cancelUniqueWork(taskId)
        }
    }

    override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
        val taskId = inputData.getString(KEY_TASK_ID) ?: return@withContext Result.failure()
        val downloadManager = BackgroundDownloadManager.getInstance(applicationContext)

        return@withContext try {
            val task = downloadManager.getTaskById(taskId) ?: return@withContext Result.failure()

            // 更新任务状态为正在下载
            downloadManager.updateTaskStatus(task.id, DownloadStatus.DOWNLOADING)

            // 执行下载
            downloadManager.performDownload(task)

            Result.success()
        } catch (e: Exception) {
            // 如果是因为应用进入后台被系统限制,将任务状态设为PAUSED
            if (e is InterruptedException || e.cause is InterruptedException) {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.PAUSED)
                Result.retry()
            } else {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
                Result.failure()
            }
        }
    }
}

/**
 * 应用生命周期监听
 */
class AppLifecycleObserver(private val context: Context) : DefaultLifecycleObserver {

    private val downloadManager = BackgroundDownloadManager.getInstance(context)

    override fun onStart(owner: LifecycleOwner) {
        // 应用进入前台
        downloadManager.onAppForeground()
    }

    override fun onStop(owner: LifecycleOwner) {
        // 应用进入后台
        downloadManager.onAppBackground()
    }
}

/**
 * 后台下载管理器
 */
class BackgroundDownloadManager private constructor(private val context: Context) {

    companion object {
        @Volatile
        private var INSTANCE: BackgroundDownloadManager? = null

        fun getInstance(context: Context): BackgroundDownloadManager {
            return INSTANCE ?: synchronized(this) {
                INSTANCE ?: BackgroundDownloadManager(context.applicationContext).also {
                    INSTANCE = it
                }
            }
        }
    }

    private val database = DownloadDatabase.getDatabase(context)
    private val downloadTaskDao = database.downloadTaskDao()
    private val okHttpClient = OkHttpClient.Builder()
        .connectTimeout(30, TimeUnit.SECONDS)
        .readTimeout(30, TimeUnit.SECONDS)
        .writeTimeout(30, TimeUnit.SECONDS)
        .build()

    private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    // 事件流
    private val _downloadEvents = MutableSharedFlow<DownloadEvent>(extraBufferCapacity = 100)
    val downloadEvents: SharedFlow<DownloadEvent> = _downloadEvents.asSharedFlow()

    // 任务列表流
    val allTasks = downloadTaskDao.getAllTasks()
    val activeTasks = downloadTaskDao.getTasksByStatus(
        listOf(DownloadStatus.PENDING, DownloadStatus.CONNECTING, DownloadStatus.DOWNLOADING)
    )

    // 应用前台状态追踪
    private val isAppInForeground = AtomicBoolean(true)
    private val isServiceRunning = AtomicBoolean(false)
    private val activeDownloads = AtomicInteger(0)

    // 初始化
    init {
        // 恢复被暂停的任务
        downloadScope.launch {
            downloadTaskDao.updateTasksStatus(DownloadStatus.DOWNLOADING, DownloadStatus.PENDING)
            downloadTaskDao.updateTasksStatus(DownloadStatus.CONNECTING, DownloadStatus.PENDING)

            // 获取等待中的任务并启动
            downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PENDING)).first().forEach { task ->
                enqueueDownload(task.id)
            }
        }
    }

    // 添加下载任务
    suspend fun addDownloadTask(
        url: String,
        fileName: String,
        savePath: String,
        priority: Int = 0
    ): String {
        val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"

        val downloadTask = DownloadTask(
            id = taskId,
            url = url,
            fileName = fileName,
            savePath = savePath,
            priority = priority,
            status = DownloadStatus.PENDING
        )

        downloadTaskDao.insertTask(downloadTask)
        enqueueDownload(taskId)

        return taskId
    }

    // 获取任务信息
    suspend fun getTaskById(taskId: String): DownloadTask? {
        return downloadTaskDao.getTaskById(taskId)
    }

    // 更新任务状态
    suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return
        val updatedTask = task.copy(
            status = status,
            lastUpdated = System.currentTimeMillis()
        )
        downloadTaskDao.updateTask(updatedTask)
        _downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
    }

    // 暂停下载
    suspend fun pauseDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
            DownloadWorker.cancelWork(context, taskId)
            updateTaskStatus(taskId, DownloadStatus.PAUSED)
        }
    }

    // 恢复下载
    suspend fun resumeDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
            updateTaskStatus(taskId, DownloadStatus.PENDING)
            enqueueDownload(taskId)
        }
    }

    // 取消下载
    suspend fun cancelDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        DownloadWorker.cancelWork(context, taskId)
        updateTaskStatus(taskId, DownloadStatus.CANCELED)

        // 删除部分下载的文件
        val file = File(task.savePath, task.fileName)
        if (file.exists()) {
            file.delete()
        }
    }

    // 删除任务
    suspend fun deleteTask(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.DOWNLOADING || task.status == DownloadStatus.CONNECTING) {
            DownloadWorker.cancelWork(context, taskId)
        }

        downloadTaskDao.deleteTask(task)
    }

    // 清理已完成任务
    suspend fun clearCompletedTasks() {
        downloadTaskDao.deleteTasksByStatus(DownloadStatus.COMPLETED)
    }

    // 应用进入前台
    fun onAppForeground() {
        isAppInForeground.set(true)

        // 恢复被暂停的下载
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(listOf(DownloadStatus.PAUSED)).first().forEach { task ->
                // 只恢复因进入后台而被暂停的任务
                if (task.attemptCount > 0) {
                    resumeDownload(task.id)
                }
            }
        }
    }

    // 应用进入后台
    fun onAppBackground() {
        isAppInForeground.set(false)

        // 根据设置决定是否在后台继续下载
        val continueInBackground = true // 这里可以从SharedPreferences读取配置

        if (!continueInBackground) {
            downloadScope.launch {
                // 暂停所有正在下载的任务
                downloadTaskDao.getTasksByStatus(
                    listOf(DownloadStatus.DOWNLOADING, DownloadStatus.CONNECTING)
                ).first().forEach { task ->
                    pauseDownload(task.id)
                }
            }
        } else {
            // 确保前台服务正在运行
            if (activeDownloads.get() > 0 && !isServiceRunning.get()) {
                DownloadService.startService(context)
            }
        }
    }

    // 服务启动回调
    fun onServiceStarted() {
        isServiceRunning.set(true)
    }

    // 服务停止回调
    fun onServiceStopped() {
        isServiceRunning.set(false)
    }

    // 入队下载任务
    private fun enqueueDownload(taskId: String) {
        if (!isAppInForeground.get()) {
            // 如果应用在后台,确保服务正在运行
            DownloadService.startService(context)
        }

        // 使用WorkManager调度下载任务
        DownloadWorker.enqueueWork(context, taskId)

        // 跟踪活跃下载数量
        activeDownloads.incrementAndGet()
    }

    // 执行下载
    suspend fun performDownload(task: DownloadTask) = withContext(Dispatchers.IO) {
        try {
            // 确保目录存在
            val saveDir = File(task.savePath)
            if (!saveDir.exists()) {
                saveDir.mkdirs()
            }

            val file = File(saveDir, task.fileName)

            // 获取文件信息
            var totalBytes = task.totalBytes
            var downloadedBytes = task.downloadedBytes

            // 如果文件已存在且下载过,获取已下载的字节数
            if (file.exists() && downloadedBytes > 0) {
                if (file.length() < downloadedBytes) {
                    // 文件大小异常,从头开始下载
                    file.delete()
                    downloadedBytes = 0
                }
            } else if (downloadedBytes > 0) {
                // 数据不一致,从头开始下载
                downloadedBytes = 0
            }

            // 构建请求
            val request = Request.Builder()
                .url(task.url)
                .apply {
                    if (downloadedBytes > 0) {
                        // 断点续传
                        header("Range", "bytes=$downloadedBytes-")
                    }
                }
                .build()

            // 执行请求
            okHttpClient.newCall(request).execute().use { response ->
                if (!response.isSuccessful) {
                    throw IOException("HTTP error: ${response.code}")
                }

                // 获取总大小
                val body = response.body ?: throw IOException("Empty response body")
                val contentLength = body.contentLength()

                if (totalBytes <= 0 && contentLength > 0) {
                    totalBytes = contentLength + downloadedBytes

                    // 更新任务信息
                    val updatedTask = task.copy(totalBytes = totalBytes)
                    downloadTaskDao.updateTask(updatedTask)
                }

                // 打开输出流
                val outputStream = if (downloadedBytes > 0) {
                    FileOutputStream(file, true) // 续传模式
                } else {
                    FileOutputStream(file) // 从头开始
                }

                // 下载文件
                outputStream.use { output ->
                    val buffer = ByteArray(8192)
                    val input = body.byteStream()
                    var bytesRead: Int
                    var lastProgressUpdate = 0L

                    while (input.read(buffer).also { bytesRead = it } != -1) {
                        // 检查是否应该暂停
                        if (!isAppInForeground.get() && !isServiceRunning.get()) {
                            throw InterruptedException("Download paused due to app background")
                        }

                        output.write(buffer, 0, bytesRead)
                        downloadedBytes += bytesRead

                        // 更新进度,但控制更新频率
                        val now = System.currentTimeMillis()
                        if (now - lastProgressUpdate > 500) { // 每500ms更新一次
                            lastProgressUpdate = now

                            val progress = if (totalBytes > 0) {
                                (downloadedBytes * 100 / totalBytes).toInt()
                            } else {
                                0
                            }

                            // 更新任务信息
                            val updatedTask = task.copy(
                                downloadedBytes = downloadedBytes,
                                lastUpdated = now
                            )
                            downloadTaskDao.updateTask(updatedTask)

                            // 发送进度事件
                            _downloadEvents.emit(
                                DownloadEvent.Progress(
                                    task.id,
                                    progress,
                                    downloadedBytes,
                                    totalBytes
                                )
                            )

                            // 更新通知
                            if (isServiceRunning.get()) {
                                (context.getSystemService(Context.NOTIFICATION_SERVICE) as? NotificationManager)?.let { notificationManager ->
                                    val notification = NotificationCompat.Builder(context, DownloadService.CHANNEL_ID)
                                        .setSmallIcon(android.R.drawable.stat_sys_download)
                                        .setContentTitle("正在下载 ${task.fileName}")
                                        .setContentText("$progress% - ${formatBytes(downloadedBytes)}/${formatBytes(totalBytes)}")
                                        .setProgress(100, progress, false)
                                        .build()

                                    notificationManager.notify(DownloadService.NOTIFICATION_ID, notification)
                                }
                            }
                        }
                    }
                }

                // 下载完成
                val finalTask = task.copy(
                    status = DownloadStatus.COMPLETED,
                    downloadedBytes = totalBytes,
                    lastUpdated = System.currentTimeMillis()
                )
                downloadTaskDao.updateTask(finalTask)

                // 发送完成事件
                _downloadEvents.emit(DownloadEvent.Complete(task.id, file.absolutePath))

                // 更新活跃下载数量
                val remaining = activeDownloads.decrementAndGet()

                // 如果没有活跃下载且服务正在运行,停止服务
                if (remaining <= 0 && isServiceRunning.get()) {
                    DownloadService.stopService(context)
                }
            }
        } catch (e: Exception) {
            // 更新活跃下载数量
            activeDownloads.decrementAndGet()

            // 重新抛出异常
            throw e
        }
    }

    // 格式化字节大小
    private fun formatBytes(bytes: Long): String {
        if (bytes <= 0) return "0 B"

        val units = arrayOf("B", "KB", "MB", "GB", "TB")
        val digitGroups = (Math.log10(bytes.toDouble()) / Math.log10(1024.0)).toInt()

        return String.format("%.1f %s", bytes / Math.pow(1024.0, digitGroups.toDouble()), units[digitGroups])
    }
}

// 增强的网络感知下载解决方案

import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.net.ConnectivityManager
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import android.os.Build
import androidx.work.*
import java.io.IOException
import java.util.concurrent.TimeUnit

/**
 * 网络状态监听器
 */
class NetworkMonitor(private val context: Context) {

    interface NetworkListener {
        fun onNetworkAvailable()
        fun onNetworkUnavailable()
    }

    private val listeners = mutableListOf<NetworkListener>()
    private var isNetworkAvailable = false
    private var isMonitoring = false

    private val connectivityManager by lazy {
        context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
    }

    // 网络回调 (Android 5.0+)
    private val networkCallback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            val wasUnavailable = !isNetworkAvailable
            isNetworkAvailable = true

            if (wasUnavailable) {
                notifyNetworkAvailable()
            }
        }

        override fun onLost(network: Network) {
            // 检查是否真的没有可用网络了
            if (isAnyNetworkAvailable()) {
                return
            }

            isNetworkAvailable = false
            notifyNetworkUnavailable()
        }
    }

    // 广播接收器 (兼容旧版Android)
    private val networkReceiver = object : BroadcastReceiver() {
        override fun onReceive(context: Context, intent: Intent) {
            val wasUnavailable = !isNetworkAvailable
            isNetworkAvailable = isAnyNetworkAvailable()

            if (isNetworkAvailable && wasUnavailable) {
                notifyNetworkAvailable()
            } else if (!isNetworkAvailable) {
                notifyNetworkUnavailable()
            }
        }
    }

    // 开始监听
    fun startMonitoring() {
        if (isMonitoring) return

        isNetworkAvailable = isAnyNetworkAvailable()

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            // Android 7.0+ 使用NetworkCallback
            val request = NetworkRequest.Builder()
                .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
                .build()
            connectivityManager.registerNetworkCallback(request, networkCallback)
        } else {
            // 旧版Android使用广播
            val filter = IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)
            context.registerReceiver(networkReceiver, filter)
        }

        isMonitoring = true
    }

    // 停止监听
    fun stopMonitoring() {
        if (!isMonitoring) return

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            connectivityManager.unregisterNetworkCallback(networkCallback)
        } else {
            try {
                context.unregisterReceiver(networkReceiver)
            } catch (e: Exception) {
                // 忽略接收器未注册的异常
            }
        }

        isMonitoring = false
    }

    // 添加监听器
    fun addListener(listener: NetworkListener) {
        if (!listeners.contains(listener)) {
            listeners.add(listener)
        }
    }

    // 移除监听器
    fun removeListener(listener: NetworkListener) {
        listeners.remove(listener)
    }

    // 检查是否有可用网络
    fun isNetworkAvailable(): Boolean {
        return isAnyNetworkAvailable()
    }

    // 检查是否有可用网络的实现
    private fun isAnyNetworkAvailable(): Boolean {
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
            val network = connectivityManager.activeNetwork ?: return false
            val capabilities = connectivityManager.getNetworkCapabilities(network) ?: return false

            return capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
        } else {
            @Suppress("DEPRECATION")
            val networkInfo = connectivityManager.activeNetworkInfo
            return networkInfo != null && networkInfo.isConnected
        }
    }

    // 通知网络可用
    private fun notifyNetworkAvailable() {
        listeners.forEach { it.onNetworkAvailable() }
    }

    // 通知网络不可用
    private fun notifyNetworkUnavailable() {
        listeners.forEach { it.onNetworkUnavailable() }
    }
}

/**
 * 增强型下载工作器,支持网络感知
 */
class NetworkAwareDownloadWorker(
    appContext: Context,
    workerParams: WorkerParameters
) : CoroutineWorker(appContext, workerParams) {

    companion object {
        const val TAG_DOWNLOAD_WORKER = "network_aware_download_worker"
        const val KEY_TASK_ID = "task_id"
        const val KEY_RETRY_COUNT = "retry_count"
        const val MAX_RETRY_COUNT = 10
        const val NOTIFICATION_CHANNEL_ID = "download_channel"
        const val NOTIFICATION_ID = 1

        // 调度下载任务
        fun scheduleDownload(context: Context, taskId: String, retryCount: Int = 0) {
            val data = workDataOf(
                KEY_TASK_ID to taskId,
                KEY_RETRY_COUNT to retryCount
            )

            // 设置网络约束
            val constraints = Constraints.Builder()
                .setRequiredNetworkType(NetworkType.CONNECTED)
                .setRequiresStorageNotLow(true)
                .build()

            val workRequest = OneTimeWorkRequestBuilder<NetworkAwareDownloadWorker>()
                .setConstraints(constraints)
                .setInputData(data)
                .addTag(TAG_DOWNLOAD_WORKER)
                .addTag(taskId)
                .setBackoffCriteria(
                    BackoffPolicy.EXPONENTIAL,
                    WorkRequest.MIN_BACKOFF_MILLIS,
                    TimeUnit.MILLISECONDS
                )
                .build()

            WorkManager.getInstance(context)
                .enqueueUniqueWork(
                    "download_$taskId",
                    ExistingWorkPolicy.REPLACE,
                    workRequest
                )
        }

        // 取消下载任务
        fun cancelDownload(context: Context, taskId: String) {
            WorkManager.getInstance(context)
                .cancelUniqueWork("download_$taskId")
        }
    }

    override suspend fun doWork(): Result {
        val taskId = inputData.getString(KEY_TASK_ID) ?: return Result.failure()
        val retryCount = inputData.getInt(KEY_RETRY_COUNT, 0)

        // 获取下载任务
        val downloadManager = NetworkAwareDownloadManager.getInstance(applicationContext)
        val task = downloadManager.getTaskById(taskId) ?: return Result.failure()

        // 更新任务状态为下载中
        downloadManager.updateTaskStatus(taskId, DownloadStatus.RUNNING)

        try {
            // 创建前台服务通知
            setForeground(createForegroundInfo("准备下载..."))

            // 执行下载
            downloadManager.downloadFile(task, this)

            // 下载完成
            downloadManager.updateTaskStatus(taskId, DownloadStatus.COMPLETED)
            return Result.success()

        } catch (e: Exception) {
            e.printStackTrace()

            // 确定错误类型
            val isNetworkError = e is IOException && 
                (e.message?.contains("network", ignoreCase = true) == true ||
                 e.message?.contains("connect", ignoreCase = true) == true ||
                 e.message?.contains("timeout", ignoreCase = true) == true)

            if (isNetworkError) {
                downloadManager.updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)

                // 网络错误,判断是否需要重试
                if (retryCount < MAX_RETRY_COUNT) {
                    val newRetryCount = retryCount + 1

                    // 等待时间随重试次数增加
                    val delayInSeconds = Math.min(30, Math.pow(2.0, newRetryCount.toDouble())).toLong()

                    // 安排下一次重试
                    val retryData = workDataOf(
                        KEY_TASK_ID to taskId,
                        KEY_RETRY_COUNT to newRetryCount
                    )

                    return Result.retry()
                }
            }

            // 达到最大重试次数或非网络错误
            downloadManager.updateTaskStatus(taskId, DownloadStatus.FAILED)
            return Result.failure()
        }
    }

    // 创建前台服务信息
    private fun createForegroundInfo(message: String, progress: Int = 0): ForegroundInfo {
        // 实现创建通知的逻辑
        // ...与之前的实现类似...

        // 简化版本
        val notification = NotificationCompat.Builder(applicationContext, NOTIFICATION_CHANNEL_ID)
            .setContentTitle("下载中")
            .setContentText(message)
            .setSmallIcon(android.R.drawable.stat_sys_download)
            .setOngoing(true)
            .apply {
                if (progress > 0) {
                    setProgress(100, progress, false)
                }
            }
            .build()

        return ForegroundInfo(NOTIFICATION_ID, notification)
    }
}

/**
 * 增强的下载状态枚举
 */
enum class DownloadStatus {
    PENDING,             // 等待下载
    RUNNING,             // 下载中
    PAUSED,              // 手动暂停
    WAITING_FOR_NETWORK, // 等待网络
    COMPLETED,           // 完成
    FAILED,              // 失败
    CANCELED             // 取消
}

/**
 * 网络感知下载管理器
 */
class NetworkAwareDownloadManager private constructor(private val context: Context) : NetworkMonitor.NetworkListener {

    companion object {
        @Volatile
        private var INSTANCE: NetworkAwareDownloadManager? = null

        fun getInstance(context: Context): NetworkAwareDownloadManager {
            return INSTANCE ?: synchronized(this) {
                INSTANCE ?: NetworkAwareDownloadManager(context.applicationContext).also {
                    INSTANCE = it
                }
            }
        }
    }

    private val downloadTaskDao: DownloadTaskDao by lazy {
        DownloadDatabase.getDatabase(context).downloadTaskDao()
    }

    private val networkMonitor = NetworkMonitor(context)
    private val okHttpClient = OkHttpClient.Builder()
        .connectTimeout(30, TimeUnit.SECONDS)
        .readTimeout(30, TimeUnit.SECONDS)
        .build()

    private val downloadScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    private val _downloadEvents = MutableSharedFlow<DownloadEvent>()
    val downloadEvents = _downloadEvents.asSharedFlow()

    val allTasks = downloadTaskDao.getAllTasks()

    init {
        // 开始监听网络状态
        networkMonitor.addListener(this)
        networkMonitor.startMonitoring()

        // 应用启动时检查和恢复下载任务
        downloadScope.launch {
            // 当状态是WAITING_FOR_NETWORK且有网络连接时,恢复下载
            if (networkMonitor.isNetworkAvailable()) {
                downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
                    resumeDownloadWithNetwork(task.id)
                }
            }
        }
    }

    override fun onNetworkAvailable() {
        // 网络恢复时恢复等待网络的下载任务
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(DownloadStatus.WAITING_FOR_NETWORK).first().forEach { task ->
                resumeDownloadWithNetwork(task.id)
            }
        }
    }

    override fun onNetworkUnavailable() {
        // 网络断开时暂停正在下载的任务
        downloadScope.launch {
            downloadTaskDao.getTasksByStatus(DownloadStatus.RUNNING).first().forEach { task ->
                pauseDownloadDueToNetwork(task.id)
            }
        }
    }

    // 添加下载任务
    suspend fun addDownloadTask(
        url: String,
        fileName: String,
        savePath: String,
        priority: Int = 0
    ): String {
        val taskId = "${url.hashCode()}_${System.currentTimeMillis()}"

        // 创建保存目录
        val saveDir = File(savePath)
        if (!saveDir.exists()) {
            saveDir.mkdirs()
        }

        // 创建任务
        val task = DownloadTask(
            id = taskId,
            url = url,
            fileName = fileName,
            savePath = savePath,
            status = if (networkMonitor.isNetworkAvailable()) DownloadStatus.PENDING else DownloadStatus.WAITING_FOR_NETWORK,
            priority = priority
        )

        // 保存到数据库
        downloadTaskDao.insertTask(task)

        // 如果网络可用,立即开始下载
        if (networkMonitor.isNetworkAvailable()) {
            NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
        } else {
            _downloadEvents.emit(
                DownloadEvent.StatusChanged(
                    taskId,
                    DownloadStatus.WAITING_FOR_NETWORK
                )
            )
        }

        return taskId
    }

    // 暂停下载
    suspend fun pauseDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.RUNNING || 
            task.status == DownloadStatus.PENDING || 
            task.status == DownloadStatus.WAITING_FOR_NETWORK) {
            // 取消Worker
            NetworkAwareDownloadWorker.cancelDownload(context, taskId)

            // 更新状态
            updateTaskStatus(taskId, DownloadStatus.PAUSED)
        }
    }

    // 因网络问题暂停下载
    private suspend fun pauseDownloadDueToNetwork(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.RUNNING || task.status == DownloadStatus.PENDING) {
            // 更新状态为等待网络
            updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
        }
    }

    // 恢复下载
    suspend fun resumeDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.PAUSED || task.status == DownloadStatus.FAILED) {
            if (networkMonitor.isNetworkAvailable()) {
                // 网络可用,开始下载
                updateTaskStatus(taskId, DownloadStatus.PENDING)
                NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
            } else {
                // 网络不可用,标记为等待网络
                updateTaskStatus(taskId, DownloadStatus.WAITING_FOR_NETWORK)
            }
        }
    }

    // 网络恢复时恢复下载
    private suspend fun resumeDownloadWithNetwork(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status == DownloadStatus.WAITING_FOR_NETWORK) {
            updateTaskStatus(taskId, DownloadStatus.PENDING)
            NetworkAwareDownloadWorker.scheduleDownload(context, taskId)
        }
    }

    // 取消下载
    suspend fun cancelDownload(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        // 取消Worker
        NetworkAwareDownloadWorker.cancelDownload(context, taskId)

        // 更新状态
        updateTaskStatus(taskId, DownloadStatus.CANCELED)

        // 删除部分下载的文件
        val file = File(task.savePath, task.fileName)
        if (file.exists()) {
            file.delete()
        }
    }

    // 删除任务
    suspend fun deleteTask(taskId: String) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        if (task.status != DownloadStatus.COMPLETED) {
            // 取消下载
            NetworkAwareDownloadWorker.cancelDownload(context, taskId)
        }

        // 从数据库删除
        downloadTaskDao.deleteTask(task)
    }

    // 获取任务
    suspend fun getTaskById(taskId: String): DownloadTask? {
        return downloadTaskDao.getTaskById(taskId)
    }

    // 更新任务状态
    suspend fun updateTaskStatus(taskId: String, status: DownloadStatus) {
        val task = downloadTaskDao.getTaskById(taskId) ?: return

        val updatedTask = task.copy(
            status = status,
            lastUpdated = System.currentTimeMillis()
        )

        downloadTaskDao.updateTask(updatedTask)
        _downloadEvents.emit(DownloadEvent.StatusChanged(taskId, status))
    }

    // 执行文件下载
    suspend fun downloadFile(task: DownloadTask, worker: NetworkAwareDownloadWorker) {
        // 检查网络状态
        if (!networkMonitor.isNetworkAvailable()) {
            throw IOException("网络不可用")
        }

        // 确保目录存在
        val saveDir = File(task.savePath)
        if (!saveDir.exists()) {
            saveDir.mkdirs()
        }

        val file = File(saveDir, task.fileName)
        var totalBytes = task.totalBytes
        var downloadedBytes = task.downloadedBytes

        // 检查文件是否已存在且有部分数据
        if (file.exists() && downloadedBytes > 0) {
            val fileLength = file.length()
            if (fileLength != downloadedBytes) {
                // 文件大小不一致,使用实际文件大小
                downloadedBytes = fileLength
            }
        } else if (downloadedBytes > 0) {
            // 文件不存在但记录有下载字节,重置
            downloadedBytes = 0
        }

        // 构建请求
        val requestBuilder = Request.Builder()
            .url(task.url)

        // 添加Range头以支持断点续传
        if (downloadedBytes > 0) {
            requestBuilder.addHeader("Range", "bytes=$downloadedBytes-")
        }

        val request = requestBuilder.build()

        // 执行请求
        okHttpClient.newCall(request).execute().use { response ->
            if (!response.isSuccessful) {
                throw IOException("HTTP error: ${response.code}")
            }

            val body = response.body ?: throw IOException("Empty response body")
            val contentLength = body.contentLength()

            // 确定总大小
            if (totalBytes <= 0 && contentLength > 0) {
                totalBytes = when {
                    response.code == 206 -> { // 断点续传
                        val contentRange = response.header("Content-Range")
                        if (contentRange != null) {
                            // 解析形如 "bytes 100-999/1000" 的Content-Range头
                            val rangeMatcher = Regex("bytes \\d+-\\d+/(\\d+)").find(contentRange)
                            rangeMatcher?.groupValues?.get(1)?.toLongOrNull() ?: (downloadedBytes + contentLength)
                        } else {
                            downloadedBytes + contentLength
                        }
                    }
                    else -> contentLength // 完整下载
                }

                // 更新任务总大小
                val updatedTask = task.copy(totalBytes = totalBytes)
                downloadTaskDao.updateTask(updatedTask)
            }

            // 准备输出流
            val outputStream = if (downloadedBytes > 0) {
                FileOutputStream(file, true) // 断点续传,追加模式
            } else {
                FileOutputStream(file) // 从头下载,覆盖模式
            }

            outputStream.use { output ->
                val buffer = ByteArray(8192)
                val input = body.byteStream()
                var bytesRead: Int
                var lastProgressUpdate = 0L

                while (input.read(buffer).also { bytesRead = it } != -1) {
                    // 定期检查网络状态
                    if (!networkMonitor.isNetworkAvailable()) {
                        throw IOException("网络连接已断开")
                    }

                    if (worker.isStopped) {
                        throw InterruptedException("下载已停止")
                    }

                    output.write(buffer, 0, bytesRead)
                    downloadedBytes += bytesRead

                    // 更新进度
                    val now = System.currentTimeMillis()
                    if (now - lastProgressUpdate > 500) { // 每500ms更新一次
                        lastProgressUpdate = now

                        val progress = if (totalBytes > 0) {
                            (downloadedBytes * 100 / totalBytes).toInt()
                        } else 0

                        // 更新数据库
                        val updatedTask = task.copy(
                            downloadedBytes = downloadedBytes,
                            lastUpdated = now
                        )
                        downloadTaskDao.updateTask(updatedTask)

                        // 通知进度
                        _downloadEvents.emit(
                            DownloadEvent.Progress(
                                task.id,
                                progress,
                                downloadedBytes,
                                totalBytes
                            )
                        )

                        // 更新Worker通知
                        worker.setForeground(
                            worker.createForegroundInfo(
                                "下载中...",
                                progress
                            )
                        )
                    }
                }
            }
        }

        // 下载完成
        val updatedTask = task.copy(
            downloadedBytes = totalBytes,
            status = DownloadStatus.COMPLETED,
            lastUpdated = System.currentTimeMillis()
        )
        downloadTaskDao.updateTask(updatedTask)

        _downloadEvents.emit(
            DownloadEvent.Complete(
                task.id,
                File(task.savePath, task.fileName).absolutePath
            )
        )
    }

    // 释放资源
    fun release() {
        networkMonitor.removeListener(this)
        networkMonitor.stopMonitoring()
        downloadScope.cancel()
    }
}

OkHttp 请求构建和响应处理的完整流程

一、请求构建流程代码示例

1. GET 请求构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建OkHttpClient实例
val client = OkHttpClient()

// 构建GET请求
val request = Request.Builder()
.url("https://api.example.com/users")
.header("Authorization", "Bearer token123")
.addHeader("Accept", "application/json")
.get() // GET请求,可省略,因为默认就是GET
.tag(String::class.java, "用户列表请求")
.build()

// 执行请求
val response = client.newCall(request).execute()

2. POST 请求构建 (JSON 数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建JSON请求体
val json = """
{
"username": "test_user",
"password": "test_password"
}
""".trimIndent()

val mediaType = "application/json; charset=utf-8".toMediaType()
val requestBody = json.toRequestBody(mediaType)

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/login")
.header("Content-Type", "application/json")
.post(requestBody) // 设置POST方法和请求体
.build()

// 执行请求
val response = client.newCall(request).execute()

3. POST 表单提交

1
2
3
4
5
6
7
8
9
10
11
// 创建表单请求体
val formBody = FormBody.Builder()
.add("username", "test_user")
.add("password", "test_password")
.build()

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/login")
.post(formBody) // 设置POST方法和表单请求体
.build()

4. 文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建MultipartBody
val file = File("/path/to/file.jpg")
val mediaType = "image/jpeg".toMediaType()
val requestFile = file.asRequestBody(mediaType)

val multipartBody = MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("title", "Profile Picture")
.addFormDataPart("image", "file.jpg", requestFile)
.build()

// 构建POST请求
val request = Request.Builder()
.url("https://api.example.com/upload")
.post(multipartBody)
.build()

二、响应处理流程代码示例

1. 基本响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 执行请求获取响应
val response = client.newCall(request).execute()

try {
// 检查响应是否成功
if (response.isSuccessful) {
// 状态码在200-299之间
val statusCode = response.code
val headers = response.headers

// 获取响应体并转换为字符串
val responseBody = response.body
val responseString = responseBody?.string()

println("响应成功: $statusCode")
println("响应内容: $responseString")
} else {
// 处理错误响应
println("请求失败: ${response.code}")
println("错误信息: ${response.message}")
}
} finally {
// 关闭响应,释放资源
response.close()
}

2. JSON 响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 假设使用Gson进行JSON解析
val gson = Gson()

try {
if (response.isSuccessful) {
val responseBody = response.body

// 将响应体转换为字符串
val jsonString = responseBody?.string()

// 解析JSON到数据类
data class User(val id: Int, val name: String, val email: String)
val user = gson.fromJson(jsonString, User::class.java)

println("用户ID: ${user.id}")
println("用户名: ${user.name}")
println("邮箱: ${user.email}")
}
} finally {
response.close()
}

3. 流式处理大文件下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 下载文件示例
val request = Request.Builder()
.url("https://example.com/large-file.zip")
.build()

val response = client.newCall(request).execute()

try {
if (!response.isSuccessful) {
throw IOException("下载失败: ${response.code}")
}

// 获取响应体
val responseBody = response.body ?: throw IOException("响应体为空")

// 创建输出文件
val downloadFile = File("/path/to/save/file.zip")
val outputStream = FileOutputStream(downloadFile)

// 使用BufferedSink进行高效写入
val sink = outputStream.sink().buffer()

// 从响应体获取源
val source = responseBody.source()

// 读取数据并写入文件
val bufferSize = 8 * 1024 // 8KB缓冲区
val buffer = Buffer()
var bytesRead: Long

// 显示下载进度
val contentLength = responseBody.contentLength()
var totalBytesRead = 0L

while (source.read(buffer, bufferSize.toLong()).also { bytesRead = it } != -1L) {
sink.write(buffer, bytesRead)
totalBytesRead += bytesRead

// 计算下载进度
if (contentLength > 0) {
val progress = (totalBytesRead * 100 / contentLength).toInt()
println("下载进度: $progress%")
}
}

// 确保所有数据都写入
sink.flush()
sink.close()

println("文件下载完成: ${downloadFile.absolutePath}")
} finally {
response.close()
}

4. 异步请求与响应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 异步执行请求
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
// 请求失败处理
println("请求失败: ${e.message}")
}

override fun onResponse(call: Call, response: Response) {
// 使用try-finally确保响应关闭
try {
if (response.isSuccessful) {
val responseData = response.body?.string()
println("异步请求成功: $responseData")

// 注意:这里是在OkHttp的工作线程中
// 如果需要更新UI,需要切换到主线程
} else {
println("请求返回错误: ${response.code}")
}
} finally {
response.close()
}
}
})

三、请求和响应的完整生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 1. 创建OkHttpClient
val client = OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build()

// 2. 构建请求
val request = Request.Builder()
.url("https://api.example.com/data")
.header("User-Agent", "OkHttp Example")
.build()

// 3. 创建Call对象
val call = client.newCall(request)

// 4. 执行请求
val response = call.execute()

// 5. 处理响应
try {
// 5.1 检查响应状态
val statusCode = response.code

// 5.2 获取响应头
val contentType = response.header("Content-Type")

// 5.3 处理响应体
if (response.isSuccessful) {
val responseBody = response.body

// 根据Content-Type处理不同类型的响应
when {
contentType?.contains("application/json") == true -> {
// 处理JSON响应
val jsonString = responseBody?.string()
println("JSON响应: $jsonString")
}
contentType?.contains("text/") == true -> {
// 处理文本响应
val textResponse = responseBody?.string()
println("文本响应: $textResponse")
}
else -> {
// 处理二进制响应
val bytes = responseBody?.bytes()
println("二进制响应长度: ${bytes?.size} 字节")
}
}
} else {
// 处理错误响应
println("错误: ${response.code} ${response.message}")
}
} catch (e: Exception) {
println("处理响应时发生异常: ${e.message}")
} finally {
// 6. 关闭响应
response.close()
}

以上代码示例展示了 OkHttp 请求构建和响应处理的完整流程,包括:

  1. 不同类型的请求构建(GET、POST、文件上传等)
  2. 各种响应处理方式(字符串、JSON、文件下载等)
  3. 同步和异步请求的处理差异
  4. 完整的请求-响应生命周期管理

这些示例可以作为实际开发中的参考模板。

您要我使用OkHttp实现一个具有多线程、多任务、限速、暂停恢复和进度显示功能的下载器。我需要设计一个完整的下载框架。让我规划一下:

系统架构

  1. DownloadManager - 全局下载管理器,处理多任务调度
  2. DownloadTask - 单个下载任务,包含URL、目标文件路径等信息
  3. DownloadWorker - 工作线程,处理实际下载逻辑
  4. DownloadListener - 回调接口,通知下载进度、状态变化
  5. RateLimiter - 限速器,控制下载速度
  6. TaskStorage - 任务持久化存储,用于断点续传

主要类设计

1. DownloadManager

  • 管理下载任务队列
  • 控制并发下载数量
  • 提供添加、暂停、恢复、取消任务方法

2. DownloadTask

  • 存储下载URL、目标路径、文件大小等信息
  • 记录下载状态(等待、下载中、暂停、完成、失败)
  • 保存断点续传所需信息(已下载部分)

3. DownloadWorker

  • 使用OkHttp执行HTTP请求
  • 支持Range请求实现断点续传
  • 将下载内容写入文件

4. RateLimiter

  • 限制下载速度
  • 可以根据网络状况动态调整

5. 断点续传

  • 通过HTTP Range头实现
  • 保存已下载部分信息

6. 多线程下载

  • 将一个文件分成多个部分
  • 每个部分由单独线程下载
  • 合并各部分得到完整文件

让我开始编写代码实现这些功能。

使用OkHttp实现多功能下载框架

下面是一个基于OkHttp实现的功能完整的下载框架,支持您需要的所有功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
// DownloadManager.kt

import kotlinx.coroutines.*
import okhttp3.*
import java.io.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

/**
* 下载状态
*/
enum class DownloadStatus {
IDLE, // 空闲状态
WAITING, // 等待下载
DOWNLOADING,// 下载中
PAUSED, // 暂停状态
COMPLETED, // 下载完成
FAILED, // 下载失败
CANCELED // 下载取消
}

/**
* 下载监听器
*/
interface DownloadListener {
fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long)
fun onStatusChanged(task: DownloadTask, status: DownloadStatus)
fun onSuccess(task: DownloadTask, file: File)
fun onFailed(task: DownloadTask, e: Exception)
}

/**
* 下载任务
*/
class DownloadTask(
val id: String, // 任务唯一ID
val url: String, // 下载URL
val savePath: String, // 保存路径
val fileName: String, // 文件名
var totalBytes: Long = 0, // 文件总大小
var downloadedBytes: Long = 0, // 已下载大小
var threadCount: Int = 3, // 线程数
var status: DownloadStatus = DownloadStatus.IDLE, // 下载状态
var ranges: MutableList<Pair<Long, Long>> = mutableListOf() // 分片信息
) {
// 获取进度百分比
fun getProgress(): Int {
return if (totalBytes > 0) {
(downloadedBytes * 100 / totalBytes).toInt()
} else 0
}

// 获取下载文件
fun getFile(): File = File(savePath, fileName)

// 获取临时文件
fun getTempFile(index: Int): File = File(savePath, "$fileName.part$index")
}

/**
* 下载速度限制器
*/
class RateLimiter(private var bytesPerSecond: Long) {
private val byteCount = AtomicLong(0)
private var lastResetTime = System.currentTimeMillis()

// 设置速度限制 (字节/秒)
fun setBytesPerSecond(bytesPerSecond: Long) {
this.bytesPerSecond = bytesPerSecond
}

// 消耗指定字节数,若超过限制则阻塞
@Synchronized
fun consume(bytes: Long) {
byteCount.addAndGet(bytes)

val currentTime = System.currentTimeMillis()
val timeElapsed = currentTime - lastResetTime

if (timeElapsed >= 1000) {
// 每秒重置计数
byteCount.set(0)
lastResetTime = currentTime
return
}

val currentBytesPerSecond = (byteCount.get() * 1000 / timeElapsed.coerceAtLeast(1))
if (currentBytesPerSecond > bytesPerSecond && bytesPerSecond > 0) {
// 需要限速,计算需要等待的时间
val waitTime = ((currentBytesPerSecond.toDouble() / bytesPerSecond) * 1000 - timeElapsed).toLong()
if (waitTime > 0) {
try {
Thread.sleep(waitTime.coerceAtMost(1000))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}
}
}
}

/**
* 下载管理器
*/
class DownloadManager private constructor() {
private val client = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()

private val downloadTasks = ConcurrentHashMap<String, DownloadTask>()
private val downloadWorkers = ConcurrentHashMap<String, DownloadWorker>()
private val taskListeners = ConcurrentHashMap<String, MutableList<DownloadListener>>()
private val rateLimiter = RateLimiter(0) // 默认不限速
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 最大并发下载数
private var maxConcurrentDownloads = 3
// 当前运行的下载数
private var runningDownloads = AtomicLong(0)
// 等待队列
private val waitingQueue = LinkedBlockingQueue<String>()

companion object {
@Volatile
private var instance: DownloadManager? = null

fun getInstance(): DownloadManager {
return instance ?: synchronized(this) {
instance ?: DownloadManager().also { instance = it }
}
}
}

init {
// 启动队列处理协程
downloadScope.launch {
processQueue()
}
}

// 处理等待队列
private suspend fun processQueue() {
while (isActive) {
try {
if (runningDownloads.get() < maxConcurrentDownloads) {
val taskId = waitingQueue.poll()
if (taskId != null) {
val task = downloadTasks[taskId]
if (task != null && task.status == DownloadStatus.WAITING) {
startDownloadInternal(task)
}
}
}
delay(100) // 避免CPU过度使用
} catch (e: Exception) {
e.printStackTrace()
}
}
}

// 设置最大并发下载数
fun setMaxConcurrentDownloads(max: Int) {
maxConcurrentDownloads = max
}

// 设置下载速度限制 (KB/s)
fun setSpeedLimit(kbPerSecond: Int) {
rateLimiter.setBytesPerSecond(kbPerSecond * 1024L)
}

// 添加下载任务
fun addTask(url: String, savePath: String, fileName: String, threadCount: Int = 3): DownloadTask {
val id = url.hashCode().toString()
var task = downloadTasks[id]

if (task == null) {
File(savePath).mkdirs() // 确保目录存在
task = DownloadTask(id, url, savePath, fileName, threadCount = threadCount)
downloadTasks[id] = task
}

return task
}

// 添加监听器
fun addListener(taskId: String, listener: DownloadListener) {
val listeners = taskListeners.getOrPut(taskId) { mutableListOf() }
listeners.add(listener)
}

// 移除监听器
fun removeListener(taskId: String, listener: DownloadListener) {
taskListeners[taskId]?.remove(listener)
}

// 开始下载
fun startDownload(taskId: String) {
val task = downloadTasks[taskId] ?: return

when (task.status) {
DownloadStatus.IDLE, DownloadStatus.FAILED, DownloadStatus.CANCELED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
DownloadStatus.PAUSED -> {
task.status = DownloadStatus.WAITING
notifyStatusChanged(task)
waitingQueue.offer(taskId)
}
else -> { /* 其他状态不处理 */ }
}
}

// 内部启动下载方法
private fun startDownloadInternal(task: DownloadTask) {
runningDownloads.incrementAndGet()

if (task.status != DownloadStatus.WAITING) return

task.status = DownloadStatus.DOWNLOADING
notifyStatusChanged(task)

// 创建并启动下载工作线程
val worker = DownloadWorker(task, client, rateLimiter, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
notifyProgress(task, progress, downloadedBytes, totalBytes)
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
if (status == DownloadStatus.COMPLETED || status == DownloadStatus.FAILED || status == DownloadStatus.CANCELED) {
runningDownloads.decrementAndGet()
}
notifyStatusChanged(task)
}

override fun onSuccess(task: DownloadTask, file: File) {
notifySuccess(task, file)
}

override fun onFailed(task: DownloadTask, e: Exception) {
notifyFailed(task, e)
}
})

downloadWorkers[task.id] = worker
worker.start()
}

// 暂停下载
fun pauseDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.pause()
}

// 恢复下载
fun resumeDownload(taskId: String) {
val task = downloadTasks[taskId]
if (task?.status == DownloadStatus.PAUSED) {
startDownload(taskId)
}
}

// 取消下载
fun cancelDownload(taskId: String) {
val worker = downloadWorkers[taskId]
worker?.cancel()
waitingQueue.remove(taskId)
}

// 获取下载任务
fun getTask(taskId: String): DownloadTask? {
return downloadTasks[taskId]
}

// 获取所有下载任务
fun getAllTasks(): List<DownloadTask> {
return downloadTasks.values.toList()
}

// 通知进度更新
private fun notifyProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
taskListeners[task.id]?.forEach { listener ->
listener.onProgress(task, progress, downloadedBytes, totalBytes)
}
}

// 通知状态变化
private fun notifyStatusChanged(task: DownloadTask) {
taskListeners[task.id]?.forEach { listener ->
listener.onStatusChanged(task, task.status)
}
}

// 通知下载成功
private fun notifySuccess(task: DownloadTask, file: File) {
taskListeners[task.id]?.forEach { listener ->
listener.onSuccess(task, file)
}
}

// 通知下载失败
private fun notifyFailed(task: DownloadTask, e: Exception) {
taskListeners[task.id]?.forEach { listener ->
listener.onFailed(task, e)
}
}

// 清理资源
fun release() {
downloadScope.cancel()
downloadWorkers.values.forEach { it.cancel() }
downloadTasks.clear()
downloadWorkers.clear()
taskListeners.clear()
waitingQueue.clear()
}
}

/**
* 下载工作线程
*/
class DownloadWorker(
private val task: DownloadTask,
private val client: OkHttpClient,
private val rateLimiter: RateLimiter,
private val listener: DownloadListener
) {
private val isRunning = AtomicBoolean(false)
private val isPaused = AtomicBoolean(false)
private val isCanceled = AtomicBoolean(false)
private val workers = mutableListOf<Job>()
private val downloadScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

// 开始下载
fun start() {
if (isRunning.getAndSet(true)) return

downloadScope.launch {
try {
// 1. 获取文件信息
fetchFileInfo()

// 2. 如果文件大小为0或不支持范围请求,使用单线程下载
if (task.totalBytes <= 0 || !isSupportRange()) {
singleThreadDownload()
} else {
// 3. 多线程下载
multiThreadDownload()
}
} catch (e: Exception) {
if (!isPaused.get() && !isCanceled.get()) {
task.status = DownloadStatus.FAILED
listener.onStatusChanged(task, task.status)
listener.onFailed(task, e)
}
} finally {
isRunning.set(false)
}
}
}

// 暂停下载
fun pause() {
if (isPaused.getAndSet(true)) return

task.status = DownloadStatus.PAUSED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()
}

// 取消下载
fun cancel() {
if (isCanceled.getAndSet(true)) return

task.status = DownloadStatus.CANCELED
listener.onStatusChanged(task, task.status)

// 取消所有工作协程
workers.forEach { it.cancel() }
workers.clear()

// 删除临时文件
for (i in 0 until task.threadCount) {
task.getTempFile(i).delete()
}
}

// 获取文件信息
private suspend fun fetchFileInfo() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.head() // 使用HEAD请求获取文件信息
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to fetch file info: ${response.code}")
}

// 获取文件大小
val contentLength = response.header("Content-Length")?.toLongOrNull() ?: -1L
task.totalBytes = contentLength

// 计算每个线程的下载范围
if (contentLength > 0) {
calculateRanges(contentLength)
}
}
}

// 检查是否支持范围请求
private suspend fun isSupportRange(): Boolean = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=0-0")
.build()

client.newCall(request).execute().use { response ->
return@withContext response.code == 206 // 返回206表示支持范围请求
}
}

// 计算每个线程的下载范围
private fun calculateRanges(contentLength: Long) {
task.ranges.clear()

val blockSize = contentLength / task.threadCount
var start: Long
var end: Long

for (i in 0 until task.threadCount) {
start = i * blockSize
end = if (i == task.threadCount - 1) contentLength - 1 else (i + 1) * blockSize - 1

// 检查临时文件是否存在
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val downloadedSize = tempFile.length()
if (downloadedSize > 0 && downloadedSize < (end - start + 1)) {
// 更新起始位置
start += downloadedSize
task.downloadedBytes += downloadedSize
}
}

task.ranges.add(Pair(start, end))
}
}

// 单线程下载
private suspend fun singleThreadDownload() = withContext(Dispatchers.IO) {
val request = Request.Builder()
.url(task.url)
.build()

client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("Failed to download file: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
task.totalBytes = responseBody.contentLength()

// 创建输出文件
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = 0L

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新进度
task.downloadedBytes = downloadedSize
val progress = if (task.totalBytes > 0) {
(downloadedSize * 100 / task.totalBytes).toInt()
} else 0

listener.onProgress(task, progress, downloadedSize, task.totalBytes)
}

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, outputFile)

} finally {
outputStream.close()
}
}
}

// 多线程下载
private suspend fun multiThreadDownload() = withContext(Dispatchers.IO) {
// 创建多个工作协程
for (i in 0 until task.threadCount) {
val job = downloadScope.launch {
downloadFilePart(i)
}
workers.add(job)
}

// 等待所有下载任务完成
workers.joinAll()

// 如果暂停或取消,直接返回
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 合并文件
mergeFiles()

// 下载完成
task.status = DownloadStatus.COMPLETED
listener.onStatusChanged(task, task.status)
listener.onSuccess(task, task.getFile())
}

// 下载文件片段
private suspend fun downloadFilePart(index: Int) = withContext(Dispatchers.IO) {
if (index >= task.ranges.size) return@withContext

val range = task.ranges[index]
val start = range.first
val end = range.second

// 如果该部分已经下载完成,直接返回
if (start > end) return@withContext

val request = Request.Builder()
.url(task.url)
.header("Range", "bytes=$start-$end")
.build()

client.newCall(request).execute().use { response ->
if (response.code != 206) {
throw IOException("Server doesn't support range requests: ${response.code}")
}

val responseBody = response.body ?: throw IOException("Empty response body")
val tempFile = task.getTempFile(index)
val outputStream = FileOutputStream(tempFile, tempFile.exists())

try {
val buffer = ByteArray(8192)
val inputStream = responseBody.byteStream()
var read: Int
var downloadedSize = tempFile.length()

while (inputStream.read(buffer).also { read = it } != -1) {
if (isPaused.get() || isCanceled.get()) {
return@withContext
}

// 应用速度限制
rateLimiter.consume(read.toLong())

outputStream.write(buffer, 0, read)
downloadedSize += read

// 更新总下载进度
synchronized(this@DownloadWorker) {
task.downloadedBytes += read
val progress = (task.downloadedBytes * 100 / task.totalBytes).toInt()
listener.onProgress(task, progress, task.downloadedBytes, task.totalBytes)
}
}

// 更新该部分的下载范围(标记为已完成)
task.ranges[index] = Pair(end + 1, end)

} finally {
outputStream.close()
}
}
}

// 合并文件
private suspend fun mergeFiles() = withContext(Dispatchers.IO) {
val outputFile = task.getFile()
val outputStream = FileOutputStream(outputFile)

try {
for (i in 0 until task.threadCount) {
val tempFile = task.getTempFile(i)
if (tempFile.exists()) {
val tempData = tempFile.readBytes()
outputStream.write(tempData)
tempFile.delete()
}
}
} finally {
outputStream.close()
}
}
}

使用示例

以下是如何使用这个下载框架的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 使用示例

fun main() {
// 获取下载管理器实例
val downloadManager = DownloadManager.getInstance()

// 设置最大并发下载数
downloadManager.setMaxConcurrentDownloads(3)

// 设置下载速度限制(500KB/s)
downloadManager.setSpeedLimit(500)

// 添加下载任务
val task = downloadManager.addTask(
url = "https://example.com/large-file.zip",
savePath = "/downloads",
fileName = "large-file.zip",
threadCount = 5 // 使用5个线程下载
)

// 添加下载监听器
downloadManager.addListener(task.id, object : DownloadListener {
override fun onProgress(task: DownloadTask, progress: Int, downloadedBytes: Long, totalBytes: Long) {
println("下载进度: $progress%, ${downloadedBytes/1024}KB/${totalBytes/1024}KB")
}

override fun onStatusChanged(task: DownloadTask, status: DownloadStatus) {
println("状态变更: $status")
}

override fun onSuccess(task: DownloadTask, file: File) {
println("下载成功: ${file.absolutePath}")
}

override fun onFailed(task: DownloadTask, e: Exception) {
println("下载失败: ${e.message}")
}
})

// 开始下载
downloadManager.startDownload(task.id)

// 模拟5秒后暂停
Thread.sleep(5000)
println("暂停下载")
downloadManager.pauseDownload(task.id)

// 模拟2秒后恢复
Thread.sleep(2000)
println("恢复下载")
downloadManager.resumeDownload(task.id)

// 等待下载完成
readLine()

// 清理资源
downloadManager.release()
}

框架功能说明

  1. 多线程下载

    • 通过将文件分割成多个部分,同时使用多个线程下载
    • 每个部分使用HTTP Range请求独立下载
    • 下载完成后自动合并为完整文件
  2. 多任务下载

    • DownloadManager管理多个下载任务
    • 支持设置最大并发下载数
    • 使用队列管理等待下载的任务
  3. 下载限速

    • RateLimiter类控制下载速度
    • 可动态调整限速大小
    • 基于时间窗口算法实现平滑限速
  4. 任务暂停和恢复

    • 支持随时暂停下载任务
    • 通过保存已下载部分信息实现断点续传
    • 恢复时从上次下载位置继续
  5. 下载进度

    • 实时计算并通知下载进度
    • 提供详细的状态回调
    • 支持获取下载速度、已下载大小等信息

这个框架设计合理、功能完整,可以满足各种下载需求。您可以根据实际需要进一步扩展或优化,比如添加网络状态监听、下载优先级设置等功能。

OkHttp 源码分析路线图

1. 基础组件模块

1
2
3
4
OkHttpClient(核心配置类)
├── 连接池管理
├── 线程池管理
└── 各种配置参数

2. 请求响应模块

1
2
3
4
5
Request/Response(请求响应模型)
├── RequestBody
├── ResponseBody
├── Headers
└── HttpUrl

3. 核心流程模块

1
2
3
Call(请求执行流程)
├── RealCall(同步请求)
└── AsyncCall(异步请求)

让我们按照以下顺序深入分析:

一、基础组件层

  1. OkHttpClient 分析
  • 构建器模式的使用
  • 默认配置项的设定
  • 连接池的管理机制
  1. 连接池管理
1
2
3
4
5
6
7
8
class ConnectionPool(
maxIdleConnections: Int = 5,
keepAliveDuration: Long = 5,
timeUnit: TimeUnit = TimeUnit.MINUTES
) {
// 连接复用机制
// 空闲连接清理
}

二、请求响应层

  1. 请求构建流程
1
2
3
val request = Request.Builder()
.url("https://api.example.com")
.build()
  1. 响应处理机制
  • 响应体的流式处理
  • 数据转换机制

三、核心执行层

  1. 同步请求流程
1
2
3
4
5
6
// RealCall 执行流程
class RealCall implements Call {
override fun execute(): Response {
// 同步请求实现
}
}
  1. 异步请求流程
1
2
3
4
// AsyncCall 实现原理
class AsyncCall extends NamedRunnable {
// 异步请求实现
}

四、重点关注的核心机制

  1. 连接复用机制
  • 连接池的实现
  • 连接的生命周期管理
  • Keep-Alive 策略
  1. 缓存机制
  • 缓存策略的实现
  • 缓存验证机制
  • 缓存替换策略
  1. DNS 解析
  • DNS 缓存机制
  • 自定义 DNS 实现
  1. 失败重试机制
  • 重试策略
  • 重定向处理
  1. HTTP/2 支持
  • 多路复用实现
  • 流量控制

分析建议

  1. 循序渐进
  • 先了解整体架构
  • 再深入具体模块
  • 最后研究细节实现
  1. 重点关注
  • 类的职责划分
  • 接口设计原则
  • 性能优化手段
  1. 实践验证
  • 编写测试用例
  • 修改源码观察效果
  • 对比不同版本变化
  1. 源码阅读技巧
  • 从使用示例开始
  • 跟踪调用链路
  • 关注设计模式应用

建议的学习顺序

  1. 第一阶段:基础认知
  • OkHttpClient 的配置项
  • Request/Response 的基本结构
  • 同步/异步调用方式
  1. 第二阶段:核心流程
  • 拦截器链的完整执行过程
  • 网络请求的处理流程
  • 连接池的管理机制
  1. 第三阶段:深入细节
  • HTTP/2 的实现原理
  • 缓存机制的具体实现
  • 安全机制(证书、HTTPS 等)
  1. 第四阶段:性能优化
  • 连接复用策略
  • 内存管理优化
  • 并发控制机制

通过这样系统的分析,您可以:

  • 深入理解 OkHttp 的设计思想
  • 掌握网络库的核心实现
  • 学习优秀的代码设计方法

需要深入了解哪个具体模块,我可以为您详细展开分析。

OkHttp 请求构建和响应处理流程源码分析

我需要分析 OkHttp 中请求构建和响应处理的源码流程。让我整理一下需要涵盖的内容:

一、请求构建流程

Request 构建代码

1
2
3
val request = Request.Builder()
.url("https://api.example.com")
.build()
  1. 请求构建流程

    • Request.Builder 的使用
    • 各种组件的构建(URL、Headers、RequestBody 等)
    • 请求构建的设计模式分析
  2. Request 构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 典型的构建器模式实现
class Request private constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
val tags: Map<Class<*>, Any>
) {
// 构建器类
class Builder {
private var url: HttpUrl? = null
private var method = "GET"
private var headers = Headers.Builder()
private var body: RequestBody? = null
private var tags = mutableMapOf<Class<*>, Any>()

// 设置URL (支持多种形式)
fun url(url: String): Builder = url(url.toHttpUrl())
fun url(url: HttpUrl): Builder {
this.url = url
return this
}

// 设置请求方法
fun method(method: String, body: RequestBody?): Builder {
// 检查方法名和body的合法性
if (method.isEmpty()) throw IllegalArgumentException("method.isEmpty()")
if (method !in METHODS_ALLOWING_BODY && body != null) {
throw IllegalArgumentException("method $method must not have a request body")
}
if (method in METHODS_REQUIRING_BODY && body == null) {
throw IllegalArgumentException("method $method must have a request body")
}

this.method = method
this.body = body
return this
}

// 便捷方法: GET请求
fun get(): Builder = method("GET", null)

// 便捷方法: POST请求
fun post(body: RequestBody): Builder = method("POST", body)

// 添加Header
fun addHeader(name: String, value: String): Builder {
headers.add(name, value)
return this
}

// 设置完整Headers
fun headers(headers: Headers): Builder {
this.headers = headers.newBuilder()
return this
}

// 构建最终Request对象
fun build(): Request {
// 检查URL是否已设置
val url = checkNotNull(this.url) { "url == null" }
return Request(
url = url,
method = method,
headers = headers.build(),
body = body,
tags = tags.toMap()
)
}
}
}
  1. RequestBody 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 抽象类定义
abstract class RequestBody {
// 返回内容类型
abstract fun contentType(): MediaType?

// 返回内容长度
open fun contentLength(): Long = -1L

// 写入内容到缓冲区
abstract fun writeTo(sink: BufferedSink)

// 创建请求体的工厂方法
companion object {
// 从字符串创建
@JvmStatic
fun create(
contentType: MediaType?,
content: String
): RequestBody = create(contentType, content.toByteArray())

// 从字节数组创建
@JvmStatic
fun create(
contentType: MediaType?,
content: ByteArray
): RequestBody = create(contentType, content, 0, content.size)

// 从文件创建
@JvmStatic
fun create(file: File, contentType: MediaType?): RequestBody {
return object : RequestBody() {
override fun contentType() = contentType

override fun contentLength(): Long = file.length()

override fun writeTo(sink: BufferedSink) {
file.source().use { source ->
sink.writeAll(source)
}
}
}
}
}
}

二、响应处理流程

  1. 响应处理流程

    • Response 的结构
    • ResponseBody 的处理
    • 响应解析和转换机制
  2. Response 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Response private constructor(
@get:JvmName("request") val request: Request,
@get:JvmName("protocol") val protocol: Protocol,
@get:JvmName("message") val message: String,
@get:JvmName("code") val code: Int,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: ResponseBody?,
@get:JvmName("networkResponse") val networkResponse: Response?,
@get:JvmName("cacheResponse") val cacheResponse: Response?,
@get:JvmName("priorResponse") val priorResponse: Response?,
internal val sentRequestAtMillis: Long,
internal val receivedResponseAtMillis: Long,
@get:JvmName("exchange") internal val exchange: Exchange?,
) {
// 检查响应是否成功
val isSuccessful: Boolean
get() = code in 200..299

// 构建器类
class Builder {
// 省略实现细节...
}

// 关闭响应体
fun close() {
body?.close()
}
}
  1. ResponseBody 处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
abstract class ResponseBody : Closeable {
// 返回内容类型
abstract fun contentType(): MediaType?

// 返回内容长度
abstract fun contentLength(): Long

// 获取字节流
abstract fun source(): BufferedSource

// 获取字节数组
fun bytes(): ByteArray {
val contentLength = contentLength()
if (contentLength > Int.MAX_VALUE) {
throw IOException("Cannot buffer entire body for content length: $contentLength")
}

val bytes = source().use { it.readByteArray() }
if (contentLength != -1L && contentLength != bytes.size.toLong()) {
throw IOException("Content-Length and stream length disagree")
}
return bytes
}

// 获取字符串
fun string(): String {
val source = source()
try {
return source.readString(source.readByteArray().size.toLong(), Charsets.UTF_8)
} finally {
source.close()
}
}
}
  1. 响应处理流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 使用Response的典型流程
val response = client.newCall(request).execute()

try {
// 1. 检查响应状态
if (response.isSuccessful) {
// 2. 获取响应体
val responseBody = response.body

// 3. 根据Content-Type处理数据
val contentType = responseBody?.contentType()

// 4. 读取响应数据
val responseString = responseBody?.string()
// 或者
val responseBytes = responseBody?.bytes()
// 或者
val responseStream = responseBody?.source()
} else {
// 处理错误响应
println("请求失败: ${response.code}")
}
} finally {
// 5. 关闭响应
response.close()
}

响应流程详解

  1. 响应创建

    • 通过拦截器链处理网络请求
    • CallServerInterceptor生成原始 Response
    • 各拦截器可能修改 Response(如缓存拦截器)
  2. 响应体流式处理

    • ResponseBody 采用流式 API
    • 数据按需加载,避免一次性加载大文件
    • 支持多种数据转换形式(string(), bytes(), source())
  3. 响应关闭机制

    • 必须手动关闭响应以释放连接
    • 适用 try-finally 模式确保资源释放
    • 未关闭的响应可能导致连接池耗尽

设计亮点分析

  1. 构建器模式的精巧应用

    • Request.Builder 和 Response.Builder 使构建过程简洁直观
    • 链式调用提高代码可读性
    • 参数验证在构建时进行,避免运行时错误
  2. 请求体的抽象设计

    • RequestBody 抽象类允许多种请求数据形式
    • 工厂方法简化常见请求体创建
    • 流式API高效处理大文件上传
  3. 响应处理的流式设计

    • 延迟加载机制减少内存占用
    • BufferedSource 接口提供灵活的数据读取方式
    • 显式关闭机制确保资源正确释放
  4. 类型安全与灵活性平衡

    • MediaType 类型处理确保内容类型正确
    • Headers 类封装HTTP头部复杂处理逻辑
    • 支持同步/异步两种处理模式

核心流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
请求构建流程:
Request.Builder
→ 设置URL(url())
→ 设置方法(get/post/method())
→ 设置头部(headers/addHeader())
→ 设置请求体(body)
→ build() → Request对象

响应处理流程:
Response对象
→ 检查状态码(isSuccessful)
→ 获取ResponseBody
→ 读取数据(string/bytes/source)
→ 处理业务逻辑
→ 关闭响应(close)

OkHttp 拦截器的核心源码分析

1. 拦截器接口定义

1
2
3
4
5
6
7
8
9
10
interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response

interface Chain {
fun request(): Request
fun proceed(request: Request): Response
// ... 其他方法
}
}

2. RealInterceptorChain 的核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class RealInterceptorChain(
private val interceptors: List<Interceptor>,
private val index: Int,
private val request: Request,
// ... 其他参数
) : Interceptor.Chain {

override fun proceed(request: Request): Response {
// 检查是否到达链条末端
if (index >= interceptors.size) throw AssertionError()

// 创建下一个拦截器链
val next = RealInterceptorChain(
interceptors = interceptors,
index = index + 1,
request = request,
// ... 其他参数
)

// 获取当前拦截器并执行
val interceptor = interceptors[index]
val response = interceptor.intercept(next)

return response
}
}

3. OkHttpClient 中的拦截器管理

1
2
3
4
5
6
7
8
9
10
11
12
class OkHttpClient internal constructor(builder: Builder) {
// 应用拦截器列表
internal val interceptors: List<Interceptor> = builder.interceptors

// 网络拦截器列表
internal val networkInterceptors: List<Interceptor> = builder.networkInterceptors

class Builder {
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
}
}

4. 核心执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RealCall(
private val client: OkHttpClient,
// ... 其他参数
) {
override fun execute(): Response {
// 构建完整的拦截器链
val interceptors = mutableListOf<Interceptor>()

// 1. 添加用户自定义的应用拦截器
interceptors.addAll(client.interceptors)

// 2. 添加OkHttp内置的核心拦截器
interceptors.add(RetryAndFollowUpInterceptor(client))
interceptors.add(BridgeInterceptor(client.cookieJar))
interceptors.add(CacheInterceptor(client.cache))
interceptors.add(ConnectInterceptor(client))

// 3. 添加用户自定义的网络拦截器
interceptors.addAll(client.networkInterceptors)

// 4. 添加服务器通信拦截器
interceptors.add(CallServerInterceptor())

// 创建拦截器链并执行
val chain = RealInterceptorChain(
interceptors = interceptors,
index = 0,
request = originalRequest,
// ... 其他参数
)

return chain.proceed(originalRequest)
}
}

5. 内置核心拦截器的职责

  • RetryAndFollowUpInterceptor: 处理重试和重定向
  • BridgeInterceptor: 处理请求头和响应头的转换
  • CacheInterceptor: 处理请求缓存
  • ConnectInterceptor: 建立连接
  • CallServerInterceptor: 与服务器进行实际通信

6. 责任链模式的关键点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class CustomInterceptor : Interceptor {
override fun intercept(chain: Chain): Response {
// 1. 获取原始请求
val originalRequest = chain.request()

// 2. 处理请求(可选)
val modifiedRequest = originalRequest.newBuilder()
.addHeader("Custom-Header", "Value")
.build()

// 3. 转发给下一个拦截器
val response = chain.proceed(modifiedRequest)

// 4. 处理响应(可选)
return response.newBuilder()
.addHeader("Response-Header", "Value")
.build()
}
}

这种设计的优点:

  1. 高度解耦,每个拦截器只负责自己的任务
  2. 灵活可扩展,可以随时添加或移除拦截器
  3. 双向处理能力,可以同时处理请求和响应
  4. 链式调用清晰,调试方便

注意事项:

  1. 拦截器的添加顺序会影响最终的执行顺序
  2. 必须调用 chain.proceed() 来继续链条,除非你想中断请求
  3. 注意处理异常,确保资源正确释放
  4. 避免在拦截器中执行耗时操作,可能会影响整体性能

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×