Update error handling in controllers

This commit is contained in:
Ming Ming 2024-10-31 23:50:54 +08:00
parent 17916a8434
commit d24e595d8f
14 changed files with 176 additions and 131 deletions

View file

@ -16,7 +16,6 @@ import 'package:nc_photos/entity/collection_item/new_item.dart';
import 'package:nc_photos/entity/file_descriptor.dart';
import 'package:nc_photos/entity/file_util.dart' as file_util;
import 'package:nc_photos/exception_event.dart';
import 'package:nc_photos/object_extension.dart';
import 'package:nc_photos/rx_extension.dart';
import 'package:nc_photos/use_case/collection/add_file_to_collection.dart';
import 'package:nc_photos/use_case/collection/list_collection_item.dart';
@ -25,6 +24,7 @@ import 'package:nc_photos/use_case/collection/update_collection_post_load.dart';
import 'package:nc_photos/use_case/remove.dart';
import 'package:np_codegen/np_codegen.dart';
import 'package:np_collection/np_collection.dart';
import 'package:np_common/object_util.dart';
import 'package:rxdart/rxdart.dart';
part 'collection_items_controller.g.dart';
@ -87,6 +87,8 @@ class CollectionItemsController {
return _dataStreamController.stream;
}
Stream<ExceptionEvent> get errorStream => _dataErrorStreamController.stream;
/// Peek the stream and return the current value
CollectionItemStreamData peekStream() => _dataStreamController.stream.value;
@ -142,8 +144,7 @@ class CollectionItemsController {
);
if (isInited) {
error
?.run((e) => _dataStreamController.addError(e.error, e.stackTrace));
error?.also(_dataErrorStreamController.add);
var finalize = _dataStreamController.value.items.toList();
if (failed.isNotEmpty) {
// remove failed items
@ -216,8 +217,7 @@ class CollectionItemsController {
);
if (isInited) {
error
?.run((e) => _dataStreamController.addError(e.error, e.stackTrace));
error?.also(_dataErrorStreamController.add);
if (failed.isNotEmpty) {
_dataStreamController.addWithValue((value) => value.copyWith(
items: [...value.items, ...failed],
@ -276,8 +276,7 @@ class CollectionItemsController {
);
if (isInited) {
error
?.run((e) => _dataStreamController.addError(e.error, e.stackTrace));
error?.also(_dataErrorStreamController.add);
if (failed.isNotEmpty) {
_dataStreamController.addWithValue((value) => value.copyWith(
items: [...value.items, ...failed],
@ -347,9 +346,8 @@ class CollectionItemsController {
}
}
} catch (e, stackTrace) {
_dataStreamController
..addError(e, stackTrace)
..addWithValue((v) => v.copyWith(hasNext: false));
_dataErrorStreamController.add(ExceptionEvent(e, stackTrace));
_dataStreamController.addWithValue((v) => v.copyWith(hasNext: false));
}
}
@ -401,6 +399,8 @@ class CollectionItemsController {
hasNext: true,
),
);
final _dataErrorStreamController =
StreamController<ExceptionEvent>.broadcast();
late final BehaviorSubject<int?> _countStreamController;
final _mutex = Mutex();

View file

@ -18,6 +18,7 @@ import 'package:nc_photos/entity/collection_item/util.dart';
import 'package:nc_photos/entity/file_descriptor.dart';
import 'package:nc_photos/entity/sharee.dart';
import 'package:nc_photos/exception.dart';
import 'package:nc_photos/exception_event.dart';
import 'package:nc_photos/rx_extension.dart';
import 'package:nc_photos/use_case/collection/create_collection.dart';
import 'package:nc_photos/use_case/collection/edit_collection.dart';
@ -98,6 +99,8 @@ class CollectionsController {
/// Peek the stream and return the current value
CollectionStreamEvent peekStream() => _dataStreamController.stream.value;
Stream<ExceptionEvent> get errorStream => _dataErrorStreamController.stream;
/// Reload the data
///
/// The data will be loaded automatically when the stream is first listened so
@ -109,7 +112,7 @@ class CollectionsController {
(c) {
results = c;
},
onError: _dataStreamController.addError,
onError: _dataErrorStreamController.add,
onDone: () => completer.complete(),
);
await completer.future;
@ -225,7 +228,7 @@ class CollectionsController {
});
_updateCollection(c, newItems?.whereNotNull().toList());
} catch (e, stackTrace) {
_dataStreamController.addError(e, stackTrace);
_dataErrorStreamController.add(ExceptionEvent(e, stackTrace));
}
}
@ -246,11 +249,11 @@ class CollectionsController {
_updateCollection(newCollection!);
}
if (result == CollectionShareResult.partial) {
_dataStreamController
.addError(CollectionPartialShareException(sharee.shareWith.raw));
_dataErrorStreamController.add(ExceptionEvent(
CollectionPartialShareException(sharee.shareWith.raw)));
}
} catch (e, stackTrace) {
_dataStreamController.addError(e, stackTrace);
_dataErrorStreamController.add(ExceptionEvent(e, stackTrace));
}
}
@ -271,11 +274,11 @@ class CollectionsController {
_updateCollection(newCollection!);
}
if (result == CollectionShareResult.partial) {
_dataStreamController
.addError(CollectionPartialUnshareException(share.username));
_dataErrorStreamController.add(
ExceptionEvent(CollectionPartialUnshareException(share.username)));
}
} catch (e, stackTrace) {
_dataStreamController.addError(e, stackTrace);
_dataErrorStreamController.add(ExceptionEvent(e, stackTrace));
}
}
@ -293,7 +296,7 @@ class CollectionsController {
));
return newCollection;
} catch (e, stackTrace) {
_dataStreamController.addError(e, stackTrace);
_dataErrorStreamController.add(ExceptionEvent(e, stackTrace));
return null;
}
}
@ -312,7 +315,7 @@ class CollectionsController {
);
_dataStreamController.add(lastData);
},
onError: _dataStreamController.addError,
onError: _dataErrorStreamController.add,
onDone: () => completer.complete(),
);
await completer.future;
@ -376,6 +379,8 @@ class CollectionsController {
hasNext: true,
),
);
final _dataErrorStreamController =
StreamController<ExceptionEvent>.broadcast();
final _itemControllers = <_CollectionKey, CollectionItemsController>{};

View file

@ -6,6 +6,7 @@ import 'package:nc_photos/account.dart';
import 'package:nc_photos/controller/account_pref_controller.dart';
import 'package:nc_photos/di_container.dart';
import 'package:nc_photos/entity/person.dart';
import 'package:nc_photos/exception_event.dart';
import 'package:nc_photos/use_case/person/list_person.dart';
import 'package:np_codegen/np_codegen.dart';
import 'package:rxdart/rxdart.dart';
@ -58,6 +59,8 @@ class PersonsController {
return _personStreamContorller.stream;
}
Stream<ExceptionEvent> get errorStream => _personErrorStreamController.stream;
Future<void> reload() async {
if (_isPersonStreamInited) {
return _load();
@ -80,7 +83,7 @@ class PersonsController {
);
_personStreamContorller.add(lastData);
},
onError: _personStreamContorller.addError,
onError: _personErrorStreamController.add,
onDone: () => completer.complete(),
);
await completer.future;
@ -97,4 +100,6 @@ class PersonsController {
final _personStreamContorller = BehaviorSubject.seeded(
const PersonStreamEvent(data: [], hasNext: true),
);
final _personErrorStreamController =
StreamController<ExceptionEvent>.broadcast();
}

View file

@ -4,6 +4,7 @@ import 'package:copy_with/copy_with.dart';
import 'package:logging/logging.dart';
import 'package:nc_photos/account.dart';
import 'package:nc_photos/di_container.dart';
import 'package:nc_photos/exception_event.dart';
import 'package:nc_photos/use_case/list_location_group.dart';
import 'package:np_codegen/np_codegen.dart';
import 'package:rxdart/rxdart.dart';
@ -47,6 +48,8 @@ class PlacesController {
return _placeStreamContorller.stream;
}
Stream<ExceptionEvent> get errorStream => _placeErrorStreamController.stream;
Future<void> reload() async {
if (_isPlaceStreamInited) {
return _load();
@ -67,7 +70,7 @@ class PlacesController {
);
_placeStreamContorller.add(lastData);
},
onError: _placeStreamContorller.addError,
onError: _placeErrorStreamController.add,
onDone: () => completer.complete(),
);
await completer.future;
@ -84,4 +87,6 @@ class PlacesController {
hasNext: true,
),
);
final _placeErrorStreamController =
StreamController<ExceptionEvent>.broadcast();
}

View file

@ -10,6 +10,7 @@ import 'package:nc_photos/entity/file.dart';
import 'package:nc_photos/entity/file_descriptor.dart';
import 'package:nc_photos/entity/share.dart';
import 'package:nc_photos/event/event.dart';
import 'package:nc_photos/exception_event.dart';
import 'package:nc_photos/remote_storage_util.dart' as remote_storage_util;
import 'package:nc_photos/rx_extension.dart';
import 'package:nc_photos/use_case/list_share_with_me.dart';
@ -90,6 +91,9 @@ class SharingsController {
return _sharingStreamContorller.stream;
}
Stream<ExceptionEvent> get errorStream =>
_sharingErrorStreamController.stream;
/// In the future we need to get rid of the listeners and this reload function
/// and move all manipulations to this controller
Future<void> reload() async {
@ -114,7 +118,7 @@ class SharingsController {
_sharingStreamContorller.add(lastData);
}
},
onError: _sharingStreamContorller.addError,
onError: _sharingErrorStreamController.add,
onDone: () => completer.complete(),
);
await completer.future;
@ -187,6 +191,8 @@ class SharingsController {
final _sharingStreamContorller = BehaviorSubject.seeded(
const SharingStreamEvent(data: [], hasNext: true),
);
final _sharingErrorStreamController =
StreamController<ExceptionEvent>.broadcast();
AppEventListener<ShareRemovedEvent>? _shareRemovedListener;
AppEventListener<FileMovedEvent>? _fileMovedEventListener;

View file

@ -73,7 +73,7 @@ class _Bloc extends Bloc<_Event, _State>
filesController.errorStream,
onData: (data) => state.copyWith(
isLoading: _itemTransformerQueue.isProcessing,
error: ExceptionEvent(data.error, data.stackTrace),
error: data,
),
),
]);

View file

@ -131,13 +131,14 @@ class _Bloc extends Bloc<_Event, _State>
rawItems: data.items,
isLoading: data.hasNext,
),
onError: (e, stackTrace) {
_log.severe("[_onLoad] Uncaught exception", e, stackTrace);
return state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
),
forEach(
emit,
itemsController.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: data,
),
),
// do we need this as we already filter files in filesController?
forEach(

View file

@ -30,23 +30,26 @@ class _Bloc extends Bloc<_Event, _State>
super.onError(error, stackTrace);
}
Future<void> _onLoad(_LoadCollections ev, Emitter<_State> emit) async {
Future<void> _onLoad(_LoadCollections ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
controller.stream,
onData: (data) => state.copyWith(
collections: data.data.map((e) => e.collection).toList(),
isLoading: data.hasNext,
return Future.wait([
forEach(
emit,
controller.stream,
onData: (data) => state.copyWith(
collections: data.data.map((e) => e.collection).toList(),
isLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoad] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
controller.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
void _onTransformItems(_TransformItems ev, Emitter<_State> emit) {

View file

@ -75,23 +75,26 @@ class _Bloc extends Bloc<_Event, _State>
super.onError(error, stackTrace);
}
Future<void> _onLoad(_LoadCollections ev, Emitter<_State> emit) async {
Future<void> _onLoad(_LoadCollections ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
controller.stream,
onData: (data) => state.copyWith(
collections: data.data.map((d) => d.collection).toList(),
isLoading: data.hasNext,
return Future.wait([
forEach(
emit,
controller.stream,
onData: (data) => state.copyWith(
collections: data.data.map((d) => d.collection).toList(),
isLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoad] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
controller.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
void _onReload(_ReloadCollections ev, Emitter<_State> emit) {

View file

@ -17,21 +17,24 @@ class _Bloc extends Bloc<_Event, _State>
Future<void> _onLoad(_LoadPersons ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
personsController.stream,
onData: (data) => state.copyWith(
persons: data.data,
isLoading: data.hasNext,
return Future.wait([
forEach(
emit,
personsController.stream,
onData: (data) => state.copyWith(
persons: data.data,
isLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoad] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
personsController.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
void _onReload(_Reload ev, Emitter<_State> emit) {

View file

@ -17,21 +17,24 @@ class _Bloc extends Bloc<_Event, _State>
Future<void> _onLoad(_LoadPlaces ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
placesController.stream,
onData: (data) => state.copyWith(
places: data.data,
isLoading: data.hasNext,
return Future.wait([
forEach(
emit,
placesController.stream,
onData: (data) => state.copyWith(
places: data.data,
isLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoad] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
placesController.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
void _onReload(_Reload ev, Emitter<_State> emit) {

View file

@ -19,21 +19,24 @@ class _Bloc extends Bloc<_Event, _State>
Future<void> _onLoadPersons(_LoadPersons ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
personsController.stream,
onData: (data) => state.copyWith(
persons: data.data,
isPersonsLoading: data.hasNext,
return Future.wait([
forEach(
emit,
personsController.stream,
onData: (data) => state.copyWith(
persons: data.data,
isPersonsLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoadPersons] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
personsController.errorStream,
onData: (data) => state.copyWith(
isPersonsLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
Future<void> _onTransformPersonItems(
@ -56,21 +59,24 @@ class _Bloc extends Bloc<_Event, _State>
Future<void> _onLoadPlaces(_LoadPlaces ev, Emitter<_State> emit) {
_log.info(ev);
return forEach(
emit,
placesController.stream,
onData: (data) => state.copyWith(
places: data.data,
isPlacesLoading: data.hasNext,
return Future.wait([
forEach(
emit,
placesController.stream,
onData: (data) => state.copyWith(
places: data.data,
isPlacesLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onLoadPlaces] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
placesController.errorStream,
onData: (data) => state.copyWith(
isPlacesLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
Future<void> _onTransformPlaceItems(

View file

@ -27,7 +27,7 @@ class _Bloc extends Bloc<_Event, _State> with BlocLogger {
on<_SetError>(_onSetError);
_collectionControllerSubscription = collectionsController.stream.listen(
_subscriptions.add(collectionsController.stream.listen(
(event) {
final c = event.data
.firstWhere((d) => state.collection.compareIdentity(d.collection));
@ -35,10 +35,10 @@ class _Bloc extends Bloc<_Event, _State> with BlocLogger {
add(_UpdateCollection(c.collection));
}
},
onError: (e, stackTrace) {
add(_SetError(e, stackTrace));
},
);
));
_subscriptions.add(collectionsController.errorStream.listen((ev) {
add(_SetError(ev.error, ev.stackTrace));
}));
}
@override
@ -46,7 +46,9 @@ class _Bloc extends Bloc<_Event, _State> with BlocLogger {
@override
Future<void> close() {
_collectionControllerSubscription?.cancel();
for (final s in _subscriptions) {
s.cancel();
}
return super.close();
}
@ -159,6 +161,6 @@ class _Bloc extends Bloc<_Event, _State> with BlocLogger {
final Account account;
final CollectionsController collectionsController;
StreamSubscription? _collectionControllerSubscription;
final _subscriptions = <StreamSubscription>[];
var _isHandlingError = false;
}

View file

@ -21,21 +21,24 @@ class _Bloc extends Bloc<_Event, _State> with BlocForEachMixin<_Event, _State> {
"[_onInit] Failed while _importPotentialSharedAlbum", e, stackTrace);
}
unawaited(sharingsController.reload());
return forEach(
emit,
sharingsController.stream,
onData: (data) => state.copyWith(
items: data.data,
isLoading: data.hasNext,
await Future.wait([
forEach(
emit,
sharingsController.stream,
onData: (data) => state.copyWith(
items: data.data,
isLoading: data.hasNext,
),
),
onError: (e, stackTrace) {
_log.severe("[_onInit] Uncaught exception", e, stackTrace);
return state.copyWith(
forEach(
emit,
sharingsController.errorStream,
onData: (data) => state.copyWith(
isLoading: false,
error: ExceptionEvent(e, stackTrace),
);
},
);
error: data,
),
),
]);
}
Future<void> _onTransformItems(