use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet}; use k8s_openapi::api::core::v1::{ Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec, }; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, OwnerReference}; use kube::api::{ObjectMeta, Patch, PatchParams}; use kube::{Api, Client, Error}; use std::collections::BTreeMap; pub async fn reconcile_storage_broker( client: Client, owner_ref: &OwnerReference, namespace: &str, neon_image_ref: &str, ) -> Result<(), Error> { let storage_broker = Deployment { metadata: ObjectMeta { name: Some("storage-broker".to_string()), namespace: Some(namespace.to_string()), owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec { replicas: Some(1), selector: LabelSelector { match_labels: Some(BTreeMap::from([( "app".to_string(), "storage-broker".to_string(), )])), ..Default::default() }, template: PodTemplateSpec { metadata: Some(ObjectMeta { labels: Some(BTreeMap::from([( "app".to_string(), "storage-broker".to_string(), )])), ..Default::default() }), spec: Some(PodSpec { containers: vec![Container { name: "storage-broker".to_string(), image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 50051, ..Default::default() }]), command: Some(vec![ "storage_broker".to_string(), "--listen-addr=0.0.0.0:50051".to_string(), ]), ..Default::default() }], ..Default::default() }), ..Default::default() }, ..Default::default() }), ..Default::default() }; let deployment_api = Api::::namespaced(client.clone(), namespace); deployment_api .patch( "storage-broker", &PatchParams::apply("neon-operator"), &Patch::Apply(storage_broker), ) .await?; let service = Service { metadata: ObjectMeta { name: Some("storage-broker".to_string()), namespace: Some(namespace.to_string()), labels: Some(BTreeMap::from([( "app".to_string(), "storage-broker".to_string(), )])), ..Default::default() }, spec: Some(ServiceSpec { selector: Some(BTreeMap::from([( "app".to_string(), "storage-broker".to_string(), )])), ports: Some(vec![ServicePort { port: 50051, ..Default::default() }]), ..Default::default() }), ..Default::default() }; let service_api = Api::::namespaced(client.clone(), namespace); service_api .patch( "storage-broker", &PatchParams::apply("neon-operator"), &Patch::Apply(service), ) .await?; Ok(()) } pub async fn reconcile_page_server( client: Client, owner_ref: &OwnerReference, namespace: &str, neon_image_ref: &str, ) -> Result<(), Error> { let deployment = Deployment { metadata: ObjectMeta { name: Some("pageserver".to_string()), namespace: Some(namespace.to_string()), owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec { replicas: Some(1), selector: LabelSelector { match_labels: Some(BTreeMap::from([( "app".to_string(), "pageserver".to_string(), )])), ..Default::default() }, template: PodTemplateSpec { metadata: Some(ObjectMeta { labels: Some(BTreeMap::from([( "app".to_string(), "pageserver".to_string(), )])), ..Default::default() }), spec: Some(PodSpec { containers: vec![Container { name: "pageserver".to_string(), image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 9898, name: Some("http".to_string()), ..Default::default() }, ContainerPort { container_port: 6400, name: Some("pg".to_string()), ..Default::default() }]), env: Some(vec![ EnvVar { name: "BROKER_ENDPOINT".to_string(), value: Some("storage-broker:50051".to_string()), ..Default::default() }, EnvVar { name: "AWS_ACCESS_KEY_ID".to_string(), value: Some("minio".to_string()), ..Default::default() }, EnvVar { name: "AWS_SECRET_ACCESS_KEY".to_string(), value: Some("password".to_string()), ..Default::default() }, ]), command: Some(vec!["/usr/local/bin/pageserver".to_string()]), args: Some(vec![ "-D".to_string(), "/data/.neon/".to_string(), "-c".to_string(), r#"broker_endpoint='http://storage-broker:50051'"#.to_string(), "-c".to_string(), r#"listen_pg_addr='0.0.0.0:6400'"#.to_string(), "-c".to_string(), r#"listen_http_addr='0.0.0.0:9898'"#.to_string(), "-c".to_string(), r#"remote_storage={endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/pageserver/'}"#.to_string(), ]), ..Default::default() }], ..Default::default() }), ..Default::default() }, ..Default::default() }), ..Default::default() }; let deployment_api = Api::::namespaced(client.clone(), namespace); deployment_api .patch( "pageserver", &PatchParams::apply("neon_operator"), &Patch::Apply(deployment), ) .await?; let service = Service { metadata: ObjectMeta { name: Some("pageserver".to_string()), namespace: Some(namespace.to_string()), labels: Some(BTreeMap::from([( "app".to_string(), "pageserver".to_string(), )])), ..Default::default() }, spec: Some(ServiceSpec { selector: Some(BTreeMap::from([( "app".to_string(), "pageserver".to_string(), )])), ports: Some(vec![ ServicePort { port: 9898, name: Some("http".to_string()), ..Default::default() }, ServicePort { port: 6400, name: Some("pg".to_string()), ..Default::default() }, ]), ..Default::default() }), ..Default::default() }; let service_api = Api::::namespaced(client.clone(), namespace); service_api .patch( "pageserver", &PatchParams::apply("neon_operator"), &Patch::Apply(service), ) .await?; Ok(()) } pub async fn reconcile_safe_keepers( client: Client, owner_ref: &OwnerReference, namespace: &str, neon_image_ref: &str, ) -> Result<(), Error> { let stateful_set = StatefulSet { metadata: ObjectMeta { name: Some("safe-keeper".to_string()), namespace: Some(namespace.to_string()), owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec { service_name: "safe-keeper".to_string(), replicas: Some(3), selector: LabelSelector { match_labels: Some(BTreeMap::from([( "app".to_string(), "safe-keeper".to_string(), )])), ..Default::default() }, template: PodTemplateSpec { metadata: Some(ObjectMeta { labels: Some(BTreeMap::from([( "app".to_string(), "safe-keeper".to_string(), )])), ..Default::default() }), spec: Some(PodSpec { containers: vec![Container { name: "safe-keeper".to_string(), image: Some(neon_image_ref.to_string()), ports: Some(vec![ContainerPort { container_port: 7676, ..Default::default() }]), command: Some(vec![ "bash".to_string(), "-c".to_string(), ]), args: Some(vec![ r#"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1 export ordinal=${BASH_REMATCH[1]} safekeeper --listen-pg=$SAFEKEEPER_ADVERTISE_URL \ --listen-http='0.0.0.0:7676' \ --id=$ordinal \ --broker-endpoint=$BROKER_ENDPOINT \ -D /data \ --remote-storage="{endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/safekeeper/'}" "#.to_string(), ]), env: Some(vec![ EnvVar { name: "POD_NAME".to_string(), value_from: Some(k8s_openapi::api::core::v1::EnvVarSource { field_ref: Some(k8s_openapi::api::core::v1::ObjectFieldSelector { field_path: "metadata.name".to_string(), ..Default::default() }), ..Default::default() }), ..Default::default() }, EnvVar { name: "SAFEKEEPER_ADVERTISE_URL".to_string(), value: Some("$(POD_NAME).safe-keeper.default.svc.cluster.local:5454".to_string()), ..Default::default() }, EnvVar { name: "BROKER_ENDPOINT".to_string(), value: Some("http://storage-broker:50051".to_string()), ..Default::default() }, EnvVar { name: "AWS_ACCESS_KEY_ID".to_string(), value: Some("minio".to_string()), ..Default::default() }, EnvVar { name: "AWS_SECRET_ACCESS_KEY".to_string(), value: Some("password".to_string()), ..Default::default() }, ]), ..Default::default() }], ..Default::default() }), ..Default::default() }, ..Default::default() }), ..Default::default() }; let headless_service = Service { metadata: ObjectMeta { name: Some("safe-keeper".to_string()), namespace: Some(namespace.to_string()), labels: Some(BTreeMap::from([( "app".to_string(), "safe-keeper".to_string(), )])), ..Default::default() }, spec: Some(ServiceSpec { cluster_ip: Some("None".to_string()), selector: Some(BTreeMap::from([( "app".to_string(), "safe-keeper".to_string(), )])), ports: Some(vec![ServicePort { port: 5454, ..Default::default() }]), ..Default::default() }), ..Default::default() }; let stateful_set_api: Api = Api::namespaced(client.clone(), &namespace); stateful_set_api .patch( "safe-keeper", &PatchParams::apply("neon_operator"), &Patch::Apply(stateful_set), ) .await?; let service_api = Api::::namespaced(client.clone(), &namespace); service_api .patch( "safe-keeper", &PatchParams::apply("neon_operator"), &Patch::Apply(headless_service), ) .await?; Ok(()) } pub async fn reconcile_compute( client: Client, owner_ref: &OwnerReference, namespace: &str, compute_image: &str, ) -> Result<(), Error> { let deployment = Deployment { metadata: ObjectMeta { name: Some("compute".to_string()), namespace: Some(namespace.to_string()), owner_references: Some(vec![owner_ref.clone()]), ..Default::default() }, spec: Some(DeploymentSpec { replicas: Some(1), selector: LabelSelector { match_labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])), ..Default::default() }, template: PodTemplateSpec { metadata: Some(ObjectMeta { labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])), ..Default::default() }), spec: Some(PodSpec { containers: vec![Container { name: "compute".to_string(), image: Some(compute_image.to_string()), ports: Some(vec![ContainerPort { container_port: 9898, ..Default::default() }]), env: Some(vec![EnvVar { name: "PG_VERSION".to_string(), value: Some("15".to_string()), ..Default::default() }]), command: Some(vec!["/shell/compute.sh".to_string()]), ..Default::default() }], ..Default::default() }), ..Default::default() }, ..Default::default() }), ..Default::default() }; let service = Service { metadata: ObjectMeta { name: Some("compute".to_string()), namespace: Some(namespace.to_string()), labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])), ..Default::default() }, spec: Some(ServiceSpec { selector: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])), ports: Some(vec![ServicePort { port: 9898, ..Default::default() }]), ..Default::default() }), ..Default::default() }; let deployment_api = Api::::namespaced(client.clone(), &namespace); deployment_api .patch( "compute", &PatchParams::apply("neon_operator"), &Patch::Apply(deployment), ) .await?; let service_api = Api::::namespaced(client.clone(), &namespace); service_api .patch( "compute", &PatchParams::apply("neon_operator"), &Patch::Apply(service), ) .await?; Ok(()) }