music: re-add fallible async execution
Re-add the tryAsync wrapper function to the loading process to properly handle music loading errors. Turns out there is basically no other way to properly do this except for the insane exception -> Result -> exception hack.
This commit is contained in:
parent
aa24ea00ea
commit
c98ca8712f
4 changed files with 40 additions and 23 deletions
|
@ -25,15 +25,19 @@ import java.util.*
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import org.oxycblt.auxio.R
|
||||||
import org.oxycblt.auxio.music.cache.CacheRepository
|
import org.oxycblt.auxio.music.cache.CacheRepository
|
||||||
import org.oxycblt.auxio.music.device.DeviceLibrary
|
import org.oxycblt.auxio.music.device.DeviceLibrary
|
||||||
import org.oxycblt.auxio.music.device.RawSong
|
import org.oxycblt.auxio.music.device.RawSong
|
||||||
import org.oxycblt.auxio.music.fs.MediaStoreExtractor
|
import org.oxycblt.auxio.music.fs.MediaStoreExtractor
|
||||||
import org.oxycblt.auxio.music.metadata.TagExtractor
|
import org.oxycblt.auxio.music.metadata.TagExtractor
|
||||||
import org.oxycblt.auxio.music.user.UserLibrary
|
import org.oxycblt.auxio.music.user.UserLibrary
|
||||||
|
import org.oxycblt.auxio.util.fallible
|
||||||
import org.oxycblt.auxio.util.logD
|
import org.oxycblt.auxio.util.logD
|
||||||
import org.oxycblt.auxio.util.logE
|
import org.oxycblt.auxio.util.logE
|
||||||
import org.oxycblt.auxio.util.logW
|
import org.oxycblt.auxio.util.logW
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Primary manager of music information and loading.
|
* Primary manager of music information and loading.
|
||||||
|
@ -272,14 +276,14 @@ constructor(
|
||||||
|
|
||||||
// Do the initial query of the cache and media databases in parallel.
|
// Do the initial query of the cache and media databases in parallel.
|
||||||
logD("Starting queries")
|
logD("Starting queries")
|
||||||
val mediaStoreQueryJob = worker.scope.async { mediaStoreExtractor.query() }
|
val mediaStoreQueryJob = worker.scope.tryAsync { mediaStoreExtractor.query() }
|
||||||
val cache =
|
val cache =
|
||||||
if (withCache) {
|
if (withCache) {
|
||||||
cacheRepository.readCache()
|
cacheRepository.readCache()
|
||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
val query = mediaStoreQueryJob.await()
|
val query = mediaStoreQueryJob.await().getOrThrow()
|
||||||
|
|
||||||
// Now start processing the queried song information in parallel. Songs that can't be
|
// Now start processing the queried song information in parallel. Songs that can't be
|
||||||
// received from the cache are consisted incomplete and pushed to a separate channel
|
// received from the cache are consisted incomplete and pushed to a separate channel
|
||||||
|
@ -288,11 +292,15 @@ constructor(
|
||||||
val completeSongs = Channel<RawSong>(Channel.UNLIMITED)
|
val completeSongs = Channel<RawSong>(Channel.UNLIMITED)
|
||||||
val incompleteSongs = Channel<RawSong>(Channel.UNLIMITED)
|
val incompleteSongs = Channel<RawSong>(Channel.UNLIMITED)
|
||||||
val mediaStoreJob =
|
val mediaStoreJob =
|
||||||
worker.scope.async {
|
worker.scope.tryAsync {
|
||||||
mediaStoreExtractor.consume(query, cache, incompleteSongs, completeSongs)
|
mediaStoreExtractor.consume(query, cache, incompleteSongs, completeSongs) }.also {
|
||||||
|
incompleteSongs.close()
|
||||||
}
|
}
|
||||||
val metadataJob =
|
val metadataJob =
|
||||||
worker.scope.async { tagExtractor.consume(incompleteSongs, completeSongs) }
|
worker.scope.tryAsync {
|
||||||
|
tagExtractor.consume(incompleteSongs, completeSongs)
|
||||||
|
completeSongs.close()
|
||||||
|
}
|
||||||
|
|
||||||
// Await completed raw songs as they are processed.
|
// Await completed raw songs as they are processed.
|
||||||
val rawSongs = LinkedList<RawSong>()
|
val rawSongs = LinkedList<RawSong>()
|
||||||
|
@ -301,8 +309,8 @@ constructor(
|
||||||
emitLoading(IndexingProgress.Songs(rawSongs.size, query.projectedTotal))
|
emitLoading(IndexingProgress.Songs(rawSongs.size, query.projectedTotal))
|
||||||
}
|
}
|
||||||
// These should be no-ops
|
// These should be no-ops
|
||||||
mediaStoreJob.await()
|
mediaStoreJob.await().getOrThrow()
|
||||||
metadataJob.await()
|
metadataJob.await().getOrThrow()
|
||||||
|
|
||||||
if (rawSongs.isEmpty()) {
|
if (rawSongs.isEmpty()) {
|
||||||
logE("Music library was empty")
|
logE("Music library was empty")
|
||||||
|
@ -315,21 +323,33 @@ constructor(
|
||||||
emitLoading(IndexingProgress.Indeterminate)
|
emitLoading(IndexingProgress.Indeterminate)
|
||||||
val deviceLibraryChannel = Channel<DeviceLibrary>()
|
val deviceLibraryChannel = Channel<DeviceLibrary>()
|
||||||
val deviceLibraryJob =
|
val deviceLibraryJob =
|
||||||
worker.scope.async(Dispatchers.Main) {
|
worker.scope.tryAsync(Dispatchers.Main) {
|
||||||
deviceLibraryFactory.create(rawSongs).also { deviceLibraryChannel.send(it) }
|
deviceLibraryFactory.create(rawSongs).also { deviceLibraryChannel.send(it) }
|
||||||
}
|
}
|
||||||
val userLibraryJob = worker.scope.async { userLibraryFactory.read(deviceLibraryChannel) }
|
val userLibraryJob = worker.scope.tryAsync { userLibraryFactory.read(deviceLibraryChannel) }
|
||||||
if (cache == null || cache.invalidated) {
|
if (cache == null || cache.invalidated) {
|
||||||
cacheRepository.writeCache(rawSongs)
|
cacheRepository.writeCache(rawSongs)
|
||||||
}
|
}
|
||||||
val deviceLibrary = deviceLibraryJob.await()
|
val deviceLibrary = deviceLibraryJob.await().getOrThrow()
|
||||||
val userLibrary = userLibraryJob.await()
|
val userLibrary = userLibraryJob.await().getOrThrow()
|
||||||
withContext(Dispatchers.Main) {
|
withContext(Dispatchers.Main) {
|
||||||
emitComplete(null)
|
emitComplete(null)
|
||||||
emitData(deviceLibrary, userLibrary)
|
emitData(deviceLibrary, userLibrary)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private inline fun <R> CoroutineScope.tryAsync(
|
||||||
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
crossinline block: suspend () -> R
|
||||||
|
) =
|
||||||
|
async(context) {
|
||||||
|
try {
|
||||||
|
Result.success(block())
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Result.failure(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun emitLoading(progress: IndexingProgress) {
|
private suspend fun emitLoading(progress: IndexingProgress) {
|
||||||
yield()
|
yield()
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
|
|
|
@ -210,7 +210,6 @@ private abstract class BaseMediaStoreExtractor(
|
||||||
// Free the cursor and signal that no more incomplete songs will be produced by
|
// Free the cursor and signal that no more incomplete songs will be produced by
|
||||||
// this extractor.
|
// this extractor.
|
||||||
query.close()
|
query.close()
|
||||||
incompleteSongs.close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -87,8 +87,6 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (ongoingTasks)
|
} while (ongoingTasks)
|
||||||
|
|
||||||
completeSongs.close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private companion object {
|
private companion object {
|
||||||
|
|
|
@ -35,15 +35,15 @@ fun <T> unlikelyToBeNull(value: T?) =
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Require that the given data is a specific type [T].
|
* Maps a try expression to a [Result].
|
||||||
*
|
* @param block The code to execute
|
||||||
* @param data The data to check.
|
* @return A [Result] representing the outcome of [block]'s execution.
|
||||||
* @return A data casted to [T].
|
|
||||||
* @throws IllegalStateException If the data cannot be casted to [T].
|
|
||||||
*/
|
*/
|
||||||
inline fun <reified T> requireIs(data: Any?): T {
|
inline fun <reified R> fallible(block: () -> R) =
|
||||||
check(data is T) { "Unexpected datatype: ${data?.let { it::class.simpleName }}" }
|
try {
|
||||||
return data
|
Result.success(block())
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Result.failure(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue