Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
eed14b5ada | |||
9bdcfb0f9b | |||
b7cba1fb84 | |||
5e1d0c27c6 | |||
bd00e26bde |
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -1 +1,7 @@
|
||||||
/target
|
/target
|
||||||
|
result
|
||||||
|
\#*#
|
||||||
|
*~
|
||||||
|
.*
|
||||||
|
!.git*
|
||||||
|
.git
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "putex"
|
name = "putex"
|
||||||
version = "0.1.0"
|
version = "0.3.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
repository = "https://git.strudelline.net/james/putex.git"
|
repository = "https://git.strudelline.net/james/putex.git"
|
||||||
|
|
40
README.md
40
README.md
|
@ -4,6 +4,46 @@ Process Mutex
|
||||||
|
|
||||||
Used to manage a lock and timing components of an at-most-once execution system.
|
Used to manage a lock and timing components of an at-most-once execution system.
|
||||||
|
|
||||||
|
## Flake Usage in NixOS
|
||||||
|
|
||||||
|
Installing is accomplished with the included NixOS module. The included module
|
||||||
|
automatically enables the putex package system-wide so including it is sufficient.
|
||||||
|
|
||||||
|
Additionally, if you have the following setup:
|
||||||
|
* a NATS cluster reachable via 127.0.0.1
|
||||||
|
* a service named `routestar`
|
||||||
|
* unique hostnames for your hosts
|
||||||
|
* `systemctl is-failed` only shows failed when the host is truly ineligible to serve
|
||||||
|
|
||||||
|
then you have gained the following capability:
|
||||||
|
* the default setup for putex will control the service named `routestar`
|
||||||
|
|
||||||
|
```nix
|
||||||
|
{ inputs = {
|
||||||
|
nixpkgs.url = "nixpkgs";
|
||||||
|
putex.url = "git+https://git.strudelline.net/james/putex" # <-- include the flake
|
||||||
|
};
|
||||||
|
outputs = {
|
||||||
|
nixosConfigurations = {
|
||||||
|
container = nixpkgs.lib.nixosSystem {
|
||||||
|
system = "x86_64-linux";
|
||||||
|
modules = [
|
||||||
|
{ networking.useDHCP = false; boot.isContainer = true; system.stateVersion = "24.05"; }
|
||||||
|
|
||||||
|
putex.nixosModules.default # <-- then include the module.
|
||||||
|
{ services.putexes."routestar" = { }; } # <-- and configure the putex with defaults.
|
||||||
|
];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
There are additional configuration options including the `healthcheck`, `start`, and `stop` scripts
|
||||||
|
which must simply be `bash` scripts that run as fast as possible. More explicit requirements are
|
||||||
|
detailed below.
|
||||||
|
|
||||||
|
|
||||||
### NATS
|
### NATS
|
||||||
|
|
||||||
This currently uses [NATS](https://nats.io) exclusively for locking but there is
|
This currently uses [NATS](https://nats.io) exclusively for locking but there is
|
||||||
|
|
61
flake.lock
Normal file
61
flake.lock
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
{
|
||||||
|
"nodes": {
|
||||||
|
"flake-utils": {
|
||||||
|
"inputs": {
|
||||||
|
"systems": "systems"
|
||||||
|
},
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1710146030,
|
||||||
|
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nixpkgs": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1721838734,
|
||||||
|
"narHash": "sha256-o87oh2nLDzZ1E9+j1I6GaEvd9865OWGYvxaPSiH9DEU=",
|
||||||
|
"owner": "NixOS",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"rev": "1855c9961e0bfa2e776fa4b58b7d43149eeed431",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "NixOS",
|
||||||
|
"ref": "nixos-unstable-small",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": {
|
||||||
|
"inputs": {
|
||||||
|
"flake-utils": "flake-utils",
|
||||||
|
"nixpkgs": "nixpkgs"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systems": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1681028828,
|
||||||
|
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": "root",
|
||||||
|
"version": 7
|
||||||
|
}
|
45
flake.nix
Normal file
45
flake.nix
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
{
|
||||||
|
description = "CoreDNS";
|
||||||
|
inputs = {
|
||||||
|
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
|
||||||
|
flake-utils.url = "github:numtide/flake-utils";
|
||||||
|
};
|
||||||
|
|
||||||
|
outputs = {self, flake-utils, nixpkgs }:
|
||||||
|
with nixpkgs.lib;
|
||||||
|
let
|
||||||
|
packageConfigBase = flake-utils.lib.eachDefaultSystem (system:
|
||||||
|
let
|
||||||
|
pkgs = nixpkgs.legacyPackages.${system};
|
||||||
|
pkg = pkgs.callPackage ./package.nix {};
|
||||||
|
in
|
||||||
|
{
|
||||||
|
packages = rec {
|
||||||
|
putex = pkg;
|
||||||
|
default = pkg;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
);
|
||||||
|
nixosModules = rec {
|
||||||
|
putex = import ./nixos-module.nix self nixpkgs.lib;
|
||||||
|
default = putex;
|
||||||
|
};
|
||||||
|
nixosConfigurations = {
|
||||||
|
container = nixpkgs.lib.nixosSystem {
|
||||||
|
system = "x86_64-linux";
|
||||||
|
modules = [
|
||||||
|
self.nixosModules.default
|
||||||
|
{
|
||||||
|
networking.useDHCP = false; boot.isContainer = true; system.stateVersion = "24.05";
|
||||||
|
|
||||||
|
services.putex.putexes."testputex" = {};
|
||||||
|
|
||||||
|
}
|
||||||
|
];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
in
|
||||||
|
packageConfigBase // {
|
||||||
|
inherit nixosModules nixosConfigurations;
|
||||||
|
};
|
||||||
|
}
|
103
nixos-module.nix
Normal file
103
nixos-module.nix
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
self: lib:
|
||||||
|
|
||||||
|
with lib;
|
||||||
|
|
||||||
|
{ config, ... }:
|
||||||
|
|
||||||
|
let cfg = config.services.putex;
|
||||||
|
prog = config.programs.putex;
|
||||||
|
shq = lib.escapeShellArg;
|
||||||
|
shqs = lib.escapeShellArgs;
|
||||||
|
in
|
||||||
|
|
||||||
|
let
|
||||||
|
#runCommand = cmd: derivation { name = "script"; builder = "/bin/sh"; args = [ "-c" cmd ]; };
|
||||||
|
#writeExecutable = cmd: runCommand ''
|
||||||
|
# echo "${cmd}" > $out
|
||||||
|
# chmod +x $out
|
||||||
|
#'';
|
||||||
|
generateSystemdUnits =
|
||||||
|
name: { nats, bucket, key, token, healthcheck, stop, start, enable, package }:
|
||||||
|
{
|
||||||
|
services."putex-${name}" = {
|
||||||
|
script = ''
|
||||||
|
${package}/bin/putex -vvv \
|
||||||
|
--nats ${shq nats} \
|
||||||
|
--bucket ${shq bucket} \
|
||||||
|
--key ${shq key} \
|
||||||
|
--token ${shq token} \
|
||||||
|
--healthcheck ${shq healthcheck} \
|
||||||
|
--start ${shq start} \
|
||||||
|
--stop ${shq stop}
|
||||||
|
'';
|
||||||
|
after = [ "network.target" ];
|
||||||
|
wantedBy = [ "network.target" ];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
in
|
||||||
|
|
||||||
|
{
|
||||||
|
options = {
|
||||||
|
programs.putex = {
|
||||||
|
enable = mkEnableOption "putex" // { default = true; };
|
||||||
|
package = mkPackageOption self.packages.${config.nixpkgs.system} "putex" { };
|
||||||
|
};
|
||||||
|
services.putex = {
|
||||||
|
putexes = mkOption {
|
||||||
|
type = types.attrsOf (types.submodule ({name, ...}: {
|
||||||
|
options = {
|
||||||
|
nats = mkOption {
|
||||||
|
description = "NATS address for cluster backing the lock";
|
||||||
|
type = types.str;
|
||||||
|
default = "nats://127.0.0.1";
|
||||||
|
};
|
||||||
|
bucket = mkOption {
|
||||||
|
description = "bucket name in which to store the lock within which in";
|
||||||
|
type = types.str;
|
||||||
|
default = "putexes";
|
||||||
|
};
|
||||||
|
key = mkOption {
|
||||||
|
description = "kv key name for lock (the lock's \"name\" in the bucket)";
|
||||||
|
type = types.str;
|
||||||
|
default = name;
|
||||||
|
};
|
||||||
|
token = mkOption {
|
||||||
|
description = "kv token value which is unique to this agent within the context of the key.";
|
||||||
|
type = types.str;
|
||||||
|
default = "${config.networking.hostName}";
|
||||||
|
};
|
||||||
|
healthcheck = mkOption {
|
||||||
|
description = "script which evaluates the ability of this agent to serve as the host";
|
||||||
|
type = types.lines;
|
||||||
|
default = ''
|
||||||
|
! systemctl is-failed
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
stop = mkOption {
|
||||||
|
description = "script to start the service";
|
||||||
|
type = types.lines;
|
||||||
|
default = ''
|
||||||
|
systemctl stop ${name}.service
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
start = mkOption {
|
||||||
|
description = "script to start the service";
|
||||||
|
type = types.lines;
|
||||||
|
default = ''
|
||||||
|
systemctl start ${name}.service
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
enable = mkEnableOption "putex for ${name}" // { default = true; };
|
||||||
|
package = mkPackageOption self.packages.${config.nixpkgs.system} "putex" { };
|
||||||
|
};
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
config = {
|
||||||
|
services.putex.putexes = {};
|
||||||
|
environment.systemPackages = mkIf prog.enable [ prog.package ];
|
||||||
|
systemd = mkMerge (attrValues (mapAttrs generateSystemdUnits cfg.putexes));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
18
package.nix
Normal file
18
package.nix
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
{ lib, rustPlatform }:
|
||||||
|
|
||||||
|
with builtins;
|
||||||
|
with lib;
|
||||||
|
|
||||||
|
let cargoToml = (fromTOML (readFile ./Cargo.toml));
|
||||||
|
in
|
||||||
|
|
||||||
|
rustPlatform.buildRustPackage rec {
|
||||||
|
pname = cargoToml.package.name;
|
||||||
|
version = cargoToml.package.version;
|
||||||
|
src = ./.;
|
||||||
|
|
||||||
|
cargoLock = {
|
||||||
|
lockFile = src + /Cargo.lock;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
122
src/main.rs
122
src/main.rs
|
@ -29,13 +29,40 @@ struct Cli {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
verbosity: Verbosity<WarnLevel>,
|
verbosity: Verbosity<WarnLevel>,
|
||||||
|
|
||||||
server: String,
|
#[arg(long)]
|
||||||
|
/// use NATS, via this server. may be any valid nats:// address
|
||||||
|
nats: String,
|
||||||
|
#[arg(long)]
|
||||||
|
/// NATS: kv store bucket
|
||||||
bucket: String,
|
bucket: String,
|
||||||
|
#[arg(long)]
|
||||||
|
/// NATS: kv store key
|
||||||
key: String,
|
key: String,
|
||||||
|
#[arg(long)]
|
||||||
|
/// unique identifier for this agent.
|
||||||
|
/// for host-based clusters, hostname is usually a good choice.
|
||||||
token: String,
|
token: String,
|
||||||
|
#[arg(long)]
|
||||||
|
/// script to run to check the health of the service;
|
||||||
|
/// if passed `active`, validate actual health;
|
||||||
|
/// if passed `standby`, validate candidacy for service
|
||||||
healthcheck: String,
|
healthcheck: String,
|
||||||
startup: String,
|
#[arg(long)]
|
||||||
shutdown: String,
|
/// script to assert the started state of the service
|
||||||
|
start: String,
|
||||||
|
#[arg(long)]
|
||||||
|
/// script to assert the stopped state of the service
|
||||||
|
stop: String,
|
||||||
|
|
||||||
|
#[arg(short, default_value_t = 3000)]
|
||||||
|
/// renewal interval, the length of time between health checks
|
||||||
|
r: u64,
|
||||||
|
#[arg(short, default_value_t = 3)]
|
||||||
|
/// failures, r*f is when another agent will attempt to take the lock
|
||||||
|
f: u64,
|
||||||
|
#[arg(short, default_value_t = 2)]
|
||||||
|
/// confirms, r*c is when a new agent will begin fencing and services
|
||||||
|
c: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Invocation {
|
struct Invocation {
|
||||||
|
@ -43,9 +70,6 @@ struct Invocation {
|
||||||
js: Option<jetstream::Context>,
|
js: Option<jetstream::Context>,
|
||||||
latest_revision: u64,
|
latest_revision: u64,
|
||||||
latest_value: Bytes,
|
latest_value: Bytes,
|
||||||
r: u64,
|
|
||||||
f: u64,
|
|
||||||
c: u64,
|
|
||||||
starts: u64,
|
starts: u64,
|
||||||
exiting: bool,
|
exiting: bool,
|
||||||
active: bool,
|
active: bool,
|
||||||
|
@ -58,9 +82,6 @@ impl Invocation {
|
||||||
js: None,
|
js: None,
|
||||||
latest_revision: 0,
|
latest_revision: 0,
|
||||||
latest_value: "".into(),
|
latest_value: "".into(),
|
||||||
r: 1000,
|
|
||||||
f: 3,
|
|
||||||
c: 2,
|
|
||||||
starts: 0,
|
starts: 0,
|
||||||
exiting: false,
|
exiting: false,
|
||||||
active: false,
|
active: false,
|
||||||
|
@ -70,7 +91,20 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_store(&mut self) -> Result<jetstream::kv::Store> {
|
async fn get_store(&mut self) -> Result<jetstream::kv::Store> {
|
||||||
Ok(self.js.clone().ok_or(anyhow!("Jetstream not connected"))?.get_key_value(&self.args.bucket).await?)
|
let js = self.js.clone().ok_or(anyhow!("Jetstream not connected"))?;
|
||||||
|
let store_r = js.get_key_value(&self.args.bucket).await;
|
||||||
|
if let Ok(store) = store_r {
|
||||||
|
return Ok(store);
|
||||||
|
}
|
||||||
|
let bucket = self.args.bucket.clone();
|
||||||
|
log::warn!("bucket {bucket} does not exist. attempting to create it");
|
||||||
|
// it's either gonna work or it's not. yolooooooooooooooo
|
||||||
|
return Ok(
|
||||||
|
js.create_key_value(jetstream::kv::Config {
|
||||||
|
bucket: self.args.bucket.clone(),
|
||||||
|
..Default::default()
|
||||||
|
}).await?
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_lock_data(&mut self) -> Result<()> {
|
async fn update_lock_data(&mut self) -> Result<()> {
|
||||||
|
@ -104,7 +138,7 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn setup(&mut self) -> Result<()> {
|
async fn setup(&mut self) -> Result<()> {
|
||||||
let nc = async_nats::connect(&self.args.server).await?;
|
let nc = async_nats::connect(&self.args.nats).await?;
|
||||||
println!("info: {:?}", nc.server_info());
|
println!("info: {:?}", nc.server_info());
|
||||||
self.js = Some(async_nats::jetstream::new(nc));
|
self.js = Some(async_nats::jetstream::new(nc));
|
||||||
|
|
||||||
|
@ -129,7 +163,7 @@ impl Invocation {
|
||||||
|
|
||||||
if self.healthcheck().await? {
|
if self.healthcheck().await? {
|
||||||
log::trace!("health check succeeded");
|
log::trace!("health check succeeded");
|
||||||
let max_i = self.f;
|
let max_i = self.args.f;
|
||||||
for i in 1..=max_i {
|
for i in 1..=max_i {
|
||||||
if self.take_lock().await.is_ok() {
|
if self.take_lock().await.is_ok() {
|
||||||
self.unfence_and_enable_process().await?;
|
self.unfence_and_enable_process().await?;
|
||||||
|
@ -162,7 +196,7 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log::trace!("waiting F*R");
|
log::trace!("waiting F*R");
|
||||||
self.wait_r(self.f).await;
|
self.wait_r(self.args.f).await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
log::trace!("lk empty or mine? - no - Found a different token in the lock");
|
log::trace!("lk empty or mine? - no - Found a different token in the lock");
|
||||||
|
@ -174,7 +208,7 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation.");
|
log::trace!("initial takeover health check succeeded. waiting F for takeover confirmation.");
|
||||||
self.wait_r(self.f).await;
|
self.wait_r(self.args.f).await;
|
||||||
if !self.healthcheck().await? {
|
if !self.healthcheck().await? {
|
||||||
log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle.");
|
log::trace!("confirmation takeover health check failed. no longer eligible for takeover. ending pump cycle.");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -182,7 +216,16 @@ impl Invocation {
|
||||||
log::trace!("confirmation takeover health check succeeded.");
|
log::trace!("confirmation takeover health check succeeded.");
|
||||||
if self.take_lock().await.is_ok() {
|
if self.take_lock().await.is_ok() {
|
||||||
log::info!("lock taken. waiting C for other services to die.");
|
log::info!("lock taken. waiting C for other services to die.");
|
||||||
self.wait_r(self.c).await;
|
for _ in 1..=self.args.c {
|
||||||
|
self.wait_r(1).await;
|
||||||
|
self.take_lock().await?;
|
||||||
|
log::trace!("lock asserted while waiting for C");
|
||||||
|
// if this one causes us to exit early, it will simply cause an additional
|
||||||
|
// wait of C*R but without lock assertions so it could jeopardize the
|
||||||
|
// promotion of this agent. This agent which just failed to assert the
|
||||||
|
// lock for some reason...
|
||||||
|
// seems legit.
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// this happens on every cycle whether the lock _needs_ to be taken or not so
|
// this happens on every cycle whether the lock _needs_ to be taken or not so
|
||||||
// logging it every time is not helpful.
|
// logging it every time is not helpful.
|
||||||
|
@ -210,7 +253,7 @@ impl Invocation {
|
||||||
//
|
//
|
||||||
// For the same reason, this is also done whenever pump has
|
// For the same reason, this is also done whenever pump has
|
||||||
// failed. See below.
|
// failed. See below.
|
||||||
self.wait_r(self.c).await;
|
self.wait_r(self.args.c).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
round = round + 1;
|
round = round + 1;
|
||||||
|
@ -221,7 +264,7 @@ impl Invocation {
|
||||||
|
|
||||||
// BEWARE vvv
|
// BEWARE vvv
|
||||||
// this must be awaited before the loop ends.
|
// this must be awaited before the loop ends.
|
||||||
let round_timer = sleep(Duration::from_millis(self.r));
|
let round_timer = sleep(Duration::from_millis(self.args.r));
|
||||||
// BEWARE ^^^
|
// BEWARE ^^^
|
||||||
|
|
||||||
match self.pump().await {
|
match self.pump().await {
|
||||||
|
@ -230,7 +273,7 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
e => {
|
e => {
|
||||||
// See longer comment above about C*R
|
// See longer comment above about C*R
|
||||||
self.wait_r(self.c).await;
|
self.wait_r(self.args.c).await;
|
||||||
log::error!("pump {this_start}.{round}: error: {e:?}");
|
log::error!("pump {this_start}.{round}: error: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,24 +286,53 @@ impl Invocation {
|
||||||
|
|
||||||
async fn kill_process(&mut self) -> Result<()> {
|
async fn kill_process(&mut self) -> Result<()> {
|
||||||
log::trace!("attempting to kill process");
|
log::trace!("attempting to kill process");
|
||||||
if self.active {
|
let was_active = self.active;
|
||||||
|
self.active = false;
|
||||||
|
|
||||||
|
if was_active {
|
||||||
log::warn!("killing process");
|
log::warn!("killing process");
|
||||||
}
|
}
|
||||||
self.active = false;
|
let started = Command::new("/bin/sh")
|
||||||
|
.arg("-c")
|
||||||
|
.arg(self.args.stop.clone())
|
||||||
|
.arg("kill")
|
||||||
|
.spawn()?.wait().await?.success();
|
||||||
|
if started {
|
||||||
|
log::trace!("process killed successfully");
|
||||||
|
if !self.active {
|
||||||
|
log::warn!("process killed successfully");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log::warn!("kill process failed");
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn unfence_and_enable_process(&mut self) -> Result<()> {
|
async fn unfence_and_enable_process(&mut self) -> Result<()> {
|
||||||
log::trace!("attempting to unfence and enable process");
|
log::trace!("attempting to unfence and enable process");
|
||||||
if !self.active {
|
let was_active = self.active;
|
||||||
|
self.active = true;
|
||||||
|
if !was_active {
|
||||||
log::warn!("starting process");
|
log::warn!("starting process");
|
||||||
}
|
}
|
||||||
self.active = true;
|
let started = Command::new("/bin/sh")
|
||||||
|
.arg("-c")
|
||||||
|
.arg(self.args.start.clone())
|
||||||
|
.arg("start")
|
||||||
|
.arg(if self.active {"active"} else {"standby"})
|
||||||
|
.spawn()?.wait().await?.success();
|
||||||
|
if started {
|
||||||
|
log::trace!("process started successfully");
|
||||||
|
if !self.active {
|
||||||
|
log::warn!("process started successfully");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log::warn!("unfence/enable process failed");
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn healthcheck(&mut self) -> Result<bool> {
|
async fn healthcheck(&mut self) -> Result<bool> {
|
||||||
|
|
||||||
let mut child = Command::new("/bin/sh")
|
let mut child = Command::new("/bin/sh")
|
||||||
.arg("-c")
|
.arg("-c")
|
||||||
.arg(self.args.healthcheck.clone())
|
.arg(self.args.healthcheck.clone())
|
||||||
|
@ -271,7 +343,7 @@ impl Invocation {
|
||||||
status = child.wait() => {
|
status = child.wait() => {
|
||||||
Ok(status?.success())
|
Ok(status?.success())
|
||||||
}
|
}
|
||||||
_ = sleep(Duration::from_millis(self.r * self.f)) => {
|
_ = sleep(Duration::from_millis(self.args.r * self.args.f)) => {
|
||||||
log::warn!("health check timed out.");
|
log::warn!("health check timed out.");
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
@ -279,7 +351,7 @@ impl Invocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_r(&mut self, count: u64) {
|
async fn wait_r(&mut self, count: u64) {
|
||||||
sleep(Duration::from_millis(self.r * count)).await;
|
sleep(Duration::from_millis(self.args.r * count)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user