Revert "musikr: use channel-based pipeline"

This reverts commit 7c8863bd3a.
This commit is contained in:
Alexander Capehart 2025-01-30 09:30:38 -07:00
parent 7880c777ba
commit e78fde44e0
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
6 changed files with 365 additions and 283 deletions

View file

@ -19,15 +19,16 @@
package org.oxycblt.musikr
import android.content.Context
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import org.oxycblt.musikr.fs.MusicLocation
import org.oxycblt.musikr.pipeline.EvaluateStep
import org.oxycblt.musikr.pipeline.ExploreNode
import org.oxycblt.musikr.pipeline.ExploreStep
import org.oxycblt.musikr.pipeline.ExtractStep
import org.oxycblt.musikr.pipeline.ExtractedMusic
/**
* A highly opinionated, multi-threaded device music library.
@ -118,57 +119,21 @@ private class MusikrImpl(
locations: List<MusicLocation>,
onProgress: suspend (IndexingProgress) -> Unit
) = coroutineScope {
var explored = 0
var loaded = 0
val intermediateNodes = Channel<ExploreNode>(Channel.UNLIMITED)
val nodes = Channel<ExploreNode>(Channel.UNLIMITED)
val exploreTask = exploreStep.explore(locations, intermediateNodes)
val exploreMonitor = async {
try {
onProgress(IndexingProgress.Songs(loaded, explored))
for (node in intermediateNodes) {
explored++
onProgress(IndexingProgress.Songs(loaded, explored))
nodes.send(node)
}
nodes.close()
Result.success(Unit)
} catch (e: Exception) {
nodes.close(e)
Result.failure(e)
}
}
val intermediateExtracted = Channel<ExtractedMusic>(Channel.UNLIMITED)
val extracted = Channel<ExtractedMusic>(Channel.UNLIMITED)
val extractTask = extractStep.extract(nodes, intermediateExtracted)
val extractMonitor = async {
try {
onProgress(IndexingProgress.Songs(loaded, explored))
for (music in intermediateExtracted) {
loaded++
onProgress(IndexingProgress.Songs(loaded, explored))
extracted.send(music)
}
extracted.close()
Result.success(Unit)
} catch (e: Exception) {
extracted.close(e)
Result.failure(e)
}
}
val libraryTask = evaluateStep.evaluate(extracted)
exploreTask.await().getOrThrow()
exploreMonitor.await().getOrThrow()
extractTask.await().getOrThrow()
extractMonitor.await().getOrThrow()
val library = libraryTask.await().getOrThrow()
var exploredCount = 0
var extractedCount = 0
val explored =
exploreStep
.explore(locations)
.buffer(Channel.UNLIMITED)
.onStart { onProgress(IndexingProgress.Songs(0, 0)) }
.onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) }
val extracted =
extractStep
.extract(explored)
.buffer(Channel.UNLIMITED)
.onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) }
.onCompletion { onProgress(IndexingProgress.Indeterminate) }
val library = evaluateStep.evaluate(extracted)
LibraryResultImpl(storage, library)
}
}

View file

@ -18,10 +18,16 @@
package org.oxycblt.musikr.pipeline
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import org.oxycblt.musikr.Interpretation
import org.oxycblt.musikr.MutableLibrary
import org.oxycblt.musikr.Storage
@ -32,7 +38,7 @@ import org.oxycblt.musikr.playlist.interpret.PlaylistInterpreter
import org.oxycblt.musikr.tag.interpret.TagInterpreter
internal interface EvaluateStep {
suspend fun evaluate(extractedMusic: Channel<ExtractedMusic>): Deferred<Result<MutableLibrary>>
suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary
companion object {
fun new(storage: Storage, interpretation: Interpretation): EvaluateStep =
@ -50,30 +56,33 @@ private class EvaluateStepImpl(
private val storedPlaylists: StoredPlaylists,
private val libraryFactory: LibraryFactory
) : EvaluateStep {
override suspend fun evaluate(
extractedMusic: Channel<ExtractedMusic>
): Deferred<Result<MutableLibrary>> = coroutineScope {
async {
try {
override suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary {
val filterFlow =
extractedMusic.filterIsInstance<ExtractedMusic.Valid>().divert {
when (it) {
is ExtractedMusic.Valid.Song -> Divert.Right(it.song)
is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file)
}
}
val rawSongs = filterFlow.right
val preSongs =
rawSongs
.map { wrap(it, tagInterpreter::interpret) }
.flowOn(Dispatchers.Default)
.buffer(Channel.UNLIMITED)
val prePlaylists =
filterFlow.left
.map { wrap(it, playlistInterpreter::interpret) }
.flowOn(Dispatchers.Default)
.buffer(Channel.UNLIMITED)
val graphBuilder = MusicGraph.builder()
for (music in extractedMusic) {
when (music) {
is ExtractedMusic.Valid.Song ->
graphBuilder.add(tagInterpreter.interpret(music.song))
is ExtractedMusic.Valid.Playlist ->
graphBuilder.add(playlistInterpreter.interpret(music.file))
is ExtractedMusic.Invalid -> {}
}
}
val graphBuild =
merge(
filterFlow.manager,
preSongs.onEach { wrap(it, graphBuilder::add) },
prePlaylists.onEach { wrap(it, graphBuilder::add) })
graphBuild.collect()
val graph = graphBuilder.build()
val library = libraryFactory.create(graph, storedPlaylists, playlistInterpreter)
extractedMusic.close()
Result.success(library)
} catch (e: Exception) {
extractedMusic.close(e)
Result.failure(e)
}
}
return libraryFactory.create(graph, storedPlaylists, playlistInterpreter)
}
}

View file

@ -19,12 +19,16 @@
package org.oxycblt.musikr.pipeline
import android.content.Context
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.fs.MusicLocation
@ -34,10 +38,7 @@ import org.oxycblt.musikr.playlist.db.StoredPlaylists
import org.oxycblt.musikr.playlist.m3u.M3U
internal interface ExploreStep {
suspend fun explore(
locations: List<MusicLocation>,
explored: SendChannel<ExploreNode>
): Deferred<Result<Unit>>
fun explore(locations: List<MusicLocation>): Flow<ExploreNode>
companion object {
fun from(context: Context, storage: Storage): ExploreStep =
@ -49,14 +50,8 @@ private class ExploreStepImpl(
private val deviceFiles: DeviceFiles,
private val storedPlaylists: StoredPlaylists
) : ExploreStep {
override suspend fun explore(
locations: List<MusicLocation>,
explored: SendChannel<ExploreNode>
) = coroutineScope {
async {
try {
val audioTask = async {
try {
override fun explore(locations: List<MusicLocation>): Flow<ExploreNode> {
val audios =
deviceFiles
.explore(locations.asFlow())
.mapNotNull {
@ -66,32 +61,14 @@ private class ExploreStepImpl(
else -> null
}
}
.collect { explored.send(it) }
Result.success(Unit)
} catch (e: Exception) {
Result.failure(e)
}
}
val playlistTask = async {
try {
storedPlaylists.read().forEach { explored.send(ExploreNode.Playlist(it)) }
Result.success(Unit)
} catch (e: Exception) {
Result.failure(e)
}
}
audioTask.await().getOrThrow()
playlistTask.await().getOrThrow()
explored.close()
Result.success(Unit)
} catch (e: Exception) {
explored.close(e)
Result.failure(e)
}
}
.flowOn(Dispatchers.IO)
.buffer()
val playlists =
flow { emitAll(storedPlaylists.read().asFlow()) }
.map { ExploreNode.Playlist(it) }
.flowOn(Dispatchers.IO)
.buffer()
return merge(audios, playlists)
}
}

View file

@ -19,15 +19,17 @@
package org.oxycblt.musikr.pipeline
import android.content.Context
import android.os.ParcelFileDescriptor
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.withContext
import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.cache.Cache
@ -35,7 +37,6 @@ import org.oxycblt.musikr.cache.CacheResult
import org.oxycblt.musikr.cover.Cover
import org.oxycblt.musikr.cover.MutableCovers
import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.metadata.Metadata
import org.oxycblt.musikr.metadata.MetadataExtractor
import org.oxycblt.musikr.metadata.Properties
import org.oxycblt.musikr.playlist.PlaylistFile
@ -43,10 +44,7 @@ import org.oxycblt.musikr.tag.parse.ParsedTags
import org.oxycblt.musikr.tag.parse.TagParser
internal interface ExtractStep {
suspend fun extract(
explored: ReceiveChannel<ExploreNode>,
extracted: SendChannel<ExtractedMusic>
): Deferred<Result<Unit>>
fun extract(nodes: Flow<ExploreNode>): Flow<ExtractedMusic>
companion object {
fun from(context: Context, storage: Storage): ExtractStep =
@ -66,155 +64,118 @@ private class ExtractStepImpl(
private val cacheFactory: Cache.Factory,
private val storedCovers: MutableCovers
) : ExtractStep {
override suspend fun extract(
explored: ReceiveChannel<ExploreNode>,
extracted: SendChannel<ExtractedMusic>
) = coroutineScope {
async {
try {
@OptIn(ExperimentalCoroutinesApi::class)
override fun extract(nodes: Flow<ExploreNode>): Flow<ExtractedMusic> {
val cache = cacheFactory.open()
val addingMs = System.currentTimeMillis()
val filterFlow =
nodes.divert {
when (it) {
is ExploreNode.Audio -> Divert.Right(it.file)
is ExploreNode.Playlist -> Divert.Left(it.file)
}
}
val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) }
val read = Channel<DeviceFile>(Channel.UNLIMITED)
val open = Channel<DeviceFile>(Channel.UNLIMITED)
val extract = Channel<FileWith<ParcelFileDescriptor>>(Channel.UNLIMITED)
val readDistributedFlow = audioNodes.distribute(8)
val cacheResults =
readDistributedFlow.flows
.map { flow ->
flow
.map { wrap(it) { file -> cache.read(file, storedCovers) } }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
}
.flattenMerge()
.buffer(Channel.UNLIMITED)
val cacheFlow =
cacheResults.divert {
when (it) {
is CacheResult.Hit -> Divert.Left(it.song)
is CacheResult.Miss -> Divert.Right(it.file)
}
}
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) }
val uncachedSongs = cacheFlow.right
val fds =
uncachedSongs
.mapNotNull {
wrap(it) { file ->
withContext(Dispatchers.IO) {
context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd ->
FileWith(file, fd)
}
}
}
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
val metadata =
fds.mapNotNull { fileWith ->
wrap(fileWith.file) { _ ->
metadataExtractor
.extract(fileWith.file, fileWith.with)
.let { FileWith(fileWith.file, it) }
.also { withContext(Dispatchers.IO) { fileWith.with.close() } }
}
}
.flowOn(Dispatchers.IO)
// Covers are pretty big, so cap the amount of parsed metadata in-memory to at most
// 8 to minimize GCs.
val parse = Channel<FileWith<Metadata>>(8)
val write = Channel<RawSong>(Channel.UNLIMITED)
.buffer(8)
val exploreAssortTask = async {
try {
for (node in explored) {
when (node) {
is ExploreNode.Audio -> read.send(node.file)
is ExploreNode.Playlist ->
extracted.send(ExtractedMusic.Valid.Playlist(node.file))
}
}
read.close()
Result.success(Unit)
} catch (e: Exception) {
read.close(e)
Result.failure(e)
}
}
val readTasks =
List(8) {
async {
try {
for (file in read) {
when (val result = cache.read(file, storedCovers)) {
is CacheResult.Hit ->
extracted.send(ExtractedMusic.Valid.Song(result.song))
is CacheResult.Miss -> open.send(result.file)
}
}
Result.success(Unit)
} catch (e: Exception) {
Result.failure(e)
}
}
}
val readTask = async {
try {
readTasks.awaitAll().forEach { it.getOrThrow() }
open.close()
Result.success(Unit)
} catch (e: Exception) {
open.close(e)
Result.failure(e)
}
}
val openTask = async {
try {
for (file in open) {
withContext(Dispatchers.IO) {
val fd = context.contentResolver.openFileDescriptor(file.uri, "r")
if (fd != null) {
extract.send(FileWith(file, fd))
} else {
extracted.send(ExtractedMusic.Invalid)
}
}
}
extract.close()
Result.success(Unit)
} catch (e: Exception) {
extract.close(e)
Result.failure(e)
}
}
val extractTask = async {
try {
for (fileWith in extract) {
val metadata = metadataExtractor.extract(fileWith.file, fileWith.with)
if (metadata != null) {
parse.send(FileWith(fileWith.file, metadata))
} else {
extracted.send(ExtractedMusic.Invalid)
}
fileWith.with.close()
}
parse.close()
Result.success(Unit)
} catch (e: Exception) {
parse.close(e)
Result.failure(e)
}
}
val parseTask = async {
try {
for (fileWith in parse) {
val extractedSongs =
metadata
.map { fileWith ->
if (fileWith.with != null) {
val tags = tagParser.parse(fileWith.file, fileWith.with)
val cover = fileWith.with.cover?.let { storedCovers.write(it) }
write.send(
RawSong(
fileWith.file, fileWith.with.properties, tags, cover, addingMs))
RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs)
} else {
null
}
write.close()
Result.success(Unit)
} catch (e: Exception) {
write.close(e)
Result.failure(e)
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
val extractedFilter =
extractedSongs.divert {
if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid)
}
val writeTasks =
List(8) {
async {
try {
for (song in write) {
cache.write(song)
extracted.send(ExtractedMusic.Valid.Song(song))
}
Result.success(Unit)
} catch (e: Exception) {
Result.failure(e)
}
}
}
val write = extractedFilter.left
val invalid = extractedFilter.right
exploreAssortTask.await().getOrThrow()
readTask.await().getOrThrow()
openTask.await().getOrThrow()
extractTask.await().getOrThrow()
parseTask.await().getOrThrow()
writeTasks.awaitAll().forEach { it.getOrThrow() }
cache.finalize()
val writeDistributedFlow = write.distribute(8)
val writtenSongs =
writeDistributedFlow.flows
.map { flow ->
flow
.map {
wrap(it, cache::write)
ExtractedMusic.Valid.Song(it)
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
}
.flattenMerge()
extracted.close()
Result.success(Unit)
} catch (e: Exception) {
extracted.close(e)
Result.failure(e)
}
}
val merged =
merge(
filterFlow.manager,
readDistributedFlow.manager,
cacheFlow.manager,
cachedSongs,
extractedFilter.manager,
writeDistributedFlow.manager,
writtenSongs,
invalid,
playlistNodes)
return merged.onCompletion { cache.finalize() }
}
private data class FileWith<T>(val file: DeviceFile, val with: T)

View file

@ -0,0 +1,82 @@
/*
* Copyright (c) 2024 Auxio Project
* FlowUtil.kt is part of Auxio.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.oxycblt.musikr.pipeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.withIndex
internal sealed interface Divert<L, R> {
data class Left<L, R>(val value: L) : Divert<L, R>
data class Right<L, R>(val value: R) : Divert<L, R>
}
internal class DivertedFlow<L, R>(
val manager: Flow<Nothing>,
val left: Flow<L>,
val right: Flow<R>
)
internal inline fun <T, L, R> Flow<T>.divert(
crossinline predicate: (T) -> Divert<L, R>
): DivertedFlow<L, R> {
val leftChannel = Channel<L>(Channel.UNLIMITED)
val rightChannel = Channel<R>(Channel.UNLIMITED)
val managedFlow =
flow<Nothing> {
collect {
when (val result = predicate(it)) {
is Divert.Left -> leftChannel.send(result.value)
is Divert.Right -> rightChannel.send(result.value)
}
}
leftChannel.close()
rightChannel.close()
}
return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow())
}
internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Flow<Flow<T>>)
/**
* Equally "distributes" the values of some flow across n new flows.
*
* Note that this function requires the "manager" flow to be consumed alongside the split flows in
* order to function. Without this, all of the newly split flows will simply block.
*/
internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
val posChannels = List(n) { Channel<T>(Channel.UNLIMITED) }
val managerFlow =
flow<Nothing> {
withIndex().collect {
val index = it.index % n
posChannels[index].send(it.value)
}
for (channel in posChannels) {
channel.close()
}
}
val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() }
return DistributedFlow(managerFlow, hotFlows)
}

View file

@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 Auxio Project
* PipelineException.kt is part of Auxio.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.oxycblt.musikr.pipeline
import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.playlist.PlaylistFile
import org.oxycblt.musikr.playlist.interpret.PrePlaylist
import org.oxycblt.musikr.tag.interpret.PreSong
class PipelineException(val processing: WhileProcessing, val error: Exception) : Exception() {
override val cause = error
override val message = "Error while processing ${processing}: ${error.stackTraceToString()}"
}
sealed interface WhileProcessing {
class AFile internal constructor(private val file: DeviceFile) : WhileProcessing {
override fun toString() = "File @ ${file.path}"
}
class ARawSong internal constructor(private val rawSong: RawSong) : WhileProcessing {
override fun toString() = "Raw Song @ ${rawSong.file.path}"
}
class APlaylistFile internal constructor(private val playlist: PlaylistFile) : WhileProcessing {
override fun toString() = "Playlist File @ ${playlist.name}"
}
class APreSong internal constructor(private val preSong: PreSong) : WhileProcessing {
override fun toString() = "Pre Song @ ${preSong.path}"
}
class APrePlaylist internal constructor(private val prePlaylist: PrePlaylist) :
WhileProcessing {
override fun toString() = "Pre Playlist @ ${prePlaylist.name}"
}
}
internal suspend fun <R> wrap(file: DeviceFile, block: suspend (DeviceFile) -> R): R =
try {
block(file)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.AFile(file), e)
}
internal suspend fun <R> wrap(song: RawSong, block: suspend (RawSong) -> R): R =
try {
block(song)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.ARawSong(song), e)
}
internal suspend fun <R> wrap(file: PlaylistFile, block: suspend (PlaylistFile) -> R): R =
try {
block(file)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APlaylistFile(file), e)
}
internal suspend fun <R> wrap(song: PreSong, block: suspend (PreSong) -> R): R =
try {
block(song)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APreSong(song), e)
}
internal suspend fun <R> wrap(playlist: PrePlaylist, block: suspend (PrePlaylist) -> R): R =
try {
block(playlist)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APrePlaylist(playlist), e)
}