Skip to content

Commit

Permalink
fix(store): complete action results on destroy
Browse files Browse the repository at this point in the history
In this commit, we perform some minor refactoring by replacing the implementation of the `OnDestroy`
interface with the use of `ApplicationRef` in the constructor to avoid creating redundant methods.
Additionally, we complete the `InternalDispatchedActionResults` once the application is destroyed
to ensure there are no active subscribers after resources have been cleaned up.
  • Loading branch information
arturovt committed Dec 22, 2024
1 parent 4499a53 commit 9a8bd8e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ $ npm install @ngxs/store@dev
### To become next patch version

- Fix(store): Add root store initializer guard [#2278](https://github.com/ngxs/store/pull/2278)
- Fix(store): Complete action results on destroy [#2282](https://github.com/ngxs/store/pull/2282)

### 19.0.0 2024-12-3

Expand Down
18 changes: 10 additions & 8 deletions packages/store/internals/src/state-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, OnDestroy, Signal, untracked } from '@angular/core';
import { ApplicationRef, inject, Injectable, Signal, untracked } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';

import { ɵwrapObserverCalls } from './custom-rxjs-operators';
Expand All @@ -10,22 +10,24 @@ import { ɵPlainObject } from './symbols';
* @ignore
*/
@Injectable({ providedIn: 'root' })
export class ɵStateStream extends ɵOrderedBehaviorSubject<ɵPlainObject> implements OnDestroy {
export class ɵStateStream extends ɵOrderedBehaviorSubject<ɵPlainObject> {
readonly state: Signal<ɵPlainObject> = toSignal(this.pipe(ɵwrapObserverCalls(untracked)), {
manualCleanup: true,
requireSync: true
});

constructor() {
super({});
}

ngOnDestroy(): void {
// The StateStream should never emit values once the root view is removed,
// such as when the `NgModuleRef.destroy()` method is called. This is crucial
// for preventing memory leaks in server-side rendered apps, where a new StateStream
// Complete the subject once the root injector is destroyed to ensure
// there are no active subscribers that would receive events or perform
// any actions after the application is destroyed.
// The `StateStream` should never emit values once the root view is removed,
// such as when the `ApplicationRef.destroy()` method is called. This is crucial
// for preventing memory leaks in server-side rendered apps, where a new `StateStream`
// is created for each HTTP request. If users forget to unsubscribe from `store.select`
// or `store.subscribe`, it can result in significant memory leaks in SSR apps.
this.complete();
const appRef = inject(ApplicationRef);
appRef.onDestroy(() => this.complete());
}
}
12 changes: 7 additions & 5 deletions packages/store/src/actions-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, Injectable, OnDestroy } from '@angular/core';
import { ApplicationRef, inject, Injectable } from '@angular/core';
import { ɵOrderedSubject } from '@ngxs/store/internals';
import { Observable, Subject, filter, share } from 'rxjs';

Expand All @@ -25,7 +25,7 @@ export interface ActionContext<T = any> {
* Internal Action stream that is emitted anytime an action is dispatched.
*/
@Injectable({ providedIn: 'root' })
export class InternalActions extends ɵOrderedSubject<ActionContext> implements OnDestroy {
export class InternalActions extends ɵOrderedSubject<ActionContext> {
// This subject will be the first to know about the dispatched action, its purpose is for
// any logic that must be executed before action handlers are invoked (i.e., cancelation).
readonly dispatched$ = new Subject<ActionContext>();
Expand All @@ -36,10 +36,12 @@ export class InternalActions extends ɵOrderedSubject<ActionContext> implements
this.pipe(filter(ctx => ctx.status === ActionStatus.Dispatched)).subscribe(ctx => {
this.dispatched$.next(ctx);
});
}

ngOnDestroy(): void {
this.complete();
// Complete the subject once the root injector is destroyed to ensure
// there are no active subscribers that would receive events or perform
// any actions after the application is destroyed.
const appRef = inject(ApplicationRef);
appRef.onDestroy(() => this.complete());
}
}

Expand Down
23 changes: 23 additions & 0 deletions packages/store/src/internal/action-results.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ApplicationRef, inject, Injectable } from '@angular/core';
import { Subject } from 'rxjs';

import type { ActionContext } from '../actions-stream';

/**
* Internal Action result stream that is emitted when an action is completed.
* This is used as a method of returning the action result to the dispatcher
* for the observable returned by the dispatch(...) call.
* The dispatcher then asynchronously pushes the result from this stream onto the main action stream as a result.
*/
@Injectable({ providedIn: 'root' })
export class InternalDispatchedActionResults extends Subject<ActionContext> {
constructor() {
super();

// Complete the subject once the root injector is destroyed to ensure
// there are no active subscribers that would receive events or perform
// any actions after the application is destroyed.
const appRef = inject(ApplicationRef);
appRef.onDestroy(() => this.complete());
}
}
16 changes: 4 additions & 12 deletions packages/store/src/internal/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import { inject, Injectable, Injector, NgZone, runInInjectionContext } from '@angular/core';
import { forkJoin, Observable, of, Subject, throwError } from 'rxjs';
import { forkJoin, Observable, of, throwError } from 'rxjs';
import { filter, map, mergeMap, shareReplay, take } from 'rxjs/operators';

import { getActionTypeFromInstance } from '@ngxs/store/plugins';
import { ɵPlainObject, ɵStateStream } from '@ngxs/store/internals';

import { ActionContext, ActionStatus, InternalActions } from '../actions-stream';
import { PluginManager } from '../plugin-manager';
import { InternalNgxsExecutionStrategy } from '../execution/internal-ngxs-execution-strategy';
import { leaveNgxs } from '../operators/leave-ngxs';
import { fallbackSubscriber } from './fallback-subscriber';

/**
* Internal Action result stream that is emitted when an action is completed.
* This is used as a method of returning the action result to the dispatcher
* for the observable returned by the dispatch(...) call.
* The dispatcher then asynchronously pushes the result from this stream onto the main action stream as a result.
*/
@Injectable({ providedIn: 'root' })
export class InternalDispatchedActionResults extends Subject<ActionContext> {}
import { InternalDispatchedActionResults } from './action-results';
import { ActionContext, ActionStatus, InternalActions } from '../actions-stream';
import { InternalNgxsExecutionStrategy } from '../execution/internal-ngxs-execution-strategy';

@Injectable({ providedIn: 'root' })
export class InternalDispatcher {
Expand Down
2 changes: 1 addition & 1 deletion packages/store/src/internal/state-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import {
} from './internals';
import { NgxsActionRegistry } from '../actions/action-registry';
import { ActionContext, ActionStatus, InternalActions } from '../actions-stream';
import { InternalDispatchedActionResults } from '../internal/dispatcher';
import { InternalDispatchedActionResults } from '../internal/action-results';
import { ensureStateNameIsUnique, ensureStatesAreDecorated } from '../utils/store-validators';
import { ensureStateClassIsInjectable } from '../ivy/ivy-enabled-in-dev-mode';
import { NgxsUnhandledActionsLogger } from '../dev-features/ngxs-unhandled-actions-logger';
Expand Down

0 comments on commit 9a8bd8e

Please sign in to comment.