fix(engine): always push back hooks, add tests (#4924)

This commit is contained in:
Alexey Shekhirin
2023-10-05 20:47:15 +01:00
committed by GitHub
parent ec543ab2b6
commit 50b5215f4b
2 changed files with 270 additions and 11 deletions

View File

@ -10,6 +10,8 @@ use tracing::debug;
#[derive(Debug)]
pub(crate) struct PolledHook {
#[allow(unused)]
pub(crate) name: &'static str,
pub(crate) event: EngineHookEvent,
pub(crate) action: Option<EngineHookAction>,
pub(crate) db_access_level: EngineHookDBAccessLevel,
@ -56,7 +58,12 @@ impl EngineHooksController {
match hook.poll(cx, args)? {
Poll::Ready((event, action)) => {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };
let result = PolledHook {
name: hook.name(),
event,
action,
db_access_level: hook.db_access_level(),
};
debug!(
target: "consensus::engine::hooks",
@ -101,6 +108,33 @@ impl EngineHooksController {
) -> Poll<Result<PolledHook, EngineHookError>> {
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };
let result = self.poll_next_hook_inner(cx, &mut hook, args, db_write_active);
if matches!(
result,
Poll::Ready(Ok(PolledHook {
event: EngineHookEvent::Started,
db_access_level: EngineHookDBAccessLevel::ReadWrite,
..
}))
) {
// If a read-write hook started, set `running_hook_with_db_write` to it
self.running_hook_with_db_write = Some(hook);
} else {
// Otherwise, push it back to the collection of hooks to poll it next time
self.hooks.push_back(hook);
}
result
}
fn poll_next_hook_inner(
&mut self,
cx: &mut Context<'_>,
hook: &mut Box<dyn EngineHook>,
args: EngineContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
if hook.db_access_level().is_read_write() &&
@ -110,7 +144,12 @@ impl EngineHooksController {
}
if let Poll::Ready((event, action)) = hook.poll(cx, args)? {
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };
let result = PolledHook {
name: hook.name(),
event,
action,
db_access_level: hook.db_access_level(),
};
debug!(
target: "consensus::engine::hooks",
@ -119,15 +158,7 @@ impl EngineHooksController {
"Polled next hook"
);
if result.event.is_started() && result.db_access_level.is_read_write() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}
return Poll::Ready(Ok(result))
} else {
self.hooks.push_back(hook);
}
Poll::Pending
@ -138,3 +169,231 @@ impl EngineHooksController {
self.running_hook_with_db_write.is_some()
}
}
#[cfg(test)]
mod tests {
use crate::hooks::{
EngineContext, EngineHook, EngineHookAction, EngineHookDBAccessLevel, EngineHookEvent,
EngineHooks, EngineHooksController,
};
use futures::poll;
use reth_interfaces::{RethError, RethResult};
use std::{
collections::VecDeque,
future::poll_fn,
task::{Context, Poll},
};
struct TestHook {
results: VecDeque<RethResult<(EngineHookEvent, Option<EngineHookAction>)>>,
name: &'static str,
access_level: EngineHookDBAccessLevel,
}
impl TestHook {
fn new_ro(name: &'static str) -> Self {
Self {
results: Default::default(),
name,
access_level: EngineHookDBAccessLevel::ReadOnly,
}
}
fn new_rw(name: &'static str) -> Self {
Self {
results: Default::default(),
name,
access_level: EngineHookDBAccessLevel::ReadWrite,
}
}
fn add_result(&mut self, result: RethResult<(EngineHookEvent, Option<EngineHookAction>)>) {
self.results.push_back(result);
}
}
impl EngineHook for TestHook {
fn name(&self) -> &'static str {
self.name
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
_ctx: EngineContext,
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>> {
self.results.pop_front().map_or(Poll::Pending, Poll::Ready)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
self.access_level
}
}
#[tokio::test]
async fn poll_running_hook_with_db_write() {
let mut controller = EngineHooksController::new(EngineHooks::new());
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
// No currently running hook with DB write access is set
let result = poll!(poll_fn(|cx| controller.poll_running_hook_with_db_write(cx, context)));
assert!(result.is_pending());
// Currently running hook with DB write access returned `Pending` on polling
controller.running_hook_with_db_write = Some(Box::new(TestHook::new_rw("read-write")));
let result = poll!(poll_fn(|cx| controller.poll_running_hook_with_db_write(cx, context)));
assert!(result.is_pending());
// Currently running hook with DB write access returned `Ready` on polling, but didn't
// return `EngineHookEvent::Finished` yet.
// Currently running hooks with DB write should still be set.
let mut hook = TestHook::new_rw("read-write");
hook.add_result(Ok((EngineHookEvent::Started, None)));
controller.running_hook_with_db_write = Some(Box::new(hook));
let result = poll!(poll_fn(|cx| controller.poll_running_hook_with_db_write(cx, context)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.event.is_started() &&
polled_hook.action.is_none() &&
polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert!(controller.running_hook_with_db_write.is_some());
assert!(controller.hooks.is_empty());
// Currently running hook with DB write access returned `Ready` on polling and
// `EngineHookEvent::Finished` inside.
// Currently running hooks with DB write should be moved to collection of hooks.
let mut hook = TestHook::new_rw("read-write");
hook.add_result(Ok((EngineHookEvent::Finished(Ok(())), None)));
controller.running_hook_with_db_write = Some(Box::new(hook));
let result = poll!(poll_fn(|cx| controller.poll_running_hook_with_db_write(cx, context)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.event.is_finished() &&
polled_hook.action.is_none() &&
polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert!(controller.running_hook_with_db_write.is_none());
assert!(controller.hooks.pop_front().is_some());
}
#[tokio::test]
async fn poll_next_hook_db_write_active() {
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
let mut hook_rw = TestHook::new_rw("read-write");
hook_rw.add_result(Ok((EngineHookEvent::Started, None)));
let hook_ro_name = "read-only";
let mut hook_ro = TestHook::new_ro(hook_ro_name);
hook_ro.add_result(Ok((EngineHookEvent::Started, None)));
let mut hooks = EngineHooks::new();
hooks.add(hook_rw);
hooks.add(hook_ro);
let mut controller = EngineHooksController::new(hooks);
// Read-write hook can't be polled when external DB write is active
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, true)));
assert!(result.is_pending());
assert!(controller.running_hook_with_db_write.is_none());
// Read-only hook can be polled when external DB write is active
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, true)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_ro_name &&
polled_hook.event.is_started() &&
polled_hook.action.is_none() &&
polled_hook.db_access_level.is_read_only()
}),
Poll::Ready(true)
);
}
#[tokio::test]
async fn poll_next_hook_db_write_inactive() {
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
let hook_rw_1_name = "read-write-1";
let mut hook_rw_1 = TestHook::new_rw(hook_rw_1_name);
hook_rw_1.add_result(Ok((EngineHookEvent::Started, None)));
let hook_rw_2_name = "read-write-2";
let mut hook_rw_2 = TestHook::new_rw(hook_rw_2_name);
hook_rw_2.add_result(Ok((EngineHookEvent::Started, None)));
let hook_ro_name = "read-only";
let mut hook_ro = TestHook::new_ro(hook_ro_name);
hook_ro.add_result(Ok((EngineHookEvent::Started, None)));
hook_ro.add_result(Err(RethError::Custom("something went wrong".to_string())));
let mut hooks = EngineHooks::new();
hooks.add(hook_rw_1);
hooks.add(hook_rw_2);
hooks.add(hook_ro);
let mut controller = EngineHooksController::new(hooks);
let hooks_len = controller.hooks.len();
// Read-write hook can be polled because external DB write is not active
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_1_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_rw_1_name &&
polled_hook.event.is_started() &&
polled_hook.action.is_none() &&
polled_hook.db_access_level.is_read_write()
}),
Poll::Ready(true)
);
assert_eq!(
controller.running_hook_with_db_write.as_ref().map(|hook| hook.name()),
Some(hook_rw_1_name)
);
// Read-write hook cannot be polled because another read-write hook is running
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_2_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert!(result.is_pending());
// Read-only hook can be polled in parallel with already running read-write hook
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_ro_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(
result.map(|result| {
let polled_hook = result.unwrap();
polled_hook.name == hook_ro_name &&
polled_hook.event.is_started() &&
polled_hook.action.is_none() &&
polled_hook.db_access_level.is_read_only()
}),
Poll::Ready(true)
);
// Read-write hook still cannot be polled because another read-write hook is running
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_rw_2_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert!(result.is_pending());
// Read-only hook has finished with error
assert_eq!(controller.hooks.front().map(|hook| hook.name()), Some(hook_ro_name));
let result = poll!(poll_fn(|cx| controller.poll_next_hook(cx, context, false)));
assert_eq!(result.map(|result| { result.is_err() }), Poll::Ready(true));
assert!(controller.running_hook_with_db_write.is_some());
assert_eq!(controller.hooks.len(), hooks_len - 1)
}
}

View File

@ -113,7 +113,7 @@ pub enum EngineHookError {
}
/// Level of database access the hook needs for execution.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum EngineHookDBAccessLevel {
/// Read-only database access.
ReadOnly,