Struct tor_hsservice::publish::reactor::Reactor
source · pub(super) struct Reactor<R: Runtime, M: Mockable> {
imm: Arc<Immutable<R, M>>,
dir_provider: Arc<dyn NetDirProvider>,
inner: Arc<Mutex<Inner>>,
ipt_watcher: IptsPublisherView,
config_rx: Receiver<Arc<OnionServiceConfig>>,
publish_status_rx: Receiver<PublishStatus>,
publish_status_tx: Sender<PublishStatus>,
upload_task_complete_rx: Receiver<TimePeriodUploadResult>,
upload_task_complete_tx: Sender<TimePeriodUploadResult>,
shutdown_tx: Sender<Void>,
}
Expand description
A reactor for the HsDir Publisher
The entrypoint is Reactor::run
.
Fields§
§imm: Arc<Immutable<R, M>>
The immutable, shared inner state.
dir_provider: Arc<dyn NetDirProvider>
A source for new network directories that we use to determine our HsDirs.
inner: Arc<Mutex<Inner>>
The mutable inner state,
ipt_watcher: IptsPublisherView
A channel for receiving IPT change notifications.
config_rx: Receiver<Arc<OnionServiceConfig>>
A channel for receiving onion service config change notifications.
publish_status_rx: Receiver<PublishStatus>
A channel for receiving updates regarding our PublishStatus
.
The main loop of the reactor watches for updates on this channel.
When the PublishStatus
changes to UploadScheduled
,
we can start publishing descriptors.
If the PublishStatus
is AwaitingIpts
, publishing is
paused until we receive a notification on ipt_watcher
telling us the IPT manager has
established some introduction points.
publish_status_tx: Sender<PublishStatus>
A sender for updating our PublishStatus
.
When our PublishStatus
changes to UploadScheduled
,
we can start publishing descriptors.
upload_task_complete_rx: Receiver<TimePeriodUploadResult>
A channel for sending upload completion notifications.
This channel is polled in the main loop of the reactor.
upload_task_complete_tx: Sender<TimePeriodUploadResult>
A channel for receiving upload completion notifications.
A copy of this sender is handed to each upload task.
shutdown_tx: Sender<Void>
A sender for notifying any pending upload tasks that the reactor is shutting down.
Receivers can use this channel to find out when reactor is dropped.
This is currently only used in upload_for_time_period
.
Any future background tasks can also use this channel to detect if the reactor is dropped.
Closing this channel will cause any pending upload tasks to be dropped.
Implementations§
source§impl<R: Runtime, M: Mockable> Reactor<R, M>
impl<R: Runtime, M: Mockable> Reactor<R, M>
sourcepub(super) fn new(
runtime: R,
nickname: HsNickname,
dir_provider: Arc<dyn NetDirProvider>,
mockable: M,
config: Arc<OnionServiceConfig>,
ipt_watcher: IptsPublisherView,
config_rx: Receiver<Arc<OnionServiceConfig>>,
status_tx: PublisherStatusSender,
keymgr: Arc<KeyMgr>
) -> Self
pub(super) fn new( runtime: R, nickname: HsNickname, dir_provider: Arc<dyn NetDirProvider>, mockable: M, config: Arc<OnionServiceConfig>, ipt_watcher: IptsPublisherView, config_rx: Receiver<Arc<OnionServiceConfig>>, status_tx: PublisherStatusSender, keymgr: Arc<KeyMgr> ) -> Self
Create a new Reactor
.
sourcepub(super) async fn run(self) -> Result<(), FatalError>
pub(super) async fn run(self) -> Result<(), FatalError>
Start the reactor.
Under normal circumstances, this function runs indefinitely.
Note: this also spawns the “reminder task” that we use to reschedule uploads whenever an upload fails or is rate-limited.
sourceasync fn run_once(&mut self) -> Result<ShutdownStatus, FatalError>
async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError>
Run one iteration of the reactor loop.
sourcefn status(&self) -> PublishStatus
fn status(&self) -> PublishStatus
Returns the current status of the publisher
sourcefn handle_upload_results(&self, results: TimePeriodUploadResult)
fn handle_upload_results(&self, results: TimePeriodUploadResult)
Handle a batch of upload outcomes, possibly updating the status of the descriptor for the corresponding HSDirs.
sourceasync fn handle_consensus_change(
&mut self,
netdir: Arc<NetDir>
) -> Result<(), FatalError>
async fn handle_consensus_change( &mut self, netdir: Arc<NetDir> ) -> Result<(), FatalError>
Maybe update our list of HsDirs.
sourcefn recompute_hs_dirs(&self) -> Result<(), FatalError>
fn recompute_hs_dirs(&self) -> Result<(), FatalError>
Recompute the HsDirs for all relevant time periods.
sourcefn compute_time_periods(
&self,
netdir: &Arc<NetDir>,
time_periods: &[TimePeriodContext]
) -> Result<Vec<TimePeriodContext>, FatalError>
fn compute_time_periods( &self, netdir: &Arc<NetDir>, time_periods: &[TimePeriodContext] ) -> Result<Vec<TimePeriodContext>, FatalError>
Compute the TimePeriodContext
s for the time periods from the specified NetDir
.
The specified time_periods
are used to preserve the DescriptorStatus
of the
HsDirs where possible.
sourcefn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>>
fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>>
Replace the old netdir with the new, returning the old.
sourcefn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfig>) -> bool
fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfig>) -> bool
Replace our view of the service config with new_config
if new_config
contains changes
that would cause us to generate a new descriptor.
sourcefn note_ipt_change(&self) -> PublishStatus
fn note_ipt_change(&self) -> PublishStatus
Read the intro points from ipt_watcher
, and decide whether we’re ready to start
uploading.
sourceasync fn handle_ipt_change(
&mut self,
update: Option<Result<(), FatalError>>
) -> Result<ShutdownStatus, FatalError>
async fn handle_ipt_change( &mut self, update: Option<Result<(), FatalError>> ) -> Result<ShutdownStatus, FatalError>
Update our list of introduction points.
sourceasync fn update_publish_status_unless_waiting(
&mut self,
new_state: PublishStatus
) -> Result<(), FatalError>
async fn update_publish_status_unless_waiting( &mut self, new_state: PublishStatus ) -> Result<(), FatalError>
Update the PublishStatus
of the reactor with new_state
,
unless the current state is AwaitingIpts
.
sourceasync fn update_publish_status_unless_rate_lim(
&mut self,
new_state: PublishStatus
) -> Result<(), FatalError>
async fn update_publish_status_unless_rate_lim( &mut self, new_state: PublishStatus ) -> Result<(), FatalError>
Update the PublishStatus
of the reactor with new_state
,
unless the current state is RateLimited
.
sourceasync fn update_publish_status(
&mut self,
new_state: PublishStatus
) -> Result<(), FatalError>
async fn update_publish_status( &mut self, new_state: PublishStatus ) -> Result<(), FatalError>
Unconditionally update the PublishStatus
of the reactor with new_state
.
sourceasync fn handle_new_keys(&self) -> Result<(), FatalError>
async fn handle_new_keys(&self) -> Result<(), FatalError>
Use the new keys.
sourceasync fn handle_svc_config_change(
&mut self,
config: Arc<OnionServiceConfig>
) -> Result<(), FatalError>
async fn handle_svc_config_change( &mut self, config: Arc<OnionServiceConfig> ) -> Result<(), FatalError>
Update the descriptors based on the config change.
sourcefn mark_all_dirty(&self)
fn mark_all_dirty(&self)
Mark the descriptor dirty for all time periods.
sourcefn mark_dirty(&self, period: &TimePeriod) -> bool
fn mark_dirty(&self, period: &TimePeriod) -> bool
Mark the descriptor dirty for the specified time period.
Returns true
if the specified period is still relevant, and false
otherwise.
sourceasync fn upload_all(&mut self) -> Result<(), FatalError>
async fn upload_all(&mut self) -> Result<(), FatalError>
Try to upload our descriptor to the HsDirs that need it.
If we’ve recently uploaded some descriptors, we return immediately and schedule the upload
to happen after UPLOAD_RATE_LIM_THRESHOLD
.
Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we implement it, as well as in what cases this will return an error).
sourceasync fn upload_for_time_period(
hs_dirs: Vec<RelayIds>,
netdir: &Arc<NetDir>,
config: Arc<OnionServiceConfig>,
params: HsDirParams,
imm: Arc<Immutable<R, M>>,
ipt_upload_view: IptsPublisherUploadView,
upload_task_complete_tx: Sender<TimePeriodUploadResult>,
shutdown_rx: Receiver<Void>
) -> Result<(), FatalError>
async fn upload_for_time_period( hs_dirs: Vec<RelayIds>, netdir: &Arc<NetDir>, config: Arc<OnionServiceConfig>, params: HsDirParams, imm: Arc<Immutable<R, M>>, ipt_upload_view: IptsPublisherUploadView, upload_task_complete_tx: Sender<TimePeriodUploadResult>, shutdown_rx: Receiver<Void> ) -> Result<(), FatalError>
Upload the descriptor for the specified time period.
Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we implement it, as well as in what cases this will return an error).
sourceasync fn upload_descriptor(
hsdesc: String,
netdir: &Arc<NetDir>,
hsdir: &Relay<'_>,
imm: Arc<Immutable<R, M>>
) -> Result<(), UploadError>
async fn upload_descriptor( hsdesc: String, netdir: &Arc<NetDir>, hsdir: &Relay<'_>, imm: Arc<Immutable<R, M>> ) -> Result<(), UploadError>
Upload a descriptor to the specified HSDir.
If an upload fails, this returns an Err
. This function does not handle retries. It is up
to the caller to retry on failure.
This function does not handle timeouts.
sourceasync fn upload_descriptor_with_retries(
hsdesc: String,
netdir: &Arc<NetDir>,
hsdir: &Relay<'_>,
ed_id: &str,
rsa_id: &str,
imm: Arc<Immutable<R, M>>
) -> UploadStatus
async fn upload_descriptor_with_retries( hsdesc: String, netdir: &Arc<NetDir>, hsdir: &Relay<'_>, ed_id: &str, rsa_id: &str, imm: Arc<Immutable<R, M>> ) -> UploadStatus
Upload a descriptor to the specified HSDir, retrying if appropriate.
TODO (#1216): document the retry logic when we implement it.
sourceasync fn start_rate_limit(&mut self, delay: Duration) -> Result<(), FatalError>
async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), FatalError>
Stop publishing descriptors until the specified delay elapses.
sourceasync fn expire_rate_limit(&mut self) -> Result<(), FatalError>
async fn expire_rate_limit(&mut self) -> Result<(), FatalError>
Handle the upload rate-limit being lifted.
Auto Trait Implementations§
impl<R, M> !Freeze for Reactor<R, M>
impl<R, M> !RefUnwindSafe for Reactor<R, M>
impl<R, M> Send for Reactor<R, M>
impl<R, M> Sync for Reactor<R, M>
impl<R, M> Unpin for Reactor<R, M>
impl<R, M> !UnwindSafe for Reactor<R, M>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.