putex/src/main.rs
James Andariese bd00e26bde prep for 0.2.0
fixes:
should attempt to create bucket if it doesn't exist #4
must _run_ services, not just say it is. #3
improve cli ergonomics #2
WaitCR does not renew lock in between R intervals (and should) #1
2024-07-23 23:55:15 -05:00

370 lines
13 KiB
Rust

/*
Copyright 2024 James Andariese
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use anyhow::{anyhow, Result};
use bytes::Bytes;
use clap::{Parser};
use clap_verbosity_flag::{Verbosity, WarnLevel};
use tokio::process::{Command};
use tokio::time::{sleep, Duration};
use async_nats::{jetstream};
#[derive(Parser)]
struct Cli {
#[command(flatten)]
verbosity: Verbosity<WarnLevel>,
#[arg(long)]
/// use NATS, via this server. may be any valid nats:// address
nats: String,
#[arg(long)]
/// NATS: kv store bucket
bucket: String,
#[arg(long)]
/// NATS: kv store key
key: String,
#[arg(long)]
/// unique identifier for this agent.
/// for host-based clusters, hostname is usually a good choice.
token: String,
#[arg(long)]
/// script to run to check the health of the service;
/// if passed `active`, validate actual health;
/// if passed `standby`, validate candidacy for service
healthcheck: String,
#[arg(long)]
/// script to assert the started state of the service
start: String,
#[arg(long)]
/// script to assert the stopped state of the service
stop: String,
#[arg(short, default_value_t = 3000)]
/// renewal interval, the length of time between health checks
r: u64,
#[arg(short, default_value_t = 3)]
/// failures, r*f is when another agent will attempt to take the lock
f: u64,
#[arg(short, default_value_t = 2)]
/// confirms, r*c is when a new agent will begin fencing and services
c: u64,
}
struct Invocation {
args: Cli,
js: Option<jetstream::Context>,
latest_revision: u64,
latest_value: Bytes,
starts: u64,
exiting: bool,
active: bool,
}
impl Invocation {
async fn new() -> Result<Self> {
let mut r = Invocation {
args: Cli::parse(),
js: None,
latest_revision: 0,
latest_value: "".into(),
starts: 0,
exiting: false,
active: false,
};
r.setup().await?;
Ok(r)
}
async fn get_store(&mut self) -> Result<jetstream::kv::Store> {
let js = self.js.clone().ok_or(anyhow!("Jetstream not connected"))?;
let store_r = js.get_key_value(&self.args.bucket).await;
if let Ok(store) = store_r {
return Ok(store);
}
let bucket = self.args.bucket.clone();
log::warn!("bucket {bucket} does not exist. attempting to create it");
// it's either gonna work or it's not. yolooooooooooooooo
return Ok(
js.create_key_value(jetstream::kv::Config {
bucket: self.args.bucket.clone(),
..Default::default()
}).await?
);
}
async fn update_lock_data(&mut self) -> Result<()> {
let store = self.get_store().await?;
let e = store.entry(&self.args.key).await?;
match e {
Some(k) => {
self.latest_value = k.value.into();
self.latest_revision = k.revision;
}
None => {
self.latest_value = "".into();
self.latest_revision = 0;
}
};
Ok(())
}
async fn clear_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, "".into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn take_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, self.args.token.clone().into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn setup(&mut self) -> Result<()> {
let nc = async_nats::connect(&self.args.nats).await?;
println!("info: {:?}", nc.server_info());
self.js = Some(async_nats::jetstream::new(nc));
if self.args.verbosity.is_present() {
env_logger::Builder::new().filter_level(self.args.verbosity.log_level_filter()).init();
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
}
log::error!("error level active");
log::warn!("warn level active");
log::info!("info level active");
log::debug!("debug level active");
log::trace!("trace level active");
Ok(())
}
async fn pump(&mut self) -> Result<()> {
self.update_lock_data().await?;
if self.latest_value == "" || self.latest_value == self.args.token { // lk empty or mine?
log::trace!("lk empty or mine? - yes - Found an empty lock or my own token present");
if self.healthcheck().await? {
log::trace!("health check succeeded");
let max_i = self.args.f;
for i in 1..=max_i {
if self.take_lock().await.is_ok() {
self.unfence_and_enable_process().await?;
log::trace!("successfully unfenced and enabled process. ending pump cycle.");
return Ok(());
} else {
log::trace!("failed to take lock. attempt {i}/{max_i}");
}
}
} else {
log::info!("sunny side health check failed");
}
// lock failed to write or health check failed at this point
log::trace!("attempting to clear lock and kill process");
match self.kill_process().await {
Ok(()) => {
log::info!("killed process");
}
e => {
log::info!("failed to kill process: {e:?}");
}
}
match self.clear_lock().await {
Ok(()) => {
log::info!("cleared lock");
}
e => {
log::info!("failed to clear lock (will not retry but will wait 1R extra): {e:?}");
self.wait_r(1).await;
}
}
log::trace!("waiting F*R");
self.wait_r(self.args.f).await;
return Ok(());
} else {
log::trace!("lk empty or mine? - no - Found a different token in the lock");
self.kill_process().await?;
log::trace!("kill_process asserted");
if !self.healthcheck().await? {
log::trace!("initial takeover health check failed. not eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation.");
self.wait_r(self.args.f).await;
if !self.healthcheck().await? {
log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("confirmation takeover health check succeeded.");
if self.take_lock().await.is_ok() {
log::info!("lock taken. waiting C for other services to die.");
for _ in 1..=self.args.c {
self.wait_r(1).await;
self.take_lock().await?;
log::trace!("lock asserted while waiting for C");
// if this one causes us to exit early, it will simply cause an additional
// wait of C*R but without lock assertions so it could jeopardize the
// promotion of this agent. This agent which just failed to assert the
// lock for some reason...
// seems legit.
}
} else {
// this happens on every cycle whether the lock _needs_ to be taken or not so
// logging it every time is not helpful.
log::trace!("failed to take lock. ending cycle");
return Ok(());
}
}
log::trace!("ending pump cycle normally.");
Ok(())
}
async fn run(&mut self) -> Result<()> {
let mut round = 0;
self.starts = self.starts + 1;
let this_start = self.starts;
// we must wait C*R before participating in case we had just
// taken the lock and the program crashed and then restarted
// within .00001s because this is the year 2050 and computers
// are really fast now. This is because we must guarantee
// that the previous owner agent has C*R to disable itself.
// we can guarantee at all other times that we've waited the
// requisite time except in circumstances where we have left
// the control of the round_timer loop.
//
// For the same reason, this is also done whenever pump has
// failed. See below.
self.wait_r(self.args.c).await;
loop {
round = round + 1;
if self.exiting {
log::info!("exiting before round {round}");
break;
}
// BEWARE vvv
// this must be awaited before the loop ends.
let round_timer = sleep(Duration::from_millis(self.args.r));
// BEWARE ^^^
match self.pump().await {
Ok(()) => {
log::info!("pump {this_start}.{round}: success");
}
e => {
// See longer comment above about C*R
self.wait_r(self.args.c).await;
log::error!("pump {this_start}.{round}: error: {e:?}");
}
}
// BEWARL PART DEUX
round_timer.await;
}
Ok(())
}
async fn kill_process(&mut self) -> Result<()> {
log::trace!("attempting to kill process");
let was_active = self.active;
self.active = false;
if was_active {
log::warn!("killing process");
}
let started = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.stop.clone())
.arg("kill")
.spawn()?.wait().await?.success();
if started {
log::trace!("process killed successfully");
if !self.active {
log::warn!("process killed successfully");
}
} else {
log::warn!("kill process failed");
}
Ok(())
}
async fn unfence_and_enable_process(&mut self) -> Result<()> {
log::trace!("attempting to unfence and enable process");
let was_active = self.active;
self.active = true;
if !was_active {
log::warn!("starting process");
}
let started = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.start.clone())
.arg("start")
.arg(if self.active {"active"} else {"standby"})
.spawn()?.wait().await?.success();
if started {
log::trace!("process started successfully");
if !self.active {
log::warn!("process started successfully");
}
} else {
log::warn!("unfence/enable process failed");
}
Ok(())
}
async fn healthcheck(&mut self) -> Result<bool> {
let mut child = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.healthcheck.clone())
.arg("healthcheck")
.arg(if self.active {"active"} else {"standby"})
.spawn()?;
tokio::select! {
status = child.wait() => {
Ok(status?.success())
}
_ = sleep(Duration::from_millis(self.args.r * self.args.f)) => {
log::warn!("health check timed out.");
Ok(false)
}
}
}
async fn wait_r(&mut self, count: u64) {
sleep(Duration::from_millis(self.args.r * count)).await;
}
}
#[tokio::main]
async fn main() -> Result<()> {
log::info!("here we come");
let mut inv = Invocation::new().await?;
log::info!("here we are");
// inv.check().await?;
let rr = inv.run().await;
log::info!("{rr:?}");
Ok(())
}