From 1a7dfa7acdbb58b068fda20be37333eb959d0304 Mon Sep 17 00:00:00 2001 From: Michael Francis Date: Sun, 26 May 2024 15:02:43 -0400 Subject: [PATCH] Added ownerRefs --- src/main.rs | 60 ++++++++++++++++++++++++++++++++++++++++------------ src/minio.rs | 49 ++++++++++++++++++++++++++++++------------ src/neon.rs | 39 +++++++++++++++++++++++----------- 3 files changed, 109 insertions(+), 39 deletions(-) diff --git a/src/main.rs b/src/main.rs index 96200d5..2e65571 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,14 +12,16 @@ use kube::runtime::wait::await_condition; use kube::runtime::watcher::Config; use kube::Client; use kube::CustomResourceExt; +use kube::Resource; use kube::ResourceExt; use kube::{runtime::controller::Action, runtime::Controller}; use tracing::error; use tracing::info; use tracing::instrument; -pub mod crd; -pub mod minio; -pub mod neon; + +mod crd; +mod minio; +mod neon; struct ContextData { pub kube_client: Client, @@ -40,9 +42,11 @@ pub enum Error { #[from] source: kube::Error, }, - /// Error in user input or Echo resource definition, typically missing fields. - #[error("Invalid Neon CRD: {0}")] + /// Error in user input or NeonDatabase resource definition, typically missing fields. + #[error("Invalid NeonDatabase CRD: {0}")] UserInputError(String), + #[error("General error: {0}")] + GeneralError(String), } #[tokio::main] @@ -55,7 +59,7 @@ async fn main() { let crd_api: Api = Api::all(kube_client.clone()); let context: Arc = Arc::new(ContextData::new(kube_client.clone())); - let ssapply = PatchParams::apply("crd_apply_example").force(); + let ssapply = PatchParams::apply("neon_operator").force(); let crds: Api = Api::all(kube_client.clone()); info!( "Creating crd: {}", @@ -77,7 +81,7 @@ async fn main() { ); let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish) .await - .expect("Waiting failed"); + .expect("Timeout waiting for CRD to establish"); info!("CRD established, starting controller"); Controller::new(crd_api.clone(), Config::default()) @@ -100,16 +104,44 @@ async fn reconcile(neon: Arc, context: Arc) -> Result let client = context.kube_client.clone(); let namespace: String = match neon.namespace() { - Some(ns) => ns.to_string(), + Some(ns) => ns, None => return Err(Error::UserInputError("Missing namespace".to_string())), }; - minio::create_deployment(client.clone(), &namespace).await?; - minio::create_service(client.clone(), &namespace).await?; - neon::reconcile_storage_broker(client.clone(), &namespace).await?; - neon::reconcile_page_server(client.clone(), &namespace).await?; - neon::reconcile_safe_keepers(client.clone(), &namespace).await?; - neon::reconcile_compute(client.clone(), &namespace, &neon.spec.compute_image_ref).await?; + let owner_ref = neon + .controller_owner_ref(&()) + .ok_or(Error::GeneralError("Missing owner ref".to_string()))?; + + minio::create_deployment(client.clone(), &owner_ref, &namespace).await?; + minio::create_service(client.clone(), &owner_ref, &namespace).await?; + neon::reconcile_storage_broker( + client.clone(), + &owner_ref, + &namespace, + &neon.spec.neon_image_ref, + ) + .await?; + neon::reconcile_page_server( + client.clone(), + &owner_ref, + &namespace, + &neon.spec.neon_image_ref, + ) + .await?; + neon::reconcile_safe_keepers( + client.clone(), + &owner_ref, + &namespace, + &neon.spec.neon_image_ref, + ) + .await?; + neon::reconcile_compute( + client.clone(), + &owner_ref, + &namespace, + &neon.spec.compute_image_ref, + ) + .await?; Ok(Action::requeue(Duration::from_secs(5))) } diff --git a/src/minio.rs b/src/minio.rs index 656a886..30085bb 100644 --- a/src/minio.rs +++ b/src/minio.rs @@ -1,16 +1,33 @@ use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec}; use k8s_openapi::api::core::v1::{ - Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, + Container, ContainerPort, EnvVar, HTTPGetAction, PodSpec, PodTemplateSpec, Probe, Service, + ServicePort, ServiceSpec, }; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, OwnerReference}; +use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; use kube::api::{ListParams, ObjectMeta, Patch, PatchParams}; use kube::{Api, Client, Error}; use std::collections::BTreeMap; use tracing::{info, instrument}; -const NAME: &str = "minio"; +// Minio settings to lift out +const NAME: &str = "minio"; // In a real world scenario, we should support multiple minio instances +const MINIO_ROOT_USER: &str = "minio"; +const MINIO_ROOT_PASSWORD: &str = "password"; +const MINIO_IMAGE: &str = "quay.io/minio/minio:RELEASE.2022-10-20T00-55-09Z"; +const MINIO_PORT: i32 = 9000; +const MINIO_CONSOLE_PORT: i32 = 9001; -pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Error> { +// Workflow: +// 1. Create minio deployment +// 2. Create minio service +// 3. Ensure bucket exists + +pub async fn create_deployment( + client: Client, + owner_ref: &OwnerReference, + namespace: &str, +) -> Result<(), Error> { let minio_container = Container { name: NAME.to_string(), env: Some(vec![ @@ -49,20 +66,20 @@ pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Er .map(|s| s.to_string()) .collect::>(), ), - liveness_probe: Some(k8s_openapi::api::core::v1::Probe { - http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction { + liveness_probe: Some(Probe { + http_get: Some(HTTPGetAction { path: Some("/minio/health/live".to_string()), - port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9000), + port: IntOrString::Int(9000), ..Default::default() }), initial_delay_seconds: Some(10), period_seconds: Some(30), ..Default::default() }), - readiness_probe: Some(k8s_openapi::api::core::v1::Probe { - http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction { + readiness_probe: Some(Probe { + http_get: Some(HTTPGetAction { path: Some("/minio/health/ready".to_string()), - port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9000), + port: IntOrString::Int(9000), ..Default::default() }), initial_delay_seconds: Some(10), @@ -100,6 +117,7 @@ pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Er metadata: ObjectMeta { name: Some(NAME.to_string()), namespace: Some(namespace.to_string()), + owner_references: Some(vec![owner_ref.clone()]), labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])), ..Default::default() }, @@ -131,7 +149,7 @@ pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Er deployment_api .patch( NAME, - &PatchParams::apply("neon-operator"), + &PatchParams::apply("neon_operator"), &Patch::Apply(deployment), ) .await?; @@ -139,12 +157,17 @@ pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Er Ok(()) } -pub async fn create_service(client: Client, namespace: &str) -> Result<(), Error> { +pub async fn create_service( + client: Client, + owner_ref: &OwnerReference, + namespace: &str, +) -> Result<(), Error> { let service = Service { metadata: ObjectMeta { name: Some(NAME.to_string()), namespace: Some(namespace.to_string()), labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])), + owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(ServiceSpec { @@ -162,7 +185,7 @@ pub async fn create_service(client: Client, namespace: &str) -> Result<(), Error Api::::namespaced(client, namespace) .patch( NAME, - &PatchParams::apply("neon-operator"), + &PatchParams::apply("neon_operator"), &Patch::Apply(service), ) .await?; diff --git a/src/neon.rs b/src/neon.rs index 65facd8..d253244 100644 --- a/src/neon.rs +++ b/src/neon.rs @@ -2,21 +2,22 @@ use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet}; use k8s_openapi::api::core::v1::{ Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, }; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, OwnerReference}; use kube::api::{ObjectMeta, Patch, PatchParams}; use kube::{Api, Client, Error}; use std::collections::BTreeMap; -// Workflow: -// 1. Create minio deployment -// 2. Create minio service -// 3. Ensure bucket exists - -pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result<(), Error> { +pub async fn reconcile_storage_broker( + client: Client, + owner_ref: &OwnerReference, + namespace: &str, + neon_image_ref: &str, +) -> Result<(), Error> { let storage_broker = Deployment { metadata: ObjectMeta { name: Some("storage-broker".to_string()), namespace: Some(namespace.to_string()), + owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec { @@ -39,7 +40,7 @@ pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result spec: Some(PodSpec { containers: vec![Container { name: "storage-broker".to_string(), - image: Some("neondatabase/neon".to_string()), + image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 50051, ..Default::default() @@ -104,11 +105,17 @@ pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result Ok(()) } -pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<(), Error> { +pub async fn reconcile_page_server( + client: Client, + owner_ref: &OwnerReference, + namespace: &str, + neon_image_ref: &str, +) -> Result<(), Error> { let deployment = Deployment { metadata: ObjectMeta { name: Some("pageserver".to_string()), namespace: Some(namespace.to_string()), + owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec { @@ -131,7 +138,7 @@ pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<() spec: Some(PodSpec { containers: vec![Container { name: "pageserver".to_string(), - image: Some("neondatabase/neon".to_string()), + image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 9898, name: Some("http".to_string()), @@ -235,11 +242,17 @@ pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<() Ok(()) } -pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<(), Error> { +pub async fn reconcile_safe_keepers( + client: Client, + owner_ref: &OwnerReference, + namespace: &str, + neon_image_ref: &str, +) -> Result<(), Error> { let stateful_set = StatefulSet { metadata: ObjectMeta { name: Some("safe-keeper".to_string()), namespace: Some(namespace.to_string()), + owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec { @@ -263,7 +276,7 @@ pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<( spec: Some(PodSpec { containers: vec![Container { name: "safe-keeper".to_string(), - image: Some("neondatabase/neon".to_string()), + image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 7676, ..Default::default() @@ -375,6 +388,7 @@ pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<( pub async fn reconcile_compute( client: Client, + owner_ref: &OwnerReference, namespace: &str, compute_image: &str, ) -> Result<(), Error> { @@ -382,6 +396,7 @@ pub async fn reconcile_compute( metadata: ObjectMeta { name: Some("compute".to_string()), namespace: Some(namespace.to_string()), + owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec {