formatting

This commit is contained in:
James Andariese 2023-10-31 05:35:38 -05:00
parent bb9d6ae5ac
commit 97fadf6c92
2 changed files with 296 additions and 152 deletions

View File

@ -9,36 +9,35 @@
// * config mode (maybe) // * config mode (maybe)
// * set the pvc storageclass instead // * set the pvc storageclass instead
use futures::{StreamExt, TryStreamExt};
use std::process::ExitCode;
use std::time::Duration;
use std::{collections::BTreeMap};
use std::sync::Arc;
use futures::{StreamExt,TryStreamExt};
use k8s::apimachinery::pkg::api::resource::Quantity; use k8s::apimachinery::pkg::api::resource::Quantity;
use kube_quantity::{ParsedQuantity, ParseQuantityError}; use kube_quantity::{ParseQuantityError, ParsedQuantity};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::BTreeMap;
use std::process::ExitCode;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, anyhow, Result}; use anyhow::{anyhow, bail, Result};
use thiserror::Error as ThisError; use thiserror::Error as ThisError;
use clap::{Parser as ClapParser, ArgAction}; use clap::{ArgAction, Parser as ClapParser};
use tracing::{info, error, debug, Level, Instrument, error_span}; use tracing::{debug, error, error_span, info, Instrument, Level};
use tracing_subscriber::FmtSubscriber; use tracing_subscriber::FmtSubscriber;
use k8s::api::core::v1::*; use k8s::api::core::v1::*;
use k8s_openapi as k8s; use k8s_openapi as k8s;
use kube::{api::*, config::KubeConfigOptions}; use kube::{api::*, config::KubeConfigOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncWrite, AsyncRead};
const SCHEME: &str = "k8s"; const SCHEME: &str = "k8s";
const PAT_SCHEME: &str = r"[a-zA-Z][a-zA-Z0-9+.-]*"; const PAT_SCHEME: &str = r"[a-zA-Z][a-zA-Z0-9+.-]*";
const PAT_PATH: &str = r"[0-9a-zA-Z](?:[0-9a-zA-Z.-]*[0-9a-zA-Z])?"; const PAT_PATH: &str = r"[0-9a-zA-Z](?:[0-9a-zA-Z.-]*[0-9a-zA-Z])?";
static REMOTE_PATTERN: Lazy<regex::Regex> = Lazy::new(|| {regex::Regex::new(&format!("^(?P<scheme_prefix>(?P<scheme>{PAT_SCHEME}):)?(?:/*)(?P<context>{PAT_PATH})?/(?P<namespace>{PAT_PATH})?/(?P<pvc>{PAT_PATH})?(?P<trailing>/)?")).expect("regex failed to compile")}); static REMOTE_PATTERN: Lazy<regex::Regex> = Lazy::new(|| {
regex::Regex::new(&format!("^(?P<scheme_prefix>(?P<scheme>{PAT_SCHEME}):)?(?:/*)(?P<context>{PAT_PATH})?/(?P<namespace>{PAT_PATH})?/(?P<pvc>{PAT_PATH})?(?P<trailing>/)?")).expect("regex failed to compile")
});
#[cfg(test)] #[cfg(test)]
mod test; mod test;
@ -60,15 +59,11 @@ struct Config {
/// Docker image used for git Jobs /// Docker image used for git Jobs
image: String, image: String,
#[arg( #[arg(index = 1)]
index=1
)]
/// remote name /// remote name
remote_name: String, remote_name: String,
#[arg( #[arg(index = 2)]
index=2
)]
/// remote URL /// remote URL
remote_url: String, remote_url: String,
@ -81,18 +76,26 @@ struct Config {
/// verbosity, may be specified more than once /// verbosity, may be specified more than once
verbose: u8, verbose: u8,
#[arg(long("storage-class-name"),env="GIT_REMOTE_K8S_PVC_STORAGECLASSNAME")] #[arg(
long("storage-class-name"),
env = "GIT_REMOTE_K8S_PVC_STORAGECLASSNAME"
)]
/// storageClassName to use for the backing PVC _if it is created_. /// storageClassName to use for the backing PVC _if it is created_.
/// ///
/// this will be used to create a new PVC but if a PVC already /// this will be used to create a new PVC but if a PVC already
/// exists by the requested name, it will be used. /// exists by the requested name, it will be used.
storageclass: Option<String>, storageclass: Option<String>,
#[arg(short('s'),long("initial-volume-size"), default_value("1Gi"), env="GIT_REMOTE_K8S_INITIAL_VOLUME_SIZE")] #[arg(
short('s'),
long("initial-volume-size"),
default_value("1Gi"),
env = "GIT_REMOTE_K8S_INITIAL_VOLUME_SIZE"
)]
initial_size: String, initial_size: String,
} }
#[derive(ThisError,Debug)] #[derive(ThisError, Debug)]
pub enum ApplicationError { pub enum ApplicationError {
/// cluster state problems /// cluster state problems
#[error("cluster is in an inconsistent state")] #[error("cluster is in an inconsistent state")]
@ -115,7 +118,7 @@ pub enum ApplicationError {
PodDidNotWait, PodDidNotWait,
} }
#[derive(ThisError,Debug)] #[derive(ThisError, Debug)]
pub enum ConfigError { pub enum ConfigError {
#[error("no namespace present in remote URL")] #[error("no namespace present in remote URL")]
RemoteNoNamespace, RemoteNoNamespace,
@ -136,18 +139,28 @@ impl Config {
/// ///
/// this utilizes a regex instead of url::Url to ensure that it returns sensible errors /// this utilizes a regex instead of url::Url to ensure that it returns sensible errors
// TODO: find a way to memoize this cleanly. probably give it access to a memoizing context from AppContext. // TODO: find a way to memoize this cleanly. probably give it access to a memoizing context from AppContext.
fn parse_and_validate(&self) -> Result<(String,String,String)> { fn parse_and_validate(&self) -> Result<(String, String, String)> {
let caps = REMOTE_PATTERN.captures(&self.remote_url).ok_or(ConfigError::RemoteInvalid)?; let caps = REMOTE_PATTERN
.captures(&self.remote_url)
.ok_or(ConfigError::RemoteInvalid)?;
let scheme = if caps.name("scheme_prefix").is_none() { let scheme = if caps.name("scheme_prefix").is_none() {
SCHEME SCHEME
} else { } else {
caps.name("scheme").ok_or(ConfigError::RemoteInvalidScheme)?.as_str() caps.name("scheme")
.ok_or(ConfigError::RemoteInvalidScheme)?
.as_str()
}; };
if scheme != SCHEME { if scheme != SCHEME {
bail!(ConfigError::RemoteInvalidScheme); bail!(ConfigError::RemoteInvalidScheme);
} }
let kctx = caps.name("context").ok_or(ConfigError::RemoteNoContext)?.as_str(); let kctx = caps
let ns = caps.name("namespace").ok_or(ConfigError::RemoteNoNamespace)?.as_str(); .name("context")
.ok_or(ConfigError::RemoteNoContext)?
.as_str();
let ns = caps
.name("namespace")
.ok_or(ConfigError::RemoteNoNamespace)?
.as_str();
let pvc = caps.name("pvc").ok_or(ConfigError::RemoteNoPVC)?.as_str(); let pvc = caps.name("pvc").ok_or(ConfigError::RemoteNoPVC)?.as_str();
// regex::Regex::find(REMOTE_PATTERN); // regex::Regex::find(REMOTE_PATTERN);
if kctx == "" { if kctx == "" {
@ -168,17 +181,17 @@ impl Config {
} }
fn get_remote_context(&self) -> Result<String> { fn get_remote_context(&self) -> Result<String> {
let (r,_,_) = self.parse_and_validate()?; let (r, _, _) = self.parse_and_validate()?;
Ok(r) Ok(r)
} }
fn get_remote_namespace(&self) -> Result<String> { fn get_remote_namespace(&self) -> Result<String> {
let (_,r,_) = self.parse_and_validate()?; let (_, r, _) = self.parse_and_validate()?;
Ok(r) Ok(r)
} }
fn get_remote_pvc(&self) -> Result<String> { fn get_remote_pvc(&self) -> Result<String> {
let (_,_,r) = self.parse_and_validate()?; let (_, _, r) = self.parse_and_validate()?;
Ok(r) Ok(r)
} }
} }
@ -202,7 +215,9 @@ impl AppContext {
async fn ktx(&self, context_name: Option<String>) -> Result<kube::Client> { async fn ktx(&self, context_name: Option<String>) -> Result<kube::Client> {
let mut kco = KubeConfigOptions::default(); let mut kco = KubeConfigOptions::default();
kco.context = context_name; kco.context = context_name;
Ok(kube::Client::try_from(kube::Config::from_kubeconfig(&kco).await?)?) Ok(kube::Client::try_from(
kube::Config::from_kubeconfig(&kco).await?,
)?)
} }
} }
@ -211,13 +226,15 @@ impl Drop for AppContext {
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();
futures::executor::block_on(async { futures::executor::block_on(async {
for ensure in self.ensures.drain(..) { for ensure in self.ensures.drain(..) {
if let Err(e) = handle.spawn( if let Err(e) = handle
async move { .spawn(async move {
let _ = ensure.await.unwrap_or_default(); let _ = ensure.await.unwrap_or_default();
}).await { })
.await
{
eprintln!("failed to ensure in Context: {e}"); eprintln!("failed to ensure in Context: {e}");
} }
}; }
}); });
} }
} }
@ -231,19 +248,30 @@ impl PodExt for Pod {
fn label_selectors(&self) -> Vec<String> { fn label_selectors(&self) -> Vec<String> {
let l = self.labels(); let l = self.labels();
let selectors = Vec::with_capacity(l.len()); let selectors = Vec::with_capacity(l.len());
for (k,v) in l.iter() { for (k, v) in l.iter() {
format!("{}={}", k, v); format!("{}={}", k, v);
}; }
selectors selectors
} }
fn field_selectors(&self) -> Result<Vec<String>> { fn field_selectors(&self) -> Result<Vec<String>> {
Ok(vec![ Ok(vec![
format!("metadata.name={}", self.meta().name.as_ref().ok_or(ApplicationError::PodNoName)?), format!(
format!("metadata.namespace={}", self.meta().namespace.as_ref().ok_or(ApplicationError::PodNoNamespace)?), "metadata.name={}",
self.meta()
.name
.as_ref()
.ok_or(ApplicationError::PodNoName)?
),
format!(
"metadata.namespace={}",
self.meta()
.namespace
.as_ref()
.ok_or(ApplicationError::PodNoNamespace)?
),
]) ])
} }
} }
async fn wait_for_pod_running_watch(pods: &Api<Pod>, pod: Pod) -> Result<()> { async fn wait_for_pod_running_watch(pods: &Api<Pod>, pod: Pod) -> Result<()> {
@ -271,15 +299,23 @@ async fn wait_for_pod_running_watch(pods: &Api<Pod>, pod: Pod) -> Result<()> {
} }
_ => {} _ => {}
} }
}; }
Ok(()) Ok(())
} }
async fn is_pod_running(pods: &Api<Pod>, pod: Pod) -> Result<bool> { async fn is_pod_running(pods: &Api<Pod>, pod: Pod) -> Result<bool> {
let got_pod = pods.get(&pod.metadata.name.ok_or(anyhow!("pod metadata must have a name"))?).await?; let got_pod = pods
.get(
&pod.metadata
.name
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod let phase = got_pod
.status.ok_or(anyhow!("pod has no status"))? .status
.phase.ok_or(anyhow!("pod has no status.phase"))?; .ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Running" { if phase == "Running" {
Ok(true) Ok(true)
} else { } else {
@ -288,10 +324,19 @@ async fn is_pod_running(pods: &Api<Pod>, pod: Pod) -> Result<bool> {
} }
async fn is_pod_finished(pods: &Api<Pod>, pod: &Pod) -> Result<bool> { async fn is_pod_finished(pods: &Api<Pod>, pod: &Pod) -> Result<bool> {
let got_pod = pods.get(&pod.metadata.name.as_ref().ok_or(anyhow!("pod metadata must have a name"))?).await?; let got_pod = pods
.get(
&pod.metadata
.name
.as_ref()
.ok_or(anyhow!("pod metadata must have a name"))?,
)
.await?;
let phase = got_pod let phase = got_pod
.status.ok_or(anyhow!("pod has no status"))? .status
.phase.ok_or(anyhow!("pod has no status.phase"))?; .ok_or(anyhow!("pod has no status"))?
.phase
.ok_or(anyhow!("pod has no status.phase"))?;
if phase == "Failed" { if phase == "Failed" {
Ok(true) Ok(true)
} else if phase == "Succeeded" { } else if phase == "Succeeded" {
@ -301,7 +346,6 @@ async fn is_pod_finished(pods: &Api<Pod>, pod: &Pod) -> Result<bool> {
} }
} }
async fn wait_for_pod_running(pods: &Api<Pod>, pod: &Pod) -> Result<()> { async fn wait_for_pod_running(pods: &Api<Pod>, pod: &Pod) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let xtx = tx.clone(); let xtx = tx.clone();
@ -321,7 +365,9 @@ async fn wait_for_pod_running(pods: &Api<Pod>, pod: &Pod) -> Result<()> {
let xpods = pods.clone(); let xpods = pods.clone();
let xpod = pod.clone(); let xpod = pod.clone();
let _w = tokio::spawn(async move { let _w = tokio::spawn(async move {
xtx.send(wait_for_pod_running_watch(&xpods, xpod).await).await.expect("couldn't send on channel"); xtx.send(wait_for_pod_running_watch(&xpods, xpod).await)
.await
.expect("couldn't send on channel");
}); });
let r = rx.recv().await; let r = rx.recv().await;
if r.is_none() { if r.is_none() {
@ -346,8 +392,7 @@ async fn set_log_level(level: Level) {
.with_max_level(level) .with_max_level(level)
.with_writer(std::io::stderr) .with_writer(std::io::stderr)
.finish(); .finish();
tracing::subscriber::set_global_default(subscriber) tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
.expect("setting default subscriber failed");
} }
async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> { async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
@ -361,11 +406,11 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
1 => Level::INFO, 1 => Level::INFO,
2 => Level::DEBUG, 2 => Level::DEBUG,
_ => Level::TRACE, _ => Level::TRACE,
}).await; })
.await;
debug!("{:?}", &cfg); debug!("{:?}", &cfg);
// these should all be &str to ensure we don't accidentally fail to copy any // these should all be &str to ensure we don't accidentally fail to copy any
// and make weird errors later. instead of making these Strings here, we need // and make weird errors later. instead of making these Strings here, we need
// to to_owned() them everywhere they're referenced to make a copy to go in the // to to_owned() them everywhere they're referenced to make a copy to go in the
@ -403,15 +448,14 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
"#; "#;
let kube_repo_mount_path = "/repo"; let kube_repo_mount_path = "/repo";
let parsed_size: Result<ParsedQuantity, ParseQuantityError> = cfg.initial_size.clone().try_into(); let parsed_size: Result<ParsedQuantity, ParseQuantityError> =
cfg.initial_size.clone().try_into();
let quantity_size: Quantity = match parsed_size { let quantity_size: Quantity = match parsed_size {
Err(e) => { Err(e) => {
error!("could not parse initial PVC size: {e}"); error!("could not parse initial PVC size: {e}");
return Err(e.into()); return Err(e.into());
},
Ok(s) => {
s.into()
} }
Ok(s) => s.into(),
}; };
debug!("parsed size is {quantity_size:#?}"); debug!("parsed size is {quantity_size:#?}");
@ -435,22 +479,24 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
let pvcs_api = kube::Api::<PersistentVolumeClaim>::namespaced(client.clone(), &kube_ns); let pvcs_api = kube::Api::<PersistentVolumeClaim>::namespaced(client.clone(), &kube_ns);
let pods_api = kube::Api::<Pod>::namespaced(client.clone(), &kube_ns); let pods_api = kube::Api::<Pod>::namespaced(client.clone(), &kube_ns);
let existing_pvc = pvcs_api.get_opt(kube_pvc).await?; let existing_pvc = pvcs_api.get_opt(kube_pvc).await?;
if let None = existing_pvc { if let None = existing_pvc {
info!("pvc doesn't exist yet. creating now."); info!("pvc doesn't exist yet. creating now.");
let mut repo_pvc = PersistentVolumeClaim { let mut repo_pvc = PersistentVolumeClaim {
metadata: ObjectMeta::default(), metadata: ObjectMeta::default(),
spec: Some(PersistentVolumeClaimSpec{ spec: Some(PersistentVolumeClaimSpec {
access_modes:Some(vec!["ReadWriteOnce".to_owned()]), access_modes: Some(vec!["ReadWriteOnce".to_owned()]),
resources: Some(ResourceRequirements{ resources: Some(ResourceRequirements {
claims: None, claims: None,
limits: None, limits: None,
requests: Some(BTreeMap::from([("storage".to_owned(),quantity_size)])) requests: Some(BTreeMap::from([("storage".to_owned(), quantity_size)])),
}), }),
storage_class_name: cfg.storageclass.clone(), storage_class_name: cfg.storageclass.clone(),
volume_mode: Some("Filesystem".to_owned()), volume_mode: Some("Filesystem".to_owned()),
volume_name: None, data_source: None, data_source_ref: None, selector: None, volume_name: None,
data_source: None,
data_source_ref: None,
selector: None,
}), }),
status: None, status: None,
}; };
@ -462,7 +508,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
let created_pvc = pvcs_api.create(&pp, &repo_pvc).await?; let created_pvc = pvcs_api.create(&pp, &repo_pvc).await?;
debug!("created pvc: {created_pvc:#?}"); debug!("created pvc: {created_pvc:#?}");
} }
debug!("{:#?}",existing_pvc); debug!("{:#?}", existing_pvc);
// create the worker pod // create the worker pod
let mut worker_pod = Pod::default(); let mut worker_pod = Pod::default();
@ -470,7 +516,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
worker_pod.metadata.namespace = Some(kube_ns.to_owned()); worker_pod.metadata.namespace = Some(kube_ns.to_owned());
{ {
let mut labels = BTreeMap::new(); let mut labels = BTreeMap::new();
for (k,v) in kube_pod_labels.iter() { for (k, v) in kube_pod_labels.iter() {
let kk = k.to_owned().to_owned(); let kk = k.to_owned().to_owned();
let vv = v.to_owned().to_owned(); let vv = v.to_owned().to_owned();
labels.insert(kk, vv); labels.insert(kk, vv);
@ -513,15 +559,14 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
} }
// debug!("Pod: {:?}", worker_pod); // debug!("Pod: {:?}", worker_pod);
let mut lp = let mut lp = ListParams::default();
ListParams::default(); let mut ls: String = String::with_capacity(kube_pod_labels.len() * 100);
let mut ls: String = String::with_capacity(kube_pod_labels.len()*100); for (k, v) in kube_pod_labels {
for (k,v) in kube_pod_labels {
if ls.len() > 0 { if ls.len() > 0 {
ls.push(','); ls.push(',');
} }
ls.push_str(format!("{}={}", k.to_owned(), v).as_ref()); ls.push_str(format!("{}={}", k.to_owned(), v).as_ref());
}; }
lp = lp.labels(&ls); lp = lp.labels(&ls);
debug!("list params: {lp:#?}"); debug!("list params: {lp:#?}");
@ -550,13 +595,21 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
bail!(ApplicationError::RemoteClusterInconsistent); bail!(ApplicationError::RemoteClusterInconsistent);
} else if worker_pods.len() == 1 { } else if worker_pods.len() == 1 {
// 3 // 3
let p = worker_pods.iter().next() let p = worker_pods
.iter()
.next()
.expect("failed to take an item from an iter which is known to have enough items") .expect("failed to take an item from an iter which is known to have enough items")
.to_owned(); .to_owned();
if is_pod_finished(&pods_api, &p).await? { if is_pod_finished(&pods_api, &p).await? {
info!("existing worker is finished. deleting."); info!("existing worker is finished. deleting.");
let pn = p.metadata.name.expect("trying to delete an ended pod which has no name"); let pn = p
pods_api.delete(&pn, &DeleteParams::default()).await.expect("failed to delete existing ended pod"); .metadata
.name
.expect("trying to delete an ended pod which has no name");
pods_api
.delete(&pn, &DeleteParams::default())
.await
.expect("failed to delete existing ended pod");
let mut sleeptime = 0.2; let mut sleeptime = 0.2;
loop { loop {
tokio::time::sleep(Duration::from_secs_f64(sleeptime)).await; tokio::time::sleep(Duration::from_secs_f64(sleeptime)).await;
@ -588,7 +641,10 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
if let Some(p) = opod { if let Some(p) = opod {
pod = p; pod = p;
} else { } else {
error!("could not find or start a worker pod (pod name: {})", kube_worker_name); error!(
"could not find or start a worker pod (pod name: {})",
kube_worker_name
);
bail!("could not find or start a worker pod"); bail!("could not find or start a worker pod");
} }
@ -607,29 +663,42 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
// }; // };
// }.instrument(error_span!("pinger"))); // }.instrument(error_span!("pinger")));
let connect_cmd = negotiate_git_protocol(&mut ttyout, &mut ttyin).await? let connect_cmd = negotiate_git_protocol(&mut ttyout, &mut ttyin)
.ok_or(anyhow!("no connect command specified and we don't know how to do anything else"))?; .await?
.ok_or(anyhow!(
"no connect command specified and we don't know how to do anything else"
))?;
gitcommand.push_str(&format!(";echo;{connect_cmd} .;RC=$?;1>&2 echo remote worker exiting;exit $RC")); gitcommand.push_str(&format!(
let ap = ";echo;{connect_cmd} .;RC=$?;1>&2 echo remote worker exiting;exit $RC"
AttachParams::default() ));
let ap = AttachParams::default()
.stdin(true) .stdin(true)
.stdout(true) .stdout(true)
.stderr(true) .stderr(true)
.container(kube_container_name.to_owned()); .container(kube_container_name.to_owned());
// let (ready_tx, ready_rx) = oneshot::channel::<()>(); // let (ready_tx, ready_rx) = oneshot::channel::<()>();
let mut stuff =pods_api.exec(kube_worker_name, vec!["sh", "-c", &gitcommand], &ap).await?; let mut stuff = pods_api
let mut podout = stuff.stdout().ok_or(ApplicationError::PodCouldNotOpenStdout)?; .exec(kube_worker_name, vec!["sh", "-c", &gitcommand], &ap)
let mut podin = stuff.stdin().ok_or(ApplicationError::PodCouldNotOpenStdin)?; .await?;
let mut podout = stuff
.stdout()
.ok_or(ApplicationError::PodCouldNotOpenStdout)?;
let mut podin = stuff
.stdin()
.ok_or(ApplicationError::PodCouldNotOpenStdin)?;
// pod stderr is handled specially // pod stderr is handled specially
let poderr = stuff.stderr().ok_or(ApplicationError::PodCouldNotOpenStderr)?; let poderr = stuff
.stderr()
.ok_or(ApplicationError::PodCouldNotOpenStderr)?;
let mut poderr = tokio_util::io::ReaderStream::new(poderr); let mut poderr = tokio_util::io::ReaderStream::new(poderr);
// ready_tx.send(()).expect("failed to send ready check"); // ready_tx.send(()).expect("failed to send ready check");
let barrier = Arc::new(tokio::sync::Barrier::new(4)); let barrier = Arc::new(tokio::sync::Barrier::new(4));
let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone(); let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone();
let _jhe = tokio::spawn(async move { let _jhe = tokio::spawn(
async move {
debug!("entering"); debug!("entering");
while let Some(l) = poderr.next().await { while let Some(l) = poderr.next().await {
if let Err(e) = l { if let Err(e) = l {
@ -644,41 +713,63 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
debug!("waiting for group to exit"); debug!("waiting for group to exit");
xbarrier.wait().await; xbarrier.wait().await;
debug!("exiting"); debug!("exiting");
}.instrument(error_span!("pod->tty", "test" = "fred"))); }
.instrument(error_span!("pod->tty", "test" = "fred")),
);
let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone(); let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone();
let _jhi = tokio::spawn(async move{ let _jhi = tokio::spawn(
async move {
debug!("entering"); debug!("entering");
tokio::io::copy(&mut ttyin, &mut podin).await.expect("error copying tty input to pod input"); tokio::io::copy(&mut ttyin, &mut podin)
.await
.expect("error copying tty input to pod input");
podin.flush().await.expect("final flush to pod failed"); podin.flush().await.expect("final flush to pod failed");
debug!("waiting for group to exit"); debug!("waiting for group to exit");
xbarrier.wait().await; xbarrier.wait().await;
debug!("exiting"); debug!("exiting");
}.instrument(error_span!("git->pod"))); }
.instrument(error_span!("git->pod")),
);
let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone(); let xbarrier: Arc<tokio::sync::Barrier> = barrier.clone();
let _jho = tokio::spawn(async move { let _jho = tokio::spawn(
async move {
debug!("entering"); debug!("entering");
tokio::io::copy(&mut podout, &mut ttyout).await.expect("error copying pod output to tty output"); tokio::io::copy(&mut podout, &mut ttyout)
.await
.expect("error copying pod output to tty output");
ttyout.flush().await.expect("final flush to git failed"); ttyout.flush().await.expect("final flush to git failed");
debug!("waiting for group to exit"); debug!("waiting for group to exit");
xbarrier.wait().await; xbarrier.wait().await;
debug!("exiting"); debug!("exiting");
}.instrument(error_span!("git<-pod"))); }
.instrument(error_span!("git<-pod")),
);
let status = stuff.take_status() let status = stuff
.expect("failed to take status").await .take_status()
.expect("failed to take status")
.await
.ok_or(anyhow!("could not take status of remote git worker"))?; .ok_or(anyhow!("could not take status of remote git worker"))?;
// this is an exit code which is always nonzero. // this is an exit code which is always nonzero.
// we'll _default_ to 1 instead of 0 because we only return _anything_ other than 0 // we'll _default_ to 1 instead of 0 because we only return _anything_ other than 0
// when NonZeroExitCode is also given as the exit reason. // when NonZeroExitCode is also given as the exit reason.
debug!("exit code of job: {status:#?}"); debug!("exit code of job: {status:#?}");
let exitcode = (|| -> Option<u8>{ let exitcode = (|| -> Option<u8> {
let exitcode = status.details.as_ref()?.causes.as_ref()?.first()?.message.to_owned()?; let exitcode = status
.details
.as_ref()?
.causes
.as_ref()?
.first()?
.message
.to_owned()?;
if let Ok(rc) = exitcode.parse::<u8>() { if let Ok(rc) = exitcode.parse::<u8>() {
return Some(rc); return Some(rc);
} }
return Some(1); return Some(1);
})().unwrap_or(1); })()
.unwrap_or(1);
debug!("exit status code of remote job discovered was {exitcode:?}"); debug!("exit status code of remote job discovered was {exitcode:?}");
// finally, we'll set the exit code of the entire application // finally, we'll set the exit code of the entire application
// to the exit code of the pod, if possible. if we know it's // to the exit code of the pod, if possible. if we know it's
@ -702,7 +793,7 @@ async fn main_wrapped(rc: &mut ExitCode) -> crate::Result<()> {
async fn get_tokio_line_by_bytes(podout: &mut (impl AsyncRead + Unpin)) -> Result<String> { async fn get_tokio_line_by_bytes(podout: &mut (impl AsyncRead + Unpin)) -> Result<String> {
let mut r = String::with_capacity(512); let mut r = String::with_capacity(512);
loop{ loop {
let c = podout.read_u8().await?; let c = podout.read_u8().await?;
if c == b'\n' { if c == b'\n' {
return Ok(r); return Ok(r);
@ -711,7 +802,7 @@ async fn get_tokio_line_by_bytes(podout: &mut (impl AsyncRead + Unpin)) -> Resul
} }
} }
#[derive(ThisError,Debug)] #[derive(ThisError, Debug)]
enum ProtocolError { enum ProtocolError {
#[error("no command given via git protocol")] #[error("no command given via git protocol")]
NoCommandGiven, NoCommandGiven,
@ -721,7 +812,10 @@ enum ProtocolError {
UnknownCommand(String), UnknownCommand(String),
} }
async fn negotiate_git_protocol(ttytx: &mut (impl AsyncWrite + Unpin), ttyrx: &mut (impl AsyncRead + Unpin)) -> Result<Option<String>> { async fn negotiate_git_protocol(
ttytx: &mut (impl AsyncWrite + Unpin),
ttyrx: &mut (impl AsyncRead + Unpin),
) -> Result<Option<String>> {
loop { loop {
let cmd = get_tokio_line_by_bytes(ttyrx).await?; let cmd = get_tokio_line_by_bytes(ttyrx).await?;
let mut argv = cmd.split_whitespace(); let mut argv = cmd.split_whitespace();
@ -730,14 +824,14 @@ async fn negotiate_git_protocol(ttytx: &mut (impl AsyncWrite + Unpin), ttyrx: &m
match cmd { match cmd {
"capabilities" => { "capabilities" => {
ttytx.write_all(b"connect\n\n").await?; ttytx.write_all(b"connect\n\n").await?;
}, }
"connect" => { "connect" => {
let service = argv.next().ok_or(ProtocolError::NoServiceGiven)?; let service = argv.next().ok_or(ProtocolError::NoServiceGiven)?;
return Ok(Some(service.to_owned())); return Ok(Some(service.to_owned()));
}, }
unknown => { unknown => {
return Err(anyhow!(ProtocolError::UnknownCommand(unknown.to_owned()))); return Err(anyhow!(ProtocolError::UnknownCommand(unknown.to_owned())));
}, }
} }
} }
} }

View File

@ -38,7 +38,11 @@ fn test_config_extractors_omitted_schema_absolute_path() -> Result<()> {
#[test] #[test]
fn test_config_extractors_trailing_slash() { fn test_config_extractors_trailing_slash() {
let newcfg = Config::parse_from(vec!["x", "x", "k8s://test-context/test-namespace/test-pvc/"]); let newcfg = Config::parse_from(vec![
"x",
"x",
"k8s://test-context/test-namespace/test-pvc/",
]);
assert_eq!( assert_eq!(
newcfg.get_remote_pvc().unwrap_err().to_string(), newcfg.get_remote_pvc().unwrap_err().to_string(),
ConfigError::RemoteTrailingElements.to_string(), ConfigError::RemoteTrailingElements.to_string(),
@ -47,7 +51,11 @@ fn test_config_extractors_trailing_slash() {
#[test] #[test]
fn test_config_extractors_too_many_elements() { fn test_config_extractors_too_many_elements() {
let newcfg = Config::parse_from(vec!["x", "x", "k8s://test-context/test-namespace/test-pvc/blah"]); let newcfg = Config::parse_from(vec![
"x",
"x",
"k8s://test-context/test-namespace/test-pvc/blah",
]);
assert_eq!( assert_eq!(
newcfg.get_remote_pvc().unwrap_err().to_string(), newcfg.get_remote_pvc().unwrap_err().to_string(),
ConfigError::RemoteTrailingElements.to_string(), ConfigError::RemoteTrailingElements.to_string(),
@ -57,57 +65,97 @@ fn test_config_extractors_too_many_elements() {
#[test] #[test]
fn test_config_extractors_blank_namespace() { fn test_config_extractors_blank_namespace() {
let newcfg = Config::parse_from(vec!["x", "x", "k8s://test-context//test-pvc"]); let newcfg = Config::parse_from(vec!["x", "x", "k8s://test-context//test-pvc"]);
let expected_err = newcfg.get_remote_namespace().expect_err("Expected RemoteNoNamespace error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteNoNamespace.to_string()); .get_remote_namespace()
.expect_err("Expected RemoteNoNamespace error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteNoNamespace.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_blank_context() { fn test_config_extractors_blank_context() {
let newcfg = Config::parse_from(vec!["x", "x", "k8s:///test-namespace/test-pvc"]); let newcfg = Config::parse_from(vec!["x", "x", "k8s:///test-namespace/test-pvc"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected RemoteNoContext error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteNoContext.to_string()); .get_remote_context()
.expect_err("Expected RemoteNoContext error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteNoContext.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_only_scheme() { fn test_config_extractors_only_scheme() {
let newcfg = Config::parse_from(vec!["x", "x", "k8s:"]); let newcfg = Config::parse_from(vec!["x", "x", "k8s:"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalid.to_string()); .get_remote_context()
.expect_err("Expected RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalid.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_nothing() { fn test_config_extractors_nothing() {
let newcfg = Config::parse_from(vec!["x", "x", ""]); let newcfg = Config::parse_from(vec!["x", "x", ""]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalid.to_string()); .get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalid.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_single_colon() { fn test_config_extractors_single_colon() {
let newcfg = Config::parse_from(vec!["x", "x", ":"]); let newcfg = Config::parse_from(vec!["x", "x", ":"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalid.to_string()); .get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalid.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_single_name() { fn test_config_extractors_single_name() {
let newcfg = Config::parse_from(vec!["x", "x", "ted"]); let newcfg = Config::parse_from(vec!["x", "x", "ted"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalid.to_string()); .get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalid.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_single_slash() { fn test_config_extractors_single_slash() {
let newcfg = Config::parse_from(vec!["x", "x", "/"]); let newcfg = Config::parse_from(vec!["x", "x", "/"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalid.to_string()); .get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalid.to_string()
);
} }
#[test] #[test]
fn test_config_extractors_crazy_scheme() { fn test_config_extractors_crazy_scheme() {
let newcfg = Config::parse_from(vec!["x", "x", "crazyscheme://ctx/ns/pvc"]); let newcfg = Config::parse_from(vec!["x", "x", "crazyscheme://ctx/ns/pvc"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
assert_eq!(expected_err.to_string(), ConfigError::RemoteInvalidScheme.to_string()); .get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
assert_eq!(
expected_err.to_string(),
ConfigError::RemoteInvalidScheme.to_string()
);
} }
#[test] #[test]
@ -118,7 +166,9 @@ fn test_config_extractors_crazy_scheme() {
/// that's truly present. /// that's truly present.
fn test_config_extractors_crazy_scheme_and_other_problems() { fn test_config_extractors_crazy_scheme_and_other_problems() {
let newcfg = Config::parse_from(vec!["x", "x", "crazyscheme:///ns"]); let newcfg = Config::parse_from(vec!["x", "x", "crazyscheme:///ns"]);
let expected_err = newcfg.get_remote_context().expect_err("Expected generic RemoteInvalid error"); let expected_err = newcfg
.get_remote_context()
.expect_err("Expected generic RemoteInvalid error");
let eestr = expected_err.to_string(); let eestr = expected_err.to_string();
assert_eq!(eestr, ConfigError::RemoteInvalidScheme.to_string()); assert_eq!(eestr, ConfigError::RemoteInvalidScheme.to_string());
} }