add timeouts

This commit is contained in:
James Andariese 2023-11-01 00:06:38 -05:00
parent 11fd970400
commit 86d5cd6d0a
5 changed files with 253 additions and 172 deletions

7
Cargo.lock generated
View File

@ -587,6 +587,7 @@ dependencies = [
"clap", "clap",
"config", "config",
"futures", "futures",
"humantime",
"k8s-openapi", "k8s-openapi",
"kube", "kube",
"kube_quantity", "kube_quantity",
@ -679,6 +680,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.27" version = "0.14.27"

View File

@ -8,6 +8,7 @@ anyhow = "1.0.75"
clap = { version = "4.4.7", features = ["derive", "env"] } clap = { version = "4.4.7", features = ["derive", "env"] }
config = "0.13.3" config = "0.13.3"
futures = "0.3.29" futures = "0.3.29"
humantime = "2.1.0"
k8s-openapi = { version = "0.20.0", features = ["v1_27"] } k8s-openapi = { version = "0.20.0", features = ["v1_27"] }
kube = { version = "0.86.0", features = ["ws"] } kube = { version = "0.86.0", features = ["ws"] }
kube_quantity = "0.7.0" kube_quantity = "0.7.0"

View File

@ -17,6 +17,15 @@ pub struct Config {
/// Docker image used for git Jobs /// Docker image used for git Jobs
pub image: String, pub image: String,
#[arg(short,long,env="GIT_REMOTE_K8S_TIMEOUT")]
pub timeout: Option<humantime::Duration>,
#[arg(long("setup-timeout"),env = "GIT_REMOTE_K8S_SETUP_TIMEOUT")]
pub setup_timeout: Option<humantime::Duration>,
#[arg(long("transfer-timeout"), env = "GIT_REMOTE_K8S_TRANSFER_TIMEOUT")]
pub transfer_timeout: Option<humantime::Duration>,
#[arg(index = 1)] #[arg(index = 1)]
/// remote name /// remote name
pub remote_name: String, pub remote_name: String,

View File

@ -16,6 +16,7 @@ use once_cell::sync::Lazy;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::process::ExitCode; use std::process::ExitCode;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::Duration; use std::time::Duration;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
@ -32,6 +33,8 @@ use kube::{api::*, config::KubeConfigOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
const KUBE_JOB_LABEL: &str = "sleeper";
const KUBE_CONTAINER_NAME: &str = KUBE_JOB_LABEL;
const SCHEME: &str = "k8s"; const SCHEME: &str = "k8s";
const PAT_SCHEME: &str = r"[a-zA-Z][a-zA-Z0-9+.-]*"; const PAT_SCHEME: &str = r"[a-zA-Z][a-zA-Z0-9+.-]*";
const PAT_PATH: &str = r"[0-9a-zA-Z](?:[0-9a-zA-Z.-]*[0-9a-zA-Z])?"; const PAT_PATH: &str = r"[0-9a-zA-Z](?:[0-9a-zA-Z.-]*[0-9a-zA-Z])?";
@ -44,12 +47,20 @@ mod test;
mod cfg; mod cfg;
mod errors; mod errors;
mod pod_utils;
use crate::{cfg::*,errors::*}; use pod_utils::is_pod_finished;
use crate::{cfg::*,errors::*,pod_utils::*};
struct AppContext { struct AppContext {
config: Arc<Config>, config: Arc<Config>,
ensures: Vec<JoinHandle<()>>, ensures: Vec<JoinHandle<()>>,
rc: Arc<AtomicU8>,
}
impl Into<ExitCode> for AppContext {
fn into(self) -> ExitCode {
self.rc.clone().fetch_add(0, Ordering::SeqCst).into()
}
} }
impl AppContext { impl AppContext {
@ -57,6 +68,7 @@ impl AppContext {
Ok(Self { Ok(Self {
config: Arc::new(Config::parse()), config: Arc::new(Config::parse()),
ensures: vec![], ensures: vec![],
rc: Arc::new(AtomicU8::new(1)),
// ensurance: ensurance, // ensurance: ensurance,
}) })
} }
@ -70,6 +82,9 @@ impl AppContext {
kube::Config::from_kubeconfig(&kco).await?, kube::Config::from_kubeconfig(&kco).await?,
)?) )?)
} }
async fn set_exit_code(&self, rc: u8) {
self.rc.store(rc, Ordering::SeqCst);
}
} }
impl Drop for AppContext { impl Drop for AppContext {
@ -90,152 +105,49 @@ impl Drop for AppContext {
} }
} }
trait PodExt {
fn label_selectors(&self) -> Vec<String>;
fn field_selectors(&self) -> Result<Vec<String>>;
}
impl PodExt for Pod {
fn label_selectors(&self) -> Vec<String> {
let l = self.labels();
let selectors = Vec::with_capacity(l.len());
for (k, v) in l.iter() {
format!("{}={}", k, v);
}
selectors
}
fn field_selectors(&self) -> Result<Vec<String>> {
Ok(vec![
format!(
"metadata.name={}",
self.meta()
.name
.as_ref()
.ok_or(ApplicationError::PodNoName)?
),
format!(
"metadata.namespace={}",
self.meta()
.namespace
.as_ref()
.ok_or(ApplicationError::PodNoNamespace)?
),
])
}
}
async fn wait_for_pod_running_watch(pods: &Api<Pod>, pod: Pod) -> Result<()> {
let mut wp = WatchParams::default();
for fs in pod.field_selectors()? {
wp = wp.fields(&fs);
}
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", o.name_any());
break;
}
if s.phase.clone().unwrap_or_default() == "Failed" {
error!("Pod which we are awaiting has entered failed state instead of running");
return Err(ApplicationError::PodCouldNotStart.into());
}
if s.phase.clone().unwrap_or_default() == "Succeeded" {
error!("Pod which we are awaiting has completed instead");
return Err(ApplicationError::PodDidNotWait.into());
}
}
_ => {}
}
}
Ok(())
}
async fn is_pod_running(pods: &Api<Pod>, pod: Pod) -> Result<bool> {
let got_pod = pods
.get(
&pod.metadata
.name
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod
.status
.ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Running" {
Ok(true)
} else {
Ok(false)
}
}
async fn is_pod_finished(pods: &Api<Pod>, pod: &Pod) -> Result<bool> {
let got_pod = pods
.get(
&pod.metadata
.name
.as_ref()
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod
.status
.ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Failed" {
Ok(true)
} else if phase == "Succeeded" {
Ok(true)
} else {
Ok(false)
}
}
async fn wait_for_pod_running(pods: &Api<Pod>, pod: &Pod) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let xtx = tx.clone();
let xpods = pods.clone();
let xpod = pod.clone();
let _p: JoinHandle<Result<()>> = tokio::spawn(async move {
let r = is_pod_running(&xpods, xpod).await;
if let Ok(true) = r {
xtx.send(Ok(())).await.expect("couldn't send to channel");
}
if let Err(e) = r {
xtx.send(Err(e)).await.expect("couldn't send to channel");
}
Ok(())
});
let xtx = tx.clone();
let xpods = pods.clone();
let xpod = pod.clone();
let _w = tokio::spawn(async move {
xtx.send(wait_for_pod_running_watch(&xpods, xpod).await)
.await
.expect("couldn't send on channel");
});
let r = rx.recv().await;
if r.is_none() {
bail!("failed to read API while waiting for pod to start");
}
let r = r.expect("failed to extract value after checking for None");
r
}
#[tokio::main] #[tokio::main]
async fn main() -> ExitCode { async fn main() -> ExitCode {
let mut rc = ExitCode::from(0); let mut ctx = {
if let Err(e) = main_wrapped(&mut rc).await { let ctx = AppContext::new();
match ctx {
Err(e) => {
panic!("{}", e)
},
Ok(c) => c,
}
};
// let cfg = ctx.cfg();
if let Err(e) = real_main(&mut ctx).await {
error!("{}", e); error!("{}", e);
return ExitCode::from(127); return ExitCode::from(127);
} }
rc ctx.into()
}
async fn real_main(ctx: &mut AppContext) -> Result<()> {
let sto = ctx.cfg().timeout;
let sto = ctx.cfg().setup_timeout.or(sto);
let tto = ctx.cfg().timeout;
let tto = ctx.cfg().transfer_timeout.or(tto);
let (pods_api, kube_worker_name) = {
if let Some(d) = sto {
tokio::time::timeout(d.into(), setup_pod(ctx)).await??
} else {
setup_pod(ctx).await?
}
};
let _r = {
if let Some(d) = tto {
tokio::time::timeout(d.into(), do_git(ctx, pods_api, kube_worker_name)).await??
} else {
do_git(ctx, pods_api, kube_worker_name).await?
}
};
Ok(())
//Ok(())
} }
async fn set_log_level(level: Level) { async fn set_log_level(level: Level) {
@ -246,10 +158,7 @@ async fn set_log_level(level: Level) {
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
} }
async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { async fn setup_pod(ctx: &mut AppContext) -> Result<(Api<Pod>, String)> {
*rc = 1.into();
let ctx = AppContext::new()?;
let cfg = ctx.cfg(); let cfg = ctx.cfg();
set_log_level(match cfg.verbose { set_log_level(match cfg.verbose {
@ -275,28 +184,26 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
let __kube_worker_name_val = format!("git-remote-worker-{kube_pvc}"); let __kube_worker_name_val = format!("git-remote-worker-{kube_pvc}");
let kube_worker_name = __kube_worker_name_val.as_str(); let kube_worker_name = __kube_worker_name_val.as_str();
let kube_image = cfg.image.as_str(); let kube_image = cfg.image.as_str();
let kube_job_label = "sleeper";
let kube_container_name = kube_job_label;
let kube_shell_executable = "/bin/sh"; let kube_shell_executable = "/bin/sh";
let kube_shell_parameters = "-c"; let kube_shell_parameters = "-c";
let kube_shell_sleeper_command = r#" let kube_shell_sleeper_command = r#"
set -e set -e
SD="$(date +%s)" SD="$(date +%s)"
CD=$SD CD=$SD
LD=$SD LD=$SD
echo $SD > /repo.timeout echo $SD > /repo.timeout
while [ $(( LD + 900 )) -ge $CD ];do while [ $(( LD + 900 )) -ge $CD ];do
sleep 5 sleep 5
CD="$(date +%s)" CD="$(date +%s)"
NLD=$(cat /repo.timeout) NLD=$(cat /repo.timeout)
if [ $NLD -ne $LD ];then if [ $NLD -ne $LD ];then
echo "updated last run time to $NLD (from $LD)" echo "updated last run time to $NLD (from $LD)"
fi fi
LD=$NLD LD=$NLD
done done
echo "worker ran for $(( CD - SD )) seconds" echo "worker ran for $(( CD - SD )) seconds"
"#; "#;
let kube_repo_mount_path = "/repo"; let kube_repo_mount_path = "/repo";
let parsed_size: Result<ParsedQuantity, ParseQuantityError> = let parsed_size: Result<ParsedQuantity, ParseQuantityError> =
@ -312,7 +219,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
let kube_pod_labels = vec![ let kube_pod_labels = vec![
("com.kn8v.git-remote-k8s/repo-name", kube_pvc), ("com.kn8v.git-remote-k8s/repo-name", kube_pvc),
("com.kn8v.git-remote-k8s/job", kube_job_label), ("com.kn8v.git-remote-k8s/job", KUBE_JOB_LABEL),
]; ];
info!("Remote Context: {}", kube_context); info!("Remote Context: {}", kube_context);
@ -379,7 +286,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
spec.restart_policy = Some("Never".to_owned()); spec.restart_policy = Some("Never".to_owned());
{ {
let mut container = Container::default(); let mut container = Container::default();
container.name = kube_container_name.to_owned(); container.name = KUBE_CONTAINER_NAME.to_owned();
container.command = Some(vec![ container.command = Some(vec![
kube_shell_executable.to_owned(), kube_shell_executable.to_owned(),
kube_shell_parameters.to_owned(), kube_shell_parameters.to_owned(),
@ -387,13 +294,13 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
]); ]);
container.image = Some(kube_image.to_owned()); container.image = Some(kube_image.to_owned());
container.working_dir = Some(kube_repo_mount_path.to_owned()); container.working_dir = Some(kube_repo_mount_path.to_owned());
container.image_pull_policy = Some("IfNotPresent".to_owned());
{ {
let mut volume_mount = VolumeMount::default(); let mut volume_mount = VolumeMount::default();
volume_mount.mount_path = kube_repo_mount_path.to_owned(); volume_mount.mount_path = kube_repo_mount_path.to_owned();
volume_mount.name = "repo".to_owned(); volume_mount.name = "repo".to_owned();
container.volume_mounts = Some(vec![volume_mount]); container.volume_mounts = Some(vec![volume_mount]);
} }
container.image_pull_policy = Some("IfNotPresent".to_owned());
spec.containers = vec![container]; spec.containers = vec![container];
} }
{ {
@ -502,7 +409,10 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
info!("waiting for worker pod to come online"); info!("waiting for worker pod to come online");
wait_for_pod_running(&pods_api, &pod).await?; wait_for_pod_running(&pods_api, &pod).await?;
info!("work pod is online. continuing."); info!("work pod is online. continuing.");
Ok((pods_api, kube_worker_name.to_owned()))
}
async fn do_git(ctx: &AppContext, pods_api: Api<Pod>, kube_worker_name: String) -> Result<()> {
let mut gitcommand = "date +%s > /repo.timeout; 1>&2 echo welcome from the git-remote-k8s worker pod ; [ -f HEAD ] || git init --bare 1>&2".to_owned(); let mut gitcommand = "date +%s > /repo.timeout; 1>&2 echo welcome from the git-remote-k8s worker pod ; [ -f HEAD ] || git init --bare 1>&2".to_owned();
let mut ttyout = tokio::io::stdout(); let mut ttyout = tokio::io::stdout();
let mut ttyin = tokio::io::stdin(); let mut ttyin = tokio::io::stdin();
@ -527,10 +437,10 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
.stdin(true) .stdin(true)
.stdout(true) .stdout(true)
.stderr(true) .stderr(true)
.container(kube_container_name.to_owned()); .container(KUBE_CONTAINER_NAME.to_owned());
// let (ready_tx, ready_rx) = oneshot::channel::<()>(); // let (ready_tx, ready_rx) = oneshot::channel::<()>();
let mut stuff = pods_api let mut stuff = pods_api
.exec(kube_worker_name, vec!["sh", "-c", &gitcommand], &ap) .exec(&kube_worker_name, vec!["sh", "-c", &gitcommand], &ap)
.await?; .await?;
let mut podout = stuff let mut podout = stuff
.stdout() .stdout()
@ -630,9 +540,9 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
// integer exit code. // integer exit code.
if status.reason == Some("NonZeroExitCode".to_owned()) { if status.reason == Some("NonZeroExitCode".to_owned()) {
info!("exit status of remote job: {}", exitcode); info!("exit status of remote job: {}", exitcode);
*rc = exitcode.into(); ctx.set_exit_code(exitcode).await;
} else { } else {
*rc = 0.into(); ctx.set_exit_code(0).await;
} }
debug!("waiting for group to exit"); debug!("waiting for group to exit");
barrier.wait().await; barrier.wait().await;

154
src/pod_utils.rs Normal file
View File

@ -0,0 +1,154 @@
// TODO: there is a stupid panic here that will be annoying to fix.
// it happens when there is a timeout that cancels the tasks
// listening to these sends while this one is sending resulting
// in a totally ignorable error that I don't know what to do
// with right now.
// 2023-11-01T05:02:05.009858Z INFO git_remote_k8s: waiting for worker pod to come online
// 2023-11-01T05:02:05.018697Z ERROR git_remote_k8s: deadline has elapsed
// 2023-11-01T05:02:05.019458Z ERROR kube_client::client::builder: failed with error channel closed
// thread 'tokio-runtime-worker' panicked at src/pod_utils.rs:118:36:
// couldn't send to channel: SendError { .. }
// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
// thread 'tokio-runtime-worker' panicked at src/pod_utils.rs:131:14:
// couldn't send on channel: SendError { .. }
use crate::*;
pub trait PodExt {
fn label_selectors(&self) -> Vec<String>;
fn field_selectors(&self) -> Result<Vec<String>>;
}
impl PodExt for Pod {
fn label_selectors(&self) -> Vec<String> {
let l = self.labels();
let selectors = Vec::with_capacity(l.len());
for (k, v) in l.iter() {
format!("{}={}", k, v);
}
selectors
}
fn field_selectors(&self) -> Result<Vec<String>> {
Ok(vec![
format!(
"metadata.name={}",
self.meta()
.name
.as_ref()
.ok_or(ApplicationError::PodNoName)?
),
format!(
"metadata.namespace={}",
self.meta()
.namespace
.as_ref()
.ok_or(ApplicationError::PodNoNamespace)?
),
])
}
}
pub async fn wait_for_pod_running_watch(pods: &Api<Pod>, pod: Pod) -> Result<()> {
let mut wp = WatchParams::default();
for fs in pod.field_selectors()? {
wp = wp.fields(&fs);
}
let mut stream = pods.watch(&wp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", o.name_any());
break;
}
if s.phase.clone().unwrap_or_default() == "Failed" {
error!("Pod which we are awaiting has entered failed state instead of running");
return Err(ApplicationError::PodCouldNotStart.into());
}
if s.phase.clone().unwrap_or_default() == "Succeeded" {
error!("Pod which we are awaiting has completed instead");
return Err(ApplicationError::PodDidNotWait.into());
}
}
_ => {}
}
}
Ok(())
}
pub async fn is_pod_running(pods: &Api<Pod>, pod: Pod) -> Result<bool> {
let got_pod = pods
.get(
&pod.metadata
.name
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod
.status
.ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Running" {
Ok(true)
} else {
Ok(false)
}
}
pub async fn is_pod_finished(pods: &Api<Pod>, pod: &Pod) -> Result<bool> {
let got_pod = pods
.get(
&pod.metadata
.name
.as_ref()
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod
.status
.ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Failed" {
Ok(true)
} else if phase == "Succeeded" {
Ok(true)
} else {
Ok(false)
}
}
pub async fn wait_for_pod_running(pods: &Api<Pod>, pod: &Pod) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let xtx = tx.clone();
let xpods = pods.clone();
let xpod = pod.clone();
let _p: JoinHandle<Result<()>> = tokio::spawn(async move {
let r = is_pod_running(&xpods, xpod).await;
if let Ok(true) = r {
xtx.send(Ok(())).await.expect("couldn't send to channel");
}
if let Err(e) = r {
xtx.send(Err(e)).await.expect("couldn't send to channel");
}
Ok(())
});
let xtx = tx.clone();
let xpods = pods.clone();
let xpod = pod.clone();
let _w = tokio::spawn(async move {
xtx.send(wait_for_pod_running_watch(&xpods, xpod).await)
.await
.expect("couldn't send on channel");
});
let r = rx.recv().await;
if r.is_none() {
bail!("failed to read API while waiting for pod to start");
}
let r = r.expect("failed to extract value after checking for None");
r
}