czwartek, 9 kwietnia 2020

Processing Kafka topic with OpenShift Serverless service in 10 steps

In this post I'll describe how you can easily process Kafka topic messages using Serverless service in OpenShift 4.

1. First you'll need to install Red Hat Integration - AMQ Streams, OpenShift Serverless, Knative Eventing and Knative Apache Kafka operators. 
Please refer to OpenShift documentation on how to install operators using either OpenShift Web Console or oc command line interface.
In essence you'll need to login to the OpenShift cluster with cluster-admin credentials and install all 4 operators cluster wide.


If you want to install this demo quicky you can use this helm3 chart otherwise follow step by step instruction below.

2. Let's create project for our demo
$ oc new-project streams-serverless-demo
3. Create Kafka cluster using AMQ Streams Operator

Go to the list of installed operators available in the project created above and
Click on Red Hat Integration - AMQ Streams


Click on Create Instance link in Kafka box



You can keep default settings and click on Create button. Wait until Kafka cluster is up and running.


4.  Create Knative Eventing Kafka from Knative Apache Kafka Operator

Come back to list of installed operators and click on Knative Apache Kafka Operator.




Click on Create Instance link in Knative components for Apache Kafka box


Make sure to set bootstrapServers value to the name of your Kafka cluster bootstrap service. For default configuration this will be 'my-cluster-kafka-bootstrap:9092'. Click Create button.

5. Create Kafka topic from AMQ Streams Operator

Come back to list of installed operators and click on Red Hat Integration - AMQ Streams. Click on Create Instance link in Kafka Topic


You can keep default settings and click on Create button

6. For testing purposes create Kafka Bridge from AMQ Streams Operator

Come back to list of installed operators and click again on Red Hat Integration - AMQ Streams. Click on Create Instance link in Kafka Bridge


Make sure to set bootstrapServers value to the name of your Kafka cluster bootstrap service. For default configuration this will be 'my-cluster-kafka-bootstrap:9092'. Click Create button.

Before moving forward this is good time to send test message to Kafka topic
$ oc expose svc my-bridge-bridge-service
$ ROUTE=$(oc get route | grep my-bridge | awk '{print $2}')  && echo $ROUTE

$ curl -X POST $ROUTE/topics/my-topic -H 'content-type: application/vnd.kafka.json.v2+json' -d '{"records": [{"value": "hello from shadowman"}]}'
You should get following response:
{"offsets":[{"partition":0,"offset":0}]}
7. Create Knative Serving from OpenShift Serverless operator in knative-serving project

Please refer to OpenShift documentation on how to create Knative Service configuration.

8. Create Knative Service which will consume Kafka topic data

Come back to your project and create Serverless service
$ oc project streams-serverless-demo
$ cat <<EOF | oc apply -f -
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: myknativesink
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: quay.io/jstakun/myknativesink:0.1
        env:
          - name: EVENT_PROCESSING_TIME
            value: "random:10000" 
        resources:
          requests:
            memory: "50Mi"
            cpu: "100m"
          limits:
            memory: "50Mi"
            cpu: "100m"
        livenessProbe:
          httpGet:
            path: /healthz
        readinessProbe:
          httpGet:
            path: /healthz
EOF
9. Create the KafkaSource that connects Kafka topic to Serverless service.

Make sure to set bootstrapServers value to the name of your Kafka cluster bootstrap service. For default configuration this will be 'my-cluster-kafka-bootstrap:9092'.
$ cat <<EOF | oc apply -f -
apiVersion: sources.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: mykafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  topics: my-topic
  sink:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: myknativesink
EOF
10. Finally generate stream of events published to Kafka topic and consumed by Serverless service
$ ROUTE=$(oc get route | grep my-bridge | awk '{print $2}') && echo $ROUTE
$ while true;
curl -X POST $ROUTE/topics/my-topic -H 'content-type: application/vnd.kafka.json.v2+json' -d '{"records": [{"value": "hello from shadowman"}]}'
echo;   
do sleep 0.5;
done;
You should see Serverless service is scaled up and down depending on the traffic


Many thanks to Burr Sutter for inspiring demo!