putex/src/main.rs

298 lines
9.6 KiB
Rust
Raw Normal View History

/*
Copyright 2024 James Andariese
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use anyhow::{anyhow, Result};
use bytes::Bytes;
use clap::{Parser};
use clap_verbosity_flag::{Verbosity, WarnLevel};
use tokio::process::{Command};
use tokio::time::{sleep, Duration};
use async_nats::{jetstream};
#[derive(Parser)]
struct Cli {
#[command(flatten)]
verbosity: Verbosity<WarnLevel>,
server: String,
bucket: String,
key: String,
token: String,
healthcheck: String,
startup: String,
shutdown: String,
}
struct Invocation {
args: Cli,
js: Option<jetstream::Context>,
latest_revision: u64,
latest_value: Bytes,
r: u64,
f: u64,
c: u64,
starts: u64,
exiting: bool,
active: bool,
}
impl Invocation {
async fn new() -> Result<Self> {
let mut r = Invocation {
args: Cli::parse(),
js: None,
latest_revision: 0,
latest_value: "".into(),
r: 1000,
f: 3,
c: 2,
starts: 0,
exiting: false,
active: false,
};
r.setup().await?;
Ok(r)
}
async fn get_store(&mut self) -> Result<jetstream::kv::Store> {
Ok(self.js.clone().ok_or(anyhow!("Jetstream not connected"))?.get_key_value(&self.args.bucket).await?)
}
async fn update_lock_data(&mut self) -> Result<()> {
let store = self.get_store().await?;
let e = store.entry(&self.args.key).await?;
match e {
Some(k) => {
self.latest_value = k.value.into();
self.latest_revision = k.revision;
}
None => {
self.latest_value = "".into();
self.latest_revision = 0;
}
};
Ok(())
}
async fn clear_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, "".into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn take_lock(&mut self) -> Result<()> {
let store = self.get_store().await?;
store.update(&self.args.key, self.args.token.clone().into(), self.latest_revision).await?;
self.update_lock_data().await?;
Ok(())
}
async fn setup(&mut self) -> Result<()> {
let nc = async_nats::connect(&self.args.server).await?;
println!("info: {:?}", nc.server_info());
self.js = Some(async_nats::jetstream::new(nc));
if self.args.verbosity.is_present() {
env_logger::Builder::new().filter_level(self.args.verbosity.log_level_filter()).init();
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
}
log::error!("error level active");
log::warn!("warn level active");
log::info!("info level active");
log::debug!("debug level active");
log::trace!("trace level active");
Ok(())
}
async fn pump(&mut self) -> Result<()> {
self.update_lock_data().await?;
if self.latest_value == "" || self.latest_value == self.args.token { // lk empty or mine?
log::trace!("lk empty or mine? - yes - Found an empty lock or my own token present");
if self.healthcheck().await? {
log::trace!("health check succeeded");
let max_i = self.f;
for i in 1..=max_i {
if self.take_lock().await.is_ok() {
self.unfence_and_enable_process().await?;
log::trace!("successfully unfenced and enabled process. ending pump cycle.");
return Ok(());
} else {
log::trace!("failed to take lock. attempt {i}/{max_i}");
}
}
} else {
log::info!("sunny side health check failed");
}
// lock failed to write or health check failed at this point
log::trace!("attempting to clear lock and kill process");
match self.kill_process().await {
Ok(()) => {
log::info!("killed process");
}
e => {
log::info!("failed to kill process: {e:?}");
}
}
match self.clear_lock().await {
Ok(()) => {
log::info!("cleared lock");
}
e => {
log::info!("failed to clear lock (will not retry but will wait 1R extra): {e:?}");
self.wait_r(1).await;
}
}
log::trace!("waiting F*R");
self.wait_r(self.f).await;
return Ok(());
} else {
log::trace!("lk empty or mine? - no - Found a different token in the lock");
self.kill_process().await?;
log::trace!("kill_process asserted");
if !self.healthcheck().await? {
log::trace!("initial takeover health check failed. not eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation.");
self.wait_r(self.f).await;
if !self.healthcheck().await? {
log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle.");
return Ok(());
}
log::trace!("confirmation takeover health check succeeded.");
if self.take_lock().await.is_ok() {
log::info!("lock taken. waiting C for other services to die.");
self.wait_r(self.c).await;
} else {
// this happens on every cycle whether the lock _needs_ to be taken or not so
// logging it every time is not helpful.
log::trace!("failed to take lock. ending cycle");
return Ok(());
}
}
log::trace!("ending pump cycle normally.");
Ok(())
}
async fn run(&mut self) -> Result<()> {
let mut round = 0;
self.starts = self.starts + 1;
let this_start = self.starts;
// we must wait C*R before participating in case we had just
// taken the lock and the program crashed and then restarted
// within .00001s because this is the year 2050 and computers
// are really fast now. This is because we must guarantee
// that the previous owner agent has C*R to disable itself.
// we can guarantee at all other times that we've waited the
// requisite time except in circumstances where we have left
// the control of the round_timer loop.
//
// For the same reason, this is also done whenever pump has
// failed. See below.
self.wait_r(self.c).await;
loop {
round = round + 1;
if self.exiting {
log::info!("exiting before round {round}");
break;
}
// BEWARE vvv
// this must be awaited before the loop ends.
let round_timer = sleep(Duration::from_millis(self.r));
// BEWARE ^^^
match self.pump().await {
Ok(()) => {
log::info!("pump {this_start}.{round}: success");
}
e => {
// See longer comment above about C*R
self.wait_r(self.c).await;
log::error!("pump {this_start}.{round}: error: {e:?}");
}
}
// BEWARL PART DEUX
round_timer.await;
}
Ok(())
}
async fn kill_process(&mut self) -> Result<()> {
log::trace!("attempting to kill process");
if self.active {
log::warn!("killing process");
}
self.active = false;
Ok(())
}
async fn unfence_and_enable_process(&mut self) -> Result<()> {
log::trace!("attempting to unfence and enable process");
if !self.active {
log::warn!("starting process");
}
self.active = true;
Ok(())
}
async fn healthcheck(&mut self) -> Result<bool> {
let mut child = Command::new("/bin/sh")
.arg("-c")
.arg(self.args.healthcheck.clone())
.arg("healthcheck")
.arg(if self.active {"active"} else {"standby"})
.spawn()?;
tokio::select! {
status = child.wait() => {
Ok(status?.success())
}
_ = sleep(Duration::from_millis(self.r * self.f)) => {
log::warn!("health check timed out.");
Ok(false)
}
}
}
async fn wait_r(&mut self, count: u64) {
sleep(Duration::from_millis(self.r * count)).await;
}
}
#[tokio::main]
async fn main() -> Result<()> {
log::info!("here we come");
let mut inv = Invocation::new().await?;
log::info!("here we are");
// inv.check().await?;
let rr = inv.run().await;
log::info!("{rr:?}");
Ok(())
}