Posts in this series
- dapr 101 - Concepts and Setup
- dapr 102 - State Management and Tracing
dapr 103 - Pub/Sub and Observability Metrics
(This Post)
In a Galaxy far far away…
In the last post, we looked at how to write services using python
and golang
that will leverage dapr
and it’s state management
component to store the data in redis
and retrieve it with the help of an API. We also looked at how to create distibuted tracing
visualization using jaeger
with dapr
while performing Service to service invocation.
Goals
With the above basic code and infrastructure in place we discussed in Post #2, this post will try to extend those
same service to demonstrate how to create a pub/sub
based APIs leveraging the dapr
building blocks as well as try to gather and
visualize the observability metrics of the services using grafana
and default dashboard provided by the dapr
core team.
Pre-Requisites
Kubernetes Cluster and dapr
Setup
Please make sure your cluster is in the following state before proceeding to read the rest of this post and want to follow along with the examples and try them out yourself.
~ via π dapr-series at βΈοΈ v1.18.4+k3s1 k3s-default
β¦ β kubectl get nodes -o wide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k3d-k3s-default-worker-0 Ready <none> 3d21h v1.18.4+k3s1 172.21.0.3 <none> Unknown 4.19.76-linuxkit containerd://1.3.3-k3s2
k3d-k3s-default-worker-1 Ready <none> 3d21h v1.18.4+k3s1 172.21.0.4 <none> Unknown 4.19.76-linuxkit containerd://1.3.3-k3s2
k3d-k3s-default-worker-2 Ready <none> 3d21h v1.18.4+k3s1 172.21.0.5 <none> Unknown 4.19.76-linuxkit containerd://1.3.3-k3s2
k3d-k3s-default-server Ready master 3d21h v1.18.4+k3s1 172.21.0.2 <none> Unknown 4.19.76-linuxkit containerd://1.3.3-k3s2
(dapr-series)
~ via π dapr-series at βΈοΈ v1.18.4+k3s1 k3s-default
β¦ β kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
redis-leader-7d557b94bb-rdtqx 1/1 Running 0 3d21h 10.42.1.3 k3d-k3s-default-worker-0 <none> <none>
svclb-service-1-4mcrn 1/1 Running 0 2d21h 10.42.1.10 k3d-k3s-default-worker-0 <none> <none>
svclb-service-1-btpxq 1/1 Running 0 2d21h 10.42.2.7 k3d-k3s-default-worker-1 <none> <none>
svclb-service-1-kclct 1/1 Running 0 2d21h 10.42.0.11 k3d-k3s-default-server <none> <none>
svclb-service-1-6jh9t 1/1 Running 0 2d21h 10.42.3.8 k3d-k3s-default-worker-2 <none> <none>
dapr-sidecar-injector-c898fb49b-9scq7 1/1 Running 0 2d20h 10.42.2.11 k3d-k3s-default-worker-1 <none> <none>
dapr-sentry-58c576ff98-s5mwx 1/1 Running 0 2d20h 10.42.3.14 k3d-k3s-default-worker-2 <none> <none>
dapr-operator-75b4f7986b-kwdvv 1/1 Running 0 2d20h 10.42.3.12 k3d-k3s-default-worker-2 <none> <none>
dapr-placement-84f9cd87b7-x2tcq 1/1 Running 1 2d20h 10.42.3.13 k3d-k3s-default-worker-2 <none> <none>
svclb-service-2-nrm4k 1/1 Running 0 2d1h 10.42.2.13 k3d-k3s-default-worker-1 <none> <none>
svclb-service-2-z985z 1/1 Running 0 2d1h 10.42.3.15 k3d-k3s-default-worker-2 <none> <none>
svclb-service-2-nsg5t 1/1 Running 0 2d1h 10.42.0.22 k3d-k3s-default-server <none> <none>
svclb-service-2-hw2nq 1/1 Running 0 2d1h 10.42.1.21 k3d-k3s-default-worker-0 <none> <none>
jaeger-5dc87df5df-zlq6x 1/1 Running 0 44h 10.42.0.23 k3d-k3s-default-server <none> <none>
service-2-867984df96-mp6fv 2/2 Running 0 21h 10.42.2.19 k3d-k3s-default-worker-1 <none> <none>
service-1-5989969f84-6st8c 2/2 Running 0 21h 10.42.0.26 k3d-k3s-default-server <none> <none>
(dapr-series)
~ via π dapr-series at βΈοΈ v1.18.4+k3s1 k3s-default
β¦ β kubectl get svc -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes ClusterIP 10.43.0.1 <none> 443/TCP 3d21h <none>
dapr-api ClusterIP 10.43.39.14 <none> 80/TCP 3d21h app=dapr-operator
dapr-placement ClusterIP 10.43.245.162 <none> 80/TCP 3d21h app=dapr-placement
dapr-sentry ClusterIP 10.43.84.125 <none> 80/TCP 3d21h app=dapr-sentry
dapr-sidecar-injector ClusterIP 10.43.252.177 <none> 443/TCP 3d21h app=dapr-sidecar-injector
redis-leader ClusterIP 10.43.205.118 <none> 6379/TCP 3d21h app=redis,role=master,tier=backend
service-1-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 2d21h app=service-1
service-2-dapr ClusterIP None <none> 80/TCP,50001/TCP,50002/TCP,9090/TCP 2d2h app=service-2
jaeger ClusterIP 10.43.160.63 <none> 16686/TCP,9411/TCP 44h app=jaeger
service-2 LoadBalancer 10.43.87.66 172.21.0.4 81:31213/TCP 2d2h app=service-2
service-1 LoadBalancer 10.43.111.170 172.21.0.5 80:30427/TCP 2d21h app=service-1
If you don’t have the state in this way and want to get it up to this state, please refer to Post #1 and Post #2
for the details on how to setup the cluster and write the first two services we are calling service-1
and service-2
.
Code Repo
Please clone the harshanarayana/dapr-servies and checkout the dapr-103
branch to follow along with this post to make it easier.
git clone git@github.com:harshanarayana/dapr-series.git
cd dapr-series
git checkout dapr-103
Once you checkout the repo, the structure would look like this.
Python Service
service-1
will listen to events of type (i.e. topic
) t1
and t2
. If the event received is of topic
t1
, then it will
invoke service-2
with the same payload and based on the data retrieved by that, it will take necessary actions. In case if the
event received is of topic
t2
, then it will simply log the event body into the console after an async delay of 10 seconds.
This is done so that I can pretend to make this service do something useful.
Design
Create Subscription
In order for an API to subscribe to an event in dapr
, it needs to expose a GET /dapr/subscribe
API endpoint with the following
spect.
{
"$schema": "http://json-schema.org/draft-07/schema",
"$id": "http://example.com/example.json",
"type": "array",
"title": "The root schema",
"description": "The root schema comprises the entire JSON document.",
"default": [],
"additionalItems": true,
"items": {
"anyOf": [
{
"$id": "#/items/anyOf/0",
"type": "object",
"title": "Subscription Items",
"description": "Each entity defines a specific topic to bind to and where to route the requests when there is an event of that topic.",
"default": {},
"required": [
"topic",
"route"
],
"additionalProperties": true,
"properties": {
"topic": {
"$id": "#/items/anyOf/0/properties/topic",
"type": "string",
"title": "Topic Name to bind to",
"description": "Name of the topic that will be subscribed to.",
"default": ""
},
"route": {
"$id": "#/items/anyOf/0/properties/route",
"type": "string",
"title": "The API Endpoint/route to be invoked when there is a message on the Topic",
"description": "The API Endpoint/route to be invoked when there is a message on the Topic",
"default": ""
}
}
}
],
"$id": "#/items"
}
}
Now, let us include this route into the service-1
code.
@app.get("/dapr/subscribe")
async def subscribe(request: Request):
return json([
{
"topic": "t1",
"route": "t1"
},
{
"topic": "t2",
"route": "t2"
}
])
Now, in the above example, we are asking dapr
that I need to subscribe to two topics, t1
and t2
and whenever there is an even
on those topics, invoke the respective route specified along with it. In this case an event on topic t1
will invoke a POST /t1
API
and an event on t2
will invoke a POST /t2
API on the python service.
Extra care is to be taken to make sure that the routes you are binding the topics to will be a POST
endpoint so that when the dapr
invokes those endpoints for you, they can be invoked with a request Payload which is the message body of th event.
from asyncio import sleep as async_sleep
@app.post("/t1")
async def handle_t1(request: Request):
d = requests.post(f"{DAPR_FORWARDER}/service-2/method/t1", json=request.json)
_store_state(d.text)
return json({"message": d.json()})
@app.post("/t2")
async def handle_t2(request: Request):
await async_sleep(10)
logger.info(request.json)
return json({"success": True})
Let us also add a POST /t1
Endpoint on service-2
that the service-1
can invoke when it gets a message on topic t1
.
func handleT1() gin.HandlerFunc {
return func(context *gin.Context) {
context.JSON(200, gin.H{
"message": "received",
})
}
}
r.POST("/t1", handleT1())
That is all you need to do in order for us to setup a subscriber. Now, every time there is an event on one of the topics specified
in GET /dapr/subscribe
API, dapr
will invoke the respective endpoint and you can do whatever you want in your API.
Message Publisher API
Now, let us add an API that we can invoke with a payload which we can use for publishing a message into one of the specified topics. For the sake of convenience I am going to add this API into python service. However you are free to follow any other approach you see fit for publishing the events.
The event schema should look like the following.
{
"$schema": "http://json-schema.org/draft-07/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "Message Publish Schema",
"description": "This schema is what the Service that needs to publish a message should abide by.",
"default": {}
"required": [
"messageType",
"message"
],
"additionalProperties": false,
"properties": {
"messageType": {
"$id": "#/properties/messageType",
"type": "string",
"title": "Name of the Topic",
"description": "Topic Name to which the message needs to be published",
"default": ""
},
"message": {
"$id": "#/properties/message",
"type": [
"object",
"array",
"boolean",
"string",
"number",
"integer"
],
"title": "Message Body",
"description": "Actual Message payload that needs to be sent to the topic",
"default": ""
}
}
}
@app.post("/publish/<topic:string>")
async def publish_message(request: Request, topic: str):
data = request.json
requests.post(f"{DAPR_PUBLISHER}/{topic}", json={"messageType": topic, "message": data})
return json({"message": "published"})
Configure Message Component
In order for dapr
to be able to exchange message between the publisher and subscriber, we need to create a Component
like we did
while creating the state store
. However, the difference is that this new Component
is of type
pubsub.redis
.
# redis-pub-sub.yaml
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.redis
metadata:
- name: redisHost
value: redis-leader.default:6379
- name: redisPassword
value: ""
kubectl apply -f deployments/redis-pub-sub.yaml
Testing
Topic t2
curl --location --request POST 'localhost:6060/publish/t2' --header 'Content-Type: application/json' --data-raw '{
"message": "test"
}'
# {"message":"published"}
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:47:06 +0000] - (sanic.access)[INFO][127.0.0.1:34192]: POST http://localhost:6060/publish/t2 200 23
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:47:16 +0000] [12] [INFO] {
# 'id': '0da6a252-9289-4852-9b06-4a2ffeeaed01', 'source': 'service-1', 'type': 'com.dapr.event.sent',
# 'specversion': '0.3', 'datacontenttype': 'application/json',
# 'data': {'messageType': 't2', 'message': {'message': 'test'}},
# 'subject': '00-d6a5e7e8bc1739fe1e93ea86a4e282fd-29a341243e7e54ef-01'}
Topic t1
curl --location --request POST 'localhost:6060/publish/t1' --header 'Content-Type: application/json' --data-raw '{
"message": "test"
}'
# {"message":"published"}
# service-2-867984df96-hng99 service-2 [GIN] 2020/07/11 - 19:48:12 | 200 | 1.668ms | 10.42.1.26 | POST "/t1"
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:48:12 +0000] [12] [INFO] CLIENT: <dapr.proto.runtime.v1.dapr_pb2_grpc.DaprStub object at 0x7fc7e6c0f8e0>, STORE_NAME: statestore, STATE_KEY: dapr-series
# service-1-5989969f84-vspq2 service-1 [2020-07-11 19:48:12 +0000] - (sanic.access)[INFO][127.0.0.1:34804]: POST http://127.0.0.1:6060/t1 200 34
Traces
Observability Metrics
For the purpose of this demo, we are going to use the default setup that dapr
suggests for grafana
and prometheus
as a way to
collect and visualize the observability metrics.
Installation
For the purpose of this post, we are going to leverage the official helm chart provided by the dapr
team to setup the Grafana and the dashboard provided by the community.
# Create a custom namespace for easy access and management via RBAC
kubectl create namespace dapr-monitoring
# Configure Helm and install Prometheus collectors
helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install dapr-prom stable/prometheus -n dapr-monitoring --set alertmanager.persistentVolume.enable=false --set pushgateway.persistentVolume.enabled=false --set server.persistentVolume.enabled=false
# Install Helm chart for Grafana visualization
helm install grafana stable/grafana -n dapr-monitoring --set persistence.enabled=false