musikr.pipeline: parallelize cache writes

This commit is contained in:
Alexander Capehart 2024-12-20 22:21:24 -05:00
parent 2842bd57b1
commit c4a4b69cd1
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47

View file

@ -73,9 +73,9 @@ private class ExtractStepImpl(
val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) }
val distributedAudioNodes = audioNodes.distribute(8)
val readDistributedFlow = audioNodes.distribute(8)
val cacheResults =
distributedAudioNodes.flows
readDistributedFlow.flows
.map { flow ->
flow
.map { wrap(it) { file -> cache.read(file, storedCovers) } }
@ -132,19 +132,26 @@ private class ExtractStepImpl(
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
val writeDistributedFlow = extractedSongs.distribute(8)
val writtenSongs =
extractedSongs
writeDistributedFlow.flows
.map { flow ->
flow
.map {
wrap(it, cache::write)
ExtractedMusic.Song(it)
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
}
.flattenMerge()
return merge(
filterFlow.manager,
distributedAudioNodes.manager,
readDistributedFlow.manager,
cacheFlow.manager,
cachedSongs,
writeDistributedFlow.manager,
writtenSongs,
playlistNodes)
}