musikr.pipeline: parallelize cache reads
This commit is contained in:
parent
9f68f59504
commit
6bad9e719d
2 changed files with 31 additions and 15 deletions
|
@ -23,6 +23,8 @@ import kotlinx.coroutines.Dispatchers
|
|||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.flatMapMerge
|
||||
import kotlinx.coroutines.flow.flattenMerge
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.mapNotNull
|
||||
|
@ -51,7 +53,8 @@ internal interface ExtractStep {
|
|||
MetadataExtractor.from(context),
|
||||
TagParser.new(),
|
||||
storage.cache,
|
||||
storage.storedCovers)
|
||||
storage.storedCovers
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,10 +76,17 @@ private class ExtractStepImpl(
|
|||
val audioNodes = filterFlow.right
|
||||
val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) }
|
||||
|
||||
val distributedAudioNodes = audioNodes.distribute(8)
|
||||
val cacheResults =
|
||||
audioNodes
|
||||
.map { wrap(it) { file -> cache.read(file, storedCovers)} }
|
||||
.flowOn(Dispatchers.IO)
|
||||
distributedAudioNodes.flows
|
||||
.map { flow ->
|
||||
flow.map {
|
||||
wrap(it) { file -> cache.read(file, storedCovers) }
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
.buffer(Channel.UNLIMITED)
|
||||
}
|
||||
.flattenMerge()
|
||||
.buffer(Channel.UNLIMITED)
|
||||
val cacheFlow =
|
||||
cacheResults.divert {
|
||||
|
@ -104,13 +114,13 @@ private class ExtractStepImpl(
|
|||
|
||||
val metadata =
|
||||
fds.mapNotNull { fileWith ->
|
||||
wrap(fileWith.file) { _ ->
|
||||
metadataExtractor
|
||||
.extract(fileWith.with)
|
||||
?.let { FileWith(fileWith.file, it) }
|
||||
.also { withContext(Dispatchers.IO) { fileWith.with.close() } }
|
||||
}
|
||||
wrap(fileWith.file) { _ ->
|
||||
metadataExtractor
|
||||
.extract(fileWith.with)
|
||||
?.let { FileWith(fileWith.file, it) }
|
||||
.also { withContext(Dispatchers.IO) { fileWith.with.close() } }
|
||||
}
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
// Covers are pretty big, so cap the amount of parsed metadata in-memory to at most
|
||||
// 8 to minimize GCs.
|
||||
|
@ -135,7 +145,13 @@ private class ExtractStepImpl(
|
|||
.flowOn(Dispatchers.IO)
|
||||
|
||||
return merge(
|
||||
filterFlow.manager, cacheFlow.manager, cachedSongs, writtenSongs, playlistNodes)
|
||||
filterFlow.manager,
|
||||
distributedAudioNodes.manager,
|
||||
cacheFlow.manager,
|
||||
cachedSongs,
|
||||
writtenSongs,
|
||||
playlistNodes
|
||||
)
|
||||
}
|
||||
|
||||
private data class FileWith<T>(val file: DeviceFile, val with: T)
|
||||
|
|
|
@ -59,7 +59,7 @@ internal inline fun <T, L, R> Flow<T>.divert(
|
|||
return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow())
|
||||
}
|
||||
|
||||
internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Array<Flow<T>>)
|
||||
internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Flow<Flow<T>>)
|
||||
|
||||
/**
|
||||
* Equally "distributes" the values of some flow across n new flows.
|
||||
|
@ -68,7 +68,7 @@ internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Array<F
|
|||
* order to function. Without this, all of the newly split flows will simply block.
|
||||
*/
|
||||
internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
|
||||
val posChannels = Array(n) { Channel<T>(Channel.UNLIMITED) }
|
||||
val posChannels = List(n) { Channel<T>(Channel.UNLIMITED) }
|
||||
val managerFlow =
|
||||
flow<Nothing> {
|
||||
withIndex().collect {
|
||||
|
@ -79,6 +79,6 @@ internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
|
|||
channel.close()
|
||||
}
|
||||
}
|
||||
val hotFlows = posChannels.map { it.receiveAsFlow() }.toTypedArray()
|
||||
val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() }
|
||||
return DistributedFlow(managerFlow, hotFlows)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue