457 lines
17 KiB
Rust
457 lines
17 KiB
Rust
|
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet};
|
||
|
use k8s_openapi::api::core::v1::{
|
||
|
Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
|
||
|
};
|
||
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||
|
use kube::api::{ObjectMeta, Patch, PatchParams};
|
||
|
use kube::{Api, Client, Error};
|
||
|
use std::collections::BTreeMap;
|
||
|
|
||
|
// Workflow:
|
||
|
// 1. Create minio deployment
|
||
|
// 2. Create minio service
|
||
|
// 3. Ensure bucket exists
|
||
|
|
||
|
pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result<(), Error> {
|
||
|
let storage_broker = Deployment {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("storage-broker".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(DeploymentSpec {
|
||
|
replicas: Some(1),
|
||
|
selector: LabelSelector {
|
||
|
match_labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"storage-broker".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
template: PodTemplateSpec {
|
||
|
metadata: Some(ObjectMeta {
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"storage-broker".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
spec: Some(PodSpec {
|
||
|
containers: vec![Container {
|
||
|
name: "storage-broker".to_string(),
|
||
|
image: Some("neondatabase/neon".to_string()),
|
||
|
ports: Some(vec![ContainerPort {
|
||
|
container_port: 50051,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
command: Some(vec![
|
||
|
"storage_broker".to_string(),
|
||
|
"--listen-addr=0.0.0.0:50051".to_string(),
|
||
|
]),
|
||
|
..Default::default()
|
||
|
}],
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
},
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
|
||
|
deployment_api
|
||
|
.patch(
|
||
|
"storage-broker",
|
||
|
&PatchParams::apply("neon-operator"),
|
||
|
&Patch::Apply(storage_broker),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
let service = Service {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("storage-broker".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"storage-broker".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(ServiceSpec {
|
||
|
selector: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"storage-broker".to_string(),
|
||
|
)])),
|
||
|
ports: Some(vec![ServicePort {
|
||
|
port: 50051,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let service_api = Api::<Service>::namespaced(client.clone(), namespace);
|
||
|
service_api
|
||
|
.patch(
|
||
|
"storage-broker",
|
||
|
&PatchParams::apply("neon-operator"),
|
||
|
&Patch::Apply(service),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<(), Error> {
|
||
|
let deployment = Deployment {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("pageserver".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(DeploymentSpec {
|
||
|
replicas: Some(1),
|
||
|
selector: LabelSelector {
|
||
|
match_labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"pageserver".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
template: PodTemplateSpec {
|
||
|
metadata: Some(ObjectMeta {
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"pageserver".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
spec: Some(PodSpec {
|
||
|
containers: vec![Container {
|
||
|
name: "pageserver".to_string(),
|
||
|
image: Some("neondatabase/neon".to_string()),
|
||
|
ports: Some(vec![ContainerPort {
|
||
|
container_port: 9898,
|
||
|
name: Some("http".to_string()),
|
||
|
..Default::default()
|
||
|
}, ContainerPort {
|
||
|
container_port: 6400,
|
||
|
name: Some("pg".to_string()),
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
env: Some(vec![
|
||
|
EnvVar {
|
||
|
name: "BROKER_ENDPOINT".to_string(),
|
||
|
value: Some("storage-broker:50051".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "AWS_ACCESS_KEY_ID".to_string(),
|
||
|
value: Some("minio".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "AWS_SECRET_ACCESS_KEY".to_string(),
|
||
|
value: Some("password".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
]),
|
||
|
command: Some(vec!["/usr/local/bin/pageserver".to_string()]),
|
||
|
args: Some(vec![
|
||
|
"-D".to_string(),
|
||
|
"/data/.neon/".to_string(),
|
||
|
"-c".to_string(),
|
||
|
r#"broker_endpoint='http://storage-broker:50051'"#.to_string(),
|
||
|
"-c".to_string(),
|
||
|
r#"listen_pg_addr='0.0.0.0:6400'"#.to_string(),
|
||
|
"-c".to_string(),
|
||
|
r#"listen_http_addr='0.0.0.0:9898'"#.to_string(),
|
||
|
"-c".to_string(),
|
||
|
r#"remote_storage={endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/pageserver/'}"#.to_string(),
|
||
|
]),
|
||
|
..Default::default()
|
||
|
}],
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
},
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
|
||
|
deployment_api
|
||
|
.patch(
|
||
|
"pageserver",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(deployment),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
let service = Service {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("pageserver".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"pageserver".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(ServiceSpec {
|
||
|
selector: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"pageserver".to_string(),
|
||
|
)])),
|
||
|
ports: Some(vec![
|
||
|
ServicePort {
|
||
|
port: 9898,
|
||
|
name: Some("http".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
ServicePort {
|
||
|
port: 6400,
|
||
|
name: Some("pg".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
]),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let service_api = Api::<Service>::namespaced(client.clone(), namespace);
|
||
|
service_api
|
||
|
.patch(
|
||
|
"pageserver",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(service),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<(), Error> {
|
||
|
let stateful_set = StatefulSet {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("safe-keeper".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
|
||
|
service_name: "safe-keeper".to_string(),
|
||
|
replicas: Some(3),
|
||
|
selector: LabelSelector {
|
||
|
match_labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"safe-keeper".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
template: PodTemplateSpec {
|
||
|
metadata: Some(ObjectMeta {
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"safe-keeper".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
spec: Some(PodSpec {
|
||
|
containers: vec![Container {
|
||
|
name: "safe-keeper".to_string(),
|
||
|
image: Some("neondatabase/neon".to_string()),
|
||
|
ports: Some(vec![ContainerPort {
|
||
|
container_port: 7676,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
command: Some(vec![
|
||
|
"bash".to_string(),
|
||
|
"-c".to_string(),
|
||
|
]),
|
||
|
args: Some(vec![
|
||
|
r#"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
|
||
|
export ordinal=${BASH_REMATCH[1]}
|
||
|
safekeeper --listen-pg=$SAFEKEEPER_ADVERTISE_URL \
|
||
|
--listen-http='0.0.0.0:7676' \
|
||
|
--id=$ordinal \
|
||
|
--broker-endpoint=$BROKER_ENDPOINT \
|
||
|
-D /data \
|
||
|
--remote-storage="{endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/safekeeper/'}"
|
||
|
"#.to_string(),
|
||
|
]),
|
||
|
env: Some(vec![
|
||
|
EnvVar {
|
||
|
name: "POD_NAME".to_string(),
|
||
|
value_from: Some(k8s_openapi::api::core::v1::EnvVarSource {
|
||
|
field_ref: Some(k8s_openapi::api::core::v1::ObjectFieldSelector {
|
||
|
field_path: "metadata.name".to_string(),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "SAFEKEEPER_ADVERTISE_URL".to_string(),
|
||
|
value: Some("$(POD_NAME).safe-keeper.default.svc.cluster.local:5454".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "BROKER_ENDPOINT".to_string(),
|
||
|
value: Some("http://storage-broker:50051".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "AWS_ACCESS_KEY_ID".to_string(),
|
||
|
value: Some("minio".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
EnvVar {
|
||
|
name: "AWS_SECRET_ACCESS_KEY".to_string(),
|
||
|
value: Some("password".to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
]),
|
||
|
..Default::default()
|
||
|
}],
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
},
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let headless_service = Service {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("safe-keeper".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
labels: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"safe-keeper".to_string(),
|
||
|
)])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(ServiceSpec {
|
||
|
cluster_ip: Some("None".to_string()),
|
||
|
selector: Some(BTreeMap::from([(
|
||
|
"app".to_string(),
|
||
|
"safe-keeper".to_string(),
|
||
|
)])),
|
||
|
ports: Some(vec![ServicePort {
|
||
|
port: 5454,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let stateful_set_api: Api<StatefulSet> = Api::namespaced(client.clone(), &namespace);
|
||
|
stateful_set_api
|
||
|
.patch(
|
||
|
"safe-keeper",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(stateful_set),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
|
||
|
service_api
|
||
|
.patch(
|
||
|
"safe-keeper",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(headless_service),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
pub async fn reconcile_compute(client: Client, namespace: &str, image: &str) -> Result<(), Error> {
|
||
|
let deployment = Deployment {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("compute".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(DeploymentSpec {
|
||
|
replicas: Some(1),
|
||
|
selector: LabelSelector {
|
||
|
match_labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
template: PodTemplateSpec {
|
||
|
metadata: Some(ObjectMeta {
|
||
|
labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
spec: Some(PodSpec {
|
||
|
containers: vec![Container {
|
||
|
name: "compute".to_string(),
|
||
|
image: Some(image.to_string()),
|
||
|
ports: Some(vec![ContainerPort {
|
||
|
container_port: 9898,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
env: Some(vec![EnvVar {
|
||
|
name: "PG_VERSION".to_string(),
|
||
|
value: Some("15".to_string()),
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
command: Some(vec!["/shell/compute.sh".to_string()]),
|
||
|
..Default::default()
|
||
|
}],
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
},
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let service = Service {
|
||
|
metadata: ObjectMeta {
|
||
|
name: Some("compute".to_string()),
|
||
|
namespace: Some(namespace.to_string()),
|
||
|
labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
|
||
|
..Default::default()
|
||
|
},
|
||
|
spec: Some(ServiceSpec {
|
||
|
selector: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
|
||
|
ports: Some(vec![ServicePort {
|
||
|
port: 9898,
|
||
|
..Default::default()
|
||
|
}]),
|
||
|
..Default::default()
|
||
|
}),
|
||
|
..Default::default()
|
||
|
};
|
||
|
|
||
|
let deployment_api = Api::<Deployment>::namespaced(client.clone(), &namespace);
|
||
|
deployment_api
|
||
|
.patch(
|
||
|
"compute",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(deployment),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
|
||
|
service_api
|
||
|
.patch(
|
||
|
"compute",
|
||
|
&PatchParams::apply("neon_operator"),
|
||
|
&Patch::Apply(service),
|
||
|
)
|
||
|
.await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|