hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::Debug;
6use std::marker::PhantomData;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use colored::Colorize;
14use dfir_rs::scheduled::graph::Dfir;
15use futures::{FutureExt, Stream, StreamExt};
16use libloading::Library;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tempfile::TempPath;
20use tokio::sync::mpsc::UnboundedSender;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use super::runtime::SimHook;
24use crate::compile::deploy::ConnectableAsync;
25use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
26use crate::location::dynamic::LocationId;
27use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
28
29/// A handle to a compiled Hydro simulation, which can be instantiated and run.
30pub struct CompiledSim {
31    pub(super) _path: TempPath,
32    pub(super) lib: Library,
33    pub(super) external_ports: Vec<usize>,
34    pub(super) external_registered: HashMap<usize, usize>,
35}
36
37#[sealed::sealed]
38/// A trait implemented by closures that can instantiate a compiled simulation.
39///
40/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
41pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
42#[sealed::sealed]
43impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
44
45fn null_handler(_args: fmt::Arguments) {}
46
47fn println_handler(args: fmt::Arguments) {
48    println!("{}", args);
49}
50
51fn eprintln_handler(args: fmt::Arguments) {
52    eprintln!("{}", args);
53}
54
55type SimLoaded<'a> = libloading::Symbol<
56    'a,
57    unsafe extern "Rust" fn(
58        bool,
59        HashMap<usize, UnboundedSender<Bytes>>,
60        HashMap<usize, UnboundedReceiverStream<Bytes>>,
61        fn(fmt::Arguments<'_>),
62        fn(fmt::Arguments<'_>),
63    ) -> (
64        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
66        HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
67    ),
68>;
69
70impl CompiledSim {
71    /// Executes the given closure with a single instance of the compiled simulation.
72    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
73        self.with_instantiator(|instantiator| thunk(instantiator()), true)
74    }
75
76    /// Executes the given closure with an [`Instantiator`], which can be called to create
77    /// independent instances of the simulation. This is useful for fuzzing, where we need to
78    /// re-execute the simulation several times with different decisions.
79    ///
80    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
81    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
82    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
83    pub fn with_instantiator<T>(
84        &self,
85        thunk: impl FnOnce(&dyn Instantiator) -> T,
86        always_log: bool,
87    ) -> T {
88        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
89        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
90        thunk(
91            &(|| CompiledSimInstance {
92                func: func.clone(),
93                remaining_ports: self.external_ports.iter().cloned().collect(),
94                external_registered: self.external_registered.clone(),
95                input_ports: HashMap::new(),
96                output_ports: HashMap::new(),
97                log,
98            }),
99        )
100    }
101
102    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
103    /// closure will be repeatedly executed with instances of the Hydro program where the
104    /// batching boundaries, order of messages, and retries are varied.
105    ///
106    /// During development, you should run the test that invokes this function with the `cargo sim`
107    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
108    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
109    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
110    /// be executed, and if no reproducer is found a small number of random executions will be
111    /// performed.
112    pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
113        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
114            .elements()
115            .into_iter()
116            .find(|e| {
117                !e.fn_name.starts_with("hydro_lang::sim::compiled")
118                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
119                    && !e.fn_name.starts_with("fuzz<")
120            })
121            .unwrap();
122
123        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
124        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
125
126        let caller_fuzz_repro_path = repro_folder
127            .join(caller_fn.fn_name.replace("::", "__"))
128            .with_extension("bin");
129
130        if std::env::var("BOLERO_FUZZER").is_ok() {
131            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
132            std::fs::create_dir_all(&corpus_dir).unwrap();
133            let libfuzzer_args = format!(
134                "{} {} -artifact_prefix={}/ -handle_abrt=0",
135                corpus_dir.to_str().unwrap(),
136                corpus_dir.to_str().unwrap(),
137                corpus_dir.to_str().unwrap(),
138            );
139
140            std::fs::create_dir_all(&repro_folder).unwrap();
141
142            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
143                unsafe {
144                    std::env::set_var(
145                        "BOLERO_FAILURE_OUTPUT",
146                        caller_fuzz_repro_path.to_str().unwrap(),
147                    );
148                }
149            }
150
151            unsafe {
152                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
153            }
154
155            self.with_instantiator(
156                |instantiator| {
157                    bolero::test(bolero::TargetLocation {
158                        package_name: "",
159                        manifest_dir: "",
160                        module_path: "",
161                        file: "",
162                        line: 0,
163                        item_path: "<unknown>::__bolero_item_path__",
164                        test_name: None,
165                    })
166                    .run_with_replay(move |is_replay| {
167                        let mut instance = instantiator();
168
169                        if instance.log {
170                            eprintln!(
171                                "{}",
172                                "\n==== New Simulation Instance ===="
173                                    .color(colored::Color::Cyan)
174                                    .bold()
175                            );
176                        }
177
178                        if is_replay {
179                            instance.log = true;
180                        }
181
182                        tokio::runtime::Builder::new_current_thread()
183                            .build()
184                            .unwrap()
185                            .block_on(async {
186                                let local_set = tokio::task::LocalSet::new();
187                                local_set.run_until(thunk(instance)).await
188                            })
189                    })
190                },
191                false,
192            );
193        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
194            self.fuzz_repro(existing_bytes, thunk);
195        } else {
196            eprintln!(
197                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
198                caller_fuzz_repro_path.display()
199            );
200            self.with_instantiator(
201                |instantiator| {
202                    bolero::test(bolero::TargetLocation {
203                        package_name: "",
204                        manifest_dir: "",
205                        module_path: "",
206                        file: ".",
207                        line: 0,
208                        item_path: "<unknown>::__bolero_item_path__",
209                        test_name: None,
210                    })
211                    .with_iterations(8192)
212                    .run(move || {
213                        let instance = instantiator();
214                        tokio::runtime::Builder::new_current_thread()
215                            .build()
216                            .unwrap()
217                            .block_on(async {
218                                let local_set = tokio::task::LocalSet::new();
219                                local_set.run_until(thunk(instance)).await
220                            })
221                    })
222                },
223                false,
224            );
225        }
226    }
227
228    /// Executes the given closure with a single instance of the compiled simulation, using the
229    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
230    /// failure found during fuzzing.
231    pub fn fuzz_repro<'a>(
232        &'a self,
233        bytes: Vec<u8>,
234        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
235    ) {
236        self.with_instance(|instance| {
237            bolero::bolero_engine::any::scope::with(
238                Box::new(bolero::bolero_engine::driver::object::Object(
239                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
240                )),
241                || {
242                    tokio::runtime::Builder::new_current_thread()
243                        .build()
244                        .unwrap()
245                        .block_on(async {
246                            let local_set = tokio::task::LocalSet::new();
247                            local_set.run_until(thunk(instance)).await
248                        })
249                },
250            )
251        });
252    }
253
254    /// Exhaustively searches all possible executions of the simulation. The provided
255    /// closure will be repeatedly executed with instances of the Hydro program where the
256    /// batching boundaries, order of messages, and retries are varied.
257    ///
258    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
259    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
260    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
261    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
262    ///
263    /// Returns the number of distinct executions explored.
264    pub fn exhaustive<'a>(
265        &'a self,
266        mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
267    ) -> usize {
268        if std::env::var("BOLERO_FUZZER").is_ok() {
269            eprintln!(
270                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
271            );
272            std::process::abort();
273        }
274
275        let mut count = 0;
276        let count_mut = &mut count;
277
278        self.with_instantiator(
279            |instantiator| {
280                bolero::test(bolero::TargetLocation {
281                    package_name: "",
282                    manifest_dir: "",
283                    module_path: "",
284                    file: "",
285                    line: 0,
286                    item_path: "<unknown>::__bolero_item_path__",
287                    test_name: None,
288                })
289                .exhaustive()
290                .run_with_replay(move |is_replay| {
291                    *count_mut += 1;
292
293                    let mut instance = instantiator();
294                    if instance.log {
295                        eprintln!(
296                            "{}",
297                            "\n==== New Simulation Instance ===="
298                                .color(colored::Color::Cyan)
299                                .bold()
300                        );
301                    }
302
303                    if is_replay {
304                        instance.log = true;
305                    }
306
307                    tokio::runtime::Builder::new_current_thread()
308                        .build()
309                        .unwrap()
310                        .block_on(async {
311                            let local_set = tokio::task::LocalSet::new();
312                            local_set.run_until(thunk(instance)).await;
313                        })
314                })
315            },
316            false,
317        );
318
319        count
320    }
321}
322
323/// A single instance of a compiled Hydro simulation, which provides methods to interactively
324/// execute the simulation, feed inputs, and receive outputs.
325pub struct CompiledSimInstance<'a> {
326    func: SimLoaded<'a>,
327    remaining_ports: HashSet<usize>,
328    external_registered: HashMap<usize, usize>,
329    output_ports: HashMap<usize, UnboundedSender<Bytes>>,
330    input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
331    log: bool,
332}
333
334impl<'a> CompiledSimInstance<'a> {
335    #[deprecated(note = "Use `connect` instead")]
336    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
337    /// given input port, and returns a closure that can be used to send messages to it.
338    pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
339        &mut self,
340        port: &ExternalBincodeSink<T, M, O, R>,
341    ) -> SimSender<T, O, R> {
342        self.connect(port)
343    }
344
345    #[deprecated(note = "Use `connect` instead")]
346    /// Like the corresponding method on [`crate::compile::deploy::DeployResult`], connects to the
347    /// given output port, and returns a stream that can be used to receive messages from it.
348    pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
349        &mut self,
350        port: &ExternalBincodeStream<T, O, R>,
351    ) -> SimReceiver<'a, T, O, R> {
352        self.connect(port)
353    }
354
355    /// Establishes a connection to the given input or output port, returning either a
356    /// [`SimSender`] (for input ports) or a stream (for output ports). This should be invoked
357    /// before calling [`Self::launch`], and should only be invoked once per port.
358    pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
359        &'b mut self,
360        port: P,
361    ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
362        let mut pinned = std::pin::pin!(port.connect(self));
363        if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
364            v
365        } else {
366            panic!("Connect impl should not have used any async operations");
367        }
368    }
369
370    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
371    /// be invoked after connecting all inputs and outputs, but before receiving any messages.
372    pub fn launch(self) {
373        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
374    }
375
376    /// Returns a future that schedules simulation with the given logger for reporting the
377    /// simulation trace.
378    ///
379    /// See [`Self::launch`] for more details.
380    pub fn schedule_with_logger<W: std::io::Write>(
381        self,
382        log_writer: W,
383    ) -> impl use<W> + Future<Output = ()> {
384        self.schedule_with_maybe_logger(Some(log_writer))
385    }
386
387    fn schedule_with_maybe_logger<W: std::io::Write>(
388        mut self,
389        log_override: Option<W>,
390    ) -> impl use<W> + Future<Output = ()> {
391        for remaining in self.remaining_ports {
392            let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
393            self.output_ports.insert(remaining, sender);
394            self.input_ports.insert(remaining, receiver);
395        }
396
397        let (async_dfirs, tick_dfirs, hooks) = unsafe {
398            (self.func)(
399                colored::control::SHOULD_COLORIZE.should_colorize(),
400                self.output_ports,
401                self.input_ports,
402                if self.log {
403                    println_handler
404                } else {
405                    null_handler
406                },
407                if self.log {
408                    eprintln_handler
409                } else {
410                    null_handler
411                },
412            )
413        };
414        let mut launched = LaunchedSim {
415            async_dfirs: async_dfirs
416                .into_iter()
417                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
418                .collect(),
419            possibly_ready_ticks: vec![],
420            not_ready_ticks: tick_dfirs
421                .into_iter()
422                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
423                .collect(),
424            hooks: hooks
425                .into_iter()
426                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
427                .collect(),
428            log: if self.log {
429                if let Some(w) = log_override {
430                    LogKind::Custom(w)
431                } else {
432                    LogKind::Stderr
433                }
434            } else {
435                LogKind::Null
436            },
437        };
438
439        async move { launched.scheduler().await }
440    }
441}
442
443/// A receiver for an external bincode stream in a simulation.
444pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
445    Pin<Box<dyn Stream<Item = T> + 'a>>,
446    PhantomData<(O, R)>,
447);
448
449impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
450    /// Asserts that the stream has ended and no more messages can possibly arrive.
451    pub async fn assert_no_more(mut self)
452    where
453        T: Debug,
454    {
455        if let Some(next) = self.0.next().await {
456            panic!("Stream yielded unexpected message: {:?}", next);
457        }
458    }
459}
460
461impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
462    /// Receives the next message from the external bincode stream. This will wait until a message
463    /// is available, or return `None` if no more messages can possibly arrive.
464    pub async fn next(&mut self) -> Option<T> {
465        self.0.next().await
466    }
467
468    /// Collects all remaining messages from the external bincode stream into a collection. This
469    /// will wait until no more messages can possibly arrive.
470    pub async fn collect<C: Default + Extend<T>>(self) -> C {
471        self.0.collect().await
472    }
473
474    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
475    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
476    pub async fn assert_yields<T2: Debug>(&mut self, expected: impl IntoIterator<Item = T2>)
477    where
478        T: Debug + PartialEq<T2>,
479    {
480        let mut expected: VecDeque<T2> = expected.into_iter().collect();
481
482        while !expected.is_empty() {
483            if let Some(next) = self.next().await {
484                assert_eq!(next, expected.pop_front().unwrap());
485            } else {
486                panic!("Stream ended early, still expected: {:?}", expected);
487            }
488        }
489    }
490
491    /// Asserts that the stream yields only the expected sequence of messages, in order,
492    /// and then ends.
493    pub async fn assert_yields_only<T2: Debug>(mut self, expected: impl IntoIterator<Item = T2>)
494    where
495        T: Debug + PartialEq<T2>,
496    {
497        self.assert_yields(expected).await;
498        self.assert_no_more().await;
499    }
500}
501
502impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
503    /// Collects all remaining messages from the external bincode stream into a collection,
504    /// sorting them. This will wait until no more messages can possibly arrive.
505    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
506    where
507        T: Ord,
508    {
509        let mut collected: C = self.0.collect().await;
510        collected.as_mut().sort();
511        collected
512    }
513
514    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
515    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
516    pub async fn assert_yields_unordered<T2: Debug>(
517        &mut self,
518        expected: impl IntoIterator<Item = T2>,
519    ) where
520        T: Debug + PartialEq<T2>,
521    {
522        let mut expected: Vec<T2> = expected.into_iter().collect();
523
524        while !expected.is_empty() {
525            if let Some(next) = self.0.next().await {
526                let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
527                if let Some((i, _)) = idx {
528                    expected.swap_remove(i);
529                } else {
530                    panic!("Stream yielded unexpected message: {:?}", next);
531                }
532            } else {
533                panic!("Stream ended early, still expected: {:?}", expected);
534            }
535        }
536    }
537
538    /// Asserts that the stream yields only the expected sequence of messages, in some order,
539    /// and then ends.
540    pub async fn assert_yields_only_unordered<T2: Debug>(
541        mut self,
542        expected: impl IntoIterator<Item = T2>,
543    ) where
544        T: Debug + PartialEq<T2>,
545    {
546        self.assert_yields_unordered(expected).await;
547        self.assert_no_more().await;
548    }
549}
550
551impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
552    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
553{
554    type Output = SimReceiver<'a, T, O, R>;
555
556    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
557        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
558
559        assert!(ctx.remaining_ports.remove(looked_up));
560        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
561        ctx.output_ports.insert(*looked_up, sink);
562
563        SimReceiver(
564            Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
565            PhantomData,
566        )
567    }
568}
569
570/// A sender to an external bincode sink in a simulation.
571pub struct SimSender<T, O: Ordering, R: Retries>(
572    Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
573    PhantomData<(O, R)>,
574);
575impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
576    /// Sends a message to the external bincode sink. The message will be asynchronously processed
577    /// as part of the simulation.
578    pub fn send(&self, t: T) {
579        (self.0)(t).unwrap()
580    }
581
582    /// Sends several messages to the external bincode sink. The messages will be asynchronously
583    /// processed as part of the simulation.
584    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
585        for t in iter {
586            (self.0)(t).unwrap();
587        }
588    }
589}
590
591impl<T> SimSender<T, NoOrder, ExactlyOnce> {
592    /// Sends several messages to the external bincode sink. The messages will be asynchronously
593    /// processed as part of the simulation, in non-determinstic order.
594    pub fn send_many_unordered<I: IntoIterator<Item = T>>(
595        &self,
596        iter: I,
597    ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
598        for t in iter {
599            (self.0)(t)?;
600        }
601        Ok(())
602    }
603}
604
605impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
606    ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
607{
608    type Output = SimSender<T, O, R>;
609
610    async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
611        let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
612
613        assert!(ctx.remaining_ports.remove(looked_up));
614        let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
615        ctx.input_ports.insert(*looked_up, source);
616        SimSender(
617            Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
618            PhantomData,
619        )
620    }
621}
622
623enum LogKind<W: std::io::Write> {
624    Null,
625    Stderr,
626    Custom(W),
627}
628
629// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
630impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
631    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
632        match self {
633            LogKind::Null => Ok(()),
634            LogKind::Stderr => {
635                eprint!("{}", s);
636                Ok(())
637            }
638            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
639        }
640    }
641}
642
643type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
644
645/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
646/// about scheduling ticks and choices for non-deterministic operators like batch.
647struct LaunchedSim<W: std::io::Write> {
648    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
649    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
650    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
651    hooks: Hooks,
652    log: LogKind<W>,
653}
654
655impl<W: std::io::Write> LaunchedSim<W> {
656    async fn scheduler(&mut self) {
657        loop {
658            tokio::task::yield_now().await;
659            let mut any_made_progress = false;
660            for (loc, c_id, dfir) in &mut self.async_dfirs {
661                if dfir.run_tick().await {
662                    any_made_progress = true;
663                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
664                        .not_ready_ticks
665                        .drain(..)
666                        .partition(|(tick_loc, tick_c_id, _)| {
667                            let LocationId::Tick(_, outer) = tick_loc else {
668                                unreachable!()
669                            };
670                            outer.as_ref() == loc && tick_c_id == c_id
671                        });
672
673                    self.possibly_ready_ticks.extend(now_ready);
674                    self.not_ready_ticks.extend(still_not_ready);
675                }
676            }
677
678            if any_made_progress {
679                continue;
680            } else {
681                use bolero::generator::*;
682
683                let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
684                    .possibly_ready_ticks
685                    .drain(..)
686                    .partition(|(name, cid, _)| {
687                        self.hooks
688                            .get(&(name.clone(), *cid))
689                            .unwrap()
690                            .iter()
691                            .any(|hook| {
692                                hook.current_decision().unwrap_or(false)
693                                    || hook.can_make_nontrivial_decision()
694                            })
695                    });
696
697                self.possibly_ready_ticks = ready;
698                self.not_ready_ticks.append(&mut not_ready);
699
700                if self.possibly_ready_ticks.is_empty() {
701                    break;
702                } else {
703                    let next_tick = (0..self.possibly_ready_ticks.len()).any();
704                    let mut removed = self.possibly_ready_ticks.remove(next_tick);
705
706                    match &mut self.log {
707                        LogKind::Null => {}
708                        LogKind::Stderr => {
709                            if let Some(cid) = &removed.1 {
710                                eprintln!(
711                                    "\n{}",
712                                    format!("Running Tick (Cluster Member {})", cid)
713                                        .color(colored::Color::Magenta)
714                                        .bold()
715                                )
716                            } else {
717                                eprintln!(
718                                    "\n{}",
719                                    "Running Tick".color(colored::Color::Magenta).bold()
720                                )
721                            }
722                        }
723                        LogKind::Custom(writer) => {
724                            writeln!(
725                                writer,
726                                "\n{}",
727                                "Running Tick".color(colored::Color::Magenta).bold()
728                            )
729                            .unwrap();
730                        }
731                    }
732
733                    let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
734                        write.write_str(&"*".color(colored::Color::Magenta).bold())?;
735                        write.write_str(" ")
736                    };
737
738                    let mut tick_decision_writer =
739                        indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
740                            inserter: &mut asterisk_indenter,
741                        });
742
743                    let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
744                    let mut remaining_decision_count = hooks.len();
745                    let mut made_nontrivial_decision = false;
746
747                    bolero_generator::any::scope::borrow_with(|driver| {
748                        // first, scan manual decisions
749                        hooks.iter_mut().for_each(|hook| {
750                            if let Some(is_nontrivial) = hook.current_decision() {
751                                made_nontrivial_decision |= is_nontrivial;
752                                remaining_decision_count -= 1;
753                            } else if !hook.can_make_nontrivial_decision() {
754                                // if no nontrivial decision is possible, make a trivial one
755                                // (we need to do this in the first pass to force nontrivial decisions
756                                // on the remaining hooks)
757                                hook.autonomous_decision(driver, false);
758                                remaining_decision_count -= 1;
759                            }
760                        });
761
762                        hooks.iter_mut().for_each(|hook| {
763                            if hook.current_decision().is_none() {
764                                made_nontrivial_decision |= hook.autonomous_decision(
765                                    driver,
766                                    !made_nontrivial_decision && remaining_decision_count == 1,
767                                );
768                                remaining_decision_count -= 1;
769                            }
770
771                            hook.release_decision(&mut tick_decision_writer);
772                        });
773                    });
774
775                    assert!(removed.2.run_tick().await);
776                    self.possibly_ready_ticks.push(removed);
777                }
778            }
779        }
780    }
781}