1use core::fmt;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::fmt::Debug;
6use std::marker::PhantomData;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11
12use bytes::Bytes;
13use colored::Colorize;
14use dfir_rs::scheduled::graph::Dfir;
15use futures::{FutureExt, Stream, StreamExt};
16use libloading::Library;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tempfile::TempPath;
20use tokio::sync::mpsc::UnboundedSender;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22
23use super::runtime::SimHook;
24use crate::compile::deploy::ConnectableAsync;
25use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
26use crate::location::dynamic::LocationId;
27use crate::location::external_process::{ExternalBincodeSink, ExternalBincodeStream};
28
29pub struct CompiledSim {
31 pub(super) _path: TempPath,
32 pub(super) lib: Library,
33 pub(super) external_ports: Vec<usize>,
34 pub(super) external_registered: HashMap<usize, usize>,
35}
36
37#[sealed::sealed]
38pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
42#[sealed::sealed]
43impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
44
45fn null_handler(_args: fmt::Arguments) {}
46
47fn println_handler(args: fmt::Arguments) {
48 println!("{}", args);
49}
50
51fn eprintln_handler(args: fmt::Arguments) {
52 eprintln!("{}", args);
53}
54
55type SimLoaded<'a> = libloading::Symbol<
56 'a,
57 unsafe extern "Rust" fn(
58 bool,
59 HashMap<usize, UnboundedSender<Bytes>>,
60 HashMap<usize, UnboundedReceiverStream<Bytes>>,
61 fn(fmt::Arguments<'_>),
62 fn(fmt::Arguments<'_>),
63 ) -> (
64 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
65 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
66 HashMap<(&'static str, Option<u32>), Vec<Box<dyn SimHook>>>,
67 ),
68>;
69
70impl CompiledSim {
71 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
73 self.with_instantiator(|instantiator| thunk(instantiator()), true)
74 }
75
76 pub fn with_instantiator<T>(
84 &self,
85 thunk: impl FnOnce(&dyn Instantiator) -> T,
86 always_log: bool,
87 ) -> T {
88 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
89 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
90 thunk(
91 &(|| CompiledSimInstance {
92 func: func.clone(),
93 remaining_ports: self.external_ports.iter().cloned().collect(),
94 external_registered: self.external_registered.clone(),
95 input_ports: HashMap::new(),
96 output_ports: HashMap::new(),
97 log,
98 }),
99 )
100 }
101
102 pub fn fuzz<'a>(&'a self, thunk: impl AsyncFn(CompiledSimInstance) + RefUnwindSafe) {
113 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
114 .elements()
115 .into_iter()
116 .find(|e| {
117 !e.fn_name.starts_with("hydro_lang::sim::compiled")
118 && !e.fn_name.starts_with("hydro_lang::sim::flow")
119 && !e.fn_name.starts_with("fuzz<")
120 })
121 .unwrap();
122
123 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
124 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
125
126 let caller_fuzz_repro_path = repro_folder
127 .join(caller_fn.fn_name.replace("::", "__"))
128 .with_extension("bin");
129
130 if std::env::var("BOLERO_FUZZER").is_ok() {
131 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
132 std::fs::create_dir_all(&corpus_dir).unwrap();
133 let libfuzzer_args = format!(
134 "{} {} -artifact_prefix={}/ -handle_abrt=0",
135 corpus_dir.to_str().unwrap(),
136 corpus_dir.to_str().unwrap(),
137 corpus_dir.to_str().unwrap(),
138 );
139
140 std::fs::create_dir_all(&repro_folder).unwrap();
141
142 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
143 unsafe {
144 std::env::set_var(
145 "BOLERO_FAILURE_OUTPUT",
146 caller_fuzz_repro_path.to_str().unwrap(),
147 );
148 }
149 }
150
151 unsafe {
152 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
153 }
154
155 self.with_instantiator(
156 |instantiator| {
157 bolero::test(bolero::TargetLocation {
158 package_name: "",
159 manifest_dir: "",
160 module_path: "",
161 file: "",
162 line: 0,
163 item_path: "<unknown>::__bolero_item_path__",
164 test_name: None,
165 })
166 .run_with_replay(move |is_replay| {
167 let mut instance = instantiator();
168
169 if instance.log {
170 eprintln!(
171 "{}",
172 "\n==== New Simulation Instance ===="
173 .color(colored::Color::Cyan)
174 .bold()
175 );
176 }
177
178 if is_replay {
179 instance.log = true;
180 }
181
182 tokio::runtime::Builder::new_current_thread()
183 .build()
184 .unwrap()
185 .block_on(async {
186 let local_set = tokio::task::LocalSet::new();
187 local_set.run_until(thunk(instance)).await
188 })
189 })
190 },
191 false,
192 );
193 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
194 self.fuzz_repro(existing_bytes, thunk);
195 } else {
196 eprintln!(
197 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
198 caller_fuzz_repro_path.display()
199 );
200 self.with_instantiator(
201 |instantiator| {
202 bolero::test(bolero::TargetLocation {
203 package_name: "",
204 manifest_dir: "",
205 module_path: "",
206 file: ".",
207 line: 0,
208 item_path: "<unknown>::__bolero_item_path__",
209 test_name: None,
210 })
211 .with_iterations(8192)
212 .run(move || {
213 let instance = instantiator();
214 tokio::runtime::Builder::new_current_thread()
215 .build()
216 .unwrap()
217 .block_on(async {
218 let local_set = tokio::task::LocalSet::new();
219 local_set.run_until(thunk(instance)).await
220 })
221 })
222 },
223 false,
224 );
225 }
226 }
227
228 pub fn fuzz_repro<'a>(
232 &'a self,
233 bytes: Vec<u8>,
234 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
235 ) {
236 self.with_instance(|instance| {
237 bolero::bolero_engine::any::scope::with(
238 Box::new(bolero::bolero_engine::driver::object::Object(
239 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
240 )),
241 || {
242 tokio::runtime::Builder::new_current_thread()
243 .build()
244 .unwrap()
245 .block_on(async {
246 let local_set = tokio::task::LocalSet::new();
247 local_set.run_until(thunk(instance)).await
248 })
249 },
250 )
251 });
252 }
253
254 pub fn exhaustive<'a>(
265 &'a self,
266 mut thunk: impl AsyncFnMut(CompiledSimInstance) + RefUnwindSafe,
267 ) -> usize {
268 if std::env::var("BOLERO_FUZZER").is_ok() {
269 eprintln!(
270 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
271 );
272 std::process::abort();
273 }
274
275 let mut count = 0;
276 let count_mut = &mut count;
277
278 self.with_instantiator(
279 |instantiator| {
280 bolero::test(bolero::TargetLocation {
281 package_name: "",
282 manifest_dir: "",
283 module_path: "",
284 file: "",
285 line: 0,
286 item_path: "<unknown>::__bolero_item_path__",
287 test_name: None,
288 })
289 .exhaustive()
290 .run_with_replay(move |is_replay| {
291 *count_mut += 1;
292
293 let mut instance = instantiator();
294 if instance.log {
295 eprintln!(
296 "{}",
297 "\n==== New Simulation Instance ===="
298 .color(colored::Color::Cyan)
299 .bold()
300 );
301 }
302
303 if is_replay {
304 instance.log = true;
305 }
306
307 tokio::runtime::Builder::new_current_thread()
308 .build()
309 .unwrap()
310 .block_on(async {
311 let local_set = tokio::task::LocalSet::new();
312 local_set.run_until(thunk(instance)).await;
313 })
314 })
315 },
316 false,
317 );
318
319 count
320 }
321}
322
323pub struct CompiledSimInstance<'a> {
326 func: SimLoaded<'a>,
327 remaining_ports: HashSet<usize>,
328 external_registered: HashMap<usize, usize>,
329 output_ports: HashMap<usize, UnboundedSender<Bytes>>,
330 input_ports: HashMap<usize, UnboundedReceiverStream<Bytes>>,
331 log: bool,
332}
333
334impl<'a> CompiledSimInstance<'a> {
335 #[deprecated(note = "Use `connect` instead")]
336 pub fn connect_sink_bincode<T: Serialize + 'static, M, O: Ordering, R: Retries>(
339 &mut self,
340 port: &ExternalBincodeSink<T, M, O, R>,
341 ) -> SimSender<T, O, R> {
342 self.connect(port)
343 }
344
345 #[deprecated(note = "Use `connect` instead")]
346 pub fn connect_source_bincode<T: DeserializeOwned + 'static, O: Ordering, R: Retries>(
349 &mut self,
350 port: &ExternalBincodeStream<T, O, R>,
351 ) -> SimReceiver<'a, T, O, R> {
352 self.connect(port)
353 }
354
355 pub fn connect<'b, P: ConnectableAsync<&'b mut Self>>(
359 &'b mut self,
360 port: P,
361 ) -> <P as ConnectableAsync<&'b mut Self>>::Output {
362 let mut pinned = std::pin::pin!(port.connect(self));
363 if let Poll::Ready(v) = pinned.poll_unpin(&mut Context::from_waker(Waker::noop())) {
364 v
365 } else {
366 panic!("Connect impl should not have used any async operations");
367 }
368 }
369
370 pub fn launch(self) {
373 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
374 }
375
376 pub fn schedule_with_logger<W: std::io::Write>(
381 self,
382 log_writer: W,
383 ) -> impl use<W> + Future<Output = ()> {
384 self.schedule_with_maybe_logger(Some(log_writer))
385 }
386
387 fn schedule_with_maybe_logger<W: std::io::Write>(
388 mut self,
389 log_override: Option<W>,
390 ) -> impl use<W> + Future<Output = ()> {
391 for remaining in self.remaining_ports {
392 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
393 self.output_ports.insert(remaining, sender);
394 self.input_ports.insert(remaining, receiver);
395 }
396
397 let (async_dfirs, tick_dfirs, hooks) = unsafe {
398 (self.func)(
399 colored::control::SHOULD_COLORIZE.should_colorize(),
400 self.output_ports,
401 self.input_ports,
402 if self.log {
403 println_handler
404 } else {
405 null_handler
406 },
407 if self.log {
408 eprintln_handler
409 } else {
410 null_handler
411 },
412 )
413 };
414 let mut launched = LaunchedSim {
415 async_dfirs: async_dfirs
416 .into_iter()
417 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
418 .collect(),
419 possibly_ready_ticks: vec![],
420 not_ready_ticks: tick_dfirs
421 .into_iter()
422 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
423 .collect(),
424 hooks: hooks
425 .into_iter()
426 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
427 .collect(),
428 log: if self.log {
429 if let Some(w) = log_override {
430 LogKind::Custom(w)
431 } else {
432 LogKind::Stderr
433 }
434 } else {
435 LogKind::Null
436 },
437 };
438
439 async move { launched.scheduler().await }
440 }
441}
442
443pub struct SimReceiver<'a, T, O: Ordering, R: Retries>(
445 Pin<Box<dyn Stream<Item = T> + 'a>>,
446 PhantomData<(O, R)>,
447);
448
449impl<'a, T, O: Ordering, R: Retries> SimReceiver<'a, T, O, R> {
450 pub async fn assert_no_more(mut self)
452 where
453 T: Debug,
454 {
455 if let Some(next) = self.0.next().await {
456 panic!("Stream yielded unexpected message: {:?}", next);
457 }
458 }
459}
460
461impl<'a, T> SimReceiver<'a, T, TotalOrder, ExactlyOnce> {
462 pub async fn next(&mut self) -> Option<T> {
465 self.0.next().await
466 }
467
468 pub async fn collect<C: Default + Extend<T>>(self) -> C {
471 self.0.collect().await
472 }
473
474 pub async fn assert_yields<T2: Debug>(&mut self, expected: impl IntoIterator<Item = T2>)
477 where
478 T: Debug + PartialEq<T2>,
479 {
480 let mut expected: VecDeque<T2> = expected.into_iter().collect();
481
482 while !expected.is_empty() {
483 if let Some(next) = self.next().await {
484 assert_eq!(next, expected.pop_front().unwrap());
485 } else {
486 panic!("Stream ended early, still expected: {:?}", expected);
487 }
488 }
489 }
490
491 pub async fn assert_yields_only<T2: Debug>(mut self, expected: impl IntoIterator<Item = T2>)
494 where
495 T: Debug + PartialEq<T2>,
496 {
497 self.assert_yields(expected).await;
498 self.assert_no_more().await;
499 }
500}
501
502impl<'a, T> SimReceiver<'a, T, NoOrder, ExactlyOnce> {
503 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
506 where
507 T: Ord,
508 {
509 let mut collected: C = self.0.collect().await;
510 collected.as_mut().sort();
511 collected
512 }
513
514 pub async fn assert_yields_unordered<T2: Debug>(
517 &mut self,
518 expected: impl IntoIterator<Item = T2>,
519 ) where
520 T: Debug + PartialEq<T2>,
521 {
522 let mut expected: Vec<T2> = expected.into_iter().collect();
523
524 while !expected.is_empty() {
525 if let Some(next) = self.0.next().await {
526 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
527 if let Some((i, _)) = idx {
528 expected.swap_remove(i);
529 } else {
530 panic!("Stream yielded unexpected message: {:?}", next);
531 }
532 } else {
533 panic!("Stream ended early, still expected: {:?}", expected);
534 }
535 }
536 }
537
538 pub async fn assert_yields_only_unordered<T2: Debug>(
541 mut self,
542 expected: impl IntoIterator<Item = T2>,
543 ) where
544 T: Debug + PartialEq<T2>,
545 {
546 self.assert_yields_unordered(expected).await;
547 self.assert_no_more().await;
548 }
549}
550
551impl<'a, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
552 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeStream<T, O, R>
553{
554 type Output = SimReceiver<'a, T, O, R>;
555
556 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
557 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
558
559 assert!(ctx.remaining_ports.remove(looked_up));
560 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
561 ctx.output_ports.insert(*looked_up, sink);
562
563 SimReceiver(
564 Box::pin(source.map(|b| bincode::deserialize(&b).unwrap())),
565 PhantomData,
566 )
567 }
568}
569
570pub struct SimSender<T, O: Ordering, R: Retries>(
572 Box<dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>>,
573 PhantomData<(O, R)>,
574);
575impl<T> SimSender<T, TotalOrder, ExactlyOnce> {
576 pub fn send(&self, t: T) {
579 (self.0)(t).unwrap()
580 }
581
582 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
585 for t in iter {
586 (self.0)(t).unwrap();
587 }
588 }
589}
590
591impl<T> SimSender<T, NoOrder, ExactlyOnce> {
592 pub fn send_many_unordered<I: IntoIterator<Item = T>>(
595 &self,
596 iter: I,
597 ) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>> {
598 for t in iter {
599 (self.0)(t)?;
600 }
601 Ok(())
602 }
603}
604
605impl<'a, T: Serialize + 'static, M, O: Ordering, R: Retries>
606 ConnectableAsync<&mut CompiledSimInstance<'a>> for &ExternalBincodeSink<T, M, O, R>
607{
608 type Output = SimSender<T, O, R>;
609
610 async fn connect(self, ctx: &mut CompiledSimInstance<'a>) -> Self::Output {
611 let looked_up = ctx.external_registered.get(&self.port_id).unwrap();
612
613 assert!(ctx.remaining_ports.remove(looked_up));
614 let (sink, source) = dfir_rs::util::unbounded_channel::<Bytes>();
615 ctx.input_ports.insert(*looked_up, source);
616 SimSender(
617 Box::new(move |t| sink.send(bincode::serialize(&t).unwrap().into())),
618 PhantomData,
619 )
620 }
621}
622
623enum LogKind<W: std::io::Write> {
624 Null,
625 Stderr,
626 Custom(W),
627}
628
629impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
631 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
632 match self {
633 LogKind::Null => Ok(()),
634 LogKind::Stderr => {
635 eprint!("{}", s);
636 Ok(())
637 }
638 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
639 }
640 }
641}
642
643type Hooks = HashMap<(LocationId, Option<u32>), Vec<Box<dyn SimHook>>>;
644
645struct LaunchedSim<W: std::io::Write> {
648 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
649 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
650 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
651 hooks: Hooks,
652 log: LogKind<W>,
653}
654
655impl<W: std::io::Write> LaunchedSim<W> {
656 async fn scheduler(&mut self) {
657 loop {
658 tokio::task::yield_now().await;
659 let mut any_made_progress = false;
660 for (loc, c_id, dfir) in &mut self.async_dfirs {
661 if dfir.run_tick().await {
662 any_made_progress = true;
663 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
664 .not_ready_ticks
665 .drain(..)
666 .partition(|(tick_loc, tick_c_id, _)| {
667 let LocationId::Tick(_, outer) = tick_loc else {
668 unreachable!()
669 };
670 outer.as_ref() == loc && tick_c_id == c_id
671 });
672
673 self.possibly_ready_ticks.extend(now_ready);
674 self.not_ready_ticks.extend(still_not_ready);
675 }
676 }
677
678 if any_made_progress {
679 continue;
680 } else {
681 use bolero::generator::*;
682
683 let (ready, mut not_ready): (Vec<_>, Vec<_>) = self
684 .possibly_ready_ticks
685 .drain(..)
686 .partition(|(name, cid, _)| {
687 self.hooks
688 .get(&(name.clone(), *cid))
689 .unwrap()
690 .iter()
691 .any(|hook| {
692 hook.current_decision().unwrap_or(false)
693 || hook.can_make_nontrivial_decision()
694 })
695 });
696
697 self.possibly_ready_ticks = ready;
698 self.not_ready_ticks.append(&mut not_ready);
699
700 if self.possibly_ready_ticks.is_empty() {
701 break;
702 } else {
703 let next_tick = (0..self.possibly_ready_ticks.len()).any();
704 let mut removed = self.possibly_ready_ticks.remove(next_tick);
705
706 match &mut self.log {
707 LogKind::Null => {}
708 LogKind::Stderr => {
709 if let Some(cid) = &removed.1 {
710 eprintln!(
711 "\n{}",
712 format!("Running Tick (Cluster Member {})", cid)
713 .color(colored::Color::Magenta)
714 .bold()
715 )
716 } else {
717 eprintln!(
718 "\n{}",
719 "Running Tick".color(colored::Color::Magenta).bold()
720 )
721 }
722 }
723 LogKind::Custom(writer) => {
724 writeln!(
725 writer,
726 "\n{}",
727 "Running Tick".color(colored::Color::Magenta).bold()
728 )
729 .unwrap();
730 }
731 }
732
733 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
734 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
735 write.write_str(" ")
736 };
737
738 let mut tick_decision_writer =
739 indenter::indented(&mut self.log).with_format(indenter::Format::Custom {
740 inserter: &mut asterisk_indenter,
741 });
742
743 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
744 let mut remaining_decision_count = hooks.len();
745 let mut made_nontrivial_decision = false;
746
747 bolero_generator::any::scope::borrow_with(|driver| {
748 hooks.iter_mut().for_each(|hook| {
750 if let Some(is_nontrivial) = hook.current_decision() {
751 made_nontrivial_decision |= is_nontrivial;
752 remaining_decision_count -= 1;
753 } else if !hook.can_make_nontrivial_decision() {
754 hook.autonomous_decision(driver, false);
758 remaining_decision_count -= 1;
759 }
760 });
761
762 hooks.iter_mut().for_each(|hook| {
763 if hook.current_decision().is_none() {
764 made_nontrivial_decision |= hook.autonomous_decision(
765 driver,
766 !made_nontrivial_decision && remaining_decision_count == 1,
767 );
768 remaining_decision_count -= 1;
769 }
770
771 hook.release_decision(&mut tick_decision_writer);
772 });
773 });
774
775 assert!(removed.2.run_tick().await);
776 self.possibly_ready_ticks.push(removed);
777 }
778 }
779 }
780 }
781}