initial commit

This commit is contained in:
Michael Francis 2024-05-25 10:51:02 -04:00
commit 31e2a32e01
15 changed files with 3339 additions and 0 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
target
Tiltfile
manifests
orig-compose

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
Tiltfile
manifests/

1999
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

19
Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "neon-operator"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.37.0", features = ["full"] }
kube = { version = "0.91.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["latest"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
futures = "0.3.30"
futures-executor = { version = "0.3.30" }
schemars = "0.8.21"
serde = { version = "1.0.202", features = ["derive"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
serde_json = "1.0.117"
anyhow = "1.0.86"
thiserror = "1.0.61"

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM rust:1.78
WORKDIR /build
COPY . .
RUN --mount=type=cache,target=/usr/local/cargo/registry --mount=type=cache,target=/build/target cargo build
WORKDIR /app
RUN --mount=type=cache,target=/build/target cp /build/target/debug/neon-operator /app/
CMD ["/app/neon-operator"]

69
crd.yaml Normal file
View File

@ -0,0 +1,69 @@
apiVersion: v1
items:
- apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: "2024-05-24T17:05:01Z"
generation: 1
name: neondatabases.melenion.com
resourceVersion: "32345"
uid: c72c3bea-c002-4a74-a91a-f6d1a36f683d
spec:
conversion:
strategy: None
group: melenion.com
names:
kind: NeonDatabase
listKind: NeonDatabaseList
plural: neondatabases
shortNames:
- nd
singular: neondatabase
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
description: Auto-generated derived type for NeondatabaseSpec via `CustomResource`
properties:
spec:
properties:
neon_image_ref:
type: string
postgres_version:
type: string
required:
- neon_image_ref
- postgres_version
type: object
required:
- spec
title: NeonDatabase
type: object
served: true
storage: true
subresources: {}
status:
acceptedNames:
kind: NeonDatabase
listKind: NeonDatabaseList
plural: neondatabases
shortNames:
- nd
singular: neondatabase
conditions:
- lastTransitionTime: "2024-05-24T17:05:01Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2024-05-24T17:05:01Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "True"
type: Established
storedVersions:
- v1
kind: List
metadata:
resourceVersion: ""

View File

@ -0,0 +1,16 @@
ARG REPOSITORY=neondatabase
ARG COMPUTE_IMAGE=compute-node-v14
ARG TAG=latest
FROM $REPOSITORY/${COMPUTE_IMAGE}:$TAG
USER root
RUN apt-get update && \
apt-get install -y curl \
jq \
netcat
ADD ./var/db/postgres/specs /var/db/postgres/specs
ADD ./shell/ /shell
USER postgres

View File

@ -0,0 +1,56 @@
#!/bin/bash
set -eux
# Generate a random tenant or timeline ID
#
# Takes a variable name as argument. The result is stored in that variable.
generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
PG_VERSION=${PG_VERSION:-14}
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
SPEC_FILE=/tmp/spec.json
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do
sleep 1;
done
echo "Page server is ready."
echo "Create a tenant and timeline"
generate_id tenant_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_tenant_id\": \"${tenant_id}\"}"
http://pageserver:9898/v1/tenant/
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
generate_id timeline_id
PARAMS=(
-sb
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
echo "Overwrite tenant id and timeline id in spec file"
sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
cat ${SPEC_FILE}
echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
-S ${SPEC_FILE}

View File

@ -0,0 +1,134 @@
{
"format_version": 1.0,
"timestamp": "2022-10-12T18:00:00.000Z",
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
"cluster": {
"cluster_id": "docker_compose",
"name": "docker_compose_test",
"state": "restarted",
"roles": [
{
"name": "cloud_admin",
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
"options": null
}
],
"databases": [],
"settings": [
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "wal_level",
"value": "logical",
"vartype": "enum"
},
{
"name": "wal_log_hints",
"value": "on",
"vartype": "bool"
},
{
"name": "log_connections",
"value": "on",
"vartype": "bool"
},
{
"name": "port",
"value": "55433",
"vartype": "integer"
},
{
"name": "shared_buffers",
"value": "1MB",
"vartype": "string"
},
{
"name": "max_connections",
"value": "100",
"vartype": "integer"
},
{
"name": "listen_addresses",
"value": "0.0.0.0",
"vartype": "string"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "wal_sender_timeout",
"value": "5s",
"vartype": "string"
},
{
"name": "wal_keep_size",
"value": "0",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "md5",
"vartype": "enum"
},
{
"name": "restart_after_crash",
"value": "off",
"vartype": "bool"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "neon.safekeepers",
"value": "safe-keeper-0.safe-keeper:5454,safe-keeper-1.safe-keeper:5454,safe-keeper-2.safe-keeper:5454",
"vartype": "string"
},
{
"name": "neon.timeline_id",
"value": "TIMELINE_ID",
"vartype": "string"
},
{
"name": "neon.tenant_id",
"value": "TENANT_ID",
"vartype": "string"
},
{
"name": "neon.pageserver_connstring",
"value": "host=pageserver port=6400",
"vartype": "string"
},
{
"name": "max_replication_write_lag",
"value": "500MB",
"vartype": "string"
},
{
"name": "max_replication_flush_lag",
"value": "10GB",
"vartype": "string"
}
]
},
"delta_operations": []
}

View File

@ -0,0 +1,196 @@
version: '3'
services:
minio:
restart: always
image: quay.io/minio/minio:RELEASE.2022-10-20T00-55-09Z
ports:
- 9000:9000
- 9001:9001
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=password
command: server /data --address :9000 --console-address ":9001"
minio_create_buckets:
image: minio/mc
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=password
entrypoint:
- "/bin/sh"
- "-c"
command:
- "until (/usr/bin/mc alias set minio http://minio:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD) do
echo 'Waiting to start minio...' && sleep 1;
done;
/usr/bin/mc mb minio/neon --region=eu-north-1;
exit 0;"
depends_on:
- minio
pageserver:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
environment:
- BROKER_ENDPOINT='http://storage_broker:50051'
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 6400:6400 # pg protocol handler
- 9898:9898 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "/usr/local/bin/pageserver -D /data/.neon/
-c \"broker_endpoint=$$BROKER_ENDPOINT\"
-c \"listen_pg_addr='0.0.0.0:6400'\"
-c \"listen_http_addr='0.0.0.0:9898'\"
-c \"remote_storage={endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/pageserver/'}\""
depends_on:
- storage_broker
- minio_create_buckets
safekeeper1:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper1:5454
- SAFEKEEPER_ID=1
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7676:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
safekeeper2:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper2:5454
- SAFEKEEPER_ID=2
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7677:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
safekeeper3:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper3:5454
- SAFEKEEPER_ID=3
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
ports:
#- 5454:5454 # pg protocol handler
- 7678:7676 # http endpoints
entrypoint:
- "/bin/sh"
- "-c"
command:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- storage_broker
- minio_create_buckets
storage_broker:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
ports:
- 50051:50051
command:
- "storage_broker"
- "--listen-addr=0.0.0.0:50051"
compute:
restart: always
build:
context: ./compute_wrapper/
args:
- REPOSITORY=${REPOSITORY:-neondatabase}
- COMPUTE_IMAGE=compute-node-v${PG_VERSION:-14}
- TAG=${TAG:-latest}
- http_proxy=$http_proxy
- https_proxy=$https_proxy
environment:
- PG_VERSION=${PG_VERSION:-14}
#- RUST_BACKTRACE=1
# Mount the test files directly, for faster editing cycle.
volumes:
- ./compute_wrapper/var/db/postgres/specs/:/var/db/postgres/specs/
- ./compute_wrapper/shell/:/shell/
ports:
- 55433:55433 # pg protocol handler
- 3080:3080 # http endpoints
entrypoint:
- "/shell/compute.sh"
depends_on:
- safekeeper1
- safekeeper2
- safekeeper3
- pageserver
compute_is_ready:
image: postgres:latest
entrypoint:
- "/bin/bash"
- "-c"
command:
- "until pg_isready -h compute -p 55433 -U cloud_admin ; do
echo 'Waiting to start compute...' && sleep 1;
done"
depends_on:
- compute

View File

@ -0,0 +1,58 @@
#!/bin/bash
# A basic test to ensure Docker images are built correctly.
# Build a wrapper around the compute, start all services and runs a simple SQL query.
# Repeats the process for all currenly supported Postgres versions.
# Implicitly accepts `REPOSITORY` and `TAG` env vars that are passed into the compose file
# Their defaults point at DockerHub `neondatabase/neon:latest` image.`,
# to verify custom image builds (e.g pre-published ones).
set -eux -o pipefail
SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
COMPOSE_FILE=$SCRIPT_DIR/docker-compose.yml
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
SQL="CREATE TABLE t(key int primary key, value text); insert into t values(1,1); select * from t;"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -c '$SQL' postgres"
cleanup() {
echo "show container information"
docker ps
docker compose -f $COMPOSE_FILE logs
echo "stop containers..."
docker compose -f $COMPOSE_FILE down
}
echo "clean up containers if exists"
cleanup
for pg_version in 14 15 16; do
echo "start containers (pg_version=$pg_version)."
PG_VERSION=$pg_version docker compose -f $COMPOSE_FILE up --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 1; do
# check timeout
cnt=`expr $cnt + 1`
if [ $cnt -gt 60 ]; then
echo "timeout before the compute is ready."
cleanup
exit 1
fi
# check if the compute is ready
set +o pipefail
result=`docker compose -f $COMPOSE_FILE logs "compute_is_ready" | grep "accepting connections" | wc -l`
set -o pipefail
if [ $result -eq 1 ]; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
cleanup
break
fi
done
done

20
src/crd.rs Normal file
View File

@ -0,0 +1,20 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, PartialEq, JsonSchema)]
#[kube(
group = "melenion.com",
version = "v1",
kind = "NeonDatabase",
namespaced,
shortname = "nd",
// status = "NeonDatabaseStatus"
)]
pub struct NeondatabaseSpec {
pub compute_image_ref: String,
pub neon_image_ref: String,
pub postgres_version: String,
}
pub struct NeonDatabaseStatus {}

119
src/main.rs Normal file
View File

@ -0,0 +1,119 @@
use std::sync::Arc;
use std::time::Duration;
use crd::NeonDatabase;
use futures::stream::StreamExt;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::Api;
use kube::api::Patch;
use kube::api::PatchParams;
use kube::runtime::conditions;
use kube::runtime::wait::await_condition;
use kube::runtime::watcher::Config;
use kube::Client;
use kube::CustomResourceExt;
use kube::ResourceExt;
use kube::{runtime::controller::Action, runtime::Controller};
use tracing::error;
use tracing::info;
use tracing::instrument;
pub mod crd;
pub mod minio;
pub mod neon;
struct ContextData {
pub kube_client: Client,
}
impl ContextData {
pub fn new(kube_client: Client) -> Self {
Self { kube_client }
}
}
/// All errors possible to occur during reconciliation
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Any error originating from the `kube-rs` crate
#[error("Kubernetes reported error: {source}")]
KubeError {
#[from]
source: kube::Error,
},
/// Error in user input or Echo resource definition, typically missing fields.
#[error("Invalid Neon CRD: {0}")]
UserInputError(String),
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let kube_client = Client::try_default()
.await
.expect("Failed to create client");
let crd_api: Api<NeonDatabase> = Api::all(kube_client.clone());
let context: Arc<ContextData> = Arc::new(ContextData::new(kube_client.clone()));
let ssapply = PatchParams::apply("crd_apply_example").force();
let crds: Api<CustomResourceDefinition> = Api::all(kube_client.clone());
info!(
"Creating crd: {}",
serde_json::to_string(&NeonDatabase::crd()).unwrap()
);
crds.patch(
"neondatabases.melenion.com",
&ssapply,
&Patch::Apply(NeonDatabase::crd()),
)
.await
.expect("Patching the CRD failed");
info!("Waiting for the api-server to accept the CRD");
let establish = await_condition(
crds,
"neondatabases.melenion.com",
conditions::is_crd_established(),
);
let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish)
.await
.expect("Waiting failed");
info!("CRD established, starting controller");
Controller::new(crd_api.clone(), Config::default())
.run(reconcile, on_error, context)
.for_each(|rec_result| async move {
match rec_result {
Ok(o) => {
println!("Went OK {o:?}")
}
Err(e) => {
eprintln!("Reconciliation error:\n{:?}", e);
}
}
})
.await;
}
#[instrument(err, fields(database_name = ?neon.metadata.name), skip(neon, context))]
async fn reconcile(neon: Arc<NeonDatabase>, context: Arc<ContextData>) -> Result<Action, Error> {
let client = context.kube_client.clone();
let namespace: String = match neon.namespace() {
Some(ns) => ns.to_string(),
None => return Err(Error::UserInputError("Missing namespace".to_string())),
};
minio::create_deployment(client.clone(), &namespace).await?;
minio::create_service(client.clone(), &namespace).await?;
neon::reconcile_storage_broker(client.clone(), &namespace).await?;
neon::reconcile_page_server(client.clone(), &namespace).await?;
neon::reconcile_safe_keepers(client.clone(), &namespace).await?;
neon::reconcile_compute(client.clone(), &namespace, &neon.spec.neon_image_ref).await?;
Ok(Action::requeue(Duration::from_secs(5)))
}
fn on_error(db: Arc<NeonDatabase>, error: &Error, _context: Arc<ContextData>) -> Action {
error!("Reconciliation error:\n{:?}.\n{:?}", error, db);
Action::requeue(Duration::from_secs(5))
}

183
src/minio.rs Normal file
View File

@ -0,0 +1,183 @@
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{ListParams, ObjectMeta, Patch, PatchParams};
use kube::{Api, Client, Error};
use std::collections::BTreeMap;
use tracing::{info, instrument};
const NAME: &str = "minio";
pub async fn create_deployment(client: Client, namespace: &str) -> Result<(), Error> {
let minio_container = Container {
name: NAME.to_string(),
env: Some(vec![
EnvVar {
name: "MINIO_ROOT_USER".to_string(),
value: Some("minio".to_string()),
..Default::default()
},
EnvVar {
name: "MINIO_ROOT_PASSWORD".to_string(),
value: Some("password".to_string()),
..Default::default()
},
]),
image: Some("quay.io/minio/minio:RELEASE.2022-10-20T00-55-09Z".to_string()),
ports: Some(vec![
ContainerPort {
container_port: 9000,
..Default::default()
},
ContainerPort {
container_port: 9001,
..Default::default()
},
]),
args: Some(
vec![
"server",
"/data",
"--address",
":9000",
"--console-address",
":9001",
]
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>(),
),
liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
path: Some("/minio/health/live".to_string()),
port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9000),
..Default::default()
}),
initial_delay_seconds: Some(10),
period_seconds: Some(30),
..Default::default()
}),
readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
path: Some("/minio/health/ready".to_string()),
port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9000),
..Default::default()
}),
initial_delay_seconds: Some(10),
period_seconds: Some(30),
..Default::default()
}),
..Default::default()
};
let mc_container = Container {
name: "mc".to_string(),
env: Some(vec![
EnvVar {
name: "MINIO_ROOT_USER".to_string(),
value: Some("minio".to_string()),
..Default::default()
},
EnvVar {
name: "MINIO_ROOT_PASSWORD".to_string(),
value: Some("password".to_string()),
..Default::default()
},
]),
image: Some("minio/mc".to_string()),
command: Some(vec!["bash".to_string(), "-c".to_string()]),
args: Some(vec!["until (/usr/bin/mc alias set minio http://minio:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD) do
echo 'Waiting to start minio...' && sleep 1;
done;
/usr/bin/mc mb --ignore-existing minio/neon --region=eu-north-1;
sleep inf;".to_string()]),
..Default::default()
};
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(NAME.to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![minio_container, mc_container],
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
..Default::default()
};
let deployment_api = Api::<Deployment>::namespaced(client, namespace);
deployment_api
.patch(
NAME,
&PatchParams::apply("neon-operator"),
&Patch::Apply(deployment),
)
.await?;
Ok(())
}
pub async fn create_service(client: Client, namespace: &str) -> Result<(), Error> {
let service = Service {
metadata: ObjectMeta {
name: Some(NAME.to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])),
..Default::default()
},
spec: Some(ServiceSpec {
selector: Some(BTreeMap::from([("app".to_string(), NAME.to_string())])),
ports: Some(vec![ServicePort {
port: 9000,
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
info!("Reconciling service");
Api::<Service>::namespaced(client, namespace)
.patch(
NAME,
&PatchParams::apply("neon-operator"),
&Patch::Apply(service),
)
.await?;
Ok(())
}
#[instrument(err, skip(client))]
pub async fn deployment_exists(client: Client, namespace: &str) -> Result<bool, Error> {
let deployment_api: Api<Deployment> = Api::namespaced(client, namespace);
let lp = ListParams {
label_selector: Some(format!("app={}", NAME)),
..Default::default()
};
let list_result = deployment_api.list(&lp).await?;
Ok(list_result.items.len() > 0)
}

456
src/neon.rs Normal file
View File

@ -0,0 +1,456 @@
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{ObjectMeta, Patch, PatchParams};
use kube::{Api, Client, Error};
use std::collections::BTreeMap;
// Workflow:
// 1. Create minio deployment
// 2. Create minio service
// 3. Ensure bucket exists
pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result<(), Error> {
let storage_broker = Deployment {
metadata: ObjectMeta {
name: Some("storage-broker".to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([(
"app".to_string(),
"storage-broker".to_string(),
)])),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
"storage-broker".to_string(),
)])),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "storage-broker".to_string(),
image: Some("neondatabase/neon".to_string()),
ports: Some(vec![ContainerPort {
container_port: 50051,
..Default::default()
}]),
command: Some(vec![
"storage_broker".to_string(),
"--listen-addr=0.0.0.0:50051".to_string(),
]),
..Default::default()
}],
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
..Default::default()
};
let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
deployment_api
.patch(
"storage-broker",
&PatchParams::apply("neon-operator"),
&Patch::Apply(storage_broker),
)
.await?;
let service = Service {
metadata: ObjectMeta {
name: Some("storage-broker".to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([(
"app".to_string(),
"storage-broker".to_string(),
)])),
..Default::default()
},
spec: Some(ServiceSpec {
selector: Some(BTreeMap::from([(
"app".to_string(),
"storage-broker".to_string(),
)])),
ports: Some(vec![ServicePort {
port: 50051,
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
let service_api = Api::<Service>::namespaced(client.clone(), namespace);
service_api
.patch(
"storage-broker",
&PatchParams::apply("neon-operator"),
&Patch::Apply(service),
)
.await?;
Ok(())
}
pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<(), Error> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some("pageserver".to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([(
"app".to_string(),
"pageserver".to_string(),
)])),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
"pageserver".to_string(),
)])),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "pageserver".to_string(),
image: Some("neondatabase/neon".to_string()),
ports: Some(vec![ContainerPort {
container_port: 9898,
name: Some("http".to_string()),
..Default::default()
}, ContainerPort {
container_port: 6400,
name: Some("pg".to_string()),
..Default::default()
}]),
env: Some(vec![
EnvVar {
name: "BROKER_ENDPOINT".to_string(),
value: Some("storage-broker:50051".to_string()),
..Default::default()
},
EnvVar {
name: "AWS_ACCESS_KEY_ID".to_string(),
value: Some("minio".to_string()),
..Default::default()
},
EnvVar {
name: "AWS_SECRET_ACCESS_KEY".to_string(),
value: Some("password".to_string()),
..Default::default()
},
]),
command: Some(vec!["/usr/local/bin/pageserver".to_string()]),
args: Some(vec![
"-D".to_string(),
"/data/.neon/".to_string(),
"-c".to_string(),
r#"broker_endpoint='http://storage-broker:50051'"#.to_string(),
"-c".to_string(),
r#"listen_pg_addr='0.0.0.0:6400'"#.to_string(),
"-c".to_string(),
r#"listen_http_addr='0.0.0.0:9898'"#.to_string(),
"-c".to_string(),
r#"remote_storage={endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/pageserver/'}"#.to_string(),
]),
..Default::default()
}],
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
..Default::default()
};
let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
deployment_api
.patch(
"pageserver",
&PatchParams::apply("neon_operator"),
&Patch::Apply(deployment),
)
.await?;
let service = Service {
metadata: ObjectMeta {
name: Some("pageserver".to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([(
"app".to_string(),
"pageserver".to_string(),
)])),
..Default::default()
},
spec: Some(ServiceSpec {
selector: Some(BTreeMap::from([(
"app".to_string(),
"pageserver".to_string(),
)])),
ports: Some(vec![
ServicePort {
port: 9898,
name: Some("http".to_string()),
..Default::default()
},
ServicePort {
port: 6400,
name: Some("pg".to_string()),
..Default::default()
},
]),
..Default::default()
}),
..Default::default()
};
let service_api = Api::<Service>::namespaced(client.clone(), namespace);
service_api
.patch(
"pageserver",
&PatchParams::apply("neon_operator"),
&Patch::Apply(service),
)
.await?;
Ok(())
}
pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<(), Error> {
let stateful_set = StatefulSet {
metadata: ObjectMeta {
name: Some("safe-keeper".to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
service_name: "safe-keeper".to_string(),
replicas: Some(3),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([(
"app".to_string(),
"safe-keeper".to_string(),
)])),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
"safe-keeper".to_string(),
)])),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "safe-keeper".to_string(),
image: Some("neondatabase/neon".to_string()),
ports: Some(vec![ContainerPort {
container_port: 7676,
..Default::default()
}]),
command: Some(vec![
"bash".to_string(),
"-c".to_string(),
]),
args: Some(vec![
r#"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
export ordinal=${BASH_REMATCH[1]}
safekeeper --listen-pg=$SAFEKEEPER_ADVERTISE_URL \
--listen-http='0.0.0.0:7676' \
--id=$ordinal \
--broker-endpoint=$BROKER_ENDPOINT \
-D /data \
--remote-storage="{endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/safekeeper/'}"
"#.to_string(),
]),
env: Some(vec![
EnvVar {
name: "POD_NAME".to_string(),
value_from: Some(k8s_openapi::api::core::v1::EnvVarSource {
field_ref: Some(k8s_openapi::api::core::v1::ObjectFieldSelector {
field_path: "metadata.name".to_string(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
},
EnvVar {
name: "SAFEKEEPER_ADVERTISE_URL".to_string(),
value: Some("$(POD_NAME).safe-keeper.default.svc.cluster.local:5454".to_string()),
..Default::default()
},
EnvVar {
name: "BROKER_ENDPOINT".to_string(),
value: Some("http://storage-broker:50051".to_string()),
..Default::default()
},
EnvVar {
name: "AWS_ACCESS_KEY_ID".to_string(),
value: Some("minio".to_string()),
..Default::default()
},
EnvVar {
name: "AWS_SECRET_ACCESS_KEY".to_string(),
value: Some("password".to_string()),
..Default::default()
},
]),
..Default::default()
}],
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
..Default::default()
};
let headless_service = Service {
metadata: ObjectMeta {
name: Some("safe-keeper".to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([(
"app".to_string(),
"safe-keeper".to_string(),
)])),
..Default::default()
},
spec: Some(ServiceSpec {
cluster_ip: Some("None".to_string()),
selector: Some(BTreeMap::from([(
"app".to_string(),
"safe-keeper".to_string(),
)])),
ports: Some(vec![ServicePort {
port: 5454,
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
let stateful_set_api: Api<StatefulSet> = Api::namespaced(client.clone(), &namespace);
stateful_set_api
.patch(
"safe-keeper",
&PatchParams::apply("neon_operator"),
&Patch::Apply(stateful_set),
)
.await?;
let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
service_api
.patch(
"safe-keeper",
&PatchParams::apply("neon_operator"),
&Patch::Apply(headless_service),
)
.await?;
Ok(())
}
pub async fn reconcile_compute(client: Client, namespace: &str, image: &str) -> Result<(), Error> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some("compute".to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "compute".to_string(),
image: Some(image.to_string()),
ports: Some(vec![ContainerPort {
container_port: 9898,
..Default::default()
}]),
env: Some(vec![EnvVar {
name: "PG_VERSION".to_string(),
value: Some("15".to_string()),
..Default::default()
}]),
command: Some(vec!["/shell/compute.sh".to_string()]),
..Default::default()
}],
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
..Default::default()
};
let service = Service {
metadata: ObjectMeta {
name: Some("compute".to_string()),
namespace: Some(namespace.to_string()),
labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
..Default::default()
},
spec: Some(ServiceSpec {
selector: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
ports: Some(vec![ServicePort {
port: 9898,
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
let deployment_api = Api::<Deployment>::namespaced(client.clone(), &namespace);
deployment_api
.patch(
"compute",
&PatchParams::apply("neon_operator"),
&Patch::Apply(deployment),
)
.await?;
let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
service_api
.patch(
"compute",
&PatchParams::apply("neon_operator"),
&Patch::Apply(service),
)
.await?;
Ok(())
}