feat: test channel for pausing persistence receiver (#10699)

This commit is contained in:
Dan Cline
2024-09-06 13:39:05 -04:00
committed by GitHub
parent ed778e10b9
commit 27d4e8c363

View File

@ -2529,6 +2529,61 @@ mod tests {
};
use tokio::sync::mpsc::unbounded_channel;
/// This is a test channel that allows you to `release` any value that is in the channel.
///
/// If nothing has been sent, then the next value will be immediately sent.
#[allow(dead_code)]
struct TestChannel<T> {
/// If an item is sent to this channel, an item will be released in the wrapped channel
release: Receiver<()>,
/// The sender channel
tx: Sender<T>,
/// The receiver channel
rx: Receiver<T>,
}
impl<T: Send + 'static> TestChannel<T> {
/// Creates a new test channel
#[allow(dead_code)]
fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
let (original_tx, original_rx) = channel();
let (wrapped_tx, wrapped_rx) = channel();
let (release_tx, release_rx) = channel();
let handle = TestChannelHandle::new(release_tx);
let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
// spawn the task that listens and releases stuff
std::thread::spawn(move || test_channel.intercept_loop());
(original_tx, wrapped_rx, handle)
}
/// Runs the intercept loop, waiting for the handle to release a value
fn intercept_loop(&self) {
while self.release.recv() == Ok(()) {
let Ok(value) = self.rx.recv() else { return };
let _ = self.tx.send(value);
}
}
}
struct TestChannelHandle {
/// The sender to use for releasing values
release: Sender<()>,
}
impl TestChannelHandle {
/// Returns a [`TestChannelHandle`]
const fn new(release: Sender<()>) -> Self {
Self { release }
}
/// Signals to the channel task that a value should be released
#[allow(dead_code)]
fn release(&self) {
let _ = self.release.send(());
}
}
struct TestHarness {
tree: EngineApiTreeHandler<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes>>>,
@ -2543,6 +2598,20 @@ mod tests {
impl TestHarness {
fn new(chain_spec: Arc<ChainSpec>) -> Self {
let (action_tx, action_rx) = channel();
Self::with_persistence_channel(chain_spec, action_tx, action_rx)
}
#[allow(dead_code)]
fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
(Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
}
fn with_persistence_channel(
chain_spec: Arc<ChainSpec>,
action_tx: Sender<PersistenceAction>,
action_rx: Receiver<PersistenceAction>,
) -> Self {
let persistence_handle = PersistenceHandle::new(action_tx);
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));