hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
14#[cfg(stageleft_runtime)]
15use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
16use crate::forward_handle::{ForwardRef, TickCycle};
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::{DynLocation, LocationId};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{Location, NoTick, Tick, check_matching_location};
21use crate::nondet::{NonDet, nondet};
22
23/// A single Rust value that can asynchronously change over time.
24///
25/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
26/// [`Unbounded`], the value will asynchronously change over time.
27///
28/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
29/// a single number that will asynchronously change as events are processed. Singletons also appear
30/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
31/// such as getting the length of a batch of requests.
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this singleton
35/// - `Loc`: the [`Location`] where the singleton is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Singleton<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
45where
46 L: Location<'a>,
47{
48 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
49 Singleton::new(singleton.location, singleton.ir_node.into_inner())
50 }
51}
52
53impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
60 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
61 location.clone(),
62 HydroNode::DeferTick {
63 input: Box::new(HydroNode::CycleSource {
64 ident,
65 metadata: location.new_node_metadata(Self::collection_kind()),
66 }),
67 metadata: location
68 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
69 },
70 );
71
72 from_previous_tick.unwrap_or(initial)
73 }
74}
75
76impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
77where
78 L: Location<'a>,
79{
80 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
81 assert_eq!(
82 Location::id(&self.location),
83 expected_location,
84 "locations do not match"
85 );
86 self.location
87 .flow_state()
88 .borrow_mut()
89 .push_root(HydroRoot::CycleSink {
90 ident,
91 input: Box::new(self.ir_node.into_inner()),
92 op_metadata: HydroIrOpMetadata::new(),
93 });
94 }
95}
96
97impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
104 Singleton::new(
105 location.clone(),
106 HydroNode::CycleSource {
107 ident,
108 metadata: location.new_node_metadata(Self::collection_kind()),
109 },
110 )
111 }
112}
113
114impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
119 assert_eq!(
120 Location::id(&self.location),
121 expected_location,
122 "locations do not match"
123 );
124 self.location
125 .flow_state()
126 .borrow_mut()
127 .push_root(HydroRoot::CycleSink {
128 ident,
129 input: Box::new(self.ir_node.into_inner()),
130 op_metadata: HydroIrOpMetadata::new(),
131 });
132 }
133}
134
135impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
136where
137 L: Location<'a> + NoTick,
138{
139 type Location = L;
140
141 fn create_source(ident: syn::Ident, location: L) -> Self {
142 Singleton::new(
143 location.clone(),
144 HydroNode::CycleSource {
145 ident,
146 metadata: location.new_node_metadata(Self::collection_kind()),
147 },
148 )
149 }
150}
151
152impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(self.ir_node.into_inner()),
168 op_metadata: HydroIrOpMetadata::new(),
169 });
170 }
171}
172
173impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
174where
175 T: Clone,
176 L: Location<'a>,
177{
178 fn clone(&self) -> Self {
179 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
180 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
181 *self.ir_node.borrow_mut() = HydroNode::Tee {
182 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
183 metadata: self.location.new_node_metadata(Self::collection_kind()),
184 };
185 }
186
187 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
188 Singleton {
189 location: self.location.clone(),
190 ir_node: HydroNode::Tee {
191 inner: TeeNode(inner.0.clone()),
192 metadata: metadata.clone(),
193 }
194 .into(),
195 _phantom: PhantomData,
196 }
197 } else {
198 unreachable!()
199 }
200 }
201}
202
203#[cfg(stageleft_runtime)]
204fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
205 me: Singleton<T, Tick<L>, B>,
206 other: O,
207) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
208where
209 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
210{
211 check_matching_location(
212 &me.location,
213 &Singleton::<T, Tick<L>, B>::other_location(&other),
214 );
215
216 Singleton::<T, Tick<L>, B>::make(
217 me.location.clone(),
218 HydroNode::CrossSingleton {
219 left: Box::new(me.ir_node.into_inner()),
220 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
221 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
222 bound: B::BOUND_KIND,
223 element_type: stageleft::quote_type::<
224 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
225 >()
226 .into(),
227 }),
228 },
229 )
230}
231
232impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
233where
234 L: Location<'a>,
235{
236 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
237 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
238 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
239 Singleton {
240 location,
241 ir_node: RefCell::new(ir_node),
242 _phantom: PhantomData,
243 }
244 }
245
246 pub(crate) fn collection_kind() -> CollectionKind {
247 CollectionKind::Singleton {
248 bound: B::BOUND_KIND,
249 element_type: stageleft::quote_type::<T>().into(),
250 }
251 }
252
253 /// Returns the [`Location`] where this singleton is being materialized.
254 pub fn location(&self) -> &L {
255 &self.location
256 }
257
258 /// Transforms the singleton value by applying a function `f` to it,
259 /// continuously as the input is updated.
260 ///
261 /// # Example
262 /// ```rust
263 /// # use hydro_lang::prelude::*;
264 /// # use futures::StreamExt;
265 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
266 /// let tick = process.tick();
267 /// let singleton = tick.singleton(q!(5));
268 /// singleton.map(q!(|v| v * 2)).all_ticks()
269 /// # }, |mut stream| async move {
270 /// // 10
271 /// # assert_eq!(stream.next().await.unwrap(), 10);
272 /// # }));
273 /// ```
274 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
275 where
276 F: Fn(T) -> U + 'a,
277 {
278 let f = f.splice_fn1_ctx(&self.location).into();
279 Singleton::new(
280 self.location.clone(),
281 HydroNode::Map {
282 f,
283 input: Box::new(self.ir_node.into_inner()),
284 metadata: self
285 .location
286 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
287 },
288 )
289 }
290
291 /// Transforms the singleton value by applying a function `f` to it and then flattening
292 /// the result into a stream, preserving the order of elements.
293 ///
294 /// The function `f` is applied to the singleton value to produce an iterator, and all items
295 /// from that iterator are emitted in the output stream in deterministic order.
296 ///
297 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
298 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
299 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
300 ///
301 /// # Example
302 /// ```rust
303 /// # use hydro_lang::prelude::*;
304 /// # use futures::StreamExt;
305 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
306 /// let tick = process.tick();
307 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
308 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
309 /// # }, |mut stream| async move {
310 /// // 1, 2, 3
311 /// # for w in vec![1, 2, 3] {
312 /// # assert_eq!(stream.next().await.unwrap(), w);
313 /// # }
314 /// # }));
315 /// ```
316 pub fn flat_map_ordered<U, I, F>(
317 self,
318 f: impl IntoQuotedMut<'a, F, L>,
319 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
320 where
321 I: IntoIterator<Item = U>,
322 F: Fn(T) -> I + 'a,
323 {
324 let f = f.splice_fn1_ctx(&self.location).into();
325 Stream::new(
326 self.location.clone(),
327 HydroNode::FlatMap {
328 f,
329 input: Box::new(self.ir_node.into_inner()),
330 metadata: self.location.new_node_metadata(
331 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
332 ),
333 },
334 )
335 }
336
337 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
338 /// for the output type `I` to produce items in any order.
339 ///
340 /// The function `f` is applied to the singleton value to produce an iterator, and all items
341 /// from that iterator are emitted in the output stream in non-deterministic order.
342 ///
343 /// # Example
344 /// ```rust
345 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
346 /// # use futures::StreamExt;
347 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
348 /// let tick = process.tick();
349 /// let singleton = tick.singleton(q!(
350 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
351 /// ));
352 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
353 /// # }, |mut stream| async move {
354 /// // 1, 2, 3, but in no particular order
355 /// # let mut results = Vec::new();
356 /// # for _ in 0..3 {
357 /// # results.push(stream.next().await.unwrap());
358 /// # }
359 /// # results.sort();
360 /// # assert_eq!(results, vec![1, 2, 3]);
361 /// # }));
362 /// ```
363 pub fn flat_map_unordered<U, I, F>(
364 self,
365 f: impl IntoQuotedMut<'a, F, L>,
366 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
367 where
368 I: IntoIterator<Item = U>,
369 F: Fn(T) -> I + 'a,
370 {
371 let f = f.splice_fn1_ctx(&self.location).into();
372 Stream::new(
373 self.location.clone(),
374 HydroNode::FlatMap {
375 f,
376 input: Box::new(self.ir_node.into_inner()),
377 metadata: self
378 .location
379 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
380 },
381 )
382 }
383
384 /// Flattens the singleton value into a stream, preserving the order of elements.
385 ///
386 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
387 /// are emitted in the output stream in deterministic order.
388 ///
389 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
390 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
391 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
392 ///
393 /// # Example
394 /// ```rust
395 /// # use hydro_lang::prelude::*;
396 /// # use futures::StreamExt;
397 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
398 /// let tick = process.tick();
399 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
400 /// singleton.flatten_ordered().all_ticks()
401 /// # }, |mut stream| async move {
402 /// // 1, 2, 3
403 /// # for w in vec![1, 2, 3] {
404 /// # assert_eq!(stream.next().await.unwrap(), w);
405 /// # }
406 /// # }));
407 /// ```
408 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
409 where
410 T: IntoIterator<Item = U>,
411 {
412 self.flat_map_ordered(q!(|x| x))
413 }
414
415 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
416 /// for the element type `T` to produce items in any order.
417 ///
418 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
419 /// are emitted in the output stream in non-deterministic order.
420 ///
421 /// # Example
422 /// ```rust
423 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
424 /// # use futures::StreamExt;
425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
426 /// let tick = process.tick();
427 /// let singleton = tick.singleton(q!(
428 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
429 /// ));
430 /// singleton.flatten_unordered().all_ticks()
431 /// # }, |mut stream| async move {
432 /// // 1, 2, 3, but in no particular order
433 /// # let mut results = Vec::new();
434 /// # for _ in 0..3 {
435 /// # results.push(stream.next().await.unwrap());
436 /// # }
437 /// # results.sort();
438 /// # assert_eq!(results, vec![1, 2, 3]);
439 /// # }));
440 /// ```
441 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
442 where
443 T: IntoIterator<Item = U>,
444 {
445 self.flat_map_unordered(q!(|x| x))
446 }
447
448 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
449 ///
450 /// If the predicate returns `true`, the output optional contains the same value.
451 /// If the predicate returns `false`, the output optional is empty.
452 ///
453 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
454 /// not modify or take ownership of the value. If you need to modify the value while filtering
455 /// use [`Singleton::filter_map`] instead.
456 ///
457 /// # Example
458 /// ```rust
459 /// # use hydro_lang::prelude::*;
460 /// # use futures::StreamExt;
461 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
462 /// let tick = process.tick();
463 /// let singleton = tick.singleton(q!(5));
464 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
465 /// # }, |mut stream| async move {
466 /// // 5
467 /// # assert_eq!(stream.next().await.unwrap(), 5);
468 /// # }));
469 /// ```
470 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
471 where
472 F: Fn(&T) -> bool + 'a,
473 {
474 let f = f.splice_fn1_borrow_ctx(&self.location).into();
475 Optional::new(
476 self.location.clone(),
477 HydroNode::Filter {
478 f,
479 input: Box::new(self.ir_node.into_inner()),
480 metadata: self
481 .location
482 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
483 },
484 )
485 }
486
487 /// An operator that both filters and maps. It yields the value only if the supplied
488 /// closure `f` returns `Some(value)`.
489 ///
490 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
491 /// If the closure returns `None`, the output optional is empty.
492 ///
493 /// # Example
494 /// ```rust
495 /// # use hydro_lang::prelude::*;
496 /// # use futures::StreamExt;
497 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
498 /// let tick = process.tick();
499 /// let singleton = tick.singleton(q!("42"));
500 /// singleton
501 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
502 /// .all_ticks()
503 /// # }, |mut stream| async move {
504 /// // 42
505 /// # assert_eq!(stream.next().await.unwrap(), 42);
506 /// # }));
507 /// ```
508 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
509 where
510 F: Fn(T) -> Option<U> + 'a,
511 {
512 let f = f.splice_fn1_ctx(&self.location).into();
513 Optional::new(
514 self.location.clone(),
515 HydroNode::FilterMap {
516 f,
517 input: Box::new(self.ir_node.into_inner()),
518 metadata: self
519 .location
520 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
521 },
522 )
523 }
524
525 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
526 ///
527 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
528 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
529 /// non-null. This is useful for combining several pieces of state together.
530 ///
531 /// # Example
532 /// ```rust
533 /// # use hydro_lang::prelude::*;
534 /// # use futures::StreamExt;
535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536 /// let tick = process.tick();
537 /// let numbers = process
538 /// .source_iter(q!(vec![123, 456]))
539 /// .batch(&tick, nondet!(/** test */));
540 /// let count = numbers.clone().count(); // Singleton
541 /// let max = numbers.max(); // Optional
542 /// count.zip(max).all_ticks()
543 /// # }, |mut stream| async move {
544 /// // [(2, 456)]
545 /// # for w in vec![(2, 456)] {
546 /// # assert_eq!(stream.next().await.unwrap(), w);
547 /// # }
548 /// # }));
549 /// ```
550 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551 where
552 Self: ZipResult<'a, O, Location = L>,
553 {
554 check_matching_location(&self.location, &Self::other_location(&other));
555
556 if L::is_top_level()
557 && let Some(tick) = self.location.try_tick()
558 {
559 let out = zip_inside_tick(
560 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
561 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
562 Self::other_location(&other),
563 Self::other_ir_node(other),
564 )
565 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
566 )
567 .latest();
568
569 Self::make(out.location, out.ir_node.into_inner())
570 } else {
571 Self::make(
572 self.location.clone(),
573 HydroNode::CrossSingleton {
574 left: Box::new(self.ir_node.into_inner()),
575 right: Box::new(Self::other_ir_node(other)),
576 metadata: self.location.new_node_metadata(CollectionKind::Optional {
577 bound: B::BOUND_KIND,
578 element_type: stageleft::quote_type::<
579 <Self as ZipResult<'a, O>>::ElementType,
580 >()
581 .into(),
582 }),
583 },
584 )
585 }
586 }
587
588 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
589 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
590 ///
591 /// Useful for conditionally processing, such as only emitting a singleton's value outside
592 /// a tick if some other condition is satisfied.
593 ///
594 /// # Example
595 /// ```rust
596 /// # use hydro_lang::prelude::*;
597 /// # use futures::StreamExt;
598 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
599 /// let tick = process.tick();
600 /// // ticks are lazy by default, forces the second tick to run
601 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
602 ///
603 /// let batch_first_tick = process
604 /// .source_iter(q!(vec![1]))
605 /// .batch(&tick, nondet!(/** test */));
606 /// let batch_second_tick = process
607 /// .source_iter(q!(vec![1, 2, 3]))
608 /// .batch(&tick, nondet!(/** test */))
609 /// .defer_tick(); // appears on the second tick
610 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
611 /// batch_first_tick.chain(batch_second_tick).count()
612 /// .filter_if_some(some_on_first_tick)
613 /// .all_ticks()
614 /// # }, |mut stream| async move {
615 /// // [1]
616 /// # for w in vec![1] {
617 /// # assert_eq!(stream.next().await.unwrap(), w);
618 /// # }
619 /// # }));
620 /// ```
621 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
622 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
623 .map(q!(|(d, _signal)| d))
624 }
625
626 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
627 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
628 ///
629 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
630 /// the condition.
631 ///
632 /// # Example
633 /// ```rust
634 /// # use hydro_lang::prelude::*;
635 /// # use futures::StreamExt;
636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637 /// let tick = process.tick();
638 /// // ticks are lazy by default, forces the second tick to run
639 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640 ///
641 /// let batch_first_tick = process
642 /// .source_iter(q!(vec![1]))
643 /// .batch(&tick, nondet!(/** test */));
644 /// let batch_second_tick = process
645 /// .source_iter(q!(vec![1, 2, 3]))
646 /// .batch(&tick, nondet!(/** test */))
647 /// .defer_tick(); // appears on the second tick
648 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
649 /// batch_first_tick.chain(batch_second_tick).count()
650 /// .filter_if_none(some_on_first_tick)
651 /// .all_ticks()
652 /// # }, |mut stream| async move {
653 /// // [3]
654 /// # for w in vec![3] {
655 /// # assert_eq!(stream.next().await.unwrap(), w);
656 /// # }
657 /// # }));
658 /// ```
659 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
660 self.filter_if_some(
661 other
662 .map(q!(|_| ()))
663 .into_singleton()
664 .filter(q!(|o| o.is_none())),
665 )
666 }
667
668 /// An operator which allows you to "name" a `HydroNode`.
669 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
670 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
671 {
672 let mut node = self.ir_node.borrow_mut();
673 let metadata = node.metadata_mut();
674 metadata.tag = Some(name.to_string());
675 }
676 self
677 }
678}
679
680impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
681where
682 L: Location<'a> + NoTick,
683{
684 /// Returns a singleton value corresponding to the latest snapshot of the singleton
685 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
686 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
687 /// all snapshots of this singleton into the atomic-associated tick will observe the
688 /// same value each tick.
689 ///
690 /// # Non-Determinism
691 /// Because this picks a snapshot of a singleton whose value is continuously changing,
692 /// the output singleton has a non-deterministic value since the snapshot can be at an
693 /// arbitrary point in time.
694 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
695 Singleton::new(
696 self.location.clone().tick,
697 HydroNode::Batch {
698 inner: Box::new(self.ir_node.into_inner()),
699 metadata: self
700 .location
701 .tick
702 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
703 },
704 )
705 }
706
707 /// Returns this singleton back into a top-level, asynchronous execution context where updates
708 /// to the value will be asynchronously propagated.
709 pub fn end_atomic(self) -> Singleton<T, L, B> {
710 Singleton::new(
711 self.location.tick.l.clone(),
712 HydroNode::EndAtomic {
713 inner: Box::new(self.ir_node.into_inner()),
714 metadata: self
715 .location
716 .tick
717 .l
718 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
719 },
720 )
721 }
722}
723
724impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
725where
726 L: Location<'a>,
727{
728 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
729 /// will observe the same version of the value and will be executed synchronously before any
730 /// outputs are yielded (in [`Optional::end_atomic`]).
731 ///
732 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
733 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
734 /// a different version).
735 ///
736 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
737 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
738 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
739 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
740 let out_location = Atomic { tick: tick.clone() };
741 Singleton::new(
742 out_location.clone(),
743 HydroNode::BeginAtomic {
744 inner: Box::new(self.ir_node.into_inner()),
745 metadata: out_location
746 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
747 },
748 )
749 }
750
751 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
752 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
753 /// relevant data that contributed to the snapshot at tick `t`.
754 ///
755 /// # Non-Determinism
756 /// Because this picks a snapshot of a singleton whose value is continuously changing,
757 /// the output singleton has a non-deterministic value since the snapshot can be at an
758 /// arbitrary point in time.
759 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
760 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
761 Singleton::new(
762 tick.clone(),
763 HydroNode::Batch {
764 inner: Box::new(self.ir_node.into_inner()),
765 metadata: tick
766 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
767 },
768 )
769 }
770
771 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
772 /// with order corresponding to increasing prefixes of data contributing to the singleton.
773 ///
774 /// # Non-Determinism
775 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
776 /// to non-deterministic batching and arrival of inputs, the output stream is
777 /// non-deterministic.
778 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
779 where
780 L: NoTick,
781 {
782 let tick = self.location.tick();
783 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
784 }
785
786 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
787 /// value taken at various points in time. Because the input singleton may be
788 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
789 /// represent the value of the singleton given some prefix of the streams leading up to
790 /// it.
791 ///
792 /// # Non-Determinism
793 /// The output stream is non-deterministic in which elements are sampled, since this
794 /// is controlled by a clock.
795 pub fn sample_every(
796 self,
797 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
798 nondet: NonDet,
799 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
800 where
801 L: NoTick + NoAtomic,
802 {
803 let samples = self.location.source_interval(interval, nondet);
804 let tick = self.location.tick();
805
806 self.snapshot(&tick, nondet)
807 .filter_if_some(samples.batch(&tick, nondet).first())
808 .all_ticks()
809 .weakest_retries()
810 }
811}
812
813impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
814where
815 L: Location<'a>,
816{
817 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
818 /// which will stream the value computed in _each_ tick as a separate stream element.
819 ///
820 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
821 /// producing one element in the output for each tick. This is useful for batched computations,
822 /// where the results from each tick must be combined together.
823 ///
824 /// # Example
825 /// ```rust
826 /// # use hydro_lang::prelude::*;
827 /// # use futures::StreamExt;
828 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
829 /// let tick = process.tick();
830 /// # // ticks are lazy by default, forces the second tick to run
831 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
832 /// # let batch_first_tick = process
833 /// # .source_iter(q!(vec![1]))
834 /// # .batch(&tick, nondet!(/** test */));
835 /// # let batch_second_tick = process
836 /// # .source_iter(q!(vec![1, 2, 3]))
837 /// # .batch(&tick, nondet!(/** test */))
838 /// # .defer_tick(); // appears on the second tick
839 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
840 /// input_batch // first tick: [1], second tick: [1, 2, 3]
841 /// .count()
842 /// .all_ticks()
843 /// # }, |mut stream| async move {
844 /// // [1, 3]
845 /// # for w in vec![1, 3] {
846 /// # assert_eq!(stream.next().await.unwrap(), w);
847 /// # }
848 /// # }));
849 /// ```
850 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
851 self.into_stream().all_ticks()
852 }
853
854 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
855 /// which will stream the value computed in _each_ tick as a separate stream element.
856 ///
857 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
858 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
859 /// singleton's [`Tick`] context.
860 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
861 self.into_stream().all_ticks_atomic()
862 }
863
864 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
865 /// be asynchronously updated with the latest value of the singleton inside the tick.
866 ///
867 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
868 /// tick that tracks the inner value. This is useful for getting the value as of the
869 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
870 ///
871 /// # Example
872 /// ```rust
873 /// # use hydro_lang::prelude::*;
874 /// # use futures::StreamExt;
875 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
876 /// let tick = process.tick();
877 /// # // ticks are lazy by default, forces the second tick to run
878 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
879 /// # let batch_first_tick = process
880 /// # .source_iter(q!(vec![1]))
881 /// # .batch(&tick, nondet!(/** test */));
882 /// # let batch_second_tick = process
883 /// # .source_iter(q!(vec![1, 2, 3]))
884 /// # .batch(&tick, nondet!(/** test */))
885 /// # .defer_tick(); // appears on the second tick
886 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
887 /// input_batch // first tick: [1], second tick: [1, 2, 3]
888 /// .count()
889 /// .latest()
890 /// # .sample_eager(nondet!(/** test */))
891 /// # }, |mut stream| async move {
892 /// // asynchronously changes from 1 ~> 3
893 /// # for w in vec![1, 3] {
894 /// # assert_eq!(stream.next().await.unwrap(), w);
895 /// # }
896 /// # }));
897 /// ```
898 pub fn latest(self) -> Singleton<T, L, Unbounded> {
899 Singleton::new(
900 self.location.outer().clone(),
901 HydroNode::YieldConcat {
902 inner: Box::new(self.ir_node.into_inner()),
903 metadata: self
904 .location
905 .outer()
906 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
907 },
908 )
909 }
910
911 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
912 /// be updated with the latest value of the singleton inside the tick.
913 ///
914 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
915 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
916 /// singleton's [`Tick`] context.
917 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
918 let out_location = Atomic {
919 tick: self.location.clone(),
920 };
921 Singleton::new(
922 out_location.clone(),
923 HydroNode::YieldConcat {
924 inner: Box::new(self.ir_node.into_inner()),
925 metadata: out_location
926 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
927 },
928 )
929 }
930
931 #[deprecated(note = "use .into_stream().persist()")]
932 #[expect(missing_docs, reason = "deprecated")]
933 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
934 Stream::new(
935 self.location.clone(),
936 HydroNode::Persist {
937 inner: Box::new(self.ir_node.into_inner()),
938 metadata: self.location.new_node_metadata(Stream::<
939 T,
940 Tick<L>,
941 Bounded,
942 TotalOrder,
943 ExactlyOnce,
944 >::collection_kind()),
945 },
946 )
947 }
948
949 /// Converts this singleton into a [`Stream`] containing a single element, the value.
950 ///
951 /// # Example
952 /// ```rust
953 /// # use hydro_lang::prelude::*;
954 /// # use futures::StreamExt;
955 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
956 /// let tick = process.tick();
957 /// let batch_input = process
958 /// .source_iter(q!(vec![123, 456]))
959 /// .batch(&tick, nondet!(/** test */));
960 /// batch_input.clone().chain(
961 /// batch_input.count().into_stream()
962 /// ).all_ticks()
963 /// # }, |mut stream| async move {
964 /// // [123, 456, 2]
965 /// # for w in vec![123, 456, 2] {
966 /// # assert_eq!(stream.next().await.unwrap(), w);
967 /// # }
968 /// # }));
969 /// ```
970 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
971 Stream::new(
972 self.location.clone(),
973 HydroNode::Cast {
974 inner: Box::new(self.ir_node.into_inner()),
975 metadata: self.location.new_node_metadata(Stream::<
976 T,
977 Tick<L>,
978 Bounded,
979 TotalOrder,
980 ExactlyOnce,
981 >::collection_kind()),
982 },
983 )
984 }
985}
986
987#[doc(hidden)]
988/// Helper trait that determines the output collection type for [`Singleton::zip`].
989///
990/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
991/// [`Singleton`].
992#[sealed::sealed]
993pub trait ZipResult<'a, Other> {
994 /// The output collection type.
995 type Out;
996 /// The type of the tupled output value.
997 type ElementType;
998 /// The type of the other collection's value.
999 type OtherType;
1000 /// The location where the tupled result will be materialized.
1001 type Location: Location<'a>;
1002
1003 /// The location of the second input to the `zip`.
1004 fn other_location(other: &Other) -> Self::Location;
1005 /// The IR node of the second input to the `zip`.
1006 fn other_ir_node(other: Other) -> HydroNode;
1007
1008 /// Constructs the output live collection given an IR node containing the zip result.
1009 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1010}
1011
1012#[sealed::sealed]
1013impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1014where
1015 L: Location<'a>,
1016{
1017 type Out = Singleton<(T, U), L, B>;
1018 type ElementType = (T, U);
1019 type OtherType = U;
1020 type Location = L;
1021
1022 fn other_location(other: &Singleton<U, L, B>) -> L {
1023 other.location.clone()
1024 }
1025
1026 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1027 other.ir_node.into_inner()
1028 }
1029
1030 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1031 Singleton::new(
1032 location.clone(),
1033 HydroNode::Cast {
1034 inner: Box::new(ir_node),
1035 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1036 },
1037 )
1038 }
1039}
1040
1041#[sealed::sealed]
1042impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1043where
1044 L: Location<'a>,
1045{
1046 type Out = Optional<(T, U), L, B>;
1047 type ElementType = (T, U);
1048 type OtherType = U;
1049 type Location = L;
1050
1051 fn other_location(other: &Optional<U, L, B>) -> L {
1052 other.location.clone()
1053 }
1054
1055 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1056 other.ir_node.into_inner()
1057 }
1058
1059 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1060 Optional::new(location, ir_node)
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 #[cfg(feature = "deploy")]
1067 use futures::{SinkExt, StreamExt};
1068 #[cfg(feature = "deploy")]
1069 use hydro_deploy::Deployment;
1070 use stageleft::q;
1071
1072 use crate::compile::builder::FlowBuilder;
1073 #[cfg(feature = "deploy")]
1074 use crate::live_collections::stream::ExactlyOnce;
1075 use crate::location::Location;
1076 use crate::nondet::nondet;
1077
1078 #[cfg(feature = "deploy")]
1079 #[tokio::test]
1080 async fn tick_cycle_cardinality() {
1081 let mut deployment = Deployment::new();
1082
1083 let flow = FlowBuilder::new();
1084 let node = flow.process::<()>();
1085 let external = flow.external::<()>();
1086
1087 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1088
1089 let node_tick = node.tick();
1090 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1091 let counts = singleton
1092 .clone()
1093 .into_stream()
1094 .count()
1095 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1096 .all_ticks()
1097 .send_bincode_external(&external);
1098 complete_cycle.complete_next_tick(singleton);
1099
1100 let nodes = flow
1101 .with_process(&node, deployment.Localhost())
1102 .with_external(&external, deployment.Localhost())
1103 .deploy(&mut deployment);
1104
1105 deployment.deploy().await.unwrap();
1106
1107 let mut tick_trigger = nodes.connect(input_send).await;
1108 let mut external_out = nodes.connect(counts).await;
1109
1110 deployment.start().await.unwrap();
1111
1112 tick_trigger.send(()).await.unwrap();
1113
1114 assert_eq!(external_out.next().await.unwrap(), 1);
1115
1116 tick_trigger.send(()).await.unwrap();
1117
1118 assert_eq!(external_out.next().await.unwrap(), 1);
1119 }
1120
1121 #[test]
1122 #[should_panic]
1123 fn sim_fold_intermediate_states() {
1124 let flow = FlowBuilder::new();
1125 let external = flow.external::<()>();
1126 let node = flow.process::<()>();
1127
1128 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1129 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1130
1131 let tick = node.tick();
1132 let batch = folded.snapshot(&tick, nondet!(/** test */));
1133 let out_port = batch.all_ticks().send_bincode_external(&external);
1134
1135 flow.sim().exhaustive(async |mut compiled| {
1136 let mut out_recv = compiled.connect(&out_port);
1137 compiled.launch();
1138 assert_eq!(out_recv.next().await.unwrap(), 10);
1139 });
1140 }
1141
1142 #[test]
1143 fn sim_fold_intermediate_state_count() {
1144 let flow = FlowBuilder::new();
1145 let external = flow.external::<()>();
1146 let node = flow.process::<()>();
1147
1148 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1149 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1150
1151 let tick = node.tick();
1152 let batch = folded.snapshot(&tick, nondet!(/** test */));
1153 let out_port = batch.all_ticks().send_bincode_external(&external);
1154
1155 let instance_count = flow.sim().exhaustive(async |mut compiled| {
1156 let out_recv = compiled.connect(&out_port);
1157 compiled.launch();
1158
1159 let out = out_recv.collect::<Vec<_>>().await;
1160 assert_eq!(out.last(), Some(&10));
1161 });
1162
1163 assert_eq!(
1164 instance_count,
1165 16 // 2^4 possible subsets of intermediates (including initial state)
1166 )
1167 }
1168
1169 #[test]
1170 fn sim_fold_no_repeat_initial() {
1171 // check that we don't repeat the initial state of the fold in autonomous decisions
1172
1173 let flow = FlowBuilder::new();
1174 let external = flow.external::<()>();
1175 let node = flow.process::<()>();
1176
1177 let (in_port, input) = node.source_external_bincode(&external);
1178 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1179
1180 let tick = node.tick();
1181 let batch = folded.snapshot(&tick, nondet!(/** test */));
1182 let out_port = batch.all_ticks().send_bincode_external(&external);
1183
1184 flow.sim().exhaustive(async |mut compiled| {
1185 let in_send = compiled.connect(&in_port);
1186 let mut out_recv = compiled.connect(&out_port);
1187 compiled.launch();
1188 assert_eq!(out_recv.next().await.unwrap(), 0);
1189
1190 in_send.send(123);
1191
1192 assert_eq!(out_recv.next().await.unwrap(), 123);
1193 });
1194 }
1195
1196 #[test]
1197 #[should_panic]
1198 fn sim_fold_repeats_snapshots() {
1199 // when the tick is driven by a snapshot AND something else, the snapshot can
1200 // "stutter" and repeat the same state multiple times
1201
1202 let flow = FlowBuilder::new();
1203 let external = flow.external::<()>();
1204 let node = flow.process::<()>();
1205
1206 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1207 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1208
1209 let tick = node.tick();
1210 let batch = source_iter
1211 .batch(&tick, nondet!(/** test */))
1212 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1213 let out_port = batch.all_ticks().send_bincode_external(&external);
1214
1215 flow.sim().exhaustive(async |mut compiled| {
1216 let mut out_recv = compiled.connect(&out_port);
1217 compiled.launch();
1218
1219 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1220 {
1221 panic!("repeated snapshot");
1222 }
1223 });
1224 }
1225
1226 #[test]
1227 fn sim_fold_repeats_snapshots_count() {
1228 // check the number of instances
1229 let flow = FlowBuilder::new();
1230 let external = flow.external::<()>();
1231 let node = flow.process::<()>();
1232
1233 let source_iter = node.source_iter(q!(vec![1, 2]));
1234 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1235
1236 let tick = node.tick();
1237 let batch = source_iter
1238 .batch(&tick, nondet!(/** test */))
1239 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1240 let out_port = batch.all_ticks().send_bincode_external(&external);
1241
1242 let count = flow.sim().exhaustive(async |mut compiled| {
1243 let out_recv = compiled.connect(&out_port);
1244 compiled.launch();
1245
1246 let _ = out_recv.collect::<Vec<_>>().await;
1247 });
1248
1249 assert_eq!(count, 52);
1250 // don't have a combinatorial explanation for this number yet, but checked via logs
1251 }
1252
1253 #[test]
1254 fn sim_top_level_singleton_exhaustive() {
1255 // ensures that top-level singletons have only one snapshot
1256 let flow = FlowBuilder::new();
1257 let external = flow.external::<()>();
1258 let node = flow.process::<()>();
1259
1260 let singleton = node.singleton(q!(1));
1261 let tick = node.tick();
1262 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1263 let out_port = batch.all_ticks().send_bincode_external(&external);
1264
1265 let count = flow.sim().exhaustive(async |mut compiled| {
1266 let out_recv = compiled.connect(&out_port);
1267 compiled.launch();
1268
1269 let _ = out_recv.collect::<Vec<_>>().await;
1270 });
1271
1272 assert_eq!(count, 1);
1273 }
1274
1275 #[test]
1276 fn sim_top_level_singleton_join_count() {
1277 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1278 // exploration
1279
1280 let flow = FlowBuilder::new();
1281 let external = flow.external::<()>();
1282 let node = flow.process::<()>();
1283
1284 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1285 let tick = node.tick();
1286 let batch = source_iter
1287 .batch(&tick, nondet!(/** test */))
1288 .cross_singleton(
1289 node.singleton(q!(123))
1290 .snapshot(&tick, nondet!(/** test */)),
1291 );
1292 let out_port = batch.all_ticks().send_bincode_external(&external);
1293
1294 let instance_count = flow.sim().exhaustive(async |mut compiled| {
1295 let out_recv = compiled.connect(&out_port);
1296 compiled.launch();
1297
1298 let _ = out_recv.collect::<Vec<_>>().await;
1299 });
1300
1301 assert_eq!(
1302 instance_count,
1303 16 // 2^4 ways to split up (including a possibly empty first batch)
1304 )
1305 }
1306}