How to set up Ververica Platform with Google Cloud Storage (GCS)

Intro

Flink before 1.15 does not bundle libraries for working with GCS. There is, however, a connector from Google based on Hadoop. In order to use the gs:// scheme for Universal Blob Storage, it is required to bundle this library along with the shaded Hadoop JAR.

The following steps describe how to set up a GCS bucket as Ververica Platform's blob storage location.

Build a custom Flink image

Note: This step applies to Ververica Platform 2.2 -2.7. The Flink images come with Ververica Platform 2.8 or later supports GCS out of the box.

Flink 1.15 in Ververica Platform 2.7

1) Create a Dockerfile with the following content (and any other extensions you need)

FROM registry.ververica.com/v2.7/flink:1.15.2-stream2-scala_2.12-java8

RUN set -e \
  && mkdir /flink/plugins/flink-gs-fs-hadoop \
  && ln -s /flink/opt/flink-gs-fs-hadoop-*.jar /flink/plugins/flink-gs-fs-hadoop/

Note: See Ververica Platform Docker Images for a full list of all available Flink images to extend but make sure to choose the appropriate version of the docs (bottom left of the page).

2) Build and publish to your docker registry:

docker build -t <YOUR_DOCKER_REGISTRY>/flink-gcs:1.15.2-stream2-scala_2.12-java8 .
docker push <YOUR_DOCKER_REGISTRY>/flink-gcs:1.15.2-stream2-scala_2.12-java8

Flink 1.10-1.14

1) Download the following GCS dependencies:

2) Create a Dockerfile with the following content (and any other extensions you need)

FROM registry.ververica.com/v2.6/flink:1.14.3-stream1-scala_2.12-java8

COPY gcs-connector-hadoop2-latest.jar ../lib/
RUN set -e \
  && ln -s /flink/opt/flink-shaded-hadoop/flink-shaded-hadoop-2-uber-*.jar \
     /flink/lib/

Note: See Ververica Platform Docker Images for a full list of all available Flink images to extend but make sure to choose the appropriate version of the docs (bottom left of the page).

3) Build and publish to your docker registry:

docker build -t <YOUR_DOCKER_REGISTRY>/flink-gcs:1.14.3-stream1-scala_2.12-java8 .
docker push <YOUR_DOCKER_REGISTRY>/flink-gcs:1.14.3-stream1-scala_2.12-java8

Flink 1.9 and below

1) Download the following GCS dependencies:

2) Create a Dockerfile with the following content (and any other extensions you need)

FROM registry.ververica.com/v2.3/flink:1.9.3-stream2-scala_2.11

COPY gcs-connector-hadoop2-latest.jar ../lib/
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ../lib/

Note: See Ververica Platform Docker Images for a full list of all available Flink images to extend but make sure to choose the appropriate version of the docs (bottom left of the page).

3) Build and publish to your docker registry:

docker build -t <YOUR_DOCKER_REGISTRY>/flink-gcs:1.9.3-stream2-scala_2.11 .
docker push <YOUR_DOCKER_REGISTRY>/flink-gcs:1.9.3-stream2-scala_2.11

Prepare VVP:

Amend your values.yaml with the following parameters:

vvp:
  blobStorage:
    baseUri: gs://YOUR_GCS_BUCKET

  flinkDeploymentDefaults:
    registry: YOUR_DOCKER_REGISTRY
    repository: flink-gcs

  globalDeploymentDefaults: |
    spec:
      template:
        spec:
          kubernetes:
            jobManagerPodTemplate:
              spec:
                initContainers:
                  - name: artifact-fetcher
                    env:
                      - name: GOOGLE_APPLICATION_CREDENTIALS
                        value: /flink/gcs-creds/gcs-key.json
                    volumeMounts:
                      - name: gcs-key
                        mountPath: /flink/gcs-creds
                containers:
                  - name: flink-jobmanager
                    env:
                      - name: GOOGLE_APPLICATION_CREDENTIALS
                        value: /flink/gcs-creds/gcs-key.json
                    volumeMounts:
                      - name: gcs-key
                        mountPath: /flink/gcs-creds
                volumes:
                  - name: gcs-key
                    secret:
                      secretName: gcs-key
            taskManagerPodTemplate:
              spec:
                initContainers:
                  - name: artifact-fetcher
                    env:
                      - name: GOOGLE_APPLICATION_CREDENTIALS
                        value: /flink/gcs-creds/gcs-key.json
                    volumeMounts:
                      - name: gcs-key
                        mountPath: /flink/gcs-creds
                containers:
                  - name: flink-taskmanager
                    env:
                      - name: GOOGLE_APPLICATION_CREDENTIALS
                        value: /flink/gcs-creds/gcs-key.json
                    volumeMounts:
                      - name: gcs-key
                        mountPath: /flink/gcs-creds
                volumes:
                  - name: gcs-key
                    secret:
                      secretName: gcs-key


  globalSessionClusterDefaults: |
    spec:
      kubernetes:
        jobManagerPodTemplate:
          spec:
            containers:
              - name: flink-jobmanager
                env:
                  - name: GOOGLE_APPLICATION_CREDENTIALS
                    value: /flink/gcs-creds/gcs-key.json
                volumeMounts:
                  - name: gcs-key
                    mountPath: /flink/gcs-creds
            volumes:
              - name: gcs-key
                secret:
                  secretName: gcs-key
        taskManagerPodTemplate:
          spec:
            containers:
              - name: flink-taskmanager
                env:
                  - name: GOOGLE_APPLICATION_CREDENTIALS
                    value: /flink/gcs-creds/gcs-key.json
                volumeMounts:
                  - name: gcs-key
                    mountPath: /flink/gcs-creds
            volumes:
              - name: gcs-key
                secret:
                  secretName: gcs-key

volumes:
  - name: "gcs-key"
    secret:
      secretName: "gcs-key"

volumeMounts:
  - name: "gcs-key"
    mountPath: "/vvp/gcs-creds"

env:
  - name:  "GOOGLE_APPLICATION_CREDENTIALS"
    value: "/vvp/gcs-creds/gcs-key.json"

Important: This example uses the new (and recommended) Flink Pod Templates for specifying pod options in Ververica Platform. These are mutually exclusive to the old Kubernetes Pod Options! If you are still using these, you will have to adapt the code above.

Note: If you are not publishing all available Flink images for your version of Ververica Platform, you may want to configure Apache Flink Version Metadata to define the Flink versions available in the UI and used by the SQL service. This can be done the vvp.flinkVersionMetadata property of your values.yaml file as documented.

Permissions for GCS:

Create an appropriate service account (if not already available)

Note: This step is optional if you already have service account credentials with the Storage Admin role permissions for your designated GCS bucket

1) If you do not yet have a service account, create one:

gcloud iam service-accounts create gcs-vvp-service-acc \
--description="Service account for VVP GCS" \
--display-name="gcs-vvp-service-acc"

2) Assuming that you use the "uniform" bucket access control, navigate to Storage Browser, click three dots next to your bucket -> Edit access -> + Add principal -> and paste your full service account ID, e.g. gcs-vvp-service-acc@<YOUR_PROJECT>.iam.gserviceaccount.com.

3) In Select a role, pick "Storage Admin". Save.

Retrieve service account credentials

1) Navigate to Service Accounts, click three dots next to service account you created or want to use, e.g. gcs-vvp-service-acc, -> "Manage Keys". Then create a new JSON key and store it as gcs-key.json.

2) Create a Kubernetes secret in the namespace where Ververica Platform is installed (vvp in this example):

kubectl create secret generic gcs-key --from-file=./gcs-key.json -n vvp

3) Repeat for each namespace that is used as a Deployment Target (vvp-jobs in this example):

kubectl create secret generic gcs-key --from-file=./gcs-key.json -n vvp-jobs

Wrapup

Install or upgrade Ververica Platform, e.g. by following our Getting Started Guide and using the values.yaml file created above.

New deployments will now create checkpoints and savepoints in gs://<YOUR_GCS_BUCKET>.

If you want to adapt existing deployments, you need to modify their YAML specification (Deployment -> Configure -> YAML) and add the parameters from the globalDeploymentDefaults section of the values.yaml shown above.