using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Threading; using R3; namespace ObservableCollections; public readonly record struct CollectionAddEvent(int Index, T Value); public readonly record struct CollectionRemoveEvent(int Index, T Value); public readonly record struct CollectionReplaceEvent(int Index, T OldValue, T NewValue); public readonly record struct CollectionMoveEvent(int OldIndex, int NewIndex, T Value); public readonly record struct DictionaryAddEvent(TKey Key, TValue Value); public readonly record struct DictionaryRemoveEvent(TKey Key, TValue Value); public readonly record struct DictionaryReplaceEvent(TKey Key, TValue OldValue, TValue NewValue); public static class ObservableCollectionR3Extensions { public static Observable> ObserveAdd(this IObservableCollection source, CancellationToken cancellationToken = default) { return new ObservableCollectionAdd(source, cancellationToken); } public static Observable> ObserveRemove(this IObservableCollection source, CancellationToken cancellationToken = default) { return new ObservableCollectionRemove(source, cancellationToken); } public static Observable> ObserveReplace(this IObservableCollection source, CancellationToken cancellationToken = default) { return new ObservableCollectionReplace(source, cancellationToken); } public static Observable> ObserveMove(this IObservableCollection source, CancellationToken cancellationToken = default) { return new ObservableCollectionMove(source, cancellationToken); } public static Observable ObserveReset(this IObservableCollection source, CancellationToken cancellationToken = default) { return new ObservableCollectionReset(source, cancellationToken); } public static Observable ObserveCountChanged(this IObservableCollection source, bool notifyCurrentCount = false, CancellationToken cancellationToken = default) { return new ObservableCollectionCountChanged(source, notifyCurrentCount, cancellationToken); } } public static class ObservableDictionaryR3Extensions { public static Observable> ObserveDictionaryAdd(this IReadOnlyObservableDictionary source, CancellationToken cancellationToken = default) { return new ObservableDictionaryAdd(source, cancellationToken); } public static Observable> ObserveDictionaryRemove(this IReadOnlyObservableDictionary source, CancellationToken cancellationToken = default) { return new ObservableDictionaryRemove(source, cancellationToken); } public static Observable> ObserveDictionaryReplace(this IReadOnlyObservableDictionary source, CancellationToken cancellationToken = default) { return new ObservableDictionaryReplace(source, cancellationToken); } } sealed class ObservableCollectionAdd(IObservableCollection collection, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _ObservableCollectionAdd(collection, observer, cancellationToken); } sealed class _ObservableCollectionAdd( IObservableCollection collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Add) { if (eventArgs.IsSingleItem) { observer.OnNext(new CollectionAddEvent(eventArgs.NewStartingIndex, eventArgs.NewItem)); } else { var i = eventArgs.NewStartingIndex; foreach (var item in eventArgs.NewItems) { observer.OnNext(new CollectionAddEvent(i++, item)); } } } } } } sealed class ObservableCollectionRemove(IObservableCollection collection, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _ObservableCollectionRemove(collection, observer, cancellationToken); } sealed class _ObservableCollectionRemove( IObservableCollection collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Remove) { if (eventArgs.IsSingleItem) { observer.OnNext(new CollectionRemoveEvent(eventArgs.OldStartingIndex, eventArgs.OldItem)); } else { var i = eventArgs.OldStartingIndex; foreach (var item in eventArgs.OldItems) { observer.OnNext(new CollectionRemoveEvent(i++, item)); } } } } } } sealed class ObservableCollectionReplace(IObservableCollection collection, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _ObservableCollectionReplace(collection, observer, cancellationToken); } sealed class _ObservableCollectionReplace( IObservableCollection collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Replace) { observer.OnNext(new CollectionReplaceEvent(eventArgs.NewStartingIndex, eventArgs.OldItem, eventArgs.NewItem)); } } } } sealed class ObservableCollectionMove(IObservableCollection collection, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _ObservableCollectionMove(collection, observer, cancellationToken); } sealed class _ObservableCollectionMove( IObservableCollection collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Move) { observer.OnNext(new CollectionMoveEvent(eventArgs.OldStartingIndex, eventArgs.NewStartingIndex, eventArgs.NewItem)); } } } } sealed class ObservableCollectionReset(IObservableCollection collection, CancellationToken cancellationToken) : Observable { protected override IDisposable SubscribeCore(Observer observer) { return new _ObservableCollectionReset(collection, observer, cancellationToken); } sealed class _ObservableCollectionReset( IObservableCollection collection, Observer observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Reset) { observer.OnNext(Unit.Default); } } } } sealed class ObservableCollectionCountChanged(IObservableCollection collection, bool notifyCurrentCount, CancellationToken cancellationToken) : Observable { protected override IDisposable SubscribeCore(Observer observer) { return new _ObservableCollectionCountChanged(collection, notifyCurrentCount, observer, cancellationToken); } sealed class _ObservableCollectionCountChanged : ObservableCollectionObserverBase { int countPrev; public _ObservableCollectionCountChanged( IObservableCollection collection, bool notifyCurrentCount, Observer observer, CancellationToken cancellationToken) : base(collection, observer, cancellationToken) { this.countPrev = collection.Count; if (notifyCurrentCount) { observer.OnNext(collection.Count); } } protected override void Handler(in NotifyCollectionChangedEventArgs eventArgs) { switch (eventArgs.Action) { case NotifyCollectionChangedAction.Add: case NotifyCollectionChangedAction.Remove: case NotifyCollectionChangedAction.Reset when countPrev != collection.Count: observer.OnNext(collection.Count); break; } countPrev = collection.Count; } } } sealed class ObservableDictionaryAdd( IReadOnlyObservableDictionary dictionary, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _DictionaryCollectionAdd(dictionary, observer, cancellationToken); } sealed class _DictionaryCollectionAdd( IObservableCollection> collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase, DictionaryAddEvent>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs> eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Add) { if (eventArgs.IsSingleItem) { observer.OnNext( new DictionaryAddEvent(eventArgs.NewItem.Key, eventArgs.NewItem.Value)); } else { var i = eventArgs.NewStartingIndex; foreach (var item in eventArgs.NewItems) { observer.OnNext(new DictionaryAddEvent(item.Key, item.Value)); } } } } } } sealed class ObservableDictionaryRemove( IReadOnlyObservableDictionary dictionary, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _DictionaryCollectionRemove(dictionary, observer, cancellationToken); } sealed class _DictionaryCollectionRemove( IObservableCollection> collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase, DictionaryRemoveEvent>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs> eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Remove) { if (eventArgs.IsSingleItem) { observer.OnNext( new DictionaryRemoveEvent(eventArgs.OldItem.Key, eventArgs.OldItem.Value)); } else { var i = eventArgs.NewStartingIndex; foreach (var item in eventArgs.NewItems) { observer.OnNext(new DictionaryRemoveEvent(item.Key, item.Value)); } } } } } } sealed class ObservableDictionaryReplace( IReadOnlyObservableDictionary dictionary, CancellationToken cancellationToken) : Observable> { protected override IDisposable SubscribeCore(Observer> observer) { return new _DictionaryCollectionReplace(dictionary, observer, cancellationToken); } sealed class _DictionaryCollectionReplace( IObservableCollection> collection, Observer> observer, CancellationToken cancellationToken) : ObservableCollectionObserverBase, DictionaryReplaceEvent>(collection, observer, cancellationToken) { protected override void Handler(in NotifyCollectionChangedEventArgs> eventArgs) { if (eventArgs.Action == NotifyCollectionChangedAction.Replace) { observer.OnNext(new DictionaryReplaceEvent( eventArgs.NewItem.Key, eventArgs.OldItem.Value, eventArgs.NewItem.Value)); } } } } abstract class ObservableCollectionObserverBase : IDisposable { protected readonly IObservableCollection collection; protected readonly Observer observer; readonly CancellationTokenRegistration cancellationTokenRegistration; readonly NotifyCollectionChangedEventHandler handlerDelegate; public ObservableCollectionObserverBase(IObservableCollection collection, Observer observer, CancellationToken cancellationToken) { this.collection = collection; this.observer = observer; this.handlerDelegate = Handler; collection.CollectionChanged += handlerDelegate; if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state => { var s = (ObservableCollectionObserverBase)state!; s.observer.OnCompleted(); s.Dispose(); }, this); } } public void Dispose() { collection.CollectionChanged -= handlerDelegate; cancellationTokenRegistration.Dispose(); } protected abstract void Handler(in NotifyCollectionChangedEventArgs eventArgs); }