musikr.pipeline: redo extract pipeline
Try to separate opening FDs, extracting metadata, parsing tags/writing covers, and cache writes. This makes it slower, but now I know the bottleneck is covers. Gotta figure out how to offload that work.
This commit is contained in:
parent
7e8764d6d4
commit
a77dd3ff7a
4 changed files with 56 additions and 39 deletions
|
@ -19,16 +19,10 @@
|
|||
package org.oxycblt.musikr.metadata
|
||||
|
||||
import android.content.Context
|
||||
import android.net.Uri
|
||||
import java.io.FileInputStream
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
internal class AndroidInputStream(context: Context, uri: Uri) : NativeInputStream {
|
||||
private val fd =
|
||||
requireNotNull(context.contentResolver.openFileDescriptor(uri, "r")) {
|
||||
"Failed to open file descriptor for $uri"
|
||||
}
|
||||
private val fis = FileInputStream(fd.fileDescriptor)
|
||||
internal class AndroidInputStream(context: Context, fis: FileInputStream) : NativeInputStream {
|
||||
private val channel = fis.channel
|
||||
|
||||
override fun readBlock(length: Long): ByteArray {
|
||||
|
@ -63,7 +57,5 @@ internal class AndroidInputStream(context: Context, uri: Uri) : NativeInputStrea
|
|||
|
||||
fun close() {
|
||||
channel.close()
|
||||
fis.close()
|
||||
fd.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,13 +19,13 @@
|
|||
package org.oxycblt.musikr.metadata
|
||||
|
||||
import android.content.Context
|
||||
import android.os.ParcelFileDescriptor
|
||||
import java.io.FileInputStream
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.oxycblt.musikr.fs.DeviceFile
|
||||
import org.oxycblt.musikr.util.unlikelyToBeNull
|
||||
|
||||
internal interface MetadataExtractor {
|
||||
suspend fun extract(file: DeviceFile): Metadata?
|
||||
suspend fun extract(fd: ParcelFileDescriptor): Metadata?
|
||||
|
||||
companion object {
|
||||
fun from(context: Context): MetadataExtractor = MetadataExtractorImpl(context)
|
||||
|
@ -33,8 +33,9 @@ internal interface MetadataExtractor {
|
|||
}
|
||||
|
||||
private class MetadataExtractorImpl(private val context: Context) : MetadataExtractor {
|
||||
override suspend fun extract(file: DeviceFile) =
|
||||
override suspend fun extract(fd: ParcelFileDescriptor) =
|
||||
withContext(Dispatchers.IO) {
|
||||
TagLibJNI.open(context, file.uri)
|
||||
val fis = FileInputStream(fd.fileDescriptor)
|
||||
TagLibJNI.open(context, fis).also { fis.close() }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.oxycblt.musikr.metadata
|
||||
|
||||
import android.content.Context
|
||||
import android.net.Uri
|
||||
import java.io.FileInputStream
|
||||
|
||||
internal object TagLibJNI {
|
||||
init {
|
||||
|
@ -31,8 +31,8 @@ internal object TagLibJNI {
|
|||
*
|
||||
* Note: This method is blocking and should be handled as such if calling from a coroutine.
|
||||
*/
|
||||
fun open(context: Context, uri: Uri): Metadata? {
|
||||
val inputStream = AndroidInputStream(context, uri)
|
||||
fun open(context: Context, fis: FileInputStream): Metadata? {
|
||||
val inputStream = AndroidInputStream(context, fis)
|
||||
val tag = openNative(inputStream)
|
||||
inputStream.close()
|
||||
return tag
|
||||
|
|
|
@ -27,6 +27,7 @@ import kotlinx.coroutines.flow.flowOn
|
|||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.mapNotNull
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.oxycblt.musikr.Storage
|
||||
import org.oxycblt.musikr.cache.Cache
|
||||
import org.oxycblt.musikr.cache.CacheResult
|
||||
|
@ -45,6 +46,7 @@ internal interface ExtractStep {
|
|||
companion object {
|
||||
fun from(context: Context, storage: Storage): ExtractStep =
|
||||
ExtractStepImpl(
|
||||
context,
|
||||
MetadataExtractor.from(context),
|
||||
TagParser.new(),
|
||||
storage.cache,
|
||||
|
@ -53,6 +55,7 @@ internal interface ExtractStep {
|
|||
}
|
||||
|
||||
private class ExtractStepImpl(
|
||||
private val context: Context,
|
||||
private val metadataExtractor: MetadataExtractor,
|
||||
private val tagParser: TagParser,
|
||||
private val cache: Cache,
|
||||
|
@ -83,37 +86,58 @@ private class ExtractStepImpl(
|
|||
}
|
||||
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Song(it) }
|
||||
val uncachedSongs = cacheFlow.right
|
||||
val distributedFlow = uncachedSongs.distribute(16)
|
||||
val extractedSongs =
|
||||
Array(distributedFlow.flows.size) { i ->
|
||||
distributedFlow.flows[i]
|
||||
.mapNotNull { it ->
|
||||
|
||||
val fds =
|
||||
uncachedSongs
|
||||
.mapNotNull {
|
||||
wrap(it) { file ->
|
||||
val metadata = metadataExtractor.extract(file) ?: return@wrap null
|
||||
val tags = tagParser.parse(file, metadata)
|
||||
val cover = metadata.cover?.let { storedCovers.write(it) }
|
||||
RawSong(file, metadata.properties, tags, cover)
|
||||
withContext(Dispatchers.IO) {
|
||||
context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd ->
|
||||
FileWith(file, fd)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
.buffer(Channel.UNLIMITED)
|
||||
|
||||
val metadata =
|
||||
fds.mapNotNull { fileWith ->
|
||||
wrap(fileWith.file) { _ ->
|
||||
metadataExtractor
|
||||
.extract(fileWith.with)
|
||||
?.let { FileWith(fileWith.file, it) }
|
||||
.also { withContext(Dispatchers.IO) { fileWith.with.close() } }
|
||||
}
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
// Covers are pretty big, so cap the amount of parsed metadata in-memory to at most
|
||||
// 8 to minimize GCs.
|
||||
.buffer(8)
|
||||
|
||||
val extractedSongs =
|
||||
metadata
|
||||
.mapNotNull { fileWith ->
|
||||
val tags = tagParser.parse(fileWith.file, fileWith.with)
|
||||
val cover = fileWith.with.cover?.let { storedCovers.write(it) }
|
||||
RawSong(fileWith.file, fileWith.with.properties, tags, cover)
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
.buffer(Channel.UNLIMITED)
|
||||
|
||||
val writtenSongs =
|
||||
merge(*extractedSongs)
|
||||
extractedSongs
|
||||
.map {
|
||||
wrap(it, cache::write)
|
||||
ExtractedMusic.Song(it)
|
||||
}
|
||||
.flowOn(Dispatchers.IO)
|
||||
.buffer(Channel.UNLIMITED)
|
||||
|
||||
return merge(
|
||||
filterFlow.manager,
|
||||
cacheFlow.manager,
|
||||
cachedSongs,
|
||||
distributedFlow.manager,
|
||||
writtenSongs,
|
||||
playlistNodes)
|
||||
filterFlow.manager, cacheFlow.manager, cachedSongs, writtenSongs, playlistNodes)
|
||||
}
|
||||
|
||||
private data class FileWith<T>(val file: DeviceFile, val with: T)
|
||||
}
|
||||
|
||||
data class RawSong(
|
||||
|
|
Loading…
Reference in a new issue