diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt index 12762e22a..d4c830f6a 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt @@ -21,12 +21,12 @@ package org.oxycblt.musikr.pipeline import android.content.Context import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map @@ -35,6 +35,7 @@ import org.oxycblt.musikr.Interpretation import org.oxycblt.musikr.Storage import org.oxycblt.musikr.cache.Cache import org.oxycblt.musikr.cache.CacheResult +import org.oxycblt.musikr.cache.CachedSong import org.oxycblt.musikr.covers.Cover import org.oxycblt.musikr.covers.CoverResult import org.oxycblt.musikr.covers.Covers @@ -71,38 +72,54 @@ private class ExploreStepImpl( locations.asFlow(), ) .filter { it.mimeType.startsWith("audio/") || it.mimeType == M3U.MIME_TYPE } - .distribute(8) - .distributedMap { file -> - val cachedSong = - when (val cacheResult = cache.read(file)) { - is CacheResult.Hit -> cacheResult.song - is CacheResult.Stale -> - return@distributedMap NewSong(cacheResult.file, cacheResult.addedMs) - is CacheResult.Miss -> - return@distributedMap NewSong(cacheResult.file, addingMs) - } - val cover = - cachedSong.coverId?.let { coverId -> - when (val coverResult = covers.obtain(coverId)) { - is CoverResult.Hit -> coverResult.cover - else -> - return@distributedMap NewSong( - cachedSong.file, cachedSong.addedMs) + .distributedMap(n = 8, on = Dispatchers.IO, buffer = Channel.UNLIMITED) { file -> + when (val cacheResult = cache.read(file)) { + is CacheResult.Hit -> NeedsCover(cacheResult.song) + is CacheResult.Stale -> + Finalized(NewSong(cacheResult.file, cacheResult.addedMs)) + is CacheResult.Miss -> Finalized(NewSong(cacheResult.file, addingMs)) + } + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + .distributedMap(n = 8, on = Dispatchers.IO, buffer = Channel.UNLIMITED) { + when (it) { + is Finalized -> it + is NeedsCover -> { + when (val coverResult = it.song.coverId?.let { covers.obtain(it) }) { + is CoverResult.Hit -> + Finalized( + RawSong( + it.song.file, + it.song.properties, + it.song.tags, + coverResult.cover, + it.song.addedMs)) + null -> + Finalized( + RawSong( + it.song.file, + it.song.properties, + it.song.tags, + null, + it.song.addedMs)) + else -> Finalized(NewSong(it.song.file, it.song.addedMs)) } } - RawSong( - cachedSong.file, - cachedSong.properties, - cachedSong.tags, - cover, - cachedSong.addedMs) + } } - .flattenMerge() + .map { it.explored } .flowOn(Dispatchers.IO) - .buffer(), + .buffer(Channel.UNLIMITED), flow { emitAll(storedPlaylists.read().asFlow()) } .map { RawPlaylist(it) } .flowOn(Dispatchers.IO) .buffer()) } + + private sealed interface InternalExploreItem + + private data class NeedsCover(val song: CachedSong) : InternalExploreItem + + private data class Finalized(val explored: Explored) : InternalExploreItem } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index c6d347577..d6a2cb214 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -19,9 +19,12 @@ package org.oxycblt.musikr.pipeline import android.content.Context -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flattenMerge +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onCompletion import org.oxycblt.musikr.Storage import org.oxycblt.musikr.cache.CachedSong @@ -29,6 +32,7 @@ import org.oxycblt.musikr.cache.MutableCache import org.oxycblt.musikr.covers.Cover import org.oxycblt.musikr.covers.CoverResult import org.oxycblt.musikr.covers.MutableCovers +import org.oxycblt.musikr.metadata.Metadata import org.oxycblt.musikr.metadata.MetadataExtractor import org.oxycblt.musikr.tag.parse.TagParser @@ -48,34 +52,71 @@ private class ExtractStepImpl( private val cache: MutableCache, private val covers: MutableCovers ) : ExtractStep { - @OptIn(ExperimentalCoroutinesApi::class) override fun extract(nodes: Flow): Flow { val exclude = mutableListOf() return nodes - .distribute(8) - .distributedMap { + // Cover art is huge, so we have to kneecap the concurrency here to avoid excessive + // GCs. We still reap the concurrency benefits here, just not as much as we could. + .distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.RENDEZVOUS) { when (it) { - is RawSong -> it - is RawPlaylist -> it + is RawSong -> Finalized(it) + is RawPlaylist -> Finalized(it) is NewSong -> { - val metadata = - metadataExtractor.extract(it.file) ?: return@distributedMap InvalidSong - val tags = tagParser.parse(metadata) - val cover = - when (val result = covers.create(it.file, metadata)) { - is CoverResult.Hit -> result.cover - else -> null - } - val cachedSong = - CachedSong(it.file, metadata.properties, tags, cover?.id, it.addedMs) - cache.write(cachedSong) - exclude.add(cachedSong) - val rawSong = RawSong(it.file, metadata.properties, tags, cover, it.addedMs) - rawSong + val metadata = metadataExtractor.extract(it.file) + if (metadata != null) NeedsParsing(it, metadata) else Finalized(InvalidSong) } } } - .flattenMerge() + .flowOn(Dispatchers.IO) + .buffer(Channel.RENDEZVOUS) + .distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.UNLIMITED) { + when (it) { + is Finalized -> it + is NeedsParsing -> { + val tags = tagParser.parse(it.metadata) + val cover = + when (val result = covers.create(it.song.file, it.metadata)) { + is CoverResult.Hit -> result.cover + else -> null + } + NeedsCaching( + RawSong( + it.song.file, it.metadata.properties, tags, cover, it.song.addedMs)) + } + } + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + .distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.UNLIMITED) { + when (it) { + is Finalized -> it + is NeedsCaching -> { + val cachedSong = + CachedSong( + it.song.file, + it.song.properties, + it.song.tags, + it.song.cover?.id, + it.song.addedMs) + cache.write(cachedSong) + exclude.add(cachedSong) + Finalized(it.song) + } + } + } + .map { it.extracted } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) .onCompletion { cache.cleanup(exclude) } } + + private sealed interface ParsedExtractItem + + private data class NeedsParsing(val song: NewSong, val metadata: Metadata) : ParsedExtractItem + + private sealed interface ParsedCachingItem + + private data class NeedsCaching(val song: RawSong) : ParsedCachingItem + + private data class Finalized(val extracted: Extracted) : ParsedExtractItem, ParsedCachingItem } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt index 29eef2735..4ca845246 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt @@ -18,10 +18,16 @@ package org.oxycblt.musikr.pipeline +import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.withIndex @@ -32,7 +38,13 @@ import kotlinx.coroutines.flow.withIndex * Note that this function requires the "manager" flow to be consumed alongside the split flows in * order to function. Without this, all of the newly split flows will simply block. */ -internal fun Flow.distribute(n: Int): Flow> { +@OptIn(ExperimentalCoroutinesApi::class) +internal fun Flow.distributedMap( + n: Int, + on: CoroutineContext = Dispatchers.Main, + buffer: Int = Channel.UNLIMITED, + block: suspend (T) -> R, +): Flow { val posChannels = List(n) { Channel(Channel.UNLIMITED) } val managerFlow = flow { @@ -44,14 +56,12 @@ internal fun Flow.distribute(n: Int): Flow> { channel.close() } } - return (posChannels.map { it.receiveAsFlow() } + managerFlow).asFlow() + return (posChannels.map { it.receiveAsFlow() } + managerFlow) + .asFlow() + .map { it.tryMap(block).flowOn(on).buffer(buffer) } + .flattenMerge() } -internal fun Flow>.distributedMap(transform: suspend (T) -> R): Flow> = - flow { - collect { innerFlow -> emit(innerFlow.tryMap(transform)) } - } - internal fun Flow.tryMap(transform: suspend (T) -> R): Flow = flow { collect { value -> try {