From bd00e26bde0238f04cbbe4fd754c3ac4cf52572f Mon Sep 17 00:00:00 2001 From: James Andariese Date: Tue, 23 Jul 2024 23:50:38 -0500 Subject: [PATCH] prep for 0.2.0 fixes: should attempt to create bucket if it doesn't exist #4 must _run_ services, not just say it is. #3 improve cli ergonomics #2 WaitCR does not renew lock in between R intervals (and should) #1 --- Cargo.toml | 2 +- src/main.rs | 122 +++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7fdeb6f..0d905e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "putex" -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "Apache-2.0" repository = "https://git.strudelline.net/james/putex.git" diff --git a/src/main.rs b/src/main.rs index 8a2ac04..446d6fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,13 +29,40 @@ struct Cli { #[command(flatten)] verbosity: Verbosity, - server: String, + #[arg(long)] + /// use NATS, via this server. may be any valid nats:// address + nats: String, + #[arg(long)] + /// NATS: kv store bucket bucket: String, + #[arg(long)] + /// NATS: kv store key key: String, + #[arg(long)] + /// unique identifier for this agent. + /// for host-based clusters, hostname is usually a good choice. token: String, + #[arg(long)] + /// script to run to check the health of the service; + /// if passed `active`, validate actual health; + /// if passed `standby`, validate candidacy for service healthcheck: String, - startup: String, - shutdown: String, + #[arg(long)] + /// script to assert the started state of the service + start: String, + #[arg(long)] + /// script to assert the stopped state of the service + stop: String, + + #[arg(short, default_value_t = 3000)] + /// renewal interval, the length of time between health checks + r: u64, + #[arg(short, default_value_t = 3)] + /// failures, r*f is when another agent will attempt to take the lock + f: u64, + #[arg(short, default_value_t = 2)] + /// confirms, r*c is when a new agent will begin fencing and services + c: u64, } struct Invocation { @@ -43,9 +70,6 @@ struct Invocation { js: Option, latest_revision: u64, latest_value: Bytes, - r: u64, - f: u64, - c: u64, starts: u64, exiting: bool, active: bool, @@ -58,9 +82,6 @@ impl Invocation { js: None, latest_revision: 0, latest_value: "".into(), - r: 1000, - f: 3, - c: 2, starts: 0, exiting: false, active: false, @@ -70,7 +91,20 @@ impl Invocation { } async fn get_store(&mut self) -> Result { - Ok(self.js.clone().ok_or(anyhow!("Jetstream not connected"))?.get_key_value(&self.args.bucket).await?) + let js = self.js.clone().ok_or(anyhow!("Jetstream not connected"))?; + let store_r = js.get_key_value(&self.args.bucket).await; + if let Ok(store) = store_r { + return Ok(store); + } + let bucket = self.args.bucket.clone(); + log::warn!("bucket {bucket} does not exist. attempting to create it"); + // it's either gonna work or it's not. yolooooooooooooooo + return Ok( + js.create_key_value(jetstream::kv::Config { + bucket: self.args.bucket.clone(), + ..Default::default() + }).await? + ); } async fn update_lock_data(&mut self) -> Result<()> { @@ -104,7 +138,7 @@ impl Invocation { } async fn setup(&mut self) -> Result<()> { - let nc = async_nats::connect(&self.args.server).await?; + let nc = async_nats::connect(&self.args.nats).await?; println!("info: {:?}", nc.server_info()); self.js = Some(async_nats::jetstream::new(nc)); @@ -129,7 +163,7 @@ impl Invocation { if self.healthcheck().await? { log::trace!("health check succeeded"); - let max_i = self.f; + let max_i = self.args.f; for i in 1..=max_i { if self.take_lock().await.is_ok() { self.unfence_and_enable_process().await?; @@ -162,7 +196,7 @@ impl Invocation { } } log::trace!("waiting F*R"); - self.wait_r(self.f).await; + self.wait_r(self.args.f).await; return Ok(()); } else { log::trace!("lk empty or mine? - no - Found a different token in the lock"); @@ -174,7 +208,7 @@ impl Invocation { } log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation."); - self.wait_r(self.f).await; + self.wait_r(self.args.f).await; if !self.healthcheck().await? { log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle."); return Ok(()); @@ -182,7 +216,16 @@ impl Invocation { log::trace!("confirmation takeover health check succeeded."); if self.take_lock().await.is_ok() { log::info!("lock taken. waiting C for other services to die."); - self.wait_r(self.c).await; + for _ in 1..=self.args.c { + self.wait_r(1).await; + self.take_lock().await?; + log::trace!("lock asserted while waiting for C"); + // if this one causes us to exit early, it will simply cause an additional + // wait of C*R but without lock assertions so it could jeopardize the + // promotion of this agent. This agent which just failed to assert the + // lock for some reason... + // seems legit. + } } else { // this happens on every cycle whether the lock _needs_ to be taken or not so // logging it every time is not helpful. @@ -210,7 +253,7 @@ impl Invocation { // // For the same reason, this is also done whenever pump has // failed. See below. - self.wait_r(self.c).await; + self.wait_r(self.args.c).await; loop { round = round + 1; @@ -221,7 +264,7 @@ impl Invocation { // BEWARE vvv // this must be awaited before the loop ends. - let round_timer = sleep(Duration::from_millis(self.r)); + let round_timer = sleep(Duration::from_millis(self.args.r)); // BEWARE ^^^ match self.pump().await { @@ -230,7 +273,7 @@ impl Invocation { } e => { // See longer comment above about C*R - self.wait_r(self.c).await; + self.wait_r(self.args.c).await; log::error!("pump {this_start}.{round}: error: {e:?}"); } } @@ -243,24 +286,53 @@ impl Invocation { async fn kill_process(&mut self) -> Result<()> { log::trace!("attempting to kill process"); - if self.active { + let was_active = self.active; + self.active = false; + + if was_active { log::warn!("killing process"); } - self.active = false; + let started = Command::new("/bin/sh") + .arg("-c") + .arg(self.args.stop.clone()) + .arg("kill") + .spawn()?.wait().await?.success(); + if started { + log::trace!("process killed successfully"); + if !self.active { + log::warn!("process killed successfully"); + } + } else { + log::warn!("kill process failed"); + } Ok(()) } async fn unfence_and_enable_process(&mut self) -> Result<()> { log::trace!("attempting to unfence and enable process"); - if !self.active { + let was_active = self.active; + self.active = true; + if !was_active { log::warn!("starting process"); } - self.active = true; + let started = Command::new("/bin/sh") + .arg("-c") + .arg(self.args.start.clone()) + .arg("start") + .arg(if self.active {"active"} else {"standby"}) + .spawn()?.wait().await?.success(); + if started { + log::trace!("process started successfully"); + if !self.active { + log::warn!("process started successfully"); + } + } else { + log::warn!("unfence/enable process failed"); + } Ok(()) } async fn healthcheck(&mut self) -> Result { - let mut child = Command::new("/bin/sh") .arg("-c") .arg(self.args.healthcheck.clone()) @@ -271,7 +343,7 @@ impl Invocation { status = child.wait() => { Ok(status?.success()) } - _ = sleep(Duration::from_millis(self.r * self.f)) => { + _ = sleep(Duration::from_millis(self.args.r * self.args.f)) => { log::warn!("health check timed out."); Ok(false) } @@ -279,7 +351,7 @@ impl Invocation { } async fn wait_r(&mut self, count: u64) { - sleep(Duration::from_millis(self.r * count)).await; + sleep(Duration::from_millis(self.args.r * count)).await; } }