At Redpanda, we like to make things simple. Redpanda is an Apache Kafka®-compatible event streaming platform that eliminates ZooKeeper™ and the JVM, autotunes itself for modern hardware, and ships in a single binary.
There are many high-quality Kafka clients for lots of languages, but wouldn't it be nice if you could just fire up your favorite HTTP CLI or library to produce and consume a stream of events?
Well, we're pleased to announce Pandaproxy, a new subsystem of Redpanda that allows access to your data through a REST API!
It's already available in Redpanda, so if you have Redpanda installed, make sure it's up to date. If not, follow the instructions in the Linux, MacOS, Kubernetes, or Docker quick start guides
If you want to leave the infrastructure issues to us, sign up for Redpanda Cloud for the simplest way to run Redpanda.
Example: produce and consume
Let's jump right in and start Redpanda using Docker on Linux:
docker network create redpanda
docker volume create redpanda
docker run \
--pull=always \
--name=redpanda \
--net=redpanda \
-v "redpanda:/var/lib/redpanda/data" \
-p 8082:8082 \
-p 9092:9092 \
--detach \
vectorized/redpanda start \
--overprovisioned \
--smp 1 \
--memory 1G \
--reserve-memory 0M \
--node-id 0 \
--check=false \
--pandaproxy-addr 0.0.0.0:8082 \
--advertise-pandaproxy-addr 127.0.0.1:8082 \
--kafka-addr 0.0.0.0:9092 \
--advertise-kafka-addr redpanda:9092
Create the topic my_topic
:
docker run \
--net=redpanda \
vectorized/redpanda \
--brokers=redpanda:9092 \
topic create my_topic \
--partitions=3 \
--replicas=1
Now we're ready to start using Pandaproxy!
Endpoints are documented with Swagger at http://localhost:8082/v1
.
I'm using jq
to prettify and process the JSON responses.
We'll use the popular requests module (pip install requests
).
For the rest of the guide, we'll assume the following for an interactive python session:
import requests
import json
def pretty(text):
print(json.dumps(text, indent=2))
base_uri = "http://localhost:8082"
List topics
curl -s "localhost:8082/topics" | jq .
res = requests.get(f"{base_uri}/topics").json()
pretty(res)
[
"my_topic"
]
Produce to a topic
We need to POST
a list of records to the /topics/{topic}
endpoint.
JSON
and base64
encoded payloads are currently supported, specified with a Content-Type
of application/vnd.kafka.json.v2+json
or application/vnd.kafka.binary.v2+json
, respectively. We'll use JSON
:
curl -s \
-X POST \
"http://localhost:8082/topics/my_topic" \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-d '{
"records":[
{
"value":"Vectorized",
"partition":0
},
{
"value":"Pandaproxy",
"partition":1
},
{
"value":"JSON Demo",
"partition":2
}
]
}' | jq .
res = requests.post(
url=f"{base_uri}/topics/my_topic",
data=json.dumps(
dict(records=[
dict(value="Vectorized", partition=0),
dict(value="Pandaproxy", partition=1),
dict(value="JSON Demo", partition=2)
])),
headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
{
"offsets": [
{
"partition": 0,
"offset": 0
},
{
"partition": 1,
"offset": 0
},
{
"partition": 2,
"offset": 0
}
]
}
If a partition is not specified, one is chosen based on a murmur2 hash of the key. If there is no key, partitions are chosen using a round-robin strategy.
Create a consumer
Consumers belong to a consumer group. If you have many consumers in a group, messages are distributed between all consumers.
The standard protocol for interacting with the REST api is specified with: Content-Type: application/vnd.kafka.v2+json
.
We need to POST
the consumer configuration to the /consumers/{consumer_group}
endpoint, let's call the consumer my_consumer
and call the consumer group my_group
, with format
= json
:
curl -s \
-X POST \
"http://localhost:8082/consumers/my_group"\
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{
"format":"json",
"name":"my_consumer",
"auto.offset.reset":"earliest",
"auto.commit.enable":"false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
}' | jq .
res = requests.post(
url=f"{base_uri}/consumers/my_group",
data=json.dumps({
"name": "my_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
}),
headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
consumer_base_uri = res["base_uri"]
pretty(res)
{
"instance_id": "my_consumer",
"base_uri": "http://127.0.0.1:8082/consumers/my_group/instances/my_consumer"
}
We'll need the base_uri
for further interaction with the consumer, as it identifies the consumer, but also the particular Pandaproxy instance if we're connecting to a Redpanda cluster.
Subscribe the consumer
Consumer groups listen to topics. To subscribe a consumer group to a list of topics, subscribe any of the consumers in the group to the topics.
We need to POST
a list of topics for the /consumers/{consumer_group}/instances/{consumer}/subscription
endpoint.
Subscribe my_consumer
to topic my_topic
:
curl -s -o /dev/null -w "%{http_code}" \
-X POST \
"http://localhost:8082/consumers/my_group/instances/my_consumer/subscription"\
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{
"topics": [
"my_topic"
]
}'
res = requests.post(
url=f"{consumer_base_uri}/subscription",
data=json.dumps({"topics": ["my_topic"]}),
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Consume messages
To retrieve messages, send a GET
to the /consumers/{consumer_group}/instances/{consumer}/records
endpoint.
We'll consume JSON encoded messages, so we have to specify the Accept
header: Accept: application/vnd.kafka.json.v2+json
.
curl -s \
"http://localhost:8082/consumers/my_group/instances/my_consumer/records?timeout=1000&max_bytes=100000"\
-H "Accept: application/vnd.kafka.json.v2+json" | jq .
res = requests.get(
url=f"{consumer_base_uri}/records",
params={"timeout":1000,"max_bytes":100000},
headers={"Accept": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
[
{
"topic": "my_topic",
"key": null,
"value": "Vectorized",
"partition": 0,
"offset": 0
},
{
"topic": "my_topic",
"key": null,
"value": "Pandaproxy",
"partition": 1,
"offset": 0
},
{
"topic": "my_topic",
"key": null,
"value": "JSON Demo",
"partition": 2,
"offset": 0
}
]
Get consumer offsets
To get the offsets of consumers in the group, we need to send a GET
to the /consumers/{consumer_group}/instances/{consumer}/offsets
endpoint with the topics and partitions:
curl -s \
-X 'GET' \
'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \
-H 'accept: application/vnd.kafka.v2+json' \
-H 'Content-Type: application/vnd.kafka.v2+json' \
-d '{
"partitions": [
{
"topic": "my_topic",
"partition": 0
},
{
"topic": "my_topic",
"partition": 1
},
{
"topic": "my_topic",
"partition": 2
}
]
}' | jq .
res = requests.get(
url=f"{consumer_base_uri}/offsets",
data=json.dumps(
dict(partitions=[
dict(topic="my_topic", partition=p) for p in [0, 1, 2]
])),
headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
pretty(res)
{
"offsets": [
{
"topic": "my_topic",
"partition": 0,
"offset": -1,
"metadata": ""
},
{
"topic": "my_topic",
"partition": 1,
"offset": -1,
"metadata": ""
},
{
"topic": "my_topic",
"partition": 2,
"offset": -1,
"metadata": ""
}
]
}
Commit offsets
Once a consumer has handled messages, the offsets can be committed so the consumer group won't retrieve them again.
We need to POST
offsets to the /consumers/{consumer_group}/instances/{consumer}/offsets
endpoint:
curl -s -o /dev/null -w "%{http_code}" \
-X 'POST' \
'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \
-H 'accept: application/vnd.kafka.v2+json' \
-H 'Content-Type: application/vnd.kafka.v2+json' \
-d '{
"partitions": [
{
"topic": "my_topic",
"partition": 0,
"offset": 0
},
{
"topic": "my_topic",
"partition": 1,
"offset": 0
},
{
"topic": "my_topic",
"partition": 2,
"offset": 0
}
]
}'
res = requests.post(
url=f"{consumer_base_uri}/offsets",
data=json.dumps(
dict(partitions=[
dict(topic="my_topic", partition=p, offset=0) for p in [0, 1, 2]
])),
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Remove consumer
To remove a consumer from a group, send DELETE
to its base_uri
:
curl -s -o /dev/null -w "%{http_code}" \
-X 'DELETE' \
'http://localhost:8082/consumers/my_group/instances/my_consumer' \
-H 'Content-Type: application/vnd.kafka.v2+json'
res = requests.delete(
url=f"{consumer_base_uri}",
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Now we can clean up:
docker stop redpanda
docker rm redpanda
docker volume remove redpanda
docker network remove redpanda
Pandaproxy Status
We'll be adding more endpoints and more encodings. For an up-to-date list of features and their status see the Pandaproxy features meta-issue on GitHub.
Pandaproxy is built on the same principles as Redpanda, but has not yet been optimized for performance. We are continuing to work on Pandaproxy, so make sure you join our Slack Community to get updates on the progress!
To learn more about Pandaproxy, read the latest in our documentation.
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.