Core concepts

Subscriptions and Watchers

We covered how Signalium handles symmetric (call-response) style operations in the last section, but what about asymmetric async?

And before we answer that, what even is asymmetric async?

Asymmetric async refers to any async operation where you may send one or more requests and receive one or more responses. Some common examples include:

  • Subscribing to a topic on a message bus
  • Sending messages back and forth between separate threads
  • Adding a listener to an external library, like Tanstack Query
  • Setting up a regular polling job or other interval based task

Subscriptions are a type of signal that specifically handles these sorts of operations. When combined with Watchers, they allow you to setup and manage the full lifecycle of long-live effects and resources.


What are subscriptions?

The core idea for subscriptions comes from the observation that the following combination of hooks in React is a very common pattern:

const useCounter = (ms) => {
  const [count, setCount] = useState(0);

  useEffect(() => {
    const id = setInterval(() => {
      setCount((count) => count + 1);
    }, ms);

    return () => clearInterval(id);
  }, [ms]);

  return count;
};

What we have here is an effect paired with some state, and in such a way where that effect and only that effect manages this state. To callers of useCounter, this is a completely opaque process. They just get the latest count, and they rerun when the count updates.

This is notable because it preserves functional purity for everything outside of useCounter. While useCounter is managing and mutating state regularly, any hook calling it is just getting the latest value and using it to derive the result. It could be a static value or a dynamic one, and the code would be the same.

Subscriptions formalize this pattern by combining a managed-effect with a slot for state that by design is only accessible internally. From the perspective of the rest of the dependency graph, that subscription node is just like any other reactive value or state, and functional purity is maintained.

Creating subscriptions

Subscriptions are created much like tasks, as individual instances rather than functions:

Like reactive tasks and promises, subscriptions are also a superset of promises and they have the same interface as standard reactive promises:

interface ReactiveSubscription<T> extends Promise<T> {
  value: T | undefined;
  error: unknown;
  isPending: boolean;
  isResolved: boolean;
  isRejected: boolean;
  isSettled: boolean;
  isReady: boolean;

  rerun(): void;
}

Subscriptions define a constructor function that runs when they become watched (detailed below). The function receives a state signal as the first parameter, and should setup a side-effect and (optionally) return a destructor. Like with reactive functions, any reactive state that is used during this constructor will become a dependency of the subscription, and if that state updates, the destructor function will be called and the subscription will be recreated.

Subscriptions as promises

Subscriptions implement the promise interface, but promises are modeled for symmetric async - one request sent, one response received. So, why do subscriptions act like promises, and how do they handle asymmetric async differently?

The primary reason is that subscriptions often have an initialization step while they wait for the first event they want to receive. For instance, let's say you want to load a Post model and poll for real time updates for it as long as we're on that page. When we first load the page, we don't have any data, so we want to show a loading spinner. After the first message is received, we can show the cached data and continue polling in the background.

const usePostData = reactive((id) => {
  return subscription((state) => {
    let currentTimeout;

    const fetchPost = async () => {
      const res = await fetch(`https://examples.com/api/posts/${id}`);
      const { post } = await res.json();

      state.set(post);

      // schedule the next fetch in 10s
      currentTimeout = setTimeout(fetchPost, 10000)
    }

    // initialize
    fetchPost();

    return () => clearTimeout(currentTimeout);
  });
});

export const usePostTitle(id) {
  // subscription can be awaited just like a standard promise
  const data = await usePostData(id);

  return data.title;
}

Subscriptions "resolve" the first time their state is set. If you pass an initial value via the initValue option, they will initialize resolved. Every time after that, everything that consumes the subscription will be notified of changes and updates, but they will resolve immediately without needing to wait for async or triggering the isPending state.

If you need to reset the loading state for any reason, e.g. if you navigate back to a page that was already active and you want to refetch the value eagerly, you can set the value to a new promise, and the promise state will be reflected on the subscription until it completes.

const usePostData = reactive((id) => {
  return subscription((state) => {
    let currentTimeout;

    const fetchPost = async () => {
      const res = await fetch(`https://examples.com/api/posts/${id}`);
      const { post } = await res.json();

      state.set(post);

      // schedule the next fetch in 10s
      currentTimeout = setTimeout(fetchPost, 10000);
    };

    // Setting the value to initial promise will cause the subscription to go
    // back into a pending state, causing everything else to wait for it.
    state.set(fetchPost());

    return () => clearTimeout(currentTimeout);
  });
});

Fine-grained updates

Subscription constructors can also return an object with the following signature:

interface SignalSubscription {
  update?(): void;
  unsubscribe?(): void;
}

This form of subscription is for cases where you may want more fine-grained control over how the subscription is updated. For instance, it might be fairly expensive to teardown a subscription and recreate it each time, and there might be a cheaper way to update it.

const currentTopic = state('foo');

const messageBus = subscription((state) => {
  const id = bus.subscribe(currentTopic.get(), (msg) => state.set(msg));

  return {
    update() {
      bus.update(id, currentTopic.get());
    },

    unsubscribe() {
      bus.unsubscribe(id);
    },
  };
});

One thing to note about this form is that it tracks the initial construction function, then tracks the update function on each update. Tracking is based on the last update only, so if you access something in subscribe but not in updates, it will not trigger again.

This covers the ways that subscriptions can update reactively when in use. However, we also need to setup subscriptions when they are first accessed, and tear them down when they're no longer needed. For that, we need to introduce watchers.

Watchers

With watchers, you listen to updates from signals externally. This is how signals are ultimately consumed by your framework of choice, and by your larger application.

const value = state(0);

const plusOne = reactive(() => {
  return value.get() + 1;
});

const w = watcher(() => {
  return plusOne();
});

const removeListener = w.addListener((val) => {
  console.log(val);
});
// logs 1 after timeout

value.set(5);
// logs 6 after timeout

removeListener();

value.set(10);
// no longer logs

Watchers are typically handled by the framework integration that you are using. For instance, @signalium/react automatically detects if you are using a reactive value inside of a React component, and sets up a watcher if needed.

const value = state(0);

const plusOne = reactive(() => {
  return value.get() + 1;
});

const plusTwo = reactive(() => {
  // plusOne() is called inside another reactive function,
  // does not setup a watcher
  return plusOne() + 1;
});

export function Component() {
  // plusTwo() is called inside a React component,
  // sets up a watcher and synchronizes it with React
  // state so it rerenders whenever the watcher updates.
  const valuePlusTwo = plusTwo();

  return <div>{valuePlusTwo}</div>;
}

In general, you shouldn't need to worry about managing watchers yourself because of this, but they are very important conceptually which is why they are included in the core concepts. In addition, they'll be necessary if you ever do need to create your own integration of some kind.

Note

Watchers should never be created or managed inside reactive functions or subscriptions. They are meant to be terminal nodes that pull on the graph of dependencies and make it "live". Subscriptions generally work like "internal watchers" (i.e. they will also update automatically while they're live via an external watcher), so there should never be a reason to create a watcher inside of one.

This is a very strong recommendation; Any current behavior is considered undefined, and it is not guaranteed or covered under semver.

Watcher scheduling

Watchers have to run at some point, but for performance and consistency they do not run immediately after a change. Instead, they get scheduled to run later at some point. When exactly is globally configurable, but defaults to the next macro task (e.g. setTimeout(flush, 0)).

Scheduled watchers essentially act like if you manually ran a reactive function, only later. You can imagine it as something like this:

const myFn = reactive(() => {
  // ...
}):

function handleClickEvent() {
  // change some state

  setTimeout(() => myFn(), 0);
}

When we flush watchers, we do them together in the same task in a way that minimizes the number of scheduled tasks and any thrashing that might occur. They are automatically scheduled if they have any listeners, and if any value in their dependency tree has changed.

That said, the call order for watchers is still from changed state outward, toward the watcher. This means that the watcher will only rerun if any of its direct dependencies have also changed, following the same rules discussed in the Reactive Functions and State section. In addition, listeners added with addListener will not run if the value returned from the watcher itself has not updated.

Timing, caching, and immediacy

On occasion, you might want to write to a state signal and then immediately read from a reactive function that consumed that signal. As noted in the previous section on reactive functions and state, this is perfectly valid and will work as expected.

const valueSignal = state(0);

const useDerived = reactive(() => {
  return valueSignal.get() + 1;
});

function updateValue(value) {
  valueSignal.set(value);

  useDerived(); // value + 1
}

Watcher scheduling does not affect this behavior. Scheduled watchers do pull automatically at some point later, and if nothing else reads a watched reactive function, it will run when the watcher flushes. BUT, if the value is read earlier, it will run on-demand and cache the result, which will then be read by the watcher. In effect, watchers act as a guarantee that the reactives will rerun automatically eventually, but if you need to speed that process up, you can at any time!

Active Watchers and Subscriptions

By default, without introducing watchers, subscriptions are inert. If you access a subscription it will not subscribe and start updating, it will just return its current value.

import { subscription } from 'signalium';

const logger = subscription(() => {
  console.log('subscribed');

  return () => console.log('unsubscribed');
});

logger(); // logs nothing

This value will still be tracked by any reactive functions that use it, but the subscription itself will never activate. The reason for this comes down to resource management - that is to say, we want to only consume system resources when we need them, and we want to free them up when they're no longer needed.

With standard and even async values, this is not really an issue because they mostly use memory, and that will mostly naturally be cleaned up by garbage collection (ignoring promise lifecycle, abort signals, etc. for simplicity here). Most types of subscriptions, however, necessarily consume resources until they are torn down. Background threads, websockets, polling - all things that need some external signal that says they are no longer needed.

Watchers conceptually represent the parts of the app that are active: They are "in use", and should be updating or running background tasks and so on. These are the exit points where your signals are writing to something external, and that something is what is driving the lifecycle of your signal graph.

This leads us to active status. Watchers become active when 1 or more event listeners are added to them. When a node (a state, reactive function, or subscription) is connected directly OR indirectly to an active watcher, it also becomes active. It remains active until it is disconnected from all active consumers, at which point it is said to be inactive. Essentially, if you're directly or indirectly connected to a watcher, you are active, and if you're not then you're inactive.

And last but not least: a subscriptions lifecycle is tied directly to whether or not its active. They run their setup upon activating, and run their unsubscribe function upon deactivating.

Additional Info

This whole setup might seem a bit convoluted - why do we need to do this dance with watchers and subscriptions? Why not just expose an unsubscribe method on subscriptions and call that when they're no longer needed?

There are two main reasons for this. One is that this would leak some of the statefulness of subscriptions. Remember, one of the main benefits of subscriptions is that they are indistinguishable from standard async values. If these implementation details were exposed, you would need to manage it, and drill that management deeply from your components through the reactivity graph to every place it was used.

The other is related, but more conceptual. It comes back to what we want to do here - activate subscriptions if they are in use, and deactivate them if they are no longer needed. "In use" is doing a lot of the heavy lifting here, how do we determine that?

Signalium defines a value as "in use" IFF it is connected to an active graph. This is important because the shape of that graph is dynamic with signals, since we can use values conditionally. So you might connect to a websocket initially in some part of a reactivity tree, but then disconnect on the next update.

This dynamism makes manual subscription management intractably hard. You would need to maintain references to all previous reactives that had subscriptions, track whether or not they were reused, and call their destructors if not, all manually. This would be a pervasive pattern and would quickly infect an entire codebase and add mountains of complexity. It doesn't help that subscription data sources tend to be leaves that could be deeply nested in layers of reactives.

For all these reasons, subscription management and active status is considered a core part of signal lifecycle in Signalium. You can't have subscriptions without active status, and you can't have asymmetric async without subscriptions.

Summary

And that covers the last major types of signals in Signalium! To summarize:

  • Subscriptions
    • Manage side-effects in a single, self-contained node with its own state
    • Implementation details are hidden, externally it works just like any other state
    • Primarily used for asymmetric async (think UDP vs TCP), but also implement the ReactivePromise API for initial load and pending states
    • Activate when connected to an active watcher, and deactivate when disconnected from all active watchers
  • Watchers
    • Represent the active parts of the app
    • How state gets read from Signalium to external consumers
    • Schedules and "pulls" asynchronously
    • Activates when listener added with addListener

Now we just have one last core feature left: Contexts.

Previous
Reactive Promises