Using Benthos aka Redpanda Connect To Receive Webhooks From Github And More
- kubernetes benthos
Benthos, now renamed Redpanda Connect and the fork Bento, is a data stream processor, think anything Inputs: Kafka, NATS, SQS, S3, a socket, HTTP … modify the content, then send it to anything outputs.
I love that tool so much, that I was convinced I’ve blogged about it already which I did not.
It can be used for mostly anything streams related. Surely we could do the same thing with some real code as well, using our usual programming language, but anything resembling a basic stream processor, will always be the same boring code.
Doing it right is also harder than you think, implementing the common rich patterns takes time: reconnections, buffering, caching, windowing, aggregating, exposing metrics …
Benthos (I’m having a hard time with the new name), so Redpanda Connect, specialized itself in the boring parts done right, to the point it feels weird to me when I can not use it for anything stream related (and more see below).
It’s dead simple, but can also be used in some complex scenarios since it can run some mapping data code Bloblang.
Here is a real example to use it to receive webhooks coming from Github as input to NATS as outputs.
Webhooks & Github
Github can be configured per repository or per org to deliver information (builds, PRs… ) via webhooks.
It’s sending a header X-Hub-Signature-256
containing the HMAC hex digest of the request body, to ensure it’s being delivered rightfully by Github.
Configuring and programming Benthos, is as simple as declaring your inputs and outputs, add a processor to alter the content and you are done.
Let’s start with an HTTP server listener, required for a webhook, so we can receive the call as an “input” and expose metrics as Prometheus.
http:
enabled: true
address: 0.0.0.0:8095
root_path: /api
debug_endpoints: false # turn on to debug
metrics:
prometheus: {}
We want the URL Github will call us on to be /api/github/webhook
. We’re adding a flood protection on that input, just in case.
input:
label: webhook_server
http_server:
path: "/github/webhook"
rate_limit: flood_protect
rate_limit_resources:
- label: flood_protect
local:
count: 100
interval: 1s
Now, we are validating the hash from the header and, if not, we reject the query.
pipeline:
processors:
- bloblang: |
root = this
root.signature = meta("X-Hub-Signature-256").split("=").index(1)
root.signature_valid = if root.signature == content().string().hash("hmac_sha256","${SECRET_KEY}").encode("hex") {
true
} else {
false
}
output:
switch:
cases:
- check: 'this.signature_valid == true'
output:
stdout: {}
- output:
reject: "Invalid signature"
With that we can now register our webhook from a Github repo or globally from an Org. Let’s subscribe to the event release
.
Instead of displaying the event on stdout, we can finally output them to NATS Jetstream:
output:
switch:
cases:
- check: 'this.signature_valid == true'
output:
nats_jetstream:
urls:
- nats://nats.nats.svc.cluster.local:4222
subject: ci.actions.release
- output:
reject: "Invalid signature"
Live Testing Bloblang
There is a built in debugging tool to test Bloglang code:
Save the Bloblang code in a file, save your input data in another and run:
benthos blobl server --no-open --input-file mydata.json -m mapping.txt
Point your browser to http://localhost:4195
Kubernetes
Benthos can be used in streams mode, where one Benthos instance is capable of running multiple pipelines at the same time.
We can use this feature to add the config files using a ConfigMap
:
apiVersion: v1
kind: ConfigMap
metadata:
name: benthos-streams-cm
data:
github-webhook.yaml: |
...
drone-webhook.yaml: |
...
Then mount the configmap as a volume in the Benthos container:
...
apiVersion: apps/v1
kind: Deployment
metadata:
name: benthos-deployment
spec:
replicas: 1
selector:
matchLabels:
app: benthos
template:
metadata:
labels:
app: benthos
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '4040'
spec:
containers:
- name: benthos
image: jeffail/benthos:4.27
args: ["-r", "/etc/config/resources.yaml", "-c", "/etc/config/config.yaml","streams", "/config"] # add this line to start Benthos in streams mode
ports:
- containerPort: 8080
name: http-port
- containerPort: 4040
name: metrics-port
env:
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: benthos-secret
key: secret-key
volumeMounts:
- name: config-streams-volume
mountPath: /config
- name: config-volume
mountPath: /etc/config
volumes:
- name: config-streams-volume
configMap:
name: benthos-streams-cm
- name: config-volume
configMap:
name: benthos-config-cm
Note that in streams mode the HTTP endpoint will be prefixed with the streams identifier (file name) by default.
Also note that you still need a default config.yaml
and a resources.yaml
for the global configuration and global resources (in our case the http:
and metrics:
would move in that file).
Here is a complete working Kubernetes example.
Extending With Plugins
It’s possible to go even further and extend Benthos with the plugin system.
You would want to use it to connect to new input/outputs, or define new methods in the processors.
I’ve created geo-benthos, to provide some extensions related to Geo/GIS stuffs:
An example position.json input file:
{"id":42, "lat": 48.86, "lng": 2.34}
And the following bloblang processor using country
:
input:
file:
paths: ["position.json"]
codec: all-bytes
pipeline:
threads: 1
processors:
- mapping: |
#!blobl
root = this
root.country = country(this.lat, this.lng)
output:
label: "out"
stdout:
codec: lines
It will transform the input, setting the country from the lat lng.
{"country":["France"],"id":42,"lat":48.86,"lng":2.34}
Based on the same logic:
h3
:root.h3 = h3(this.lat, this.lng, 5)
will add the Uber H3 cells at level 5 for the position{"h3":"851fb467fffffff","id":42,"lat":48.86,"lng":2.34}
s2
:root.s2 = s2_object(this.lat, this.lng, 15)
will add the S2 cell at level 15 for the position{"id":42,"lat":48.86,"lng":2.34,"s2":"2/033303031301002"}
tz
:root.tz = tz(this.lat, this.lng)
will add the timezone for the position{"tz":"Europe/Paris","id":42,"lat":48.86,"lng":2.34}
Conclusion
Benthos/Bento/Redpanda Connect is right in the sweet spot: the useful tool you want to invest in cause it helps simplify tons of boring code.