commit caccd8a3c5637d3daf69229e459b19e4a407ffde Author: Sander Hautvast Date: Tue Oct 11 20:24:38 2016 +0200 initial commit diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..6c68559 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml new file mode 100644 index 0000000..e7bedf3 --- /dev/null +++ b/.idea/copyright/profiles_settings.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/.idea/libraries/Maven__io_reactivex_rxjava2_rxjava_2_0_0_RC4.xml b/.idea/libraries/Maven__io_reactivex_rxjava2_rxjava_2_0_0_RC4.xml new file mode 100644 index 0000000..97edab2 --- /dev/null +++ b/.idea/libraries/Maven__io_reactivex_rxjava2_rxjava_2_0_0_RC4.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_0.xml b/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_0.xml new file mode 100644 index 0000000..afef071 --- /dev/null +++ b/.idea/libraries/Maven__org_reactivestreams_reactive_streams_1_0_0.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..dfca8c4 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + 1.8 + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..2c79013 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..cc0e388 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,1643 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + DEFINITION_ORDER + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + project + + + + + + + + + + + + + + + + project + + + true + + + + DIRECTORY + + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1474223457710 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/Observable.class + 2163 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/internal/operators/observable/ObservableError.class + 17 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/Observable.class + 2183 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/Observable.class + 2193 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/internal/operators/observable/ObservableError.class + 23 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/internal/disposables/EmptyDisposable.class + 38 + + + + jar://$MAVEN_REPOSITORY$/io/reactivex/rxjava2/rxjava/2.0.0-RC2/rxjava-2.0.0-RC2.jar!/io/reactivex/plugins/RxJavaPlugins.class + 147 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/JavaRx2.iml b/JavaRx2.iml new file mode 100644 index 0000000..fff71da --- /dev/null +++ b/JavaRx2.iml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..952c792 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# rxjava2examples diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..572e74b --- /dev/null +++ b/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + nl.kadaster.exp + exp + 0.1-SNAPSHOT + jar + + + + io.reactivex.rxjava2 + rxjava + 2.0.0-RC4 + + + org.reactivestreams + reactive-streams + 1.0.0 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.5.1 + + 1.8 + 1.8 + + + + + diff --git a/rx.iml b/rx.iml new file mode 100644 index 0000000..a390dd4 --- /dev/null +++ b/rx.iml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/rx/combine/Zip.java b/src/main/java/rx/combine/Zip.java new file mode 100644 index 0000000..56dd247 --- /dev/null +++ b/src/main/java/rx/combine/Zip.java @@ -0,0 +1,15 @@ +package rx.combine; + +import io.reactivex.Observable; + +public class Zip { + + public static void main(String[] args) { + Observable strings = Observable.fromArray(new String[]{"a", "b", "c"}); + Observable integers = Observable.fromArray(new Integer[]{1, 2, 3}); + + strings.zipWith(integers, (string, integer) -> { + return string + integer; + }).subscribe(value -> System.out.println(value)); + } +} diff --git a/src/main/java/rx/conditional/All.java b/src/main/java/rx/conditional/All.java new file mode 100644 index 0000000..8c775fc --- /dev/null +++ b/src/main/java/rx/conditional/All.java @@ -0,0 +1,16 @@ +package rx.conditional; + +import io.reactivex.Observable; + +import static java.lang.System.out; + +public class All { + + public static void main(String[] args) { + + Observable.range(1, 5).all(v -> { + return v % 2 == 0; + }).subscribe(b -> out.println(b)); + + } +} diff --git a/src/main/java/rx/conditional/Amb.java b/src/main/java/rx/conditional/Amb.java new file mode 100644 index 0000000..cdb2402 --- /dev/null +++ b/src/main/java/rx/conditional/Amb.java @@ -0,0 +1,22 @@ +package rx.conditional; + +import io.reactivex.Observable; + +import static io.reactivex.Observable.amb; +import static io.reactivex.Observable.timer; +import static java.lang.System.out; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Amb { + + public static void main(String[] args) throws InterruptedException { + Observable timer1 = timer(1, SECONDS).map(t->{return 1;}); + Observable timer2 = timer(2, SECONDS).map(t->{return 2;}); + Observable timer3 = timer(3, SECONDS).map(t->{return 3;}); + + amb(asList(timer1,timer2,timer3)).subscribe(v-> out.println(v)); + + SECONDS.sleep(5); + } +} diff --git a/src/main/java/rx/conditional/Contains.java b/src/main/java/rx/conditional/Contains.java new file mode 100644 index 0000000..fc3ced2 --- /dev/null +++ b/src/main/java/rx/conditional/Contains.java @@ -0,0 +1,14 @@ +package rx.conditional; + +import io.reactivex.Observable; + +import static java.lang.System.out; + +public class Contains { + + public static void main(String[] args) { + + Observable.range(1, 5).contains(1).subscribe(b -> out.println(b)); + + } +} diff --git a/src/main/java/rx/conditional/SkipUntil.java b/src/main/java/rx/conditional/SkipUntil.java new file mode 100644 index 0000000..8290d99 --- /dev/null +++ b/src/main/java/rx/conditional/SkipUntil.java @@ -0,0 +1,21 @@ +package rx.conditional; + +import io.reactivex.Observable; + +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class SkipUntil { + + public static void main(String[] args) throws InterruptedException { + out.println("start"); + + Observable range = Observable.intervalRange(0, 10, 0, 1, SECONDS); + Observable timer = Observable.timer(5, SECONDS); + +// range.subscribe(v -> out.println(v)); + range.skipUntil(timer).subscribe(v -> out.println(v)); + + SECONDS.sleep(10); + } +} diff --git a/src/main/java/rx/create/Create.java b/src/main/java/rx/create/Create.java new file mode 100644 index 0000000..9edbd25 --- /dev/null +++ b/src/main/java/rx/create/Create.java @@ -0,0 +1,22 @@ +package rx.create; + +import io.reactivex.Observable; + +import java.util.Arrays; +import java.util.stream.Stream; + +/* + * create an Observable from any + */ +public class Create { + public static void main(String[] args) { + Observable observable = Observable.create(emitter -> { + emitter.onNext(System.nanoTime()); + emitter.onNext(System.nanoTime()); + }); + observable.subscribe(l -> System.out.println(l)); + + System.out.println("java 8:"); + Stream.of(new Long[]{System.nanoTime(), System.nanoTime()}).forEach(t -> System.out.println(t)); + } +} diff --git a/src/main/java/rx/create/Defer.java b/src/main/java/rx/create/Defer.java new file mode 100644 index 0000000..70b8014 --- /dev/null +++ b/src/main/java/rx/create/Defer.java @@ -0,0 +1,28 @@ +package rx.create; + +import io.reactivex.Observable; + +/* + * defers Observable creation until a subscriber is found + */ +public class Defer { + private static String[] animals; + + public static void main(String[] args) { + Observable deferredAnimals = Observable.defer(() -> { + return Observable.fromArray(animals); //Observable is created when subscribe is called + }); + + animals = new String[]{"bear", "lion", "dog", "parrot"}; + + deferredAnimals.subscribe(animal -> System.out.println(animal)); + } + + public static void wontWork(){ + Observable deferredAnimals = Observable.fromArray(animals); //Observable is created when animals is uninitialized + + animals = new String[]{"bear", "lion", "dog", "parrot"}; + + deferredAnimals.subscribe(animal -> System.out.println(animal)); + } +} diff --git a/src/main/java/rx/create/From.java b/src/main/java/rx/create/From.java new file mode 100644 index 0000000..5b782d1 --- /dev/null +++ b/src/main/java/rx/create/From.java @@ -0,0 +1,17 @@ +package rx.create; + + +import io.reactivex.Observable; +import java.util.Arrays; + +public class From { + + public static void main(String[] args) { + String[] animals = {"bear", "lion", "dog", "parrot"}; + + Observable.fromArray(animals).subscribe(animal -> System.out.println(animal)); + + System.out.println("java 8:"); + Arrays.asList(animals).stream().forEach(animal -> System.out.println(animal)); + } +} diff --git a/src/main/java/rx/create/Interval.java b/src/main/java/rx/create/Interval.java new file mode 100644 index 0000000..21dae4d --- /dev/null +++ b/src/main/java/rx/create/Interval.java @@ -0,0 +1,14 @@ +package rx.create; + +import io.reactivex.Observable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Interval { + public static void main(String[] args) throws InterruptedException { + Observable.interval(0, 100, MILLISECONDS).subscribe(value -> System.out.println(System.currentTimeMillis() + ": " + value)); + + SECONDS.sleep(1); + } +} diff --git a/src/main/java/rx/create/Just.java b/src/main/java/rx/create/Just.java new file mode 100644 index 0000000..70c3278 --- /dev/null +++ b/src/main/java/rx/create/Just.java @@ -0,0 +1,21 @@ +package rx.create; + +import io.reactivex.Observable; + +import java.util.Arrays; +import java.util.stream.Stream; + +/* + * creates an Observable from any + */ +public class Just { + + public static void main(String[] args) { + Observable.just("hello").subscribe(v -> System.out.println(v)); + + System.out.println(); + + System.out.println("java 8:"); + Stream.of(Arrays.asList("hello", " ", "world")).forEach(t -> System.out.println(t)); + } +} diff --git a/src/main/java/rx/create/Never.java b/src/main/java/rx/create/Never.java new file mode 100644 index 0000000..b67c78a --- /dev/null +++ b/src/main/java/rx/create/Never.java @@ -0,0 +1,12 @@ +package rx.create; + +import io.reactivex.Observable; + +/* + * Does nothing + */ +public class Never { + public static void main(String[] args) { + Observable.never().subscribe(n -> System.out.println(n)); + } +} diff --git a/src/main/java/rx/create/Range.java b/src/main/java/rx/create/Range.java new file mode 100644 index 0000000..17458b9 --- /dev/null +++ b/src/main/java/rx/create/Range.java @@ -0,0 +1,24 @@ +package rx.create; + + +import io.reactivex.Observable; + +import java.util.stream.IntStream; + +/* + * creates a range + */ +public class Range { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 10).subscribe(v -> System.out.println(v)); + + System.out.println("\njava 8:"); + IntStream.rangeClosed(1, 10).forEach(v -> System.out.println(v)); + + System.out.println("\nfactorial(5) using reduce:"); + System.out.println(IntStream.rangeClosed(1, 5).reduce(1, (x, f) -> { + return x * f; + })); + + } +} diff --git a/src/main/java/rx/create/Throw.java b/src/main/java/rx/create/Throw.java new file mode 100644 index 0000000..bdc8b12 --- /dev/null +++ b/src/main/java/rx/create/Throw.java @@ -0,0 +1,19 @@ +package rx.create; + +import io.reactivex.Observable; + +/* + * throws an exception... + */ +public class Throw { + + public static void main(String[] args) { + try { + Observable.error(new Exception("something went wrong")).subscribe(e -> System.out.println("oops: " + e)); + } catch (Throwable e) { + System.out.println("hold your horses, we're catching an exception:"); + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/rx/create/ThrowExplained.java b/src/main/java/rx/create/ThrowExplained.java new file mode 100644 index 0000000..72b2080 --- /dev/null +++ b/src/main/java/rx/create/ThrowExplained.java @@ -0,0 +1,43 @@ +package rx.create; + +import io.reactivex.Observable; + +/* + * throws an exception... + * + * seems simple enough, but wait, where's the catch? + */ +public class ThrowExplained { + + public static void main(String[] args) { + error(); + try { + the_trick(); + } catch (Throwable t) { + System.out.println("can I catch it?"); + } + } + + public static void error() { + try { + Observable.error(new Exception("something went wrong")).subscribe(e -> System.out.println("oops: " + e)); + } catch (Throwable e) { + System.out.println("hold your horses, we're catching an exception:"); + e.printStackTrace(); + } + } + + public static void the_trick() { + try { + Thread currentThread = Thread.currentThread(); + + Thread.UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler(); + handler.uncaughtException(currentThread, new Exception("looks like something went wrong here too")); + } catch (Throwable e) { + System.out.println("trying to catch this exception"); + + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/rx/create/Timer.java b/src/main/java/rx/create/Timer.java new file mode 100644 index 0000000..fe528de --- /dev/null +++ b/src/main/java/rx/create/Timer.java @@ -0,0 +1,19 @@ +package rx.create; + +import io.reactivex.Observable; + +import static java.lang.System.currentTimeMillis; +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class Timer { + public static void main(String[] args) throws InterruptedException { + out.println(currentTimeMillis()); + + Observable.timer(1000, MILLISECONDS).map(t -> { + return "peekaboo"; + }).subscribe(v -> out.println(currentTimeMillis()+" "+v)); + + MILLISECONDS.sleep(1200); + } +} diff --git a/src/main/java/rx/filter/Debounce.java b/src/main/java/rx/filter/Debounce.java new file mode 100644 index 0000000..5176b5f --- /dev/null +++ b/src/main/java/rx/filter/Debounce.java @@ -0,0 +1,38 @@ +package rx.filter; + +import io.reactivex.Observable; +import java.util.Random; + +import static io.reactivex.Observable.create; +import static java.lang.System.currentTimeMillis; +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class Debounce { + public static void main(String[] args) throws InterruptedException { + Observable keys = create(observer -> { + Random random = new Random(); + + for (int i = 0; i < 50; i++) { + long t = currentTimeMillis(); + int w = random.nextInt(500); + + out.println("time " + t + ", wait " + w); + + observer.onNext(t); + + sleep(w); + } + }); + + keys.debounce(400, MILLISECONDS).subscribe(key -> out.println("event " + key)); + } + + public static void sleep(int w) { + try { + Thread.sleep(w); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/rx/filter/Debounce2.java b/src/main/java/rx/filter/Debounce2.java new file mode 100644 index 0000000..55cd292 --- /dev/null +++ b/src/main/java/rx/filter/Debounce2.java @@ -0,0 +1,28 @@ +package rx.filter; + +import io.reactivex.Observable; +import java.util.Random; + +import static java.lang.System.currentTimeMillis; +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static rx.filter.Debounce.sleep; + +public class Debounce2 { + public static void main(String[] args) { + Random random = new Random(); + + Observable range = Observable.range(1, 50); + + Observable times = range.map(i -> { + long t = currentTimeMillis(); + int w = random.nextInt(500); + + out.println("time " + t + ", wait " + w); + sleep(w); + return t; + }); + + times.debounce(400, MILLISECONDS).subscribe(key -> out.println("event " + key)); + } +} \ No newline at end of file diff --git a/src/main/java/rx/filter/Debounce3.java b/src/main/java/rx/filter/Debounce3.java new file mode 100644 index 0000000..d0be93a --- /dev/null +++ b/src/main/java/rx/filter/Debounce3.java @@ -0,0 +1,29 @@ +package rx.filter; + +import io.reactivex.Observable; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static java.lang.System.currentTimeMillis; +import static java.lang.System.out; +import static rx.filter.Debounce.sleep; + +public class Debounce3 { + public static void main(String[] args) { + Random random = new Random(); + + Observable waittimes = Observable.range(1, 50).map(r -> { + return random.nextInt(500); + }); + + Observable timestamps = waittimes.map(waittime -> { + long t = currentTimeMillis(); + + out.println("timestamp " + t + ", wait " + waittime); + sleep(waittime); + return t; + }); + + timestamps.debounce(400, TimeUnit.MILLISECONDS).subscribe(key -> out.println("event " + key)); + } +} diff --git a/src/main/java/rx/filter/Debounce4.java b/src/main/java/rx/filter/Debounce4.java new file mode 100644 index 0000000..64591f0 --- /dev/null +++ b/src/main/java/rx/filter/Debounce4.java @@ -0,0 +1,31 @@ +package rx.filter; + +import io.reactivex.Observable; + +import java.util.Random; + +import static io.reactivex.Observable.range; +import static java.lang.System.currentTimeMillis; +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static rx.filter.Debounce.sleep; + +public class Debounce4 { + public static void main(String[] args) { + Random random = new Random(); + + range(1, 50) + .map(r -> { + return random.nextInt(500); + }) + .map(waittime -> { + long t = currentTimeMillis(); + + out.println("timestamp " + t + ", wait " + waittime); + sleep(waittime); + return t; + }) + .debounce(400, MILLISECONDS) + .subscribe(key -> out.println("event " + key)); + } +} diff --git a/src/main/java/rx/filter/Distinct.java b/src/main/java/rx/filter/Distinct.java new file mode 100644 index 0000000..2db43ce --- /dev/null +++ b/src/main/java/rx/filter/Distinct.java @@ -0,0 +1,20 @@ +package rx.filter; + +import java.util.Random; + +import static io.reactivex.Observable.interval; +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Distinct { + public static void main(String[] args) throws InterruptedException { + Random random = new Random(); + + interval(0, 100, MILLISECONDS).map(t -> { + return random.nextInt(20); + }).distinct().subscribe(w -> out.println(w)); + + SECONDS.sleep(10); + } +} diff --git a/src/main/java/rx/filter/Take.java b/src/main/java/rx/filter/Take.java new file mode 100644 index 0000000..6211040 --- /dev/null +++ b/src/main/java/rx/filter/Take.java @@ -0,0 +1,17 @@ +package rx.filter; + +import io.reactivex.Observable; + +import static io.reactivex.Observable.range; +import static java.lang.System.out; + +/* + * Emits only the first x elements of an observable. + */ +public class Take { + + public static void main(String[] args) { + range(1, 10).take(2).subscribe(w -> out.println(w)); + } + +} diff --git a/src/main/java/rx/transform/Buffer.java b/src/main/java/rx/transform/Buffer.java new file mode 100644 index 0000000..80004e1 --- /dev/null +++ b/src/main/java/rx/transform/Buffer.java @@ -0,0 +1,44 @@ +package rx.transform; + +import io.reactivex.Observable; +import java.awt.*; +import java.awt.event.KeyAdapter; +import java.awt.event.KeyEvent; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; +import javax.swing.*; + +/* + * Buffers until a criterium is met and the emits all the buffered values in one emission. + */ +public class Buffer { + public static void main(String[] args) { + Frame frame = getFrame(); + JTextField textfield = new JTextField(); + frame.add(textfield); + frame.pack(); + frame.setVisible(true); + + final Observable keys = Observable.create(observableEmitter -> { + textfield.addKeyListener(new KeyAdapter() { + @Override + public void keyTyped(KeyEvent keyEvent) { + observableEmitter.onNext(new String(new char[]{keyEvent.getKeyChar()})); + } + }); + }); + + keys.buffer(3).subscribe(key -> System.out.println("event " + key)); + } + + private static Frame getFrame() { + Frame f = new JFrame(""); + f.addWindowListener(new WindowAdapter() { + @Override + public void windowClosing(WindowEvent e) { + System.exit(0); + } + }); + return f; + } +} diff --git a/src/main/java/rx/transform/FlatMap.java b/src/main/java/rx/transform/FlatMap.java new file mode 100644 index 0000000..0181449 --- /dev/null +++ b/src/main/java/rx/transform/FlatMap.java @@ -0,0 +1,19 @@ +package rx.transform; + +import io.reactivex.Observable; + + +/* + * flattens an observable of observables + */ +public class FlatMap { + + public static void main(String[] args) { + Observable observable = Observable.range(1, 4).flatMap(r -> { + return Observable.range(1, r).map(i -> { + return "" + i; + }); + }); + observable.toList().subscribe(s -> System.out.println(s)); + } +} diff --git a/src/main/java/rx/transform/GroupBy.java b/src/main/java/rx/transform/GroupBy.java new file mode 100644 index 0000000..b282ddc --- /dev/null +++ b/src/main/java/rx/transform/GroupBy.java @@ -0,0 +1,22 @@ +package rx.transform; + + +import io.reactivex.Observable; + +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.SECONDS; + +/* + * splits an observable in new observables + */ +public class GroupBy { + + public static void main(String[] args) throws InterruptedException { + + Observable.range(0, 30) + .groupBy(v -> v % 3) + .subscribe(v -> + v.toList().subscribe(w -> out.println(w)) + ); + } +} diff --git a/src/main/java/rx/transform/Map.java b/src/main/java/rx/transform/Map.java new file mode 100644 index 0000000..ca146ec --- /dev/null +++ b/src/main/java/rx/transform/Map.java @@ -0,0 +1,14 @@ +package rx.transform; + +import io.reactivex.Observable; + +/* + * Applies a mapping function to an observable returning a new Observable + */ +public class Map { + public static void main(String[] args) { + Observable.range(1, 10).map(value -> { + return value + " is " + (value % 2 == 0 ? "even" : "uneven"); + }).subscribe(output -> System.out.println(output)); + } +} diff --git a/src/main/java/rx/transform/Scan.java b/src/main/java/rx/transform/Scan.java new file mode 100644 index 0000000..b7bd673 --- /dev/null +++ b/src/main/java/rx/transform/Scan.java @@ -0,0 +1,15 @@ +package rx.transform; + +import io.reactivex.Observable; + +/* + * Applies a mapping function 'with state', ie the previous output. + * The output reenters together with the new emission from the original observable. + */ +public class Scan { + public static void main(String[] args) { + Observable.range(1, 10).scan((sum, item) -> { + return sum + item; + }).subscribe(value -> System.out.println(value)); + } +} diff --git a/src/main/java/rx/transform/Window1.java b/src/main/java/rx/transform/Window1.java new file mode 100644 index 0000000..f4dff7a --- /dev/null +++ b/src/main/java/rx/transform/Window1.java @@ -0,0 +1,17 @@ +package rx.transform; + +import io.reactivex.Observable; + +/* + * Splits an observable in multiple, using some criterium + */ +public class Window1 { + static int counter = 1; + + public static void main(String[] args) { + Observable.range(0, 100).window(10).forEach(window -> { + System.out.println("\n" + counter++); + window.forEach(value -> System.out.print(value + " ")); + }); + } +} diff --git a/src/main/java/rx/transform/Window2.java b/src/main/java/rx/transform/Window2.java new file mode 100644 index 0000000..39c77e3 --- /dev/null +++ b/src/main/java/rx/transform/Window2.java @@ -0,0 +1,21 @@ +package rx.transform; + +import io.reactivex.Observable; + +import java.util.concurrent.TimeUnit; + +/* + * Emits a new window based on elapsed time + */ +public class Window2 { + public static void main(String[] args) throws InterruptedException { + Observable.interval(0, 500, TimeUnit.MILLISECONDS) + .window(5, TimeUnit.SECONDS) + .subscribe(observable -> { + System.out.println("new window"); + observable.subscribe(value -> System.out.println(value)); + }); + + TimeUnit.SECONDS.sleep(60); + } +} diff --git a/src/main/java/rx/transform/Window3.java b/src/main/java/rx/transform/Window3.java new file mode 100644 index 0000000..3dc73e7 --- /dev/null +++ b/src/main/java/rx/transform/Window3.java @@ -0,0 +1,58 @@ +package rx.transform; + +import io.reactivex.Observable; + +import javax.swing.*; +import java.awt.event.KeyAdapter; +import java.awt.event.KeyEvent; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Buffers observed values until a criterium is met and the emits the buffered values as a new observable. + */ +public class Window3 { + static AtomicBoolean newWindow = new AtomicBoolean(false); + + public static void main(String[] args) { + JFrame frame = new JFrame("app"); + frame.addWindowListener(new WindowAdapter() { + @Override + public void windowClosing(WindowEvent e) { + System.exit(0); + } + }); + JTextField textField = new JTextField(); + + textField.addKeyListener(new KeyAdapter() { + @Override + public void keyTyped(KeyEvent e) { + newWindow.set(true); + } + }); + + frame.getContentPane().add(textField); + frame.pack(); + frame.setVisible(true); + + Observable.interval(1, TimeUnit.SECONDS).window(() -> { + return Observable.fromCallable(new Callable>() { + @Override + public Observable call() throws Exception { + return Observable.create(emitter -> { + if (newWindow.get()) { + newWindow.set(false); + emitter.onNext(""); + } + }); + } + }); + }).subscribe(observable -> { + System.out.println("interrupt"); + observable.subscribe(value -> System.out.println(value)); + }); + } +} diff --git a/src/main/java/rx/utility/Delay.java b/src/main/java/rx/utility/Delay.java new file mode 100644 index 0000000..487bf4f --- /dev/null +++ b/src/main/java/rx/utility/Delay.java @@ -0,0 +1,18 @@ +package rx.utility; + +import io.reactivex.Observable; + +import static java.lang.System.out; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Delay { + public static void main(String[] args) throws InterruptedException { + out.println("start"); + + Observable rangeFrom1To5 = Observable.range(1,5); + + rangeFrom1To5.delay(3, SECONDS).subscribe(v -> out.println("receive "+v)); + + SECONDS.sleep(5); + } +} diff --git a/target/classes/lambda/NullCheck$NotNullExpression.class b/target/classes/lambda/NullCheck$NotNullExpression.class new file mode 100644 index 0000000..0279cea Binary files /dev/null and b/target/classes/lambda/NullCheck$NotNullExpression.class differ diff --git a/target/classes/lambda/NullCheck$NullExpression.class b/target/classes/lambda/NullCheck$NullExpression.class new file mode 100644 index 0000000..6d1a2b7 Binary files /dev/null and b/target/classes/lambda/NullCheck$NullExpression.class differ diff --git a/target/classes/lambda/NullCheck.class b/target/classes/lambda/NullCheck.class new file mode 100644 index 0000000..a243619 Binary files /dev/null and b/target/classes/lambda/NullCheck.class differ diff --git a/target/classes/rx/combine/Zip.class b/target/classes/rx/combine/Zip.class new file mode 100644 index 0000000..7adfb88 Binary files /dev/null and b/target/classes/rx/combine/Zip.class differ diff --git a/target/classes/rx/conditional/All.class b/target/classes/rx/conditional/All.class new file mode 100644 index 0000000..46495de Binary files /dev/null and b/target/classes/rx/conditional/All.class differ diff --git a/target/classes/rx/conditional/Amb.class b/target/classes/rx/conditional/Amb.class new file mode 100644 index 0000000..a91d59d Binary files /dev/null and b/target/classes/rx/conditional/Amb.class differ diff --git a/target/classes/rx/conditional/Contains.class b/target/classes/rx/conditional/Contains.class new file mode 100644 index 0000000..221fef2 Binary files /dev/null and b/target/classes/rx/conditional/Contains.class differ diff --git a/target/classes/rx/conditional/SkipUntil.class b/target/classes/rx/conditional/SkipUntil.class new file mode 100644 index 0000000..8e85e23 Binary files /dev/null and b/target/classes/rx/conditional/SkipUntil.class differ diff --git a/target/classes/rx/create/Create.class b/target/classes/rx/create/Create.class new file mode 100644 index 0000000..82dd43d Binary files /dev/null and b/target/classes/rx/create/Create.class differ diff --git a/target/classes/rx/create/Defer.class b/target/classes/rx/create/Defer.class new file mode 100644 index 0000000..000a4e0 Binary files /dev/null and b/target/classes/rx/create/Defer.class differ diff --git a/target/classes/rx/create/From.class b/target/classes/rx/create/From.class new file mode 100644 index 0000000..2389eb7 Binary files /dev/null and b/target/classes/rx/create/From.class differ diff --git a/target/classes/rx/create/Interval.class b/target/classes/rx/create/Interval.class new file mode 100644 index 0000000..862e5bf Binary files /dev/null and b/target/classes/rx/create/Interval.class differ diff --git a/target/classes/rx/create/Just.class b/target/classes/rx/create/Just.class new file mode 100644 index 0000000..a1b4ab6 Binary files /dev/null and b/target/classes/rx/create/Just.class differ diff --git a/target/classes/rx/create/Never.class b/target/classes/rx/create/Never.class new file mode 100644 index 0000000..f322a4d Binary files /dev/null and b/target/classes/rx/create/Never.class differ diff --git a/target/classes/rx/create/Range.class b/target/classes/rx/create/Range.class new file mode 100644 index 0000000..c42125c Binary files /dev/null and b/target/classes/rx/create/Range.class differ diff --git a/target/classes/rx/create/Throw.class b/target/classes/rx/create/Throw.class new file mode 100644 index 0000000..01cc3ee Binary files /dev/null and b/target/classes/rx/create/Throw.class differ diff --git a/target/classes/rx/create/ThrowExplained.class b/target/classes/rx/create/ThrowExplained.class new file mode 100644 index 0000000..924bb05 Binary files /dev/null and b/target/classes/rx/create/ThrowExplained.class differ diff --git a/target/classes/rx/create/Timer.class b/target/classes/rx/create/Timer.class new file mode 100644 index 0000000..e1c0236 Binary files /dev/null and b/target/classes/rx/create/Timer.class differ diff --git a/target/classes/rx/filter/Debounce.class b/target/classes/rx/filter/Debounce.class new file mode 100644 index 0000000..57ddfd9 Binary files /dev/null and b/target/classes/rx/filter/Debounce.class differ diff --git a/target/classes/rx/filter/Debounce2.class b/target/classes/rx/filter/Debounce2.class new file mode 100644 index 0000000..761eb00 Binary files /dev/null and b/target/classes/rx/filter/Debounce2.class differ diff --git a/target/classes/rx/filter/Debounce3.class b/target/classes/rx/filter/Debounce3.class new file mode 100644 index 0000000..510a807 Binary files /dev/null and b/target/classes/rx/filter/Debounce3.class differ diff --git a/target/classes/rx/filter/Debounce4.class b/target/classes/rx/filter/Debounce4.class new file mode 100644 index 0000000..d159bc1 Binary files /dev/null and b/target/classes/rx/filter/Debounce4.class differ diff --git a/target/classes/rx/filter/Distinct.class b/target/classes/rx/filter/Distinct.class new file mode 100644 index 0000000..2a6945e Binary files /dev/null and b/target/classes/rx/filter/Distinct.class differ diff --git a/target/classes/rx/filter/Take.class b/target/classes/rx/filter/Take.class new file mode 100644 index 0000000..08662b2 Binary files /dev/null and b/target/classes/rx/filter/Take.class differ diff --git a/target/classes/rx/transform/Buffer$1.class b/target/classes/rx/transform/Buffer$1.class new file mode 100644 index 0000000..77ce0ca Binary files /dev/null and b/target/classes/rx/transform/Buffer$1.class differ diff --git a/target/classes/rx/transform/Buffer$2.class b/target/classes/rx/transform/Buffer$2.class new file mode 100644 index 0000000..0b5805e Binary files /dev/null and b/target/classes/rx/transform/Buffer$2.class differ diff --git a/target/classes/rx/transform/Buffer.class b/target/classes/rx/transform/Buffer.class new file mode 100644 index 0000000..1967ff1 Binary files /dev/null and b/target/classes/rx/transform/Buffer.class differ diff --git a/target/classes/rx/transform/FlatMap.class b/target/classes/rx/transform/FlatMap.class new file mode 100644 index 0000000..b5fa47b Binary files /dev/null and b/target/classes/rx/transform/FlatMap.class differ diff --git a/target/classes/rx/transform/GroupBy.class b/target/classes/rx/transform/GroupBy.class new file mode 100644 index 0000000..3233f1f Binary files /dev/null and b/target/classes/rx/transform/GroupBy.class differ diff --git a/target/classes/rx/transform/Map.class b/target/classes/rx/transform/Map.class new file mode 100644 index 0000000..3f27a94 Binary files /dev/null and b/target/classes/rx/transform/Map.class differ diff --git a/target/classes/rx/transform/Scan.class b/target/classes/rx/transform/Scan.class new file mode 100644 index 0000000..4b14556 Binary files /dev/null and b/target/classes/rx/transform/Scan.class differ diff --git a/target/classes/rx/transform/Window1.class b/target/classes/rx/transform/Window1.class new file mode 100644 index 0000000..01e8c6c Binary files /dev/null and b/target/classes/rx/transform/Window1.class differ diff --git a/target/classes/rx/transform/Window2.class b/target/classes/rx/transform/Window2.class new file mode 100644 index 0000000..bb63fcf Binary files /dev/null and b/target/classes/rx/transform/Window2.class differ diff --git a/target/classes/rx/transform/Window3$1.class b/target/classes/rx/transform/Window3$1.class new file mode 100644 index 0000000..a3408b7 Binary files /dev/null and b/target/classes/rx/transform/Window3$1.class differ diff --git a/target/classes/rx/transform/Window3$2.class b/target/classes/rx/transform/Window3$2.class new file mode 100644 index 0000000..f104c06 Binary files /dev/null and b/target/classes/rx/transform/Window3$2.class differ diff --git a/target/classes/rx/transform/Window3$3.class b/target/classes/rx/transform/Window3$3.class new file mode 100644 index 0000000..2b97a7f Binary files /dev/null and b/target/classes/rx/transform/Window3$3.class differ diff --git a/target/classes/rx/transform/Window3.class b/target/classes/rx/transform/Window3.class new file mode 100644 index 0000000..b994efd Binary files /dev/null and b/target/classes/rx/transform/Window3.class differ diff --git a/target/classes/rx/utility/Delay.class b/target/classes/rx/utility/Delay.class new file mode 100644 index 0000000..11dbaaf Binary files /dev/null and b/target/classes/rx/utility/Delay.class differ