Using Benthos aka Redpanda Connect To Receive Webhooks From Github And More

- kubernetes benthos

Benthos, now renamed Redpanda Connect and the fork Bento, is a data stream processor, think anything Inputs: Kafka, NATS, SQS, S3, a socket, HTTP … modify the content, then send it to anything outputs.

I love that tool so much, that I was convinced I’ve blogged about it already which I did not.

It can be used for mostly anything streams related. Surely we could do the same thing with some real code as well, using our usual programming language, but anything resembling a basic stream processor, will always be the same boring code.
Doing it right is also harder than you think, implementing the common rich patterns takes time: reconnections, buffering, caching, windowing, aggregating, exposing metrics …

Benthos (I’m having a hard time with the new name), so Redpanda Connect, specialized itself in the boring parts done right, to the point it feels weird to me when I can not use it for anything stream related (and more see below).
It’s dead simple, but can also be used in some complex scenarios since it can run some mapping data code Bloblang.

Here is a real example to use it to receive webhooks coming from Github as input to NATS as outputs.

Webhooks & Github

Github can be configured per repository or per org to deliver information (builds, PRs… ) via webhooks.

It’s sending a header X-Hub-Signature-256 containing the HMAC hex digest of the request body, to ensure it’s being delivered rightfully by Github.

Configuring and programming Benthos, is as simple as declaring your inputs and outputs, add a processor to alter the content and you are done.

Let’s start with an HTTP server listener, required for a webhook, so we can receive the call as an “input” and expose metrics as Prometheus.

http:
  enabled: true
  address: 0.0.0.0:8095
  root_path: /api
  debug_endpoints: false # turn on to debug
metrics:
  prometheus: {}

We want the URL Github will call us on to be /api/github/webhook. We’re adding a flood protection on that input, just in case.

input:
  label: webhook_server
  http_server:
    path: "/github/webhook"
    rate_limit: flood_protect

rate_limit_resources:
  - label: flood_protect
    local:
      count: 100
      interval: 1s

Now, we are validating the hash from the header and, if not, we reject the query.

pipeline:
  processors:
    - bloblang: |
        root = this
        root.signature = meta("X-Hub-Signature-256").split("=").index(1)
        root.signature_valid = if root.signature == content().string().hash("hmac_sha256","${SECRET_KEY}").encode("hex") {
          true
        } else {
          false
        }        
       
output:
  switch:
    cases:
      - check: 'this.signature_valid == true'
        output:
          stdout: {}
      - output:
          reject: "Invalid signature"

With that we can now register our webhook from a Github repo or globally from an Org. Let’s subscribe to the event release.

Instead of displaying the event on stdout, we can finally output them to NATS Jetstream:

output:
  switch:
    cases:
      - check: 'this.signature_valid == true'
        output:
          nats_jetstream:
            urls:
              - nats://nats.nats.svc.cluster.local:4222
            subject: ci.actions.release
      - output:
          reject: "Invalid signature"

Live Testing Bloblang

There is a built in debugging tool to test Bloglang code:

Save the Bloblang code in a file, save your input data in another and run:

benthos blobl server --no-open --input-file mydata.json -m mapping.txt

Point your browser to http://localhost:4195

Kubernetes

Benthos can be used in streams mode, where one Benthos instance is capable of running multiple pipelines at the same time.
We can use this feature to add the config files using a ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: benthos-streams-cm
data:
  github-webhook.yaml: |
    ...    
  drone-webhook.yaml: |
    ...    

Then mount the configmap as a volume in the Benthos container:

...
apiVersion: apps/v1
kind: Deployment
metadata:
  name: benthos-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: benthos
  template:
    metadata:
      labels:
        app: benthos
      annotations:
        prometheus.io/scrape: 'true'
        prometheus.io/port: '4040'
    spec:
      containers:
      - name: benthos
        image: jeffail/benthos:4.27
        args: ["-r", "/etc/config/resources.yaml", "-c", "/etc/config/config.yaml","streams", "/config"]  # add this line to start Benthos in streams mode
        ports:
          - containerPort: 8080
            name: http-port
          - containerPort: 4040
            name: metrics-port
        env:
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: benthos-secret
              key: secret-key
        volumeMounts:
        - name: config-streams-volume
          mountPath: /config
        - name: config-volume
          mountPath: /etc/config
      volumes:
        - name: config-streams-volume
          configMap:
            name: benthos-streams-cm
        - name: config-volume
          configMap:
            name: benthos-config-cm

Note that in streams mode the HTTP endpoint will be prefixed with the streams identifier (file name) by default.
Also note that you still need a default config.yaml and a resources.yaml for the global configuration and global resources (in our case the http: and metrics: would move in that file).

Here is a complete working Kubernetes example.

Extending With Plugins

It’s possible to go even further and extend Benthos with the plugin system.

You would want to use it to connect to new input/outputs, or define new methods in the processors.

I’ve created geo-benthos, to provide some extensions related to Geo/GIS stuffs:

An example position.json input file:

{"id":42, "lat": 48.86, "lng": 2.34}

And the following bloblang processor using country:

input:
  file:
    paths: ["position.json"]
    codec: all-bytes

pipeline:
  threads: 1
  processors:
  - mapping: |
      #!blobl
      root = this
      root.country = country(this.lat, this.lng)      

output:
  label: "out"
  stdout:
    codec: lines

It will transform the input, setting the country from the lat lng.

{"country":["France"],"id":42,"lat":48.86,"lng":2.34}

Based on the same logic:

Conclusion

Benthos/Bento/Redpanda Connect is right in the sweet spot: the useful tool you want to invest in cause it helps simplify tons of boring code.