aves/lib/services/service_policy.dart
2021-02-19 18:55:15 +09:00

135 lines
3.9 KiB
Dart

import 'dart:async';
import 'dart:collection';
import 'package:flutter/foundation.dart';
import 'package:tuple/tuple.dart';
final ServicePolicy servicePolicy = ServicePolicy._private();
class ServicePolicy {
final StreamController<QueueState> _queueStreamController = StreamController<QueueState>.broadcast();
final Map<Object, Tuple2<int, _Task>> _paused = {};
final SplayTreeMap<int, LinkedHashMap<Object, _Task>> _queues = SplayTreeMap();
final LinkedHashMap<Object, _Task> _runningQueue = LinkedHashMap();
// magic number
static const concurrentTaskMax = 4;
Stream<QueueState> get queueStream => _queueStreamController.stream;
ServicePolicy._private();
Future<T> call<T>(
Future<T> Function() platformCall, {
int priority = ServiceCallPriority.normal,
Object key,
}) {
Completer<T> completer;
_Task task;
key ??= platformCall.hashCode;
final toResume = _paused.remove(key);
if (toResume != null) {
priority = toResume.item1;
task = toResume.item2;
completer = task.completer;
} else {
completer = Completer<T>();
task = _Task(
() async {
try {
completer.complete(await platformCall());
} catch (error, stack) {
completer.completeError(error, stack);
}
_runningQueue.remove(key);
_pickNext();
},
completer,
);
}
_getQueue(priority)[key] = task;
_pickNext();
return completer.future;
}
Future<T> resume<T>(Object key) {
final toResume = _paused.remove(key);
if (toResume != null) {
final priority = toResume.item1;
final task = toResume.item2;
_getQueue(priority)[key] = task;
_pickNext();
return task.completer.future;
} else {
return null;
}
}
LinkedHashMap<Object, _Task> _getQueue(int priority) => _queues.putIfAbsent(priority, () => LinkedHashMap());
void _pickNext() {
_notifyQueueState();
if (_runningQueue.length >= concurrentTaskMax) return;
final queue = _queues.entries.firstWhere((kv) => kv.value.isNotEmpty, orElse: () => null)?.value;
if (queue != null && queue.isNotEmpty) {
final key = queue.keys.first;
final task = queue.remove(key);
_runningQueue[key] = task;
task.callback();
}
}
bool _takeOut(Object key, Iterable<int> priorities, void Function(int priority, _Task task) action) {
var out = false;
priorities.forEach((priority) {
final task = _getQueue(priority).remove(key);
if (task != null) {
out = true;
action(priority, task);
}
});
return out;
}
bool cancel(Object key, Iterable<int> priorities) {
return _takeOut(key, priorities, (priority, task) => task.completer.completeError(CancelledException()));
}
bool pause(Object key, Iterable<int> priorities) {
return _takeOut(key, priorities, (priority, task) => _paused.putIfAbsent(key, () => Tuple2(priority, task)));
}
bool isPaused(Object key) => _paused.containsKey(key);
void _notifyQueueState() {
if (!_queueStreamController.hasListener) return;
final queueByPriority = Map.fromEntries(_queues.entries.map((kv) => MapEntry(kv.key, kv.value.length)));
_queueStreamController.add(QueueState(queueByPriority, _runningQueue.length, _paused.length));
}
}
class _Task {
final VoidCallback callback;
final Completer completer;
const _Task(this.callback, this.completer);
}
class CancelledException {}
class ServiceCallPriority {
static const int getFastThumbnail = 100;
static const int getRegion = 150;
static const int getSizedThumbnail = 200;
static const int normal = 500;
static const int getMetadata = 1000;
static const int getLocation = 1000;
}
class QueueState {
final Map<int, int> queueByPriority;
final int runningCount, pausedCount;
const QueueState(this.queueByPriority, this.runningCount, this.pausedCount);
}