dapr 103 - Pub/Sub and Observability Metrics

Posts in this series

  1. dapr 101 - Concepts and Setup
  2. dapr 102 - State Management and Tracing
  3. dapr 103 - Pub/Sub and Observability Metrics (This Post)

In a Galaxy far far away…

In the last post, we looked at how to write services using python and golang that will leverage dapr and it’s state management component to store the data in redis and retrieve it with the help of an API. We also looked at how to create distibuted tracing visualization using jaeger with dapr while performing Service to service invocation.

Goals

With the above basic code and infrastructure in place we discussed in Post #2, this post will try to extend those same service to demonstrate how to create a pub/sub based APIs leveraging the dapr building blocks as well as try to gather and visualize the observability metrics of the services using grafana and default dashboard provided by the dapr core team.

Pre-Requisites

Kubernetes Cluster and dapr Setup

Please make sure your cluster is in the following state before proceeding to read the rest of this post and want to follow along with the examples and try them out yourself.

~ via 🐍 dapr-series at ☸️  v1.18.4+k3s1 k3s-default
✦ ➜ kubectl get nodes -o wide
NAME                       STATUS   ROLES    AGE     VERSION        INTERNAL-IP   EXTERNAL-IP   OS-IMAGE   KERNEL-VERSION     CONTAINER-RUNTIME
k3d-k3s-default-worker-0   Ready    <none>   3d21h   v1.18.4+k3s1   172.21.0.3    <none>        Unknown    4.19.76-linuxkit   containerd://1.3.3-k3s2
k3d-k3s-default-worker-1   Ready    <none>   3d21h   v1.18.4+k3s1   172.21.0.4    <none>        Unknown    4.19.76-linuxkit   containerd://1.3.3-k3s2
k3d-k3s-default-worker-2   Ready    <none>   3d21h   v1.18.4+k3s1   172.21.0.5    <none>        Unknown    4.19.76-linuxkit   containerd://1.3.3-k3s2
k3d-k3s-default-server     Ready    master   3d21h   v1.18.4+k3s1   172.21.0.2    <none>        Unknown    4.19.76-linuxkit   containerd://1.3.3-k3s2
(dapr-series)
~ via 🐍 dapr-series at ☸️  v1.18.4+k3s1 k3s-default
✦ ➜ kubectl get pods -o wide
NAME                                    READY   STATUS    RESTARTS   AGE     IP           NODE                       NOMINATED NODE   READINESS GATES
redis-leader-7d557b94bb-rdtqx           1/1     Running   0          3d21h   10.42.1.3    k3d-k3s-default-worker-0   <none>           <none>
svclb-service-1-4mcrn                   1/1     Running   0          2d21h   10.42.1.10   k3d-k3s-default-worker-0   <none>           <none>
svclb-service-1-btpxq                   1/1     Running   0          2d21h   10.42.2.7    k3d-k3s-default-worker-1   <none>           <none>
svclb-service-1-kclct                   1/1     Running   0          2d21h   10.42.0.11   k3d-k3s-default-server     <none>           <none>
svclb-service-1-6jh9t                   1/1     Running   0          2d21h   10.42.3.8    k3d-k3s-default-worker-2   <none>           <none>
dapr-sidecar-injector-c898fb49b-9scq7   1/1     Running   0          2d20h   10.42.2.11   k3d-k3s-default-worker-1   <none>           <none>
dapr-sentry-58c576ff98-s5mwx            1/1     Running   0          2d20h   10.42.3.14   k3d-k3s-default-worker-2   <none>           <none>
dapr-operator-75b4f7986b-kwdvv          1/1     Running   0          2d20h   10.42.3.12   k3d-k3s-default-worker-2   <none>           <none>
dapr-placement-84f9cd87b7-x2tcq         1/1     Running   1          2d20h   10.42.3.13   k3d-k3s-default-worker-2   <none>           <none>
svclb-service-2-nrm4k                   1/1     Running   0          2d1h    10.42.2.13   k3d-k3s-default-worker-1   <none>           <none>
svclb-service-2-z985z                   1/1     Running   0          2d1h    10.42.3.15   k3d-k3s-default-worker-2   <none>           <none>
svclb-service-2-nsg5t                   1/1     Running   0          2d1h    10.42.0.22   k3d-k3s-default-server     <none>           <none>
svclb-service-2-hw2nq                   1/1     Running   0          2d1h    10.42.1.21   k3d-k3s-default-worker-0   <none>           <none>
jaeger-5dc87df5df-zlq6x                 1/1     Running   0          44h     10.42.0.23   k3d-k3s-default-server     <none>           <none>
service-2-867984df96-mp6fv              2/2     Running   0          21h     10.42.2.19   k3d-k3s-default-worker-1   <none>           <none>
service-1-5989969f84-6st8c              2/2     Running   0          21h     10.42.0.26   k3d-k3s-default-server     <none>           <none>
(dapr-series)
~ via 🐍 dapr-series at ☸️  v1.18.4+k3s1 k3s-default
✦ ➜ kubectl get svc -o wide
NAME                    TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)                               AGE     SELECTOR
kubernetes              ClusterIP      10.43.0.1       <none>        443/TCP                               3d21h   <none>
dapr-api                ClusterIP      10.43.39.14     <none>        80/TCP                                3d21h   app=dapr-operator
dapr-placement          ClusterIP      10.43.245.162   <none>        80/TCP                                3d21h   app=dapr-placement
dapr-sentry             ClusterIP      10.43.84.125    <none>        80/TCP                                3d21h   app=dapr-sentry
dapr-sidecar-injector   ClusterIP      10.43.252.177   <none>        443/TCP                               3d21h   app=dapr-sidecar-injector
redis-leader            ClusterIP      10.43.205.118   <none>        6379/TCP                              3d21h   app=redis,role=master,tier=backend
service-1-dapr          ClusterIP      None            <none>        80/TCP,50001/TCP,50002/TCP,9090/TCP   2d21h   app=service-1
service-2-dapr          ClusterIP      None            <none>        80/TCP,50001/TCP,50002/TCP,9090/TCP   2d2h    app=service-2
jaeger                  ClusterIP      10.43.160.63    <none>        16686/TCP,9411/TCP                    44h     app=jaeger
service-2               LoadBalancer   10.43.87.66     172.21.0.4    81:31213/TCP                          2d2h    app=service-2
service-1               LoadBalancer   10.43.111.170   172.21.0.5    80:30427/TCP                          2d21h   app=service-1

If you don’t have the state in this way and want to get it up to this state, please refer to Post #1 and Post #2 for the details on how to setup the cluster and write the first two services we are calling service-1 and service-2.

Code Repo

Please clone the harshanarayana/dapr-servies and checkout the dapr-103 branch to follow along with this post to make it easier.

git clone git@github.com:harshanarayana/dapr-series.git
cd dapr-series
git checkout dapr-103

Once you checkout the repo, the structure would look like this.

Python Service

service-1 will listen to events of type (i.e. topic) t1 and t2. If the event received is of topic t1, then it will invoke service-2 with the same payload and based on the data retrieved by that, it will take necessary actions. In case if the event received is of topic t2, then it will simply log the event body into the console after an async delay of 10 seconds. This is done so that I can pretend to make this service do something useful.

Design

sequenceDiagram Service 1 ->> Dapr: Subscribe To Events Dapr -->> Service 1: Events alt is t1 Service 1 -->> Service 2: Service to Service Request via `dapr` Service 2 -->> Service 1: Response alt is OK Service 1 ->> Logger: Display OK else is NOTOK Service 1 ->> Logger: Display ERROR end else is t2 Service 1 ->> Logger: Display Event end

Create Subscription

In order for an API to subscribe to an event in dapr, it needs to expose a GET /dapr/subscribe API endpoint with the following spect.

{
    "$schema": "http://json-schema.org/draft-07/schema",
    "$id": "http://example.com/example.json",
    "type": "array",
    "title": "The root schema",
    "description": "The root schema comprises the entire JSON document.",
    "default": [],
    "additionalItems": true,
    "items": {
        "anyOf": [
            {
                "$id": "#/items/anyOf/0",
                "type": "object",
                "title": "Subscription Items",
                "description": "Each entity defines a specific topic to bind to and where to route the requests when there is an event of that topic.",
                "default": {},
                "required": [
                    "topic",
                    "route"
                ],
                "additionalProperties": true,
                "properties": {
                    "topic": {
                        "$id": "#/items/anyOf/0/properties/topic",
                        "type": "string",
                        "title": "Topic Name to bind to",
                        "description": "Name of the topic that will be subscribed to.",
                        "default": ""
                    },
                    "route": {
                        "$id": "#/items/anyOf/0/properties/route",
                        "type": "string",
                        "title": "The API Endpoint/route to be invoked when there is a message on the Topic",
                        "description": "The API Endpoint/route to be invoked when there is a message on the Topic",
                        "default": ""
                    }
                }
            }
        ],
        "$id": "#/items"
    }
}

Now, let us include this route into the service-1 code.

@app.get("/dapr/subscribe")
async def subscribe(request: Request):
    return json([
        {
            "topic": "t1",
            "route": "t1"
        },
        {
            "topic": "t2",
            "route": "t2"
        }
    ])

Now, in the above example, we are asking dapr that I need to subscribe to two topics, t1 and t2 and whenever there is an even on those topics, invoke the respective route specified along with it. In this case an event on topic t1 will invoke a POST /t1 API and an event on t2 will invoke a POST /t2 API on the python service.

Extra care is to be taken to make sure that the routes you are binding the topics to will be a POST endpoint so that when the dapr invokes those endpoints for you, they can be invoked with a request Payload which is the message body of th event.

from asyncio import sleep as async_sleep

@app.post("/t1")
async def handle_t1(request: Request):
    d = requests.post(f"{DAPR_FORWARDER}/service-2/method/t1", json=request.json)
    _store_state(d.text)
    return json({"message": d.json()})


@app.post("/t2")
async def handle_t2(request: Request):
    await async_sleep(10)
    logger.info(request.json)
    return json({"success": True})

Let us also add a POST /t1 Endpoint on service-2 that the service-1 can invoke when it gets a message on topic t1.

func handleT1() gin.HandlerFunc {
    return func(context *gin.Context) {
        context.JSON(200, gin.H{
            "message": "received",
        })
    }
}

r.POST("/t1", handleT1())

That is all you need to do in order for us to setup a subscriber. Now, every time there is an event on one of the topics specified in GET /dapr/subscribe API, dapr will invoke the respective endpoint and you can do whatever you want in your API.

Message Publisher API

Now, let us add an API that we can invoke with a payload which we can use for publishing a message into one of the specified topics. For the sake of convenience I am going to add this API into python service. However you are free to follow any other approach you see fit for publishing the events.

The event schema should look like the following.

{
    "$schema": "http://json-schema.org/draft-07/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "title": "Message Publish Schema",
    "description": "This schema is what the Service that needs to publish a message should abide by.",
    "default": {}
    "required": [
        "messageType",
        "message"
    ],
    "additionalProperties": false,
    "properties": {
        "messageType": {
            "$id": "#/properties/messageType",
            "type": "string",
            "title": "Name of the Topic",
            "description": "Topic Name to which the message needs to be published",
            "default": ""
        },
        "message": {
            "$id": "#/properties/message",
            "type": [
                "object",
                "array",
                "boolean",
                "string",
                "number",
                "integer"
            ],
            "title": "Message Body",
            "description": "Actual Message payload that needs to be sent to the topic",
            "default": ""
        }
    }
}
@app.post("/publish/<topic:string>")
async def publish_message(request: Request, topic: str):
    data = request.json
    requests.post(f"{DAPR_PUBLISHER}/{topic}", json={"messageType": topic, "message": data})
    return json({"message": "published"})

Configure Message Component

In order for dapr to be able to exchange message between the publisher and subscriber, we need to create a Component like we did while creating the state store. However, the difference is that this new Component is of type pubsub.redis.

# redis-pub-sub.yaml
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.redis
  metadata:
    - name: redisHost
      value: redis-leader.default:6379
    - name: redisPassword
      value: ""
kubectl apply -f deployments/redis-pub-sub.yaml

Testing

Topic t2

curl --location --request POST 'localhost:6060/publish/t2' --header 'Content-Type: application/json' --data-raw '{
        "message": "test"
}'
# {"message":"published"}

# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:47:06 +0000] - (sanic.access)[INFO][127.0.0.1:34192]: POST http://localhost:6060/publish/t2  200 23
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:47:16 +0000] [12] [INFO] {
#  'id': '0da6a252-9289-4852-9b06-4a2ffeeaed01', 'source': 'service-1', 'type': 'com.dapr.event.sent',
#  'specversion': '0.3', 'datacontenttype': 'application/json',
#  'data': {'messageType': 't2', 'message': {'message': 'test'}}, 
#  'subject': '00-d6a5e7e8bc1739fe1e93ea86a4e282fd-29a341243e7e54ef-01'}

Topic t1

curl --location --request POST 'localhost:6060/publish/t1' --header 'Content-Type: application/json' --data-raw '{
        "message": "test"
}'

# {"message":"published"}

# service-2-867984df96-hng99 service-2 [GIN] 2020/07/11 - 19:48:12 | 200 |       1.668ms |      10.42.1.26 | POST     "/t1"
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:48:12 +0000] [12] [INFO] CLIENT: <dapr.proto.runtime.v1.dapr_pb2_grpc.DaprStub object at 0x7fc7e6c0f8e0>, STORE_NAME: statestore, STATE_KEY: dapr-series
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:48:12 +0000] - (sanic.access)[INFO][127.0.0.1:34804]: POST http://127.0.0.1:6060/t1  200 34

Traces

Observability Metrics

For the purpose of this demo, we are going to use the default setup that dapr suggests for grafana and prometheus as a way to collect and visualize the observability metrics.

Installation

For the purpose of this post, we are going to leverage the official helm chart provided by the dapr team to setup the Grafana and the dashboard provided by the community.

# Create a custom namespace for easy access and management via RBAC
kubectl create namespace dapr-monitoring

# Configure Helm and install Prometheus collectors
helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install dapr-prom stable/prometheus -n dapr-monitoring --set alertmanager.persistentVolume.enable=false --set pushgateway.persistentVolume.enabled=false --set server.persistentVolume.enabled=false

# Install Helm chart for Grafana visualization
helm install grafana stable/grafana -n dapr-monitoring --set persistence.enabled=false

Read More…