From 86d5cd6d0a8b696697b691c51fb14d71516c9eac Mon Sep 17 00:00:00 2001 From: James Andariese Date: Wed, 1 Nov 2023 00:06:38 -0500 Subject: [PATCH] add timeouts --- Cargo.lock | 7 ++ Cargo.toml | 1 + src/cfg.rs | 9 ++ src/main.rs | 254 +++++++++++++++-------------------------------- src/pod_utils.rs | 154 ++++++++++++++++++++++++++++ 5 files changed, 253 insertions(+), 172 deletions(-) create mode 100644 src/pod_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 468f704..ea80b6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,6 +587,7 @@ dependencies = [ "clap", "config", "futures", + "humantime", "k8s-openapi", "kube", "kube_quantity", @@ -679,6 +680,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" diff --git a/Cargo.toml b/Cargo.toml index a300741..e961628 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ anyhow = "1.0.75" clap = { version = "4.4.7", features = ["derive", "env"] } config = "0.13.3" futures = "0.3.29" +humantime = "2.1.0" k8s-openapi = { version = "0.20.0", features = ["v1_27"] } kube = { version = "0.86.0", features = ["ws"] } kube_quantity = "0.7.0" diff --git a/src/cfg.rs b/src/cfg.rs index 0aa69fc..3c55a42 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -17,6 +17,15 @@ pub struct Config { /// Docker image used for git Jobs pub image: String, + #[arg(short,long,env="GIT_REMOTE_K8S_TIMEOUT")] + pub timeout: Option, + + #[arg(long("setup-timeout"),env = "GIT_REMOTE_K8S_SETUP_TIMEOUT")] + pub setup_timeout: Option, + + #[arg(long("transfer-timeout"), env = "GIT_REMOTE_K8S_TRANSFER_TIMEOUT")] + pub transfer_timeout: Option, + #[arg(index = 1)] /// remote name pub remote_name: String, diff --git a/src/main.rs b/src/main.rs index 8c5a4ea..8ae353c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ use once_cell::sync::Lazy; use std::collections::BTreeMap; use std::process::ExitCode; use std::sync::Arc; +use std::sync::atomic::{AtomicU8, Ordering}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; @@ -32,6 +33,8 @@ use kube::{api::*, config::KubeConfigOptions}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::task::JoinHandle; +const KUBE_JOB_LABEL: &str = "sleeper"; +const KUBE_CONTAINER_NAME: &str = KUBE_JOB_LABEL; const SCHEME: &str = "k8s"; const PAT_SCHEME: &str = r"[a-zA-Z][a-zA-Z0-9+.-]*"; const PAT_PATH: &str = r"[0-9a-zA-Z](?:[0-9a-zA-Z.-]*[0-9a-zA-Z])?"; @@ -44,12 +47,20 @@ mod test; mod cfg; mod errors; - -use crate::{cfg::*,errors::*}; +mod pod_utils; +use pod_utils::is_pod_finished; +use crate::{cfg::*,errors::*,pod_utils::*}; struct AppContext { config: Arc, ensures: Vec>, + rc: Arc, +} + +impl Into for AppContext { + fn into(self) -> ExitCode { + self.rc.clone().fetch_add(0, Ordering::SeqCst).into() + } } impl AppContext { @@ -57,6 +68,7 @@ impl AppContext { Ok(Self { config: Arc::new(Config::parse()), ensures: vec![], + rc: Arc::new(AtomicU8::new(1)), // ensurance: ensurance, }) } @@ -70,6 +82,9 @@ impl AppContext { kube::Config::from_kubeconfig(&kco).await?, )?) } + async fn set_exit_code(&self, rc: u8) { + self.rc.store(rc, Ordering::SeqCst); + } } impl Drop for AppContext { @@ -90,152 +105,49 @@ impl Drop for AppContext { } } -trait PodExt { - fn label_selectors(&self) -> Vec; - fn field_selectors(&self) -> Result>; -} - -impl PodExt for Pod { - fn label_selectors(&self) -> Vec { - let l = self.labels(); - let selectors = Vec::with_capacity(l.len()); - for (k, v) in l.iter() { - format!("{}={}", k, v); - } - selectors - } - - fn field_selectors(&self) -> Result> { - Ok(vec![ - format!( - "metadata.name={}", - self.meta() - .name - .as_ref() - .ok_or(ApplicationError::PodNoName)? - ), - format!( - "metadata.namespace={}", - self.meta() - .namespace - .as_ref() - .ok_or(ApplicationError::PodNoNamespace)? - ), - ]) - } -} - -async fn wait_for_pod_running_watch(pods: &Api, pod: Pod) -> Result<()> { - let mut wp = WatchParams::default(); - for fs in pod.field_selectors()? { - wp = wp.fields(&fs); - } - let mut stream = pods.watch(&wp, "0").await?.boxed(); - while let Some(status) = stream.try_next().await? { - match status { - WatchEvent::Modified(o) => { - let s = o.status.as_ref().expect("status exists on pod"); - if s.phase.clone().unwrap_or_default() == "Running" { - info!("Ready to attach to {}", o.name_any()); - break; - } - if s.phase.clone().unwrap_or_default() == "Failed" { - error!("Pod which we are awaiting has entered failed state instead of running"); - return Err(ApplicationError::PodCouldNotStart.into()); - } - if s.phase.clone().unwrap_or_default() == "Succeeded" { - error!("Pod which we are awaiting has completed instead"); - return Err(ApplicationError::PodDidNotWait.into()); - } - } - _ => {} - } - } - Ok(()) -} - -async fn is_pod_running(pods: &Api, pod: Pod) -> Result { - let got_pod = pods - .get( - &pod.metadata - .name - .ok_or(anyhow!("pod metadata must have a name"))?, - ) - .await?; - let phase = got_pod - .status - .ok_or(anyhow!("pod has no status"))? - .phase - .ok_or(anyhow!("pod has no status.phase"))?; - if phase == "Running" { - Ok(true) - } else { - Ok(false) - } -} - -async fn is_pod_finished(pods: &Api, pod: &Pod) -> Result { - let got_pod = pods - .get( - &pod.metadata - .name - .as_ref() - .ok_or(anyhow!("pod metadata must have a name"))?, - ) - .await?; - let phase = got_pod - .status - .ok_or(anyhow!("pod has no status"))? - .phase - .ok_or(anyhow!("pod has no status.phase"))?; - if phase == "Failed" { - Ok(true) - } else if phase == "Succeeded" { - Ok(true) - } else { - Ok(false) - } -} - -async fn wait_for_pod_running(pods: &Api, pod: &Pod) -> Result<()> { - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - let xtx = tx.clone(); - let xpods = pods.clone(); - let xpod = pod.clone(); - let _p: JoinHandle> = tokio::spawn(async move { - let r = is_pod_running(&xpods, xpod).await; - if let Ok(true) = r { - xtx.send(Ok(())).await.expect("couldn't send to channel"); - } - if let Err(e) = r { - xtx.send(Err(e)).await.expect("couldn't send to channel"); - } - Ok(()) - }); - let xtx = tx.clone(); - let xpods = pods.clone(); - let xpod = pod.clone(); - let _w = tokio::spawn(async move { - xtx.send(wait_for_pod_running_watch(&xpods, xpod).await) - .await - .expect("couldn't send on channel"); - }); - let r = rx.recv().await; - if r.is_none() { - bail!("failed to read API while waiting for pod to start"); - } - let r = r.expect("failed to extract value after checking for None"); - r -} - #[tokio::main] async fn main() -> ExitCode { - let mut rc = ExitCode::from(0); - if let Err(e) = main_wrapped(&mut rc).await { + let mut ctx = { + let ctx = AppContext::new(); + match ctx { + Err(e) => { + panic!("{}", e) + }, + Ok(c) => c, + } + }; + + // let cfg = ctx.cfg(); + + if let Err(e) = real_main(&mut ctx).await { error!("{}", e); return ExitCode::from(127); } - rc + ctx.into() +} + +async fn real_main(ctx: &mut AppContext) -> Result<()> { + let sto = ctx.cfg().timeout; + let sto = ctx.cfg().setup_timeout.or(sto); + let tto = ctx.cfg().timeout; + let tto = ctx.cfg().transfer_timeout.or(tto); + + let (pods_api, kube_worker_name) = { + if let Some(d) = sto { + tokio::time::timeout(d.into(), setup_pod(ctx)).await?? + } else { + setup_pod(ctx).await? + } + }; + let _r = { + if let Some(d) = tto { + tokio::time::timeout(d.into(), do_git(ctx, pods_api, kube_worker_name)).await?? + } else { + do_git(ctx, pods_api, kube_worker_name).await? + } + }; + Ok(()) + //Ok(()) } async fn set_log_level(level: Level) { @@ -246,10 +158,7 @@ async fn set_log_level(level: Level) { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); } -async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { - *rc = 1.into(); - - let ctx = AppContext::new()?; +async fn setup_pod(ctx: &mut AppContext) -> Result<(Api, String)> { let cfg = ctx.cfg(); set_log_level(match cfg.verbose { @@ -275,28 +184,26 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { let __kube_worker_name_val = format!("git-remote-worker-{kube_pvc}"); let kube_worker_name = __kube_worker_name_val.as_str(); let kube_image = cfg.image.as_str(); - let kube_job_label = "sleeper"; - let kube_container_name = kube_job_label; let kube_shell_executable = "/bin/sh"; let kube_shell_parameters = "-c"; let kube_shell_sleeper_command = r#" - set -e + set -e - SD="$(date +%s)" - CD=$SD - LD=$SD - echo $SD > /repo.timeout - while [ $(( LD + 900 )) -ge $CD ];do - sleep 5 - CD="$(date +%s)" - NLD=$(cat /repo.timeout) - if [ $NLD -ne $LD ];then - echo "updated last run time to $NLD (from $LD)" - fi - LD=$NLD - done - echo "worker ran for $(( CD - SD )) seconds" - "#; + SD="$(date +%s)" + CD=$SD + LD=$SD + echo $SD > /repo.timeout + while [ $(( LD + 900 )) -ge $CD ];do + sleep 5 + CD="$(date +%s)" + NLD=$(cat /repo.timeout) + if [ $NLD -ne $LD ];then + echo "updated last run time to $NLD (from $LD)" + fi + LD=$NLD + done + echo "worker ran for $(( CD - SD )) seconds" + "#; let kube_repo_mount_path = "/repo"; let parsed_size: Result = @@ -312,7 +219,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { let kube_pod_labels = vec![ ("com.kn8v.git-remote-k8s/repo-name", kube_pvc), - ("com.kn8v.git-remote-k8s/job", kube_job_label), + ("com.kn8v.git-remote-k8s/job", KUBE_JOB_LABEL), ]; info!("Remote Context: {}", kube_context); @@ -379,7 +286,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { spec.restart_policy = Some("Never".to_owned()); { let mut container = Container::default(); - container.name = kube_container_name.to_owned(); + container.name = KUBE_CONTAINER_NAME.to_owned(); container.command = Some(vec![ kube_shell_executable.to_owned(), kube_shell_parameters.to_owned(), @@ -387,13 +294,13 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { ]); container.image = Some(kube_image.to_owned()); container.working_dir = Some(kube_repo_mount_path.to_owned()); + container.image_pull_policy = Some("IfNotPresent".to_owned()); { let mut volume_mount = VolumeMount::default(); volume_mount.mount_path = kube_repo_mount_path.to_owned(); volume_mount.name = "repo".to_owned(); container.volume_mounts = Some(vec![volume_mount]); } - container.image_pull_policy = Some("IfNotPresent".to_owned()); spec.containers = vec![container]; } { @@ -502,7 +409,10 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { info!("waiting for worker pod to come online"); wait_for_pod_running(&pods_api, &pod).await?; info!("work pod is online. continuing."); + Ok((pods_api, kube_worker_name.to_owned())) +} +async fn do_git(ctx: &AppContext, pods_api: Api, kube_worker_name: String) -> Result<()> { let mut gitcommand = "date +%s > /repo.timeout; 1>&2 echo welcome from the git-remote-k8s worker pod ; [ -f HEAD ] || git init --bare 1>&2".to_owned(); let mut ttyout = tokio::io::stdout(); let mut ttyin = tokio::io::stdin(); @@ -527,10 +437,10 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { .stdin(true) .stdout(true) .stderr(true) - .container(kube_container_name.to_owned()); + .container(KUBE_CONTAINER_NAME.to_owned()); // let (ready_tx, ready_rx) = oneshot::channel::<()>(); let mut stuff = pods_api - .exec(kube_worker_name, vec!["sh", "-c", &gitcommand], &ap) + .exec(&kube_worker_name, vec!["sh", "-c", &gitcommand], &ap) .await?; let mut podout = stuff .stdout() @@ -630,9 +540,9 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { // integer exit code. if status.reason == Some("NonZeroExitCode".to_owned()) { info!("exit status of remote job: {}", exitcode); - *rc = exitcode.into(); + ctx.set_exit_code(exitcode).await; } else { - *rc = 0.into(); + ctx.set_exit_code(0).await; } debug!("waiting for group to exit"); barrier.wait().await; diff --git a/src/pod_utils.rs b/src/pod_utils.rs new file mode 100644 index 0000000..70d93f6 --- /dev/null +++ b/src/pod_utils.rs @@ -0,0 +1,154 @@ +// TODO: there is a stupid panic here that will be annoying to fix. +// it happens when there is a timeout that cancels the tasks +// listening to these sends while this one is sending resulting +// in a totally ignorable error that I don't know what to do +// with right now. +// 2023-11-01T05:02:05.009858Z INFO git_remote_k8s: waiting for worker pod to come online +// 2023-11-01T05:02:05.018697Z ERROR git_remote_k8s: deadline has elapsed +// 2023-11-01T05:02:05.019458Z ERROR kube_client::client::builder: failed with error channel closed +// thread 'tokio-runtime-worker' panicked at src/pod_utils.rs:118:36: +// couldn't send to channel: SendError { .. } +// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace +// thread 'tokio-runtime-worker' panicked at src/pod_utils.rs:131:14: +// couldn't send on channel: SendError { .. } + + +use crate::*; + +pub trait PodExt { + fn label_selectors(&self) -> Vec; + fn field_selectors(&self) -> Result>; +} + +impl PodExt for Pod { + fn label_selectors(&self) -> Vec { + let l = self.labels(); + let selectors = Vec::with_capacity(l.len()); + for (k, v) in l.iter() { + format!("{}={}", k, v); + } + selectors + } + + fn field_selectors(&self) -> Result> { + Ok(vec![ + format!( + "metadata.name={}", + self.meta() + .name + .as_ref() + .ok_or(ApplicationError::PodNoName)? + ), + format!( + "metadata.namespace={}", + self.meta() + .namespace + .as_ref() + .ok_or(ApplicationError::PodNoNamespace)? + ), + ]) + } +} + +pub async fn wait_for_pod_running_watch(pods: &Api, pod: Pod) -> Result<()> { + let mut wp = WatchParams::default(); + for fs in pod.field_selectors()? { + wp = wp.fields(&fs); + } + let mut stream = pods.watch(&wp, "0").await?.boxed(); + while let Some(status) = stream.try_next().await? { + match status { + WatchEvent::Modified(o) => { + let s = o.status.as_ref().expect("status exists on pod"); + if s.phase.clone().unwrap_or_default() == "Running" { + info!("Ready to attach to {}", o.name_any()); + break; + } + if s.phase.clone().unwrap_or_default() == "Failed" { + error!("Pod which we are awaiting has entered failed state instead of running"); + return Err(ApplicationError::PodCouldNotStart.into()); + } + if s.phase.clone().unwrap_or_default() == "Succeeded" { + error!("Pod which we are awaiting has completed instead"); + return Err(ApplicationError::PodDidNotWait.into()); + } + } + _ => {} + } + } + Ok(()) +} + +pub async fn is_pod_running(pods: &Api, pod: Pod) -> Result { + let got_pod = pods + .get( + &pod.metadata + .name + .ok_or(anyhow!("pod metadata must have a name"))?, + ) + .await?; + let phase = got_pod + .status + .ok_or(anyhow!("pod has no status"))? + .phase + .ok_or(anyhow!("pod has no status.phase"))?; + if phase == "Running" { + Ok(true) + } else { + Ok(false) + } +} + +pub async fn is_pod_finished(pods: &Api, pod: &Pod) -> Result { + let got_pod = pods + .get( + &pod.metadata + .name + .as_ref() + .ok_or(anyhow!("pod metadata must have a name"))?, + ) + .await?; + let phase = got_pod + .status + .ok_or(anyhow!("pod has no status"))? + .phase + .ok_or(anyhow!("pod has no status.phase"))?; + if phase == "Failed" { + Ok(true) + } else if phase == "Succeeded" { + Ok(true) + } else { + Ok(false) + } +} + +pub async fn wait_for_pod_running(pods: &Api, pod: &Pod) -> Result<()> { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let xtx = tx.clone(); + let xpods = pods.clone(); + let xpod = pod.clone(); + let _p: JoinHandle> = tokio::spawn(async move { + let r = is_pod_running(&xpods, xpod).await; + if let Ok(true) = r { + xtx.send(Ok(())).await.expect("couldn't send to channel"); + } + if let Err(e) = r { + xtx.send(Err(e)).await.expect("couldn't send to channel"); + } + Ok(()) + }); + let xtx = tx.clone(); + let xpods = pods.clone(); + let xpod = pod.clone(); + let _w = tokio::spawn(async move { + xtx.send(wait_for_pod_running_watch(&xpods, xpod).await) + .await + .expect("couldn't send on channel"); + }); + let r = rx.recv().await; + if r.is_none() { + bail!("failed to read API while waiting for pod to start"); + } + let r = r.expect("failed to extract value after checking for None"); + r +}