RxJS 7 in Depth

Sébastien Dubois / August 05, 2020

12 min read

RxJS

RxJS is one of the coolest libraries in the JavaScript/TypeScript ecosystem. It revolutionized the way we handle data flows within our applications.

In this article, we’ll discover the changes that RxJS 7 will bring. We will look at the upcoming features, the bug fixes, and the breaking changes.

Let’s get started!

New lastValueFrom and firstValueFrom methods

When I first learned about RxJS back in 2016, I immediately knew that it rendered Promises completely bogus. So far, unfortunately, Observables still didn’t make it into the EcmaScript specification and there are thousands of promise-based libraries out there. Also, many people still don’t know about observables and are only learning about Promises. This is a sad state of things! So, for the time being, we still have to deal with that and go from Observables to Promises and vice-versa.

In RxJS 7 two new methods will become available: lastValueFrom and firstValueFrom. They’re being added because, as it stands, the toPromise method that we’re all familiar with is flawed.

Promises provide a strict guarantee that an operation will either result in a value, or will error out. With toPromise, we don’t know for sure what happened. Here’s are two examples, taken from the issue:

In the first case, we cannot distinguish between the situation where no value was emitted before completion and the situation where an undefined value was emitted last.

With RxJS 7, we will be able to more clearly state which value we want to extract out of the source observable.

The firstValueFrom method converts an observable to a promise by subscribing to the observable and returning a promise that will resolve as soon as the first value arrives from the observable. Then, the subscription gets closed. If the observable stream completes before any values were emitted, then the returned promise will get rejected with an EmptyError. If the observable stream emits an error, then the returned promise will reject with that error instead.

Here’s an example:

The lastValueFrom works very much the same, except that it waits for the observable to complete and resolves the returned promise with the last value from the observed stream. Again, if the observable stream completes before any values were emitted, then the returned promise will reject with an EmptyError and if the observable stream emits an error, then the returned promise will reject with that error.

Take a look at the merged PR for more examples.

This will hopefully make our lives easier. Note that toPromise will probably be deprecated.

Renamed operators

This “new” combineLatestWith operator is actually a renamed version of the now deprecated combineLatest operator.

This new name indeed makes it clearer: it creates an observable that combines the latest values from all passed observables and the source into arrays and emits them. When you subscribe to the observable returned by this operator, a subscription is made to the source observable as well as all the sources provided as arguments. Once all sources emit at least one value, then all of the latest values will be emitted as an array.

Another operator that has been renamed is the zip operator, which is now called zipWith. The same goes for race, which is now called raceWith.

Refactoring our code to use the new operators should be easy enough ;-)

Retry operator configuration

We can now pass a configuration option to the retry operator in order to reset the counter upon success:

Neat!

Selector support for fromFetch

The fromFetch operator now lets us define a selector function to extract the information that we want out of the response (or arbitrary data if we want to).

If no selector is defined, then fromFetch simply returns the response as-is (just like before):

Now, we can also extract what we want:

Finally, we can also return arbitrary data:

Type guards support for groupBy

We can now use TypeScript type guards with the groupBy operator:

This is super powerful for type inference as it allows to use custom type guards like the isNumber guard above. Note that it is of course also possible to define inline type guards.

Support for observable dictionaries with combineLatest

Starting with RxJS 7, we will be able to pass in a dictionary to the combineLatest operator. When we do so, we will receive the same dictionary structure back:

This is a super useful addition and matches what is also possible with the forkJoin operator. Thanks to this, we will be able to write more readable code, but also to combine combineLatest with the pluck operator.

One timeout to rule them all

With RxJS 7, we will get improved support for timeouts.

First of all, we will be able to pass it a more powerful TimeoutConfig :

As you can see, this config supports different options to finely control the timeouts:

  • each: time allowed between the emissions from the source, before a timeout is triggered
  • first: a deadline for the first emission from the source
  • with: a factory that we can pass in to create the observable to switch to when a timeout occurs. This allows the timeout operator to be used exactly like the other(now deprecated) timeoutWith operator

Thanks to this new configuration type, the timeout operator can be configured quite precisely. Before, it was not possible to configure timeout to have a different “first” timeout check and subsequent “each” timeout checks, or to timeout if only the first value did not arrive in time, etc.

Check out this commit for more details.

TestScheduler improvements

The TestScheduler now has an animate “run mode” helper, which can be used to specify when requested animation frames will be “painted”.

The animate helper accepts a marble diagram and each value emission in the diagram indicates when a “paint” occurs.

You can learn more about it here.

Better memory usage

Memory usage should be improved in RxJS 7 because the majority of operators no longer retain outer values.

For example, every inner subscription in mergeMap previously retained the outer value and, if the outer value was a large array, that could quickly be problematic for memory usage.

Bug fixes galore

RxJS 7 brings a ton of bug fixes. I won’t go over each in detail as it would take quite a while, but here’s the list so far:

  • ajax: Partial observers passed to progressSubscriber will no longer error (25d279f)
  • ajax: Unparsable responses will no longer prevent full AjaxError from being thrown (605ee55)
  • animationFrames: emit the timestamp from the rAF’s callback (#5438) (c980ae6)
  • Ensure unsubscriptions/teardowns on internal subscribers are idempotent (#5465) (3e39749), closes #5464
  • timeout: defer error creation until timeout occurs (#5497) (3be9840), closes #5491
  • perf: Ensure unsubscriptions/teardowns on internal subscribers are idempotent (#5465) (3e39749), closes #5464
  • timeout: defer error creation until timeout occurs (#5497) (3be9840), closes #5491
  • dependencies: Move accidental dependency on typedoc to dev-dependencies. (#5566) (45702bf)
  • pluck: operator breaks with null/undefined inputs. (#5524) (c5f6550)
  • shareReplay: no longer misses synchronous values from source (92452cc)
  • interop: chain interop/safe subscriber unsubscriptions correctly (#5472) (98ad0eb), closes #5469 #5311 #2675
  • finalize: chain subscriptions for interop with finalize (#5239) (04ba662), closes #5237 #5237
  • animationFrameScheduler: don’t execute rescheduled animation frame and asap actions in flush (#5399) (33c9c8c), closes #4972 #5397
  • iterables: errors thrown from iterables now properly propagated (#5444) (75d4c2f)
  • finalize: callback will be called after the source observable is torn down. (0d7b7c1), closes #5357
  • Notification: typing improvements (#5478) (96868ac)
  • TestScheduler: support empty subscription marbles (#5502) (e65696e), closes #5499
  • expand: now works properly with asynchronous schedulers (294b27e)
  • subscribeOn: allow Infinity as valid delay (#5500) (cd7d649)
  • Subject: resolve issue where Subject constructor errantly allowed an argument (#5476) (e1d35dc)
  • Subject: no default generic (e678e81)
  • defer: No longer allows () => undefined to observableFactory (#5449) (1ae937a), closes #5449
  • single: Corrected behavior for single(() => false) on empty observables. (#5325) (27931bc), closes #5325
  • take/takeLast: Properly assert number types at runtime (#5326) (5efc474), closes #5326
  • mergeMapTo: remove redundant/unused generic (#5299) (d67b7da)
  • ajax: AjaxTimeoutErrorImpl extends AjaxError (#5226) (a8da8dc)
  • delay: emit complete notification as soon as possible (63b8797), closes #4249
  • endWith: will properly type N arguments (#5246) (81ee1f7)
  • fetch: don’t leak event listeners added to passed-in signals (#5305) (d4d6c47)
  • TestScheduler: Subclassing TestScheduler needs RunHelpers (#5138) (927d5d9)
  • pipe: Special handling for 0-arg case. (#4936) (290fa51)
  • pluck: fix pluck’s catch-all signature for better type safety (#5192) (e0c5b7c)
  • pluck: param type now accepts number and symbol (9697b69)
  • startWith: accepts N arguments and returns correct type (#5247) (150ed8b)
  • combineLatestWith: and zipWith infer types from n-arguments (#5257) (3e282a5)
  • race: support N args in static race and ensure observable returned (#5286) (6d901cb)
  • toPromise: correct toPromise return type (#5072) (b1c3573)
  • fromFetch: don’t reassign closed-over parameter in fromFetch (#5234) (37d2d99), closes #5233 #5233

I haven’t been hit by those (or did not realize it), but if you have then you’ll be glad to see that those are now fixed. Kudos to the team for the awesome work!

By the way, there’s one “fix” that I didn’t see mentioned and couldn’t find back, but I believe that type inference for the filter operator will work better with RxJS 7:

source$.pipe(
  filter((user) => isNotNullOrUndefined(user)),
)
...

With RxJS 6, after the filter, the inferred type still includes null | undefined even though we filtered those values out. With RxJS 7, it should be fine.

Breaking changes

On to the less awesome news.

As you can guess, since this is a major release, there are… breaking changes! Here’s a rundown of those:

  • The toPromise operator now returns T | undefined. This is more in line with reality but could probably break some apps (gently)
  • The lift method is no longer exposed. It was an internal implementation detail of RxJS that was exposed and thus accessible from user-land code. It has multiple issues and should not be used anyways. Workarounds include rewriting your operators so that they return new Observable or cast your observable as any and access lift anyway, but of course option #1 is preferred. Option 2 might be useful for a quick fix but will probably break in version 8 anyways
  • The startWith operator returns incorrect types when called with more than 7 arguments and a scheduler. Also, passing a scheduler to this operator is deprecated
  • The timestamp operator now accepts a TimestampProvider, which is any object with a now() method that returns a number. This might cause issues with the TestScheduler run mode
  • The ReplaySubject no longer schedules emissions when a scheduler is provided. If you currently rely on that behavior, then you need to combine it with the observeOn operator: new ReplaySubject(2, 3000).pipe(observeOn(asap))
  • The takeLast operator now throws TypeError for invalid arguments. For instance, calling it without arguments or with NaN will throw a TypeError
  • The take operator now throws a runtime error for arguments that are negative or NaN
  • The single operator will now throw for scenarios where values coming in are either not present, or do not match the provided predicate
  • defer does not allow factories to return void or undefined anymore. All factories passed to defer must now return a proper ObservableInput (e.g., Observable or Promise ). To get the same behavior as before, you can use return EMPTY or return of() from the factory
  • Notification and dematerialize have better type signatures, which might break existing code
  • Notification.createNext(undefined) will no longer return the exact same reference everytime
  • ajax has dropped support for IE10 and lower. Time to let go of the past!

No interoperability with AsyncIterables after all

The first beta of RxJS 7 introduced first-class interoperability for AsyncIterables. Unfortunately, this support was removed in the most recent beta versions because there were too many edge cases. Still, if you’re interested in this feature, you should take a look at the rxjs-for-await library of Ben Lesh.

For the record, here’s a bit of background about what that feature could’ve allowed.

As their name indicates, async iterables are things that we can iterate on.. asynchronously. Sounds good? Well it should! Since ES2018, we can write loops like this:

for **await** (variable of iterable) {
  ...
}

This is possible thanks to asynchronous iterators and asynchronous iterables. The next() method of asynchronous iterators returns a promise, which we can consume using the await keyword. Neat.

But what about observables? Well with rxjs-for-await, we can do the same, using different strategies; each having pros and cons. Those pros and cons are certainly the reason why this feature will not make it into RxJS for the time being…

Conclusion

In this article, I’ve listed the different things I could learn by looking at the changelog for the different RxJS 7 beta versions.

Hopefully, the final release should come soon. There are a few cool new features and I can’t wait to try those. Unfortunately, the support for interop with async iterables was abandoned, which is a bit sad, but I do understand why; better safe than sorry!

Again, kudos to Ben and the awesome RxJS team for making our code feel so much nicer.

That’s it for today!

PS: check out my Dev Concepts books, join the Software Crafters community, the Personal Knowledge Management community, and come say hi on Twitter!
Discuss on TwitterEdit on GitHub