hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{
15 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode,
16};
17#[cfg(stageleft_runtime)]
18use crate::forward_handle::{CycleCollection, ReceiverComplete};
19use crate::forward_handle::{ForwardRef, TickCycle};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48 L: Location<'a>,
49{
50 fn defer_tick(self) -> Self {
51 Optional::defer_tick(self)
52 }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
56where
57 L: Location<'a>,
58{
59 type Location = Tick<L>;
60
61 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62 Optional::new(
63 location.clone(),
64 HydroNode::CycleSource {
65 ident,
66 metadata: location.new_node_metadata(Self::collection_kind()),
67 },
68 )
69 }
70}
71
72impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
73where
74 L: Location<'a>,
75{
76 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77 assert_eq!(
78 Location::id(&self.location),
79 expected_location,
80 "locations do not match"
81 );
82 self.location
83 .flow_state()
84 .borrow_mut()
85 .push_root(HydroRoot::CycleSink {
86 ident,
87 input: Box::new(self.ir_node.into_inner()),
88 op_metadata: HydroIrOpMetadata::new(),
89 });
90 }
91}
92
93impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 type Location = Tick<L>;
98
99 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
100 Optional::new(
101 location.clone(),
102 HydroNode::CycleSource {
103 ident,
104 metadata: location.new_node_metadata(Self::collection_kind()),
105 },
106 )
107 }
108}
109
110impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
111where
112 L: Location<'a>,
113{
114 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
115 assert_eq!(
116 Location::id(&self.location),
117 expected_location,
118 "locations do not match"
119 );
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133 L: Location<'a> + NoTick,
134{
135 type Location = L;
136
137 fn create_source(ident: syn::Ident, location: L) -> Self {
138 Optional::new(
139 location.clone(),
140 HydroNode::CycleSource {
141 ident,
142 metadata: location.new_node_metadata(Self::collection_kind()),
143 },
144 )
145 }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150 L: Location<'a> + NoTick,
151{
152 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153 assert_eq!(
154 Location::id(&self.location),
155 expected_location,
156 "locations do not match"
157 );
158 self.location
159 .flow_state()
160 .borrow_mut()
161 .push_root(HydroRoot::CycleSink {
162 ident,
163 input: Box::new(self.ir_node.into_inner()),
164 op_metadata: HydroIrOpMetadata::new(),
165 });
166 }
167}
168
169impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
170where
171 L: Location<'a>,
172{
173 fn from(singleton: Optional<T, L, Bounded>) -> Self {
174 Optional::new(singleton.location, singleton.ir_node.into_inner())
175 }
176}
177
178impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
179where
180 L: Location<'a>,
181{
182 fn from(singleton: Singleton<T, L, B>) -> Self {
183 Optional::new(
184 singleton.location.clone(),
185 HydroNode::Cast {
186 inner: Box::new(singleton.ir_node.into_inner()),
187 metadata: singleton
188 .location
189 .new_node_metadata(Self::collection_kind()),
190 },
191 )
192 }
193}
194
195#[cfg(stageleft_runtime)]
196fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
197 me: Optional<T, L, B>,
198 other: Optional<O, L, B>,
199) -> Optional<(T, O), L, B> {
200 check_matching_location(&me.location, &other.location);
201
202 Optional::new(
203 me.location.clone(),
204 HydroNode::CrossSingleton {
205 left: Box::new(me.ir_node.into_inner()),
206 right: Box::new(other.ir_node.into_inner()),
207 metadata: me
208 .location
209 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
210 },
211 )
212}
213
214#[cfg(stageleft_runtime)]
215fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
216 me: Optional<T, L, B>,
217 other: Optional<T, L, B>,
218) -> Optional<T, L, B> {
219 check_matching_location(&me.location, &other.location);
220
221 Optional::new(
222 me.location.clone(),
223 HydroNode::ChainFirst {
224 first: Box::new(me.ir_node.into_inner()),
225 second: Box::new(other.ir_node.into_inner()),
226 metadata: me
227 .location
228 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
229 },
230 )
231}
232
233impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
234where
235 T: Clone,
236 L: Location<'a>,
237{
238 fn clone(&self) -> Self {
239 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
240 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
241 *self.ir_node.borrow_mut() = HydroNode::Tee {
242 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
243 metadata: self.location.new_node_metadata(Self::collection_kind()),
244 };
245 }
246
247 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
248 Optional {
249 location: self.location.clone(),
250 ir_node: HydroNode::Tee {
251 inner: TeeNode(inner.0.clone()),
252 metadata: metadata.clone(),
253 }
254 .into(),
255 _phantom: PhantomData,
256 }
257 } else {
258 unreachable!()
259 }
260 }
261}
262
263impl<'a, T, L, B: Boundedness> Optional<T, L, B>
264where
265 L: Location<'a>,
266{
267 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
268 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
269 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
270 Optional {
271 location,
272 ir_node: RefCell::new(ir_node),
273 _phantom: PhantomData,
274 }
275 }
276
277 pub(crate) fn collection_kind() -> CollectionKind {
278 CollectionKind::Optional {
279 bound: B::BOUND_KIND,
280 element_type: stageleft::quote_type::<T>().into(),
281 }
282 }
283
284 /// Returns the [`Location`] where this optional is being materialized.
285 pub fn location(&self) -> &L {
286 &self.location
287 }
288
289 /// Transforms the optional value by applying a function `f` to it,
290 /// continuously as the input is updated.
291 ///
292 /// Whenever the optional is empty, the output optional is also empty.
293 ///
294 /// # Example
295 /// ```rust
296 /// # use hydro_lang::prelude::*;
297 /// # use futures::StreamExt;
298 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
299 /// let tick = process.tick();
300 /// let optional = tick.optional_first_tick(q!(1));
301 /// optional.map(q!(|v| v + 1)).all_ticks()
302 /// # }, |mut stream| async move {
303 /// // 2
304 /// # assert_eq!(stream.next().await.unwrap(), 2);
305 /// # }));
306 /// ```
307 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308 where
309 F: Fn(T) -> U + 'a,
310 {
311 let f = f.splice_fn1_ctx(&self.location).into();
312 Optional::new(
313 self.location.clone(),
314 HydroNode::Map {
315 f,
316 input: Box::new(self.ir_node.into_inner()),
317 metadata: self
318 .location
319 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320 },
321 )
322 }
323
324 /// Transforms the optional value by applying a function `f` to it and then flattening
325 /// the result into a stream, preserving the order of elements.
326 ///
327 /// If the optional is empty, the output stream is also empty. If the optional contains
328 /// a value, `f` is applied to produce an iterator, and all items from that iterator
329 /// are emitted in the output stream in deterministic order.
330 ///
331 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334 ///
335 /// # Example
336 /// ```rust
337 /// # use hydro_lang::prelude::*;
338 /// # use futures::StreamExt;
339 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340 /// let tick = process.tick();
341 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
342 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
343 /// # }, |mut stream| async move {
344 /// // 1, 2, 3
345 /// # for w in vec![1, 2, 3] {
346 /// # assert_eq!(stream.next().await.unwrap(), w);
347 /// # }
348 /// # }));
349 /// ```
350 pub fn flat_map_ordered<U, I, F>(
351 self,
352 f: impl IntoQuotedMut<'a, F, L>,
353 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
354 where
355 I: IntoIterator<Item = U>,
356 F: Fn(T) -> I + 'a,
357 {
358 let f = f.splice_fn1_ctx(&self.location).into();
359 Stream::new(
360 self.location.clone(),
361 HydroNode::FlatMap {
362 f,
363 input: Box::new(self.ir_node.into_inner()),
364 metadata: self.location.new_node_metadata(
365 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
366 ),
367 },
368 )
369 }
370
371 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
372 /// for the output type `I` to produce items in any order.
373 ///
374 /// If the optional is empty, the output stream is also empty. If the optional contains
375 /// a value, `f` is applied to produce an iterator, and all items from that iterator
376 /// are emitted in the output stream in non-deterministic order.
377 ///
378 /// # Example
379 /// ```rust
380 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
381 /// # use futures::StreamExt;
382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
383 /// let tick = process.tick();
384 /// let optional = tick.optional_first_tick(q!(
385 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
386 /// ));
387 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
388 /// # }, |mut stream| async move {
389 /// // 1, 2, 3, but in no particular order
390 /// # let mut results = Vec::new();
391 /// # for _ in 0..3 {
392 /// # results.push(stream.next().await.unwrap());
393 /// # }
394 /// # results.sort();
395 /// # assert_eq!(results, vec![1, 2, 3]);
396 /// # }));
397 /// ```
398 pub fn flat_map_unordered<U, I, F>(
399 self,
400 f: impl IntoQuotedMut<'a, F, L>,
401 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
402 where
403 I: IntoIterator<Item = U>,
404 F: Fn(T) -> I + 'a,
405 {
406 let f = f.splice_fn1_ctx(&self.location).into();
407 Stream::new(
408 self.location.clone(),
409 HydroNode::FlatMap {
410 f,
411 input: Box::new(self.ir_node.into_inner()),
412 metadata: self
413 .location
414 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
415 },
416 )
417 }
418
419 /// Flattens the optional value into a stream, preserving the order of elements.
420 ///
421 /// If the optional is empty, the output stream is also empty. If the optional contains
422 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
423 /// in the output stream in deterministic order.
424 ///
425 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
426 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
427 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
428 ///
429 /// # Example
430 /// ```rust
431 /// # use hydro_lang::prelude::*;
432 /// # use futures::StreamExt;
433 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434 /// let tick = process.tick();
435 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
436 /// optional.flatten_ordered().all_ticks()
437 /// # }, |mut stream| async move {
438 /// // 1, 2, 3
439 /// # for w in vec![1, 2, 3] {
440 /// # assert_eq!(stream.next().await.unwrap(), w);
441 /// # }
442 /// # }));
443 /// ```
444 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
445 where
446 T: IntoIterator<Item = U>,
447 {
448 self.flat_map_ordered(q!(|v| v))
449 }
450
451 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
452 /// for the element type `T` to produce items in any order.
453 ///
454 /// If the optional is empty, the output stream is also empty. If the optional contains
455 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
456 /// in the output stream in non-deterministic order.
457 ///
458 /// # Example
459 /// ```rust
460 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
461 /// # use futures::StreamExt;
462 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
463 /// let tick = process.tick();
464 /// let optional = tick.optional_first_tick(q!(
465 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
466 /// ));
467 /// optional.flatten_unordered().all_ticks()
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3, but in no particular order
470 /// # let mut results = Vec::new();
471 /// # for _ in 0..3 {
472 /// # results.push(stream.next().await.unwrap());
473 /// # }
474 /// # results.sort();
475 /// # assert_eq!(results, vec![1, 2, 3]);
476 /// # }));
477 /// ```
478 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
479 where
480 T: IntoIterator<Item = U>,
481 {
482 self.flat_map_unordered(q!(|v| v))
483 }
484
485 /// Creates an optional containing only the value if it satisfies a predicate `f`.
486 ///
487 /// If the optional is empty, the output optional is also empty. If the optional contains
488 /// a value and the predicate returns `true`, the output optional contains the same value.
489 /// If the predicate returns `false`, the output optional is empty.
490 ///
491 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
492 /// not modify or take ownership of the value. If you need to modify the value while filtering
493 /// use [`Optional::filter_map`] instead.
494 ///
495 /// # Example
496 /// ```rust
497 /// # use hydro_lang::prelude::*;
498 /// # use futures::StreamExt;
499 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
500 /// let tick = process.tick();
501 /// let optional = tick.optional_first_tick(q!(5));
502 /// optional.filter(q!(|&x| x > 3)).all_ticks()
503 /// # }, |mut stream| async move {
504 /// // 5
505 /// # assert_eq!(stream.next().await.unwrap(), 5);
506 /// # }));
507 /// ```
508 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
509 where
510 F: Fn(&T) -> bool + 'a,
511 {
512 let f = f.splice_fn1_borrow_ctx(&self.location).into();
513 Optional::new(
514 self.location.clone(),
515 HydroNode::Filter {
516 f,
517 input: Box::new(self.ir_node.into_inner()),
518 metadata: self.location.new_node_metadata(Self::collection_kind()),
519 },
520 )
521 }
522
523 /// An operator that both filters and maps. It yields only the value if the supplied
524 /// closure `f` returns `Some(value)`.
525 ///
526 /// If the optional is empty, the output optional is also empty. If the optional contains
527 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
528 /// If the closure returns `None`, the output optional is empty.
529 ///
530 /// # Example
531 /// ```rust
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let tick = process.tick();
536 /// let optional = tick.optional_first_tick(q!("42"));
537 /// optional
538 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
539 /// .all_ticks()
540 /// # }, |mut stream| async move {
541 /// // 42
542 /// # assert_eq!(stream.next().await.unwrap(), 42);
543 /// # }));
544 /// ```
545 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
546 where
547 F: Fn(T) -> Option<U> + 'a,
548 {
549 let f = f.splice_fn1_ctx(&self.location).into();
550 Optional::new(
551 self.location.clone(),
552 HydroNode::FilterMap {
553 f,
554 input: Box::new(self.ir_node.into_inner()),
555 metadata: self
556 .location
557 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
558 },
559 )
560 }
561
562 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
563 ///
564 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
565 /// non-null. This is useful for combining several pieces of state together.
566 ///
567 /// # Example
568 /// ```rust
569 /// # use hydro_lang::prelude::*;
570 /// # use futures::StreamExt;
571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572 /// let tick = process.tick();
573 /// let numbers = process
574 /// .source_iter(q!(vec![123, 456, 789]))
575 /// .batch(&tick, nondet!(/** test */));
576 /// let min = numbers.clone().min(); // Optional
577 /// let max = numbers.max(); // Optional
578 /// min.zip(max).all_ticks()
579 /// # }, |mut stream| async move {
580 /// // [(123, 789)]
581 /// # for w in vec![(123, 789)] {
582 /// # assert_eq!(stream.next().await.unwrap(), w);
583 /// # }
584 /// # }));
585 /// ```
586 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
587 where
588 O: Clone,
589 {
590 let other: Optional<O, L, B> = other.into();
591 check_matching_location(&self.location, &other.location);
592
593 if L::is_top_level()
594 && let Some(tick) = self.location.try_tick()
595 {
596 let out = zip_inside_tick(
597 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
598 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
599 )
600 .latest();
601
602 Optional::new(out.location, out.ir_node.into_inner())
603 } else {
604 zip_inside_tick(self, other)
605 }
606 }
607
608 /// Passes through `self` when it has a value, otherwise passes through `other`.
609 ///
610 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
611 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
612 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
613 ///
614 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
615 /// of the inputs change (including to/from null states).
616 ///
617 /// # Example
618 /// ```rust
619 /// # use hydro_lang::prelude::*;
620 /// # use futures::StreamExt;
621 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
622 /// let tick = process.tick();
623 /// // ticks are lazy by default, forces the second tick to run
624 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
625 ///
626 /// let some_first_tick = tick.optional_first_tick(q!(123));
627 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
628 /// some_first_tick.or(some_second_tick).all_ticks()
629 /// # }, |mut stream| async move {
630 /// // [123 /* first tick */, 456 /* second tick */]
631 /// # for w in vec![123, 456] {
632 /// # assert_eq!(stream.next().await.unwrap(), w);
633 /// # }
634 /// # }));
635 /// ```
636 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
637 check_matching_location(&self.location, &other.location);
638
639 if L::is_top_level()
640 && let Some(tick) = self.location.try_tick()
641 {
642 let out = or_inside_tick(
643 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
644 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
645 )
646 .latest();
647
648 Optional::new(out.location, out.ir_node.into_inner())
649 } else {
650 Optional::new(
651 self.location.clone(),
652 HydroNode::ChainFirst {
653 first: Box::new(self.ir_node.into_inner()),
654 second: Box::new(other.ir_node.into_inner()),
655 metadata: self.location.new_node_metadata(Self::collection_kind()),
656 },
657 )
658 }
659 }
660
661 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
662 ///
663 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
664 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
665 ///
666 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
667 /// of the inputs change (including to/from null states).
668 ///
669 /// # Example
670 /// ```rust
671 /// # use hydro_lang::prelude::*;
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674 /// let tick = process.tick();
675 /// // ticks are lazy by default, forces the later ticks to run
676 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
677 ///
678 /// let some_first_tick = tick.optional_first_tick(q!(123));
679 /// some_first_tick
680 /// .unwrap_or(tick.singleton(q!(456)))
681 /// .all_ticks()
682 /// # }, |mut stream| async move {
683 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
684 /// # for w in vec![123, 456, 456, 456] {
685 /// # assert_eq!(stream.next().await.unwrap(), w);
686 /// # }
687 /// # }));
688 /// ```
689 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
690 let res_option = self.or(other.into());
691 Singleton::new(
692 res_option.location.clone(),
693 HydroNode::Cast {
694 inner: Box::new(res_option.ir_node.into_inner()),
695 metadata: res_option
696 .location
697 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
698 },
699 )
700 }
701
702 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
703 ///
704 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
705 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
706 /// so that Hydro can skip any computation on null values.
707 ///
708 /// # Example
709 /// ```rust
710 /// # use hydro_lang::prelude::*;
711 /// # use futures::StreamExt;
712 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
713 /// let tick = process.tick();
714 /// // ticks are lazy by default, forces the later ticks to run
715 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
716 ///
717 /// let some_first_tick = tick.optional_first_tick(q!(123));
718 /// some_first_tick.into_singleton().all_ticks()
719 /// # }, |mut stream| async move {
720 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
721 /// # for w in vec![Some(123), None, None, None] {
722 /// # assert_eq!(stream.next().await.unwrap(), w);
723 /// # }
724 /// # }));
725 /// ```
726 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
727 where
728 T: Clone,
729 {
730 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
731 let core_ir = HydroNode::Source {
732 source: HydroSource::Iter(none.into()),
733 metadata: self
734 .location
735 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
736 };
737
738 let none_singleton = if L::is_top_level() {
739 Singleton::new(
740 self.location.clone(),
741 HydroNode::Persist {
742 inner: Box::new(core_ir),
743 metadata: self
744 .location
745 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
746 },
747 )
748 } else {
749 Singleton::new(self.location.clone(), core_ir)
750 };
751
752 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
753 }
754
755 /// An operator which allows you to "name" a `HydroNode`.
756 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
757 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
758 {
759 let mut node = self.ir_node.borrow_mut();
760 let metadata = node.metadata_mut();
761 metadata.tag = Some(name.to_string());
762 }
763 self
764 }
765}
766
767impl<'a, T, L> Optional<T, L, Bounded>
768where
769 L: Location<'a>,
770{
771 /// Filters this optional, passing through the optional value if it is non-null **and** the
772 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
773 ///
774 /// Useful for conditionally processing, such as only emitting an optional's value outside
775 /// a tick if some other condition is satisfied.
776 ///
777 /// # Example
778 /// ```rust
779 /// # use hydro_lang::prelude::*;
780 /// # use futures::StreamExt;
781 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782 /// let tick = process.tick();
783 /// // ticks are lazy by default, forces the second tick to run
784 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
785 ///
786 /// let batch_first_tick = process
787 /// .source_iter(q!(vec![]))
788 /// .batch(&tick, nondet!(/** test */));
789 /// let batch_second_tick = process
790 /// .source_iter(q!(vec![456]))
791 /// .batch(&tick, nondet!(/** test */))
792 /// .defer_tick(); // appears on the second tick
793 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
794 /// batch_first_tick.chain(batch_second_tick).first()
795 /// .filter_if_some(some_on_first_tick)
796 /// .unwrap_or(tick.singleton(q!(789)))
797 /// .all_ticks()
798 /// # }, |mut stream| async move {
799 /// // [789, 789]
800 /// # for w in vec![789, 789] {
801 /// # assert_eq!(stream.next().await.unwrap(), w);
802 /// # }
803 /// # }));
804 /// ```
805 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
806 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
807 }
808
809 /// Filters this optional, passing through the optional value if it is non-null **and** the
810 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
811 ///
812 /// Useful for conditionally processing, such as only emitting an optional's value outside
813 /// a tick if some other condition is satisfied.
814 ///
815 /// # Example
816 /// ```rust
817 /// # use hydro_lang::prelude::*;
818 /// # use futures::StreamExt;
819 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820 /// let tick = process.tick();
821 /// // ticks are lazy by default, forces the second tick to run
822 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
823 ///
824 /// let batch_first_tick = process
825 /// .source_iter(q!(vec![]))
826 /// .batch(&tick, nondet!(/** test */));
827 /// let batch_second_tick = process
828 /// .source_iter(q!(vec![456]))
829 /// .batch(&tick, nondet!(/** test */))
830 /// .defer_tick(); // appears on the second tick
831 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
832 /// batch_first_tick.chain(batch_second_tick).first()
833 /// .filter_if_none(some_on_first_tick)
834 /// .unwrap_or(tick.singleton(q!(789)))
835 /// .all_ticks()
836 /// # }, |mut stream| async move {
837 /// // [789, 789]
838 /// # for w in vec![789, 456] {
839 /// # assert_eq!(stream.next().await.unwrap(), w);
840 /// # }
841 /// # }));
842 /// ```
843 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
844 self.filter_if_some(
845 other
846 .map(q!(|_| ()))
847 .into_singleton()
848 .filter(q!(|o| o.is_none())),
849 )
850 }
851
852 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
853 ///
854 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
855 /// having a value, such as only releasing a piece of state if the node is the leader.
856 ///
857 /// # Example
858 /// ```rust
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let tick = process.tick();
863 /// // ticks are lazy by default, forces the second tick to run
864 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865 ///
866 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
867 /// some_on_first_tick
868 /// .if_some_then(tick.singleton(q!(456)))
869 /// .unwrap_or(tick.singleton(q!(123)))
870 /// # .all_ticks()
871 /// # }, |mut stream| async move {
872 /// // 456 (first tick) ~> 123 (second tick onwards)
873 /// # for w in vec![456, 123, 123] {
874 /// # assert_eq!(stream.next().await.unwrap(), w);
875 /// # }
876 /// # }));
877 /// ```
878 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
879 value.filter_if_some(self)
880 }
881}
882
883impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
884where
885 L: Location<'a> + NoTick,
886{
887 /// Returns an optional value corresponding to the latest snapshot of the optional
888 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
889 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
890 /// all snapshots of this optional into the atomic-associated tick will observe the
891 /// same value each tick.
892 ///
893 /// # Non-Determinism
894 /// Because this picks a snapshot of a optional whose value is continuously changing,
895 /// the output optional has a non-deterministic value since the snapshot can be at an
896 /// arbitrary point in time.
897 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
898 Optional::new(
899 self.location.clone().tick,
900 HydroNode::Batch {
901 inner: Box::new(self.ir_node.into_inner()),
902 metadata: self
903 .location
904 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
905 },
906 )
907 }
908
909 /// Returns this optional back into a top-level, asynchronous execution context where updates
910 /// to the value will be asynchronously propagated.
911 pub fn end_atomic(self) -> Optional<T, L, B> {
912 Optional::new(
913 self.location.tick.l.clone(),
914 HydroNode::EndAtomic {
915 inner: Box::new(self.ir_node.into_inner()),
916 metadata: self
917 .location
918 .tick
919 .l
920 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
921 },
922 )
923 }
924}
925
926impl<'a, T, L, B: Boundedness> Optional<T, L, B>
927where
928 L: Location<'a>,
929{
930 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
931 /// will observe the same version of the value and will be executed synchronously before any
932 /// outputs are yielded (in [`Optional::end_atomic`]).
933 ///
934 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
935 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
936 /// a different version).
937 ///
938 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
939 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
940 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
941 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
942 let out_location = Atomic { tick: tick.clone() };
943 Optional::new(
944 out_location.clone(),
945 HydroNode::BeginAtomic {
946 inner: Box::new(self.ir_node.into_inner()),
947 metadata: out_location
948 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
949 },
950 )
951 }
952
953 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
954 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
955 /// relevant data that contributed to the snapshot at tick `t`.
956 ///
957 /// # Non-Determinism
958 /// Because this picks a snapshot of a optional whose value is continuously changing,
959 /// the output optional has a non-deterministic value since the snapshot can be at an
960 /// arbitrary point in time.
961 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
962 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
963 Optional::new(
964 tick.clone(),
965 HydroNode::Batch {
966 inner: Box::new(self.ir_node.into_inner()),
967 metadata: tick
968 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
969 },
970 )
971 }
972
973 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
974 /// with order corresponding to increasing prefixes of data contributing to the optional.
975 ///
976 /// # Non-Determinism
977 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
978 /// to non-deterministic batching and arrival of inputs, the output stream is
979 /// non-deterministic.
980 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
981 where
982 L: NoTick,
983 {
984 let tick = self.location.tick();
985 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
986 }
987
988 /// Given a time interval, returns a stream corresponding to snapshots of the optional
989 /// value taken at various points in time. Because the input optional may be
990 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
991 /// represent the value of the optional given some prefix of the streams leading up to
992 /// it.
993 ///
994 /// # Non-Determinism
995 /// The output stream is non-deterministic in which elements are sampled, since this
996 /// is controlled by a clock.
997 pub fn sample_every(
998 self,
999 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1000 nondet: NonDet,
1001 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1002 where
1003 L: NoTick + NoAtomic,
1004 {
1005 let samples = self.location.source_interval(interval, nondet);
1006 let tick = self.location.tick();
1007
1008 self.snapshot(&tick, nondet)
1009 .filter_if_some(samples.batch(&tick, nondet).first())
1010 .all_ticks()
1011 .weakest_retries()
1012 }
1013}
1014
1015impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1016where
1017 L: Location<'a>,
1018{
1019 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1020 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1021 /// null values).
1022 ///
1023 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1024 /// producing one element in the output for each (non-null) tick. This is useful for batched
1025 /// computations, where the results from each tick must be combined together.
1026 ///
1027 /// # Example
1028 /// ```rust
1029 /// # use hydro_lang::prelude::*;
1030 /// # use futures::StreamExt;
1031 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032 /// # let tick = process.tick();
1033 /// # // ticks are lazy by default, forces the second tick to run
1034 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1035 /// # let batch_first_tick = process
1036 /// # .source_iter(q!(vec![]))
1037 /// # .batch(&tick, nondet!(/** test */));
1038 /// # let batch_second_tick = process
1039 /// # .source_iter(q!(vec![1, 2, 3]))
1040 /// # .batch(&tick, nondet!(/** test */))
1041 /// # .defer_tick(); // appears on the second tick
1042 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1043 /// input_batch // first tick: [], second tick: [1, 2, 3]
1044 /// .max()
1045 /// .all_ticks()
1046 /// # }, |mut stream| async move {
1047 /// // [3]
1048 /// # for w in vec![3] {
1049 /// # assert_eq!(stream.next().await.unwrap(), w);
1050 /// # }
1051 /// # }));
1052 /// ```
1053 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1054 self.into_stream().all_ticks()
1055 }
1056
1057 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1058 /// which will stream the value computed in _each_ tick as a separate stream element.
1059 ///
1060 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1061 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1062 /// optional's [`Tick`] context.
1063 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1064 self.into_stream().all_ticks_atomic()
1065 }
1066
1067 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1068 /// be asynchronously updated with the latest value of the optional inside the tick, including
1069 /// whether the optional is null or not.
1070 ///
1071 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1072 /// tick that tracks the inner value. This is useful for getting the value as of the
1073 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1074 ///
1075 /// # Example
1076 /// ```rust
1077 /// # use hydro_lang::prelude::*;
1078 /// # use futures::StreamExt;
1079 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080 /// # let tick = process.tick();
1081 /// # // ticks are lazy by default, forces the second tick to run
1082 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1083 /// # let batch_first_tick = process
1084 /// # .source_iter(q!(vec![]))
1085 /// # .batch(&tick, nondet!(/** test */));
1086 /// # let batch_second_tick = process
1087 /// # .source_iter(q!(vec![1, 2, 3]))
1088 /// # .batch(&tick, nondet!(/** test */))
1089 /// # .defer_tick(); // appears on the second tick
1090 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1091 /// input_batch // first tick: [], second tick: [1, 2, 3]
1092 /// .max()
1093 /// .latest()
1094 /// # .into_singleton()
1095 /// # .sample_eager(nondet!(/** test */))
1096 /// # }, |mut stream| async move {
1097 /// // asynchronously changes from None ~> 3
1098 /// # for w in vec![None, Some(3)] {
1099 /// # assert_eq!(stream.next().await.unwrap(), w);
1100 /// # }
1101 /// # }));
1102 /// ```
1103 pub fn latest(self) -> Optional<T, L, Unbounded> {
1104 Optional::new(
1105 self.location.outer().clone(),
1106 HydroNode::YieldConcat {
1107 inner: Box::new(self.ir_node.into_inner()),
1108 metadata: self
1109 .location
1110 .outer()
1111 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1112 },
1113 )
1114 }
1115
1116 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1117 /// be updated with the latest value of the optional inside the tick.
1118 ///
1119 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1120 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1121 /// optional's [`Tick`] context.
1122 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1123 let out_location = Atomic {
1124 tick: self.location.clone(),
1125 };
1126
1127 Optional::new(
1128 out_location.clone(),
1129 HydroNode::YieldConcat {
1130 inner: Box::new(self.ir_node.into_inner()),
1131 metadata: out_location
1132 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1133 },
1134 )
1135 }
1136
1137 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1138 /// always has the state of `self` at tick `T - 1`.
1139 ///
1140 /// At tick `0`, the output optional is null, since there is no previous tick.
1141 ///
1142 /// This operator enables stateful iterative processing with ticks, by sending data from one
1143 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1144 ///
1145 /// # Example
1146 /// ```rust
1147 /// # use hydro_lang::prelude::*;
1148 /// # use futures::StreamExt;
1149 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150 /// let tick = process.tick();
1151 /// // ticks are lazy by default, forces the second tick to run
1152 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1153 ///
1154 /// let batch_first_tick = process
1155 /// .source_iter(q!(vec![1, 2]))
1156 /// .batch(&tick, nondet!(/** test */));
1157 /// let batch_second_tick = process
1158 /// .source_iter(q!(vec![3, 4]))
1159 /// .batch(&tick, nondet!(/** test */))
1160 /// .defer_tick(); // appears on the second tick
1161 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1162 /// .reduce(q!(|state, v| *state += v));
1163 ///
1164 /// current_tick_sum.clone().into_singleton().zip(
1165 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1166 /// ).all_ticks()
1167 /// # }, |mut stream| async move {
1168 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1169 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1170 /// # assert_eq!(stream.next().await.unwrap(), w);
1171 /// # }
1172 /// # }));
1173 /// ```
1174 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1175 Optional::new(
1176 self.location.clone(),
1177 HydroNode::DeferTick {
1178 input: Box::new(self.ir_node.into_inner()),
1179 metadata: self.location.new_node_metadata(Self::collection_kind()),
1180 },
1181 )
1182 }
1183
1184 #[deprecated(note = "use .into_stream().persist()")]
1185 #[expect(missing_docs, reason = "deprecated")]
1186 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1187 where
1188 T: Clone,
1189 {
1190 self.into_stream().persist()
1191 }
1192
1193 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1194 /// non-null. Otherwise, the stream is empty.
1195 ///
1196 /// # Example
1197 /// ```rust
1198 /// # use hydro_lang::prelude::*;
1199 /// # use futures::StreamExt;
1200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201 /// # let tick = process.tick();
1202 /// # // ticks are lazy by default, forces the second tick to run
1203 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1204 /// # let batch_first_tick = process
1205 /// # .source_iter(q!(vec![]))
1206 /// # .batch(&tick, nondet!(/** test */));
1207 /// # let batch_second_tick = process
1208 /// # .source_iter(q!(vec![123, 456]))
1209 /// # .batch(&tick, nondet!(/** test */))
1210 /// # .defer_tick(); // appears on the second tick
1211 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1212 /// input_batch // first tick: [], second tick: [123, 456]
1213 /// .clone()
1214 /// .max()
1215 /// .into_stream()
1216 /// .chain(input_batch)
1217 /// .all_ticks()
1218 /// # }, |mut stream| async move {
1219 /// // [456, 123, 456]
1220 /// # for w in vec![456, 123, 456] {
1221 /// # assert_eq!(stream.next().await.unwrap(), w);
1222 /// # }
1223 /// # }));
1224 /// ```
1225 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1226 Stream::new(
1227 self.location.clone(),
1228 HydroNode::Cast {
1229 inner: Box::new(self.ir_node.into_inner()),
1230 metadata: self.location.new_node_metadata(Stream::<
1231 T,
1232 Tick<L>,
1233 Bounded,
1234 TotalOrder,
1235 ExactlyOnce,
1236 >::collection_kind()),
1237 },
1238 )
1239 }
1240}
1241
1242#[cfg(feature = "deploy")]
1243#[cfg(test)]
1244mod tests {
1245 use futures::StreamExt;
1246 use hydro_deploy::Deployment;
1247 use stageleft::q;
1248
1249 use super::Optional;
1250 use crate::compile::builder::FlowBuilder;
1251 use crate::location::Location;
1252
1253 #[tokio::test]
1254 async fn optional_or_cardinality() {
1255 let mut deployment = Deployment::new();
1256
1257 let flow = FlowBuilder::new();
1258 let node = flow.process::<()>();
1259 let external = flow.external::<()>();
1260
1261 let node_tick = node.tick();
1262 let tick_singleton = node_tick.singleton(q!(123));
1263 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1264 let counts = tick_optional_inhabited
1265 .clone()
1266 .or(tick_optional_inhabited)
1267 .into_stream()
1268 .count()
1269 .all_ticks()
1270 .send_bincode_external(&external);
1271
1272 let nodes = flow
1273 .with_process(&node, deployment.Localhost())
1274 .with_external(&external, deployment.Localhost())
1275 .deploy(&mut deployment);
1276
1277 deployment.deploy().await.unwrap();
1278
1279 let mut external_out = nodes.connect(counts).await;
1280
1281 deployment.start().await.unwrap();
1282
1283 assert_eq!(external_out.next().await.unwrap(), 1);
1284 }
1285}