use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec, StatefulSet};
use k8s_openapi::api::core::v1::{
    Container, ContainerPort, EnvVar, PodSpec, PodTemplateSpec, Service, ServicePort, ServiceSpec,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{ObjectMeta, Patch, PatchParams};
use kube::{Api, Client, Error};
use std::collections::BTreeMap;

// Workflow:
// 1. Create minio deployment
// 2. Create minio service
// 3. Ensure bucket exists

pub async fn reconcile_storage_broker(client: Client, namespace: &str) -> Result<(), Error> {
    let storage_broker = Deployment {
        metadata: ObjectMeta {
            name: Some("storage-broker".to_string()),
            namespace: Some(namespace.to_string()),
            ..Default::default()
        },
        spec: Some(DeploymentSpec {
            replicas: Some(1),
            selector: LabelSelector {
                match_labels: Some(BTreeMap::from([(
                    "app".to_string(),
                    "storage-broker".to_string(),
                )])),
                ..Default::default()
            },
            template: PodTemplateSpec {
                metadata: Some(ObjectMeta {
                    labels: Some(BTreeMap::from([(
                        "app".to_string(),
                        "storage-broker".to_string(),
                    )])),
                    ..Default::default()
                }),
                spec: Some(PodSpec {
                    containers: vec![Container {
                        name: "storage-broker".to_string(),
                        image: Some("neondatabase/neon".to_string()),
                        ports: Some(vec![ContainerPort {
                            container_port: 50051,
                            ..Default::default()
                        }]),
                        command: Some(vec![
                            "storage_broker".to_string(),
                            "--listen-addr=0.0.0.0:50051".to_string(),
                        ]),
                        ..Default::default()
                    }],
                    ..Default::default()
                }),
                ..Default::default()
            },
            ..Default::default()
        }),
        ..Default::default()
    };

    let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
    deployment_api
        .patch(
            "storage-broker",
            &PatchParams::apply("neon-operator"),
            &Patch::Apply(storage_broker),
        )
        .await?;

    let service = Service {
        metadata: ObjectMeta {
            name: Some("storage-broker".to_string()),
            namespace: Some(namespace.to_string()),
            labels: Some(BTreeMap::from([(
                "app".to_string(),
                "storage-broker".to_string(),
            )])),
            ..Default::default()
        },
        spec: Some(ServiceSpec {
            selector: Some(BTreeMap::from([(
                "app".to_string(),
                "storage-broker".to_string(),
            )])),
            ports: Some(vec![ServicePort {
                port: 50051,
                ..Default::default()
            }]),
            ..Default::default()
        }),
        ..Default::default()
    };

    let service_api = Api::<Service>::namespaced(client.clone(), namespace);
    service_api
        .patch(
            "storage-broker",
            &PatchParams::apply("neon-operator"),
            &Patch::Apply(service),
        )
        .await?;

    Ok(())
}

pub async fn reconcile_page_server(client: Client, namespace: &str) -> Result<(), Error> {
    let deployment = Deployment {
        metadata: ObjectMeta {
            name: Some("pageserver".to_string()),
            namespace: Some(namespace.to_string()),
            ..Default::default()
        },
        spec: Some(DeploymentSpec {
            replicas: Some(1),
            selector: LabelSelector {
                match_labels: Some(BTreeMap::from([(
                    "app".to_string(),
                    "pageserver".to_string(),
                )])),
                ..Default::default()
            },
            template: PodTemplateSpec {
                metadata: Some(ObjectMeta {
                    labels: Some(BTreeMap::from([(
                        "app".to_string(),
                        "pageserver".to_string(),
                    )])),
                    ..Default::default()
                }),
                spec: Some(PodSpec {
                    containers: vec![Container {
                        name: "pageserver".to_string(),
                        image: Some("neondatabase/neon".to_string()),
                        ports: Some(vec![ContainerPort {
                            container_port: 9898,
                            name: Some("http".to_string()),
                            ..Default::default()
                        }, ContainerPort {
                            container_port: 6400,
                            name: Some("pg".to_string()),
                            ..Default::default()
                        }]),
                        env: Some(vec![
                            EnvVar {
                                name: "BROKER_ENDPOINT".to_string(),
                                value: Some("storage-broker:50051".to_string()),
                                ..Default::default()
                            },
                            EnvVar {
                                name: "AWS_ACCESS_KEY_ID".to_string(),
                                value: Some("minio".to_string()),
                                ..Default::default()
                            },
                            EnvVar {
                                name: "AWS_SECRET_ACCESS_KEY".to_string(),
                                value: Some("password".to_string()),
                                ..Default::default()
                            },
                        ]),
                        command: Some(vec!["/usr/local/bin/pageserver".to_string()]),
                        args: Some(vec![
                            "-D".to_string(),
                            "/data/.neon/".to_string(),
                            "-c".to_string(),
                            r#"broker_endpoint='http://storage-broker:50051'"#.to_string(),
                            "-c".to_string(),
                            r#"listen_pg_addr='0.0.0.0:6400'"#.to_string(),
                            "-c".to_string(),
                            r#"listen_http_addr='0.0.0.0:9898'"#.to_string(),
                            "-c".to_string(),
                            r#"remote_storage={endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/pageserver/'}"#.to_string(),
                        ]),
                        ..Default::default()
                    }],
                    ..Default::default()
                }),
                ..Default::default()
            },
            ..Default::default()
        }),
        ..Default::default()
    };

    let deployment_api = Api::<Deployment>::namespaced(client.clone(), namespace);
    deployment_api
        .patch(
            "pageserver",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(deployment),
        )
        .await?;

    let service = Service {
        metadata: ObjectMeta {
            name: Some("pageserver".to_string()),
            namespace: Some(namespace.to_string()),
            labels: Some(BTreeMap::from([(
                "app".to_string(),
                "pageserver".to_string(),
            )])),
            ..Default::default()
        },
        spec: Some(ServiceSpec {
            selector: Some(BTreeMap::from([(
                "app".to_string(),
                "pageserver".to_string(),
            )])),
            ports: Some(vec![
                ServicePort {
                    port: 9898,
                    name: Some("http".to_string()),
                    ..Default::default()
                },
                ServicePort {
                    port: 6400,
                    name: Some("pg".to_string()),
                    ..Default::default()
                },
            ]),
            ..Default::default()
        }),
        ..Default::default()
    };

    let service_api = Api::<Service>::namespaced(client.clone(), namespace);
    service_api
        .patch(
            "pageserver",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(service),
        )
        .await?;

    Ok(())
}

pub async fn reconcile_safe_keepers(client: Client, namespace: &str) -> Result<(), Error> {
    let stateful_set = StatefulSet {
        metadata: ObjectMeta {
            name: Some("safe-keeper".to_string()),
            namespace: Some(namespace.to_string()),
            ..Default::default()
        },
        spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
            service_name: "safe-keeper".to_string(),
            replicas: Some(3),
            selector: LabelSelector {
                match_labels: Some(BTreeMap::from([(
                    "app".to_string(),
                    "safe-keeper".to_string(),
                )])),
                ..Default::default()
            },
            template: PodTemplateSpec {
                metadata: Some(ObjectMeta {
                    labels: Some(BTreeMap::from([(
                        "app".to_string(),
                        "safe-keeper".to_string(),
                    )])),
                    ..Default::default()
                }),
                spec: Some(PodSpec {
                    containers: vec![Container {
                        name: "safe-keeper".to_string(),
                        image: Some("neondatabase/neon".to_string()),
                        ports: Some(vec![ContainerPort {
                            container_port: 7676,
                            ..Default::default()
                        }]),
                        command: Some(vec![
                            "bash".to_string(),
                            "-c".to_string(),
                        ]),
                        args: Some(vec![
                            r#"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
                            export ordinal=${BASH_REMATCH[1]}
                            safekeeper --listen-pg=$SAFEKEEPER_ADVERTISE_URL \
                                --listen-http='0.0.0.0:7676' \
                                --id=$ordinal \
                                --broker-endpoint=$BROKER_ENDPOINT \
                                -D /data \
                                --remote-storage="{endpoint='http://minio:9000',bucket_name='neon',bucket_region='eu-north-1',prefix_in_bucket='/safekeeper/'}"
                            "#.to_string(),
                        ]),
                        env: Some(vec![
                                EnvVar {
                                    name: "POD_NAME".to_string(),
                                    value_from: Some(k8s_openapi::api::core::v1::EnvVarSource {
                                        field_ref: Some(k8s_openapi::api::core::v1::ObjectFieldSelector {
                                            field_path: "metadata.name".to_string(),
                                            ..Default::default()
                                        }),
                                        ..Default::default()
                                    }),
                                    ..Default::default()
                                },
                                EnvVar {
                                    name: "SAFEKEEPER_ADVERTISE_URL".to_string(),
                                    value: Some("$(POD_NAME).safe-keeper.default.svc.cluster.local:5454".to_string()),
                                    ..Default::default()
                                },
                                EnvVar {
                                    name: "BROKER_ENDPOINT".to_string(),
                                    value: Some("http://storage-broker:50051".to_string()),
                                    ..Default::default()
                                },
                                EnvVar {
                                    name: "AWS_ACCESS_KEY_ID".to_string(),
                                    value: Some("minio".to_string()),
                                    ..Default::default()
                                },
                                EnvVar {
                                    name: "AWS_SECRET_ACCESS_KEY".to_string(),
                                    value: Some("password".to_string()),
                                    ..Default::default()
                                },
                            ]),
                        ..Default::default()
                    }],
                    ..Default::default()
                }),
                ..Default::default()
            },
            ..Default::default()
        }),
        ..Default::default()
    };

    let headless_service = Service {
        metadata: ObjectMeta {
            name: Some("safe-keeper".to_string()),
            namespace: Some(namespace.to_string()),
            labels: Some(BTreeMap::from([(
                "app".to_string(),
                "safe-keeper".to_string(),
            )])),
            ..Default::default()
        },
        spec: Some(ServiceSpec {
            cluster_ip: Some("None".to_string()),
            selector: Some(BTreeMap::from([(
                "app".to_string(),
                "safe-keeper".to_string(),
            )])),
            ports: Some(vec![ServicePort {
                port: 5454,
                ..Default::default()
            }]),
            ..Default::default()
        }),
        ..Default::default()
    };

    let stateful_set_api: Api<StatefulSet> = Api::namespaced(client.clone(), &namespace);
    stateful_set_api
        .patch(
            "safe-keeper",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(stateful_set),
        )
        .await?;

    let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
    service_api
        .patch(
            "safe-keeper",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(headless_service),
        )
        .await?;

    Ok(())
}

pub async fn reconcile_compute(
    client: Client,
    namespace: &str,
    compute_image: &str,
) -> Result<(), Error> {
    let deployment = Deployment {
        metadata: ObjectMeta {
            name: Some("compute".to_string()),
            namespace: Some(namespace.to_string()),
            ..Default::default()
        },
        spec: Some(DeploymentSpec {
            replicas: Some(1),
            selector: LabelSelector {
                match_labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
                ..Default::default()
            },
            template: PodTemplateSpec {
                metadata: Some(ObjectMeta {
                    labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
                    ..Default::default()
                }),
                spec: Some(PodSpec {
                    containers: vec![Container {
                        name: "compute".to_string(),
                        image: Some(compute_image.to_string()),
                        ports: Some(vec![ContainerPort {
                            container_port: 9898,
                            ..Default::default()
                        }]),
                        env: Some(vec![EnvVar {
                            name: "PG_VERSION".to_string(),
                            value: Some("15".to_string()),
                            ..Default::default()
                        }]),
                        command: Some(vec!["/shell/compute.sh".to_string()]),
                        ..Default::default()
                    }],
                    ..Default::default()
                }),
                ..Default::default()
            },
            ..Default::default()
        }),
        ..Default::default()
    };

    let service = Service {
        metadata: ObjectMeta {
            name: Some("compute".to_string()),
            namespace: Some(namespace.to_string()),
            labels: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
            ..Default::default()
        },
        spec: Some(ServiceSpec {
            selector: Some(BTreeMap::from([("app".to_string(), "compute".to_string())])),
            ports: Some(vec![ServicePort {
                port: 9898,
                ..Default::default()
            }]),
            ..Default::default()
        }),
        ..Default::default()
    };

    let deployment_api = Api::<Deployment>::namespaced(client.clone(), &namespace);
    deployment_api
        .patch(
            "compute",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(deployment),
        )
        .await?;

    let service_api = Api::<Service>::namespaced(client.clone(), &namespace);
    service_api
        .patch(
            "compute",
            &PatchParams::apply("neon_operator"),
            &Patch::Apply(service),
        )
        .await?;

    Ok(())
}