hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46    L: Location<'a>,
47{
48    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49        Singleton::new(singleton.location, singleton.ir_node.into_inner())
50    }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61            location.clone(),
62            HydroNode::DeferTick {
63                input: Box::new(HydroNode::CycleSource {
64                    ident,
65                    metadata: location.new_node_metadata(Self::collection_kind()),
66                }),
67                metadata: location
68                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69            },
70        );
71
72        from_previous_tick.unwrap_or(initial)
73    }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78    L: Location<'a>,
79{
80    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81        assert_eq!(
82            Location::id(&self.location),
83            expected_location,
84            "locations do not match"
85        );
86        self.location
87            .flow_state()
88            .borrow_mut()
89            .push_root(HydroRoot::CycleSink {
90                ident,
91                input: Box::new(self.ir_node.into_inner()),
92                op_metadata: HydroIrOpMetadata::new(),
93            });
94    }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104        Singleton::new(
105            location.clone(),
106            HydroNode::CycleSource {
107                ident,
108                metadata: location.new_node_metadata(Self::collection_kind()),
109            },
110        )
111    }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119        assert_eq!(
120            Location::id(&self.location),
121            expected_location,
122            "locations do not match"
123        );
124        self.location
125            .flow_state()
126            .borrow_mut()
127            .push_root(HydroRoot::CycleSink {
128                ident,
129                input: Box::new(self.ir_node.into_inner()),
130                op_metadata: HydroIrOpMetadata::new(),
131            });
132    }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137    L: Location<'a> + NoTick,
138{
139    type Location = L;
140
141    fn create_source(ident: syn::Ident, location: L) -> Self {
142        Singleton::new(
143            location.clone(),
144            HydroNode::CycleSource {
145                ident,
146                metadata: location.new_node_metadata(Self::collection_kind()),
147            },
148        )
149    }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(self.ir_node.into_inner()),
168                op_metadata: HydroIrOpMetadata::new(),
169            });
170    }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175    T: Clone,
176    L: Location<'a>,
177{
178    fn clone(&self) -> Self {
179        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181            *self.ir_node.borrow_mut() = HydroNode::Tee {
182                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183                metadata: self.location.new_node_metadata(Self::collection_kind()),
184            };
185        }
186
187        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188            Singleton {
189                location: self.location.clone(),
190                ir_node: HydroNode::Tee {
191                    inner: TeeNode(inner.0.clone()),
192                    metadata: metadata.clone(),
193                }
194                .into(),
195                _phantom: PhantomData,
196            }
197        } else {
198            unreachable!()
199        }
200    }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205    me: Singleton<T, Tick<L>, B>,
206    other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211    check_matching_location(
212        &me.location,
213        &Singleton::<T, Tick<L>, B>::other_location(&other),
214    );
215
216    Singleton::<T, Tick<L>, B>::make(
217        me.location.clone(),
218        HydroNode::CrossSingleton {
219            left: Box::new(me.ir_node.into_inner()),
220            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222                bound: B::BOUND_KIND,
223                element_type: stageleft::quote_type::<
224                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225                >()
226                .into(),
227            }),
228        },
229    )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234    L: Location<'a>,
235{
236    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239        Singleton {
240            location,
241            ir_node: RefCell::new(ir_node),
242            _phantom: PhantomData,
243        }
244    }
245
246    pub(crate) fn collection_kind() -> CollectionKind {
247        CollectionKind::Singleton {
248            bound: B::BOUND_KIND,
249            element_type: stageleft::quote_type::<T>().into(),
250        }
251    }
252
253    /// Returns the [`Location`] where this singleton is being materialized.
254    pub fn location(&self) -> &L {
255        &self.location
256    }
257
258    /// Transforms the singleton value by applying a function `f` to it,
259    /// continuously as the input is updated.
260    ///
261    /// # Example
262    /// ```rust
263    /// # use hydro_lang::prelude::*;
264    /// # use futures::StreamExt;
265    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
266    /// let tick = process.tick();
267    /// let singleton = tick.singleton(q!(5));
268    /// singleton.map(q!(|v| v * 2)).all_ticks()
269    /// # }, |mut stream| async move {
270    /// // 10
271    /// # assert_eq!(stream.next().await.unwrap(), 10);
272    /// # }));
273    /// ```
274    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
275    where
276        F: Fn(T) -> U + 'a,
277    {
278        let f = f.splice_fn1_ctx(&self.location).into();
279        Singleton::new(
280            self.location.clone(),
281            HydroNode::Map {
282                f,
283                input: Box::new(self.ir_node.into_inner()),
284                metadata: self
285                    .location
286                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
287            },
288        )
289    }
290
291    /// Transforms the singleton value by applying a function `f` to it and then flattening
292    /// the result into a stream, preserving the order of elements.
293    ///
294    /// The function `f` is applied to the singleton value to produce an iterator, and all items
295    /// from that iterator are emitted in the output stream in deterministic order.
296    ///
297    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
298    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
299    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
300    ///
301    /// # Example
302    /// ```rust
303    /// # use hydro_lang::prelude::*;
304    /// # use futures::StreamExt;
305    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
306    /// let tick = process.tick();
307    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
308    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
309    /// # }, |mut stream| async move {
310    /// // 1, 2, 3
311    /// # for w in vec![1, 2, 3] {
312    /// #     assert_eq!(stream.next().await.unwrap(), w);
313    /// # }
314    /// # }));
315    /// ```
316    pub fn flat_map_ordered<U, I, F>(
317        self,
318        f: impl IntoQuotedMut<'a, F, L>,
319    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
320    where
321        I: IntoIterator<Item = U>,
322        F: Fn(T) -> I + 'a,
323    {
324        let f = f.splice_fn1_ctx(&self.location).into();
325        Stream::new(
326            self.location.clone(),
327            HydroNode::FlatMap {
328                f,
329                input: Box::new(self.ir_node.into_inner()),
330                metadata: self.location.new_node_metadata(
331                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
332                ),
333            },
334        )
335    }
336
337    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
338    /// for the output type `I` to produce items in any order.
339    ///
340    /// The function `f` is applied to the singleton value to produce an iterator, and all items
341    /// from that iterator are emitted in the output stream in non-deterministic order.
342    ///
343    /// # Example
344    /// ```rust
345    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
346    /// # use futures::StreamExt;
347    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
348    /// let tick = process.tick();
349    /// let singleton = tick.singleton(q!(
350    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
351    /// ));
352    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
353    /// # }, |mut stream| async move {
354    /// // 1, 2, 3, but in no particular order
355    /// # let mut results = Vec::new();
356    /// # for _ in 0..3 {
357    /// #     results.push(stream.next().await.unwrap());
358    /// # }
359    /// # results.sort();
360    /// # assert_eq!(results, vec![1, 2, 3]);
361    /// # }));
362    /// ```
363    pub fn flat_map_unordered<U, I, F>(
364        self,
365        f: impl IntoQuotedMut<'a, F, L>,
366    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
367    where
368        I: IntoIterator<Item = U>,
369        F: Fn(T) -> I + 'a,
370    {
371        let f = f.splice_fn1_ctx(&self.location).into();
372        Stream::new(
373            self.location.clone(),
374            HydroNode::FlatMap {
375                f,
376                input: Box::new(self.ir_node.into_inner()),
377                metadata: self
378                    .location
379                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
380            },
381        )
382    }
383
384    /// Flattens the singleton value into a stream, preserving the order of elements.
385    ///
386    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
387    /// are emitted in the output stream in deterministic order.
388    ///
389    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
390    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
391    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
392    ///
393    /// # Example
394    /// ```rust
395    /// # use hydro_lang::prelude::*;
396    /// # use futures::StreamExt;
397    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
398    /// let tick = process.tick();
399    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
400    /// singleton.flatten_ordered().all_ticks()
401    /// # }, |mut stream| async move {
402    /// // 1, 2, 3
403    /// # for w in vec![1, 2, 3] {
404    /// #     assert_eq!(stream.next().await.unwrap(), w);
405    /// # }
406    /// # }));
407    /// ```
408    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
409    where
410        T: IntoIterator<Item = U>,
411    {
412        self.flat_map_ordered(q!(|x| x))
413    }
414
415    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
416    /// for the element type `T` to produce items in any order.
417    ///
418    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
419    /// are emitted in the output stream in non-deterministic order.
420    ///
421    /// # Example
422    /// ```rust
423    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
424    /// # use futures::StreamExt;
425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
426    /// let tick = process.tick();
427    /// let singleton = tick.singleton(q!(
428    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
429    /// ));
430    /// singleton.flatten_unordered().all_ticks()
431    /// # }, |mut stream| async move {
432    /// // 1, 2, 3, but in no particular order
433    /// # let mut results = Vec::new();
434    /// # for _ in 0..3 {
435    /// #     results.push(stream.next().await.unwrap());
436    /// # }
437    /// # results.sort();
438    /// # assert_eq!(results, vec![1, 2, 3]);
439    /// # }));
440    /// ```
441    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
442    where
443        T: IntoIterator<Item = U>,
444    {
445        self.flat_map_unordered(q!(|x| x))
446    }
447
448    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
449    ///
450    /// If the predicate returns `true`, the output optional contains the same value.
451    /// If the predicate returns `false`, the output optional is empty.
452    ///
453    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
454    /// not modify or take ownership of the value. If you need to modify the value while filtering
455    /// use [`Singleton::filter_map`] instead.
456    ///
457    /// # Example
458    /// ```rust
459    /// # use hydro_lang::prelude::*;
460    /// # use futures::StreamExt;
461    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
462    /// let tick = process.tick();
463    /// let singleton = tick.singleton(q!(5));
464    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
465    /// # }, |mut stream| async move {
466    /// // 5
467    /// # assert_eq!(stream.next().await.unwrap(), 5);
468    /// # }));
469    /// ```
470    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
471    where
472        F: Fn(&T) -> bool + 'a,
473    {
474        let f = f.splice_fn1_borrow_ctx(&self.location).into();
475        Optional::new(
476            self.location.clone(),
477            HydroNode::Filter {
478                f,
479                input: Box::new(self.ir_node.into_inner()),
480                metadata: self
481                    .location
482                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
483            },
484        )
485    }
486
487    /// An operator that both filters and maps. It yields the value only if the supplied
488    /// closure `f` returns `Some(value)`.
489    ///
490    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
491    /// If the closure returns `None`, the output optional is empty.
492    ///
493    /// # Example
494    /// ```rust
495    /// # use hydro_lang::prelude::*;
496    /// # use futures::StreamExt;
497    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
498    /// let tick = process.tick();
499    /// let singleton = tick.singleton(q!("42"));
500    /// singleton
501    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
502    ///     .all_ticks()
503    /// # }, |mut stream| async move {
504    /// // 42
505    /// # assert_eq!(stream.next().await.unwrap(), 42);
506    /// # }));
507    /// ```
508    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
509    where
510        F: Fn(T) -> Option<U> + 'a,
511    {
512        let f = f.splice_fn1_ctx(&self.location).into();
513        Optional::new(
514            self.location.clone(),
515            HydroNode::FilterMap {
516                f,
517                input: Box::new(self.ir_node.into_inner()),
518                metadata: self
519                    .location
520                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
521            },
522        )
523    }
524
525    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
526    ///
527    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
528    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
529    /// non-null. This is useful for combining several pieces of state together.
530    ///
531    /// # Example
532    /// ```rust
533    /// # use hydro_lang::prelude::*;
534    /// # use futures::StreamExt;
535    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536    /// let tick = process.tick();
537    /// let numbers = process
538    ///   .source_iter(q!(vec![123, 456]))
539    ///   .batch(&tick, nondet!(/** test */));
540    /// let count = numbers.clone().count(); // Singleton
541    /// let max = numbers.max(); // Optional
542    /// count.zip(max).all_ticks()
543    /// # }, |mut stream| async move {
544    /// // [(2, 456)]
545    /// # for w in vec![(2, 456)] {
546    /// #     assert_eq!(stream.next().await.unwrap(), w);
547    /// # }
548    /// # }));
549    /// ```
550    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551    where
552        Self: ZipResult<'a, O, Location = L>,
553    {
554        check_matching_location(&self.location, &Self::other_location(&other));
555
556        if L::is_top_level()
557            && let Some(tick) = self.location.try_tick()
558        {
559            let out = zip_inside_tick(
560                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
561                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
562                    Self::other_location(&other),
563                    Self::other_ir_node(other),
564                )
565                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
566            )
567            .latest();
568
569            Self::make(out.location, out.ir_node.into_inner())
570        } else {
571            Self::make(
572                self.location.clone(),
573                HydroNode::CrossSingleton {
574                    left: Box::new(self.ir_node.into_inner()),
575                    right: Box::new(Self::other_ir_node(other)),
576                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
577                        bound: B::BOUND_KIND,
578                        element_type: stageleft::quote_type::<
579                            <Self as ZipResult<'a, O>>::ElementType,
580                        >()
581                        .into(),
582                    }),
583                },
584            )
585        }
586    }
587
588    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
589    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
590    ///
591    /// Useful for conditionally processing, such as only emitting a singleton's value outside
592    /// a tick if some other condition is satisfied.
593    ///
594    /// # Example
595    /// ```rust
596    /// # use hydro_lang::prelude::*;
597    /// # use futures::StreamExt;
598    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
599    /// let tick = process.tick();
600    /// // ticks are lazy by default, forces the second tick to run
601    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
602    ///
603    /// let batch_first_tick = process
604    ///   .source_iter(q!(vec![1]))
605    ///   .batch(&tick, nondet!(/** test */));
606    /// let batch_second_tick = process
607    ///   .source_iter(q!(vec![1, 2, 3]))
608    ///   .batch(&tick, nondet!(/** test */))
609    ///   .defer_tick(); // appears on the second tick
610    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
611    /// batch_first_tick.chain(batch_second_tick).count()
612    ///   .filter_if_some(some_on_first_tick)
613    ///   .all_ticks()
614    /// # }, |mut stream| async move {
615    /// // [1]
616    /// # for w in vec![1] {
617    /// #     assert_eq!(stream.next().await.unwrap(), w);
618    /// # }
619    /// # }));
620    /// ```
621    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
622        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
623            .map(q!(|(d, _signal)| d))
624    }
625
626    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
627    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
628    ///
629    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
630    /// the condition.
631    ///
632    /// # Example
633    /// ```rust
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// let tick = process.tick();
638    /// // ticks are lazy by default, forces the second tick to run
639    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640    ///
641    /// let batch_first_tick = process
642    ///   .source_iter(q!(vec![1]))
643    ///   .batch(&tick, nondet!(/** test */));
644    /// let batch_second_tick = process
645    ///   .source_iter(q!(vec![1, 2, 3]))
646    ///   .batch(&tick, nondet!(/** test */))
647    ///   .defer_tick(); // appears on the second tick
648    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
649    /// batch_first_tick.chain(batch_second_tick).count()
650    ///   .filter_if_none(some_on_first_tick)
651    ///   .all_ticks()
652    /// # }, |mut stream| async move {
653    /// // [3]
654    /// # for w in vec![3] {
655    /// #     assert_eq!(stream.next().await.unwrap(), w);
656    /// # }
657    /// # }));
658    /// ```
659    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
660        self.filter_if_some(
661            other
662                .map(q!(|_| ()))
663                .into_singleton()
664                .filter(q!(|o| o.is_none())),
665        )
666    }
667
668    /// An operator which allows you to "name" a `HydroNode`.
669    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
670    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
671        {
672            let mut node = self.ir_node.borrow_mut();
673            let metadata = node.metadata_mut();
674            metadata.tag = Some(name.to_string());
675        }
676        self
677    }
678}
679
680impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
681where
682    L: Location<'a> + NoTick,
683{
684    /// Returns a singleton value corresponding to the latest snapshot of the singleton
685    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
686    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
687    /// all snapshots of this singleton into the atomic-associated tick will observe the
688    /// same value each tick.
689    ///
690    /// # Non-Determinism
691    /// Because this picks a snapshot of a singleton whose value is continuously changing,
692    /// the output singleton has a non-deterministic value since the snapshot can be at an
693    /// arbitrary point in time.
694    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
695        Singleton::new(
696            self.location.clone().tick,
697            HydroNode::Batch {
698                inner: Box::new(self.ir_node.into_inner()),
699                metadata: self
700                    .location
701                    .tick
702                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
703            },
704        )
705    }
706
707    /// Returns this singleton back into a top-level, asynchronous execution context where updates
708    /// to the value will be asynchronously propagated.
709    pub fn end_atomic(self) -> Singleton<T, L, B> {
710        Singleton::new(
711            self.location.tick.l.clone(),
712            HydroNode::EndAtomic {
713                inner: Box::new(self.ir_node.into_inner()),
714                metadata: self
715                    .location
716                    .tick
717                    .l
718                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
719            },
720        )
721    }
722}
723
724impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
725where
726    L: Location<'a>,
727{
728    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
729    /// will observe the same version of the value and will be executed synchronously before any
730    /// outputs are yielded (in [`Optional::end_atomic`]).
731    ///
732    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
733    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
734    /// a different version).
735    ///
736    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
737    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
738    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
739    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
740        let out_location = Atomic { tick: tick.clone() };
741        Singleton::new(
742            out_location.clone(),
743            HydroNode::BeginAtomic {
744                inner: Box::new(self.ir_node.into_inner()),
745                metadata: out_location
746                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
747            },
748        )
749    }
750
751    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
752    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
753    /// relevant data that contributed to the snapshot at tick `t`.
754    ///
755    /// # Non-Determinism
756    /// Because this picks a snapshot of a singleton whose value is continuously changing,
757    /// the output singleton has a non-deterministic value since the snapshot can be at an
758    /// arbitrary point in time.
759    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
760        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
761        Singleton::new(
762            tick.clone(),
763            HydroNode::Batch {
764                inner: Box::new(self.ir_node.into_inner()),
765                metadata: tick
766                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
767            },
768        )
769    }
770
771    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
772    /// with order corresponding to increasing prefixes of data contributing to the singleton.
773    ///
774    /// # Non-Determinism
775    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
776    /// to non-deterministic batching and arrival of inputs, the output stream is
777    /// non-deterministic.
778    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
779    where
780        L: NoTick,
781    {
782        let tick = self.location.tick();
783        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
784    }
785
786    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
787    /// value taken at various points in time. Because the input singleton may be
788    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
789    /// represent the value of the singleton given some prefix of the streams leading up to
790    /// it.
791    ///
792    /// # Non-Determinism
793    /// The output stream is non-deterministic in which elements are sampled, since this
794    /// is controlled by a clock.
795    pub fn sample_every(
796        self,
797        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
798        nondet: NonDet,
799    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
800    where
801        L: NoTick + NoAtomic,
802    {
803        let samples = self.location.source_interval(interval, nondet);
804        let tick = self.location.tick();
805
806        self.snapshot(&tick, nondet)
807            .filter_if_some(samples.batch(&tick, nondet).first())
808            .all_ticks()
809            .weakest_retries()
810    }
811}
812
813impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
814where
815    L: Location<'a>,
816{
817    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
818    /// which will stream the value computed in _each_ tick as a separate stream element.
819    ///
820    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
821    /// producing one element in the output for each tick. This is useful for batched computations,
822    /// where the results from each tick must be combined together.
823    ///
824    /// # Example
825    /// ```rust
826    /// # use hydro_lang::prelude::*;
827    /// # use futures::StreamExt;
828    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
829    /// let tick = process.tick();
830    /// # // ticks are lazy by default, forces the second tick to run
831    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
832    /// # let batch_first_tick = process
833    /// #   .source_iter(q!(vec![1]))
834    /// #   .batch(&tick, nondet!(/** test */));
835    /// # let batch_second_tick = process
836    /// #   .source_iter(q!(vec![1, 2, 3]))
837    /// #   .batch(&tick, nondet!(/** test */))
838    /// #   .defer_tick(); // appears on the second tick
839    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
840    /// input_batch // first tick: [1], second tick: [1, 2, 3]
841    ///     .count()
842    ///     .all_ticks()
843    /// # }, |mut stream| async move {
844    /// // [1, 3]
845    /// # for w in vec![1, 3] {
846    /// #     assert_eq!(stream.next().await.unwrap(), w);
847    /// # }
848    /// # }));
849    /// ```
850    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
851        self.into_stream().all_ticks()
852    }
853
854    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
855    /// which will stream the value computed in _each_ tick as a separate stream element.
856    ///
857    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
858    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
859    /// singleton's [`Tick`] context.
860    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
861        self.into_stream().all_ticks_atomic()
862    }
863
864    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
865    /// be asynchronously updated with the latest value of the singleton inside the tick.
866    ///
867    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
868    /// tick that tracks the inner value. This is useful for getting the value as of the
869    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
870    ///
871    /// # Example
872    /// ```rust
873    /// # use hydro_lang::prelude::*;
874    /// # use futures::StreamExt;
875    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
876    /// let tick = process.tick();
877    /// # // ticks are lazy by default, forces the second tick to run
878    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
879    /// # let batch_first_tick = process
880    /// #   .source_iter(q!(vec![1]))
881    /// #   .batch(&tick, nondet!(/** test */));
882    /// # let batch_second_tick = process
883    /// #   .source_iter(q!(vec![1, 2, 3]))
884    /// #   .batch(&tick, nondet!(/** test */))
885    /// #   .defer_tick(); // appears on the second tick
886    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
887    /// input_batch // first tick: [1], second tick: [1, 2, 3]
888    ///     .count()
889    ///     .latest()
890    /// # .sample_eager(nondet!(/** test */))
891    /// # }, |mut stream| async move {
892    /// // asynchronously changes from 1 ~> 3
893    /// # for w in vec![1, 3] {
894    /// #     assert_eq!(stream.next().await.unwrap(), w);
895    /// # }
896    /// # }));
897    /// ```
898    pub fn latest(self) -> Singleton<T, L, Unbounded> {
899        Singleton::new(
900            self.location.outer().clone(),
901            HydroNode::YieldConcat {
902                inner: Box::new(self.ir_node.into_inner()),
903                metadata: self
904                    .location
905                    .outer()
906                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
907            },
908        )
909    }
910
911    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
912    /// be updated with the latest value of the singleton inside the tick.
913    ///
914    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
915    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
916    /// singleton's [`Tick`] context.
917    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
918        let out_location = Atomic {
919            tick: self.location.clone(),
920        };
921        Singleton::new(
922            out_location.clone(),
923            HydroNode::YieldConcat {
924                inner: Box::new(self.ir_node.into_inner()),
925                metadata: out_location
926                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
927            },
928        )
929    }
930
931    #[deprecated(note = "use .into_stream().persist()")]
932    #[expect(missing_docs, reason = "deprecated")]
933    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
934        Stream::new(
935            self.location.clone(),
936            HydroNode::Persist {
937                inner: Box::new(self.ir_node.into_inner()),
938                metadata: self.location.new_node_metadata(Stream::<
939                    T,
940                    Tick<L>,
941                    Bounded,
942                    TotalOrder,
943                    ExactlyOnce,
944                >::collection_kind()),
945            },
946        )
947    }
948
949    /// Converts this singleton into a [`Stream`] containing a single element, the value.
950    ///
951    /// # Example
952    /// ```rust
953    /// # use hydro_lang::prelude::*;
954    /// # use futures::StreamExt;
955    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
956    /// let tick = process.tick();
957    /// let batch_input = process
958    ///   .source_iter(q!(vec![123, 456]))
959    ///   .batch(&tick, nondet!(/** test */));
960    /// batch_input.clone().chain(
961    ///   batch_input.count().into_stream()
962    /// ).all_ticks()
963    /// # }, |mut stream| async move {
964    /// // [123, 456, 2]
965    /// # for w in vec![123, 456, 2] {
966    /// #     assert_eq!(stream.next().await.unwrap(), w);
967    /// # }
968    /// # }));
969    /// ```
970    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
971        Stream::new(
972            self.location.clone(),
973            HydroNode::Cast {
974                inner: Box::new(self.ir_node.into_inner()),
975                metadata: self.location.new_node_metadata(Stream::<
976                    T,
977                    Tick<L>,
978                    Bounded,
979                    TotalOrder,
980                    ExactlyOnce,
981                >::collection_kind()),
982            },
983        )
984    }
985}
986
987#[doc(hidden)]
988/// Helper trait that determines the output collection type for [`Singleton::zip`].
989///
990/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
991/// [`Singleton`].
992#[sealed::sealed]
993pub trait ZipResult<'a, Other> {
994    /// The output collection type.
995    type Out;
996    /// The type of the tupled output value.
997    type ElementType;
998    /// The type of the other collection's value.
999    type OtherType;
1000    /// The location where the tupled result will be materialized.
1001    type Location: Location<'a>;
1002
1003    /// The location of the second input to the `zip`.
1004    fn other_location(other: &Other) -> Self::Location;
1005    /// The IR node of the second input to the `zip`.
1006    fn other_ir_node(other: Other) -> HydroNode;
1007
1008    /// Constructs the output live collection given an IR node containing the zip result.
1009    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1010}
1011
1012#[sealed::sealed]
1013impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1014where
1015    L: Location<'a>,
1016{
1017    type Out = Singleton<(T, U), L, B>;
1018    type ElementType = (T, U);
1019    type OtherType = U;
1020    type Location = L;
1021
1022    fn other_location(other: &Singleton<U, L, B>) -> L {
1023        other.location.clone()
1024    }
1025
1026    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1027        other.ir_node.into_inner()
1028    }
1029
1030    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1031        Singleton::new(
1032            location.clone(),
1033            HydroNode::Cast {
1034                inner: Box::new(ir_node),
1035                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1036            },
1037        )
1038    }
1039}
1040
1041#[sealed::sealed]
1042impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1043where
1044    L: Location<'a>,
1045{
1046    type Out = Optional<(T, U), L, B>;
1047    type ElementType = (T, U);
1048    type OtherType = U;
1049    type Location = L;
1050
1051    fn other_location(other: &Optional<U, L, B>) -> L {
1052        other.location.clone()
1053    }
1054
1055    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1056        other.ir_node.into_inner()
1057    }
1058
1059    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1060        Optional::new(location, ir_node)
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    #[cfg(feature = "deploy")]
1067    use futures::{SinkExt, StreamExt};
1068    #[cfg(feature = "deploy")]
1069    use hydro_deploy::Deployment;
1070    use stageleft::q;
1071
1072    use crate::compile::builder::FlowBuilder;
1073    #[cfg(feature = "deploy")]
1074    use crate::live_collections::stream::ExactlyOnce;
1075    use crate::location::Location;
1076    use crate::nondet::nondet;
1077
1078    #[cfg(feature = "deploy")]
1079    #[tokio::test]
1080    async fn tick_cycle_cardinality() {
1081        let mut deployment = Deployment::new();
1082
1083        let flow = FlowBuilder::new();
1084        let node = flow.process::<()>();
1085        let external = flow.external::<()>();
1086
1087        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1088
1089        let node_tick = node.tick();
1090        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1091        let counts = singleton
1092            .clone()
1093            .into_stream()
1094            .count()
1095            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1096            .all_ticks()
1097            .send_bincode_external(&external);
1098        complete_cycle.complete_next_tick(singleton);
1099
1100        let nodes = flow
1101            .with_process(&node, deployment.Localhost())
1102            .with_external(&external, deployment.Localhost())
1103            .deploy(&mut deployment);
1104
1105        deployment.deploy().await.unwrap();
1106
1107        let mut tick_trigger = nodes.connect(input_send).await;
1108        let mut external_out = nodes.connect(counts).await;
1109
1110        deployment.start().await.unwrap();
1111
1112        tick_trigger.send(()).await.unwrap();
1113
1114        assert_eq!(external_out.next().await.unwrap(), 1);
1115
1116        tick_trigger.send(()).await.unwrap();
1117
1118        assert_eq!(external_out.next().await.unwrap(), 1);
1119    }
1120
1121    #[test]
1122    #[should_panic]
1123    fn sim_fold_intermediate_states() {
1124        let flow = FlowBuilder::new();
1125        let external = flow.external::<()>();
1126        let node = flow.process::<()>();
1127
1128        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1129        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1130
1131        let tick = node.tick();
1132        let batch = folded.snapshot(&tick, nondet!(/** test */));
1133        let out_port = batch.all_ticks().send_bincode_external(&external);
1134
1135        flow.sim().exhaustive(async |mut compiled| {
1136            let mut out_recv = compiled.connect(&out_port);
1137            compiled.launch();
1138            assert_eq!(out_recv.next().await.unwrap(), 10);
1139        });
1140    }
1141
1142    #[test]
1143    fn sim_fold_intermediate_state_count() {
1144        let flow = FlowBuilder::new();
1145        let external = flow.external::<()>();
1146        let node = flow.process::<()>();
1147
1148        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1149        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1150
1151        let tick = node.tick();
1152        let batch = folded.snapshot(&tick, nondet!(/** test */));
1153        let out_port = batch.all_ticks().send_bincode_external(&external);
1154
1155        let instance_count = flow.sim().exhaustive(async |mut compiled| {
1156            let out_recv = compiled.connect(&out_port);
1157            compiled.launch();
1158
1159            let out = out_recv.collect::<Vec<_>>().await;
1160            assert_eq!(out.last(), Some(&10));
1161        });
1162
1163        assert_eq!(
1164            instance_count,
1165            16 // 2^4 possible subsets of intermediates (including initial state)
1166        )
1167    }
1168
1169    #[test]
1170    fn sim_fold_no_repeat_initial() {
1171        // check that we don't repeat the initial state of the fold in autonomous decisions
1172
1173        let flow = FlowBuilder::new();
1174        let external = flow.external::<()>();
1175        let node = flow.process::<()>();
1176
1177        let (in_port, input) = node.source_external_bincode(&external);
1178        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1179
1180        let tick = node.tick();
1181        let batch = folded.snapshot(&tick, nondet!(/** test */));
1182        let out_port = batch.all_ticks().send_bincode_external(&external);
1183
1184        flow.sim().exhaustive(async |mut compiled| {
1185            let in_send = compiled.connect(&in_port);
1186            let mut out_recv = compiled.connect(&out_port);
1187            compiled.launch();
1188            assert_eq!(out_recv.next().await.unwrap(), 0);
1189
1190            in_send.send(123);
1191
1192            assert_eq!(out_recv.next().await.unwrap(), 123);
1193        });
1194    }
1195
1196    #[test]
1197    #[should_panic]
1198    fn sim_fold_repeats_snapshots() {
1199        // when the tick is driven by a snapshot AND something else, the snapshot can
1200        // "stutter" and repeat the same state multiple times
1201
1202        let flow = FlowBuilder::new();
1203        let external = flow.external::<()>();
1204        let node = flow.process::<()>();
1205
1206        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1207        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1208
1209        let tick = node.tick();
1210        let batch = source_iter
1211            .batch(&tick, nondet!(/** test */))
1212            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1213        let out_port = batch.all_ticks().send_bincode_external(&external);
1214
1215        flow.sim().exhaustive(async |mut compiled| {
1216            let mut out_recv = compiled.connect(&out_port);
1217            compiled.launch();
1218
1219            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1220            {
1221                panic!("repeated snapshot");
1222            }
1223        });
1224    }
1225
1226    #[test]
1227    fn sim_fold_repeats_snapshots_count() {
1228        // check the number of instances
1229        let flow = FlowBuilder::new();
1230        let external = flow.external::<()>();
1231        let node = flow.process::<()>();
1232
1233        let source_iter = node.source_iter(q!(vec![1, 2]));
1234        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1235
1236        let tick = node.tick();
1237        let batch = source_iter
1238            .batch(&tick, nondet!(/** test */))
1239            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1240        let out_port = batch.all_ticks().send_bincode_external(&external);
1241
1242        let count = flow.sim().exhaustive(async |mut compiled| {
1243            let out_recv = compiled.connect(&out_port);
1244            compiled.launch();
1245
1246            let _ = out_recv.collect::<Vec<_>>().await;
1247        });
1248
1249        assert_eq!(count, 52);
1250        // don't have a combinatorial explanation for this number yet, but checked via logs
1251    }
1252
1253    #[test]
1254    fn sim_top_level_singleton_exhaustive() {
1255        // ensures that top-level singletons have only one snapshot
1256        let flow = FlowBuilder::new();
1257        let external = flow.external::<()>();
1258        let node = flow.process::<()>();
1259
1260        let singleton = node.singleton(q!(1));
1261        let tick = node.tick();
1262        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1263        let out_port = batch.all_ticks().send_bincode_external(&external);
1264
1265        let count = flow.sim().exhaustive(async |mut compiled| {
1266            let out_recv = compiled.connect(&out_port);
1267            compiled.launch();
1268
1269            let _ = out_recv.collect::<Vec<_>>().await;
1270        });
1271
1272        assert_eq!(count, 1);
1273    }
1274
1275    #[test]
1276    fn sim_top_level_singleton_join_count() {
1277        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1278        // exploration
1279
1280        let flow = FlowBuilder::new();
1281        let external = flow.external::<()>();
1282        let node = flow.process::<()>();
1283
1284        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1285        let tick = node.tick();
1286        let batch = source_iter
1287            .batch(&tick, nondet!(/** test */))
1288            .cross_singleton(
1289                node.singleton(q!(123))
1290                    .snapshot(&tick, nondet!(/** test */)),
1291            );
1292        let out_port = batch.all_ticks().send_bincode_external(&external);
1293
1294        let instance_count = flow.sim().exhaustive(async |mut compiled| {
1295            let out_recv = compiled.connect(&out_port);
1296            compiled.launch();
1297
1298            let _ = out_recv.collect::<Vec<_>>().await;
1299        });
1300
1301        assert_eq!(
1302            instance_count,
1303            16 // 2^4 ways to split up (including a possibly empty first batch)
1304        )
1305    }
1306}