Reactive Extensions pro .NET (Rx.NET)


Reactive Extensions (Rx) je knihovna pro tvoření asynchronních, na událostech založených aplikací, která využívá observable kolekce a LINQ dotazy.

Před použitím Rx v projektu je nutné nejprve tuto knihovnu stáhnout zde. Poté je potřeba přidat do projektu příslušné reference – většinou stačí přidat jen System.Reactive.

IObservable<T> a IObserver<T>

V Rx jsou asynchronní datové toky (z různých zdrojů – klasické události jako je například vstup od uživatele, tweety, změny na burze atd.) reprezentovány jako observable posloupnosti (tedy posloupnosti, které se dají sledovat). Tyto observable posloupnosti implementují rozhraní IObservable<T>. K těmto posloupnostem je možné se přihlásit a sledovat je – metoda Subscribe definovaná rozhraním IObservable<T>. Metoda Subscribe přebírá jako parametr objekt implementující rozhraní IObserver<T> a vrací objekt implementující rozhraní IDisposable, pomocí tohoto objektu je možné zrušit přihlášení (zavolání metody Dispose).

Vždy, když dojde ke změně zdroje, zdroj (IObservable<T>) odešle upozornění všem přihlášeným posluchačům (IObserver<T>), což zaručuje, že aplikace není blokována čekáním na aktualizaci zdroje. Zdroj také posílá upozornění vždy, když dojde k chybě nebo ve chvíli, kdy už nejsou k dispozici žádná data.

Jako příklad fungování může posloužit RSS čtečka – přihlásíme se k odběru z určitého zdroje (celý web, pouze určité téma atd.) a vždy, když vznikne nový článek, jsme o něm automaticky informováni a nemusíme se zdržovat tím, že půjdeme na web a budeme tam hledat nové články.

Rozhraní IObservable<T> je velmi podobné rozhraní IEnumerable<T>, protože také reprezentuje posloupnost dat, ale také udržuje seznam implementací IObserver<T>, kterým musí zasílat upozornění. IObservable<T> je možné chápat jako kolekci objektů typu T – například tedy IObservable<int> je možné chápat jako kolekci celých čísel, kde jsou celá čísla odesílána všem přihlášeným posluchačům.

Zjednodušení práce s IObservable<T> a IObserver<T>

Aby nebylo nutné vždy implementovat rozhraní IObserver<T>, je možné využít extension metodu Subscribe, které stačí předat delegáty pro jednotlivé události (OnNext, OnError, OnCompleted) a tyto delegáty poté budou volány. Když pro nějakou událost není nastaven delegát, použije se defaultní chování.

Příklad použití metody Subscribe bez vlastní nutnosti implementovat IObserver<T>:

IDisposable subscription = source.Subscribe
    (
        onNext: x => Console.WriteLine("OnNext: {0}", x),
        onError: ex => Console.WriteLine("OnError: {0}", ex.Message),
        onCompleted: () => Console.WriteLine("OnCompleted")
    );

Také je možné využít třídy Observable a Observer, které jsou součástí Rx. Tyto třídy umožňují využívat množství metod, což se týká hlavně třídy Observable, která umožňuje vytvořit různé zdroje – z existující události, kterou převede na IObservable<T> (metody FromEvent nebo FromEventPattern), ze zadaného rozsahu (Range) atd.

Subject

Další možností, jak si zjednodušit práci, je použití třídy Subject<T> jako zdroje. Tato třída implementuje rozhraní IObservable<T> a je tedy možné jí jednoduše použít jako zdroj. Implementuje také IObserver<T> a je možné jí použít jako posluchače.

Třída Subject<T> implementuje ale také rozhraní ISubject<T>, jehož jedinou funkcí je to, že vynucuje implementaci IObservable<T> a současně IObserver<T>.

Třída Subject<T> funguje tak, že všechny zprávy, které dostane (zavolání metody OnNext – ručně nebo ze zdroje, kterému naslouchá) přeposílá všem přihlášeným posluchačům.

Subjekty (třídy implementující ISubject<T>) se dají velmi zajímavě využit. Například se dají použít jako proxy, když subjekt přihlásíme k požadovanému zdroji jako posluchače a všechny posluchače připojíme na subjekt. Takto je možné implementovat nějaké další chování.

V Rx už je, kromě základního Subject<T>, několik dalších implementací rozhraní ISubject<T> – například ReplaySubject, BehaviorSubject a AsyncSubject. Například ReplaySubject uchovává všechny hodnoty, které již odeslal a tak je možné přistupovat i k historii odeslaných hodnot.

Hlavní rozdíly proti klasickým událostem

Zápis – vytvoření, přihlášení a vyvolání

Samozřejmě na první pohled je zřejmý rozdíl proti klasickým událostem v zápisu, který je přímočařejší.

Vytvoření klasické události:

public event EventHandler CustomMouseEvent;

Přihlášení ke klasické události:

CustomMouseEvent += (sender, eargs) => this.EventHandler(eargs);

Vyvolání klasické události a odeslání dat:

if (this.CustomMouseEvent != null)
    this.CustomMouseEvent(this, eargs);

A proti tomu událost pomocí Rx – nejprve vytvoření události:

public ISubject<MouseEventArgs> CustomMouseEvent = new Subject<MouseEventArgs>();

Poté opět ukázka přihlášení k události:

CustomMouseEvent.Subscribe(eargs => this.EventHandler(eargs));

A vyvolání události – odeslání dat:

this.CustomMouseEvent.OnNext(eargs);

Filtrování

Při práci s klasickými událostmi není jednoduše možné reagovat jen na požadované události – například nás zajímá jen kliknutí v určité oblasti. Vždy, když uživatel klikne, dojde k zavolání handleru události a teprve v něm řešíme, zda se jedná o kliknutí v požadované části.

Při použití Rx toto vůbec není problém. Aby kód byl co nejvíce přehledný, použiju podstatně jednodušší příklad – máme událost, která poskytuje celá čísla (IntegerEvent), ale do konzole chceme vypisovat jen sudá čísla:

IntegerEvent.Where(x => x % 2 == 0).Subscribe(x => Console.WriteLine(x));

Na tomto příkladu a na filtrování samotném je konečně vidět, že se dají využívat extension metody, které jsou známé z IEnumerable<T>. V tomto příkladu konkrétně metoda Where.

Projekce

Typickým příkladem projekce je například to, že po převedení klasické události kliknutí myši získáme IObservable<MouseEventArgs>, jenže samozřejmě lépe by vypadalo a lépe by se používalo IObservable<Point>, odkud bychom získávali pozice kliknutí. To se dá udělat opět snadno:

void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    var mouseDown = Observable.FromEventPattern(this, "MouseDown");
    this.NewMouseDown = mouseDown.Select(m => m.EventArgs.GetPosition(this));
}

public IObservable NewMouseDown;

Kombinování různých zdrojů

Jedná se například o to, že snadno můžeme zkombinovat události stisknutí tlačítka myši a pohybu myši a získat událost tažení myší. V tomto případě kód asi není potřeba, protože by ho bylo o něco více než v předchozích případech a navíc jde hlavně o přístup, který je i z předchozích příkladů zřejmý.

Důležité je to, že události pomocí Rx se chovají jako kolekce dat a dá se s nimi podobným způsobem pracovat.

Závěr

Myslím, že výhody Rx jsou jasně patrné a rozhodně stojí za to je vyzkoušet a dozvědět se o nich něco víc. Tento článek by měl sloužit jako úvod, protože některé oblasti byly popsány málo a některé důležité části nebyly popsány vůbec – např. plánovače. Takže pro další informace viz zdroje.

Zdroje

, , , ,

Komentáře jsou uzavřeny.