This commit is contained in:
James Andariese 2023-11-01 18:54:29 -05:00
parent 060039d78d
commit 03a7cc5605

View File

@ -256,8 +256,12 @@ async fn setup_pod(ctx: &mut AppContext) -> Result<(Api<Pod>, String)> {
let existing_pvc = pvcs_api.get_opt(kube_pvc).await?; let existing_pvc = pvcs_api.get_opt(kube_pvc).await?;
if let None = existing_pvc { if let None = existing_pvc {
info!("pvc doesn't exist yet. creating now."); info!("pvc doesn't exist yet. creating now.");
let mut repo_pvc = PersistentVolumeClaim { let repo_pvc = PersistentVolumeClaim {
metadata: ObjectMeta::default(), metadata: ObjectMeta{
name: Some(kube_pvc.to_owned()),
namespace: Some(kube_ns.to_owned()),
..ObjectMeta::default()
},
spec: Some(PersistentVolumeClaimSpec { spec: Some(PersistentVolumeClaimSpec {
access_modes: Some(vec!["ReadWriteOnce".to_owned()]), access_modes: Some(vec!["ReadWriteOnce".to_owned()]),
resources: Some(ResourceRequirements { resources: Some(ResourceRequirements {
@ -274,10 +278,6 @@ async fn setup_pod(ctx: &mut AppContext) -> Result<(Api<Pod>, String)> {
}), }),
status: None, status: None,
}; };
let mut meta = ObjectMeta::default();
meta.name = Some(kube_pvc.to_owned());
meta.namespace = Some(kube_ns.to_owned());
repo_pvc.metadata = meta;
let pp = PostParams::default(); let pp = PostParams::default();
let created_pvc = pvcs_api.create(&pp, &repo_pvc).await?; let created_pvc = pvcs_api.create(&pp, &repo_pvc).await?;
debug!("created pvc: {created_pvc:#?}"); debug!("created pvc: {created_pvc:#?}");
@ -285,54 +285,60 @@ async fn setup_pod(ctx: &mut AppContext) -> Result<(Api<Pod>, String)> {
debug!("{:#?}", existing_pvc); debug!("{:#?}", existing_pvc);
// create the worker pod // create the worker pod
let mut worker_pod = Pod::default();
worker_pod.metadata.name = Some(kube_worker_name.to_owned()); let worker_pod = Pod{
worker_pod.metadata.namespace = Some(kube_ns.to_owned()); metadata: ObjectMeta{
{ name: Some(kube_worker_name.to_owned()),
namespace: Some(kube_ns.to_owned()),
labels: Some({
let mut labels = BTreeMap::new(); let mut labels = BTreeMap::new();
for (k, v) in kube_pod_labels.iter() { for (k, v) in kube_pod_labels.iter() {
let kk = k.to_owned().to_owned(); let kk = k.to_owned().to_owned();
let vv = v.to_owned().to_owned(); let vv = v.to_owned().to_owned();
labels.insert(kk, vv); labels.insert(kk, vv);
} }
worker_pod.metadata.labels = Some(labels); labels
} }),
{ ..ObjectMeta::default()
let mut spec = PodSpec::default(); },
spec.restart_policy = Some("Never".to_owned()); spec: Some(PodSpec {
{ restart_policy: Some("Never".to_owned()),
let mut container = Container::default(); containers: vec![Container{
container.name = KUBE_CONTAINER_NAME.to_owned(); name: KUBE_CONTAINER_NAME.to_owned(),
container.command = Some(vec![ command: Some(vec![
kube_shell_executable.to_owned(), kube_shell_executable.to_owned(),
kube_shell_parameters.to_owned(), kube_shell_parameters.to_owned(),
kube_shell_sleeper_command.to_owned(), kube_shell_sleeper_command.to_owned(),
]); ]),
container.image = Some(kube_image.to_owned()); image: Some(kube_image.to_owned()),
container.working_dir = Some(kube_repo_mount_path.to_owned()); working_dir: Some(kube_repo_mount_path.to_owned()),
container.image_pull_policy = Some("IfNotPresent".to_owned()); image_pull_policy: Some("IfNotPresent".to_owned()),
{ volume_mounts: Some(vec![
let mut volume_mount = VolumeMount::default(); VolumeMount{
volume_mount.mount_path = kube_repo_mount_path.to_owned(); mount_path: kube_repo_mount_path.to_owned(),
volume_mount.name = "repo".to_owned(); name: "repo".to_owned(),
container.volume_mounts = Some(vec![volume_mount]); ..VolumeMount::default()
} }
spec.containers = vec![container]; ]),
} ..Container::default()
{ }],
let mut volume = Volume::default(); volumes: Some(vec![
volume.name = "repo".to_owned(); Volume{
{ persistent_volume_claim: Some(
let mut pvcs = PersistentVolumeClaimVolumeSource::default(); PersistentVolumeClaimVolumeSource {
pvcs.claim_name = kube_pvc.to_owned(); claim_name: kube_pvc.to_owned(),
volume.persistent_volume_claim = Some(pvcs); read_only: Some(false),
} ..PersistentVolumeClaimVolumeSource::default()
spec.volumes = Some(vec![volume]);
}
worker_pod.spec = Some(spec);
} }
),
..Volume::default()
},
]),
..PodSpec::default()
}),
..Pod::default()
};
// debug!("Pod: {:?}", worker_pod);
let mut lp = ListParams::default(); let mut lp = ListParams::default();
let mut ls: String = String::with_capacity(kube_pod_labels.len() * 100); let mut ls: String = String::with_capacity(kube_pod_labels.len() * 100);
for (k, v) in kube_pod_labels { for (k, v) in kube_pod_labels {
@ -345,7 +351,6 @@ async fn setup_pod(ctx: &mut AppContext) -> Result<(Api<Pod>, String)> {
debug!("list params: {lp:#?}"); debug!("list params: {lp:#?}");
let worker_pods = pods_api.list(&lp).await?; let worker_pods = pods_api.list(&lp).await?;
// debug!("worker_pods: {worker_pods:#?}");
// 1: if there is >1 pod, bail // 1: if there is >1 pod, bail
// 2: if there is 1 pod and that pod is running or pending, use it // 2: if there is 1 pod and that pod is running or pending, use it
@ -433,13 +438,6 @@ async fn do_git(ctx: &AppContext, pods_api: Api<Pod>, kube_worker_name: String)
let mut ttyout = tokio::io::stdout(); let mut ttyout = tokio::io::stdout();
let mut ttyin = tokio::io::stdin(); let mut ttyin = tokio::io::stdin();
// tokio::spawn(async {
// loop {
// sleep(Duration::from_secs(1)).await;
// debug!("ping");
// };
// }.instrument(error_span!("pinger")));
let connect_cmd = negotiate_git_protocol(&mut ttyout, &mut ttyin) let connect_cmd = negotiate_git_protocol(&mut ttyout, &mut ttyin)
.await? .await?
.ok_or(anyhow!( .ok_or(anyhow!(
@ -454,7 +452,6 @@ async fn do_git(ctx: &AppContext, pods_api: Api<Pod>, kube_worker_name: String)
.stdout(true) .stdout(true)
.stderr(true) .stderr(true)
.container(KUBE_CONTAINER_NAME.to_owned()); .container(KUBE_CONTAINER_NAME.to_owned());
// let (ready_tx, ready_rx) = oneshot::channel::<()>();
let mut stuff = pods_api let mut stuff = pods_api
.exec(&kube_worker_name, vec!["sh", "-c", &gitcommand], &ap) .exec(&kube_worker_name, vec!["sh", "-c", &gitcommand], &ap)
.await?; .await?;
@ -469,7 +466,6 @@ async fn do_git(ctx: &AppContext, pods_api: Api<Pod>, kube_worker_name: String)
.stderr() .stderr()
.ok_or(ApplicationError::PodCouldNotOpenStderr)?; .ok_or(ApplicationError::PodCouldNotOpenStderr)?;
let mut poderr = tokio_util::io::ReaderStream::new(poderr); let mut poderr = tokio_util::io::ReaderStream::new(poderr);
// ready_tx.send(()).expect("failed to send ready check");
let barrier = Arc::new(tokio::sync::Barrier::new(4)); let barrier = Arc::new(tokio::sync::Barrier::new(4));