musikr.pipeline: shuffle songs to extract

This helps avoid the entire tag parsing flow from getting blocked up
by several tracks that are blocking trying to write a single cover.
This commit is contained in:
Alexander Capehart 2024-12-19 16:13:16 -05:00
parent a77dd3ff7a
commit 249d2fad67
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
2 changed files with 16 additions and 0 deletions

View file

@ -89,6 +89,7 @@ private class ExtractStepImpl(
val fds = val fds =
uncachedSongs uncachedSongs
.shuffle()
.mapNotNull { .mapNotNull {
wrap(it) { file -> wrap(it) { file ->
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {

View file

@ -20,9 +20,12 @@ package org.oxycblt.musikr.pipeline
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.withIndex import kotlinx.coroutines.flow.withIndex
internal sealed interface Divert<L, R> { internal sealed interface Divert<L, R> {
@ -79,3 +82,15 @@ internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
val hotFlows = posChannels.map { it.receiveAsFlow() }.toTypedArray() val hotFlows = posChannels.map { it.receiveAsFlow() }.toTypedArray()
return DistributedFlow(managerFlow, hotFlows) return DistributedFlow(managerFlow, hotFlows)
} }
internal fun <T> Flow<T>.shuffle() = flow {
// As far as I'm aware, the only way to get a truly normal distribution
// on a flow is by evaluating it. I tried a bunch of different strategies
// on lazily shuffling a flow and it simply doesn't produce a good enough
// distribution since you need to emit late stuff early and early stuff
// late. It's best to just eval and re-emit.
val output = mutableListOf<T>()
toList(output)
output.shuffle()
emitAll(output.asFlow())
}