MapR Ecosystem Package 2.0 (MEP) 带来了一些与 MapR Streams 相关的新功能:
MapR 生态系统包 (MEP) 是一种提供与核心升级分离的生态系统升级的方法——允许您独立于融合数据平台升级您的工具。您可以在这篇文章中了解更多关于 MEP 2.0 的信息。
在此博客中,我们描述了如何使用 REST 代理向/从 MapR 流发布和使用消息。 REST 代理是 MapR 融合数据平台的重要补充,允许任何编程语言使用 MapR 流。
随 MapR Streams 工具一起提供的 Kafka REST 代理可以与 MapR Streams(默认)一起使用,但也可以与 Apache Kafka 混合使用。在本文中,我们将重点关注 MapR Streams。
流是主题的集合,您可以通过以下方式将其作为一个组进行管理:
您可以在文档 中找到有关 MapR Streams 概念的更多信息。
在您的 Mapr 集群或沙盒上,运行以下命令:
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p $ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3 $ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3
打开两个终端窗口并使用以下命令运行消费者 Kafka 实用程序:
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary
这两个终端窗口将允许您查看发布在不同主题上的消息
端点 /topics/[topic_name]
允许您获取有关该主题的一些信息。在 MapR Streams 中,主题是 stream 的一部分,由路径标识;要通过 REST API 使用主题,您必须使用完整路径,并将其编码在 URL 中;例如:
/apps/iot-stream:sensor-json
将被编码为 %2Fapps%2Fiot-stream%3Asensor-json
运行以下命令,获取有关 sensor-json
主题的信息
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
注意:为简单起见,我从运行 Kafka REST 代理的节点运行命令,因此可以使用 localhost
。
您可以通过添加 Python 命令以漂亮的方式打印 JSON,例如:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool
默认流
如上所述,Stream 路径是您必须在命令中使用的主题名称的一部分;然而,可以将 MapR Kafka REST 代理配置为使用默认流。为此,您应该在 /opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties
文件中添加以下属性:
streams.default.stream=/apps/iot-stream
当您更改 Kafka REST 代理配置时,您必须使用 maprcli 或 MCS 重新启动服务。使用 streams.default.stream
属性的主要原因是为了简化 URL例如,应用程序使用
streams.default.stream
你可以使用curl -X GET http://localhost:8082/topics/
http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
在本文中,所有 URL 都包含编码流名称,这样您就可以在不更改配置的情况下开始使用 Kafka REST 代理,并且还可以将其用于不同的流。
MapR Streams 的 Kafka REST 代理允许应用程序将消息发布到 MapR Streams。消息可以作为 JSON 或二进制内容(base64 编码)发送。
发送 JSON 消息:
POST
application/vnd.kafka.json.v1+json
{ "records": [ { "value": { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] }
完整的请求是:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
您应该会在运行 /apps/iot-stream:sensor-json
使用者的终端窗口中看到打印的消息。
发送二进制消息:
POST
application/vnd.kafka.binary.v1+json
{ "records": [ { "value":"SGVsbG8gV29ybGQ=" } ] }
请注意,SGVsbG8gV29ybGQ=
是以 Base64 编码的字符串“Hello World”。
完整的请求是:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary
您应该会在运行 /apps/iot-stream:sensor-binary
使用者的终端窗口中看到打印的消息。
发送多条消息
HTTP Body 的 records
字段允许您发送多条消息,例如您可以发送:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
此命令将发送 2 条消息,并将偏移量增加 2。您可以对二进制内容执行相同的操作,只需在 JSON 数组中添加新元素即可;例如:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary
您可能知道,可以为消息设置一个键,以确保具有相同键的所有消息都将到达同一分区。为此,将 key
属性添加到消息中,如下所示:
{ "records": [ { "key": "K001", "value": { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] }
现在您知道如何使用 REST 代理将消息发布到 MapR Stream 主题,让我们看看如何使用消息。
REST 代理也可用于消费来自主题的消息;为此你需要:
创建消费者实例
以下请求创建消费者实例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json
来自服务器的响应如下所示:
{ "instance_id":"iot_json_consumer", "base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }
请注意,我们使用了 /consumers/[topic_name]
来创建消费者。
base_uri
将被后续请求用于从主题获取消息。像任何 MapR Streams/Kafka 消费者一样,auto.offset.reset
定义了它的行为。在此示例中,该值设置为 earliest
,这意味着消费者将从头开始阅读消息。您可以在 MapR Streams 文档 中找到有关消费者配置的更多信息。
消费消息
要使用消息,只需将 Mapr Streams 主题添加到消费者实例的 URL。
以下请求使用主题中的消息:
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json
此调用返回 JSON 文档中的消息:
[ {"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1}, {"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2}, {"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]
每次调用 API 都会根据上次调用的偏移量返回发布的新消息。
请注意,消费者将被销毁:
consumer.instance.timeout.ms
设置的空闲时间后(默认值设置为 300000 毫秒/5 分钟)如果您需要使用二进制消息,方法是相同的,您需要更改格式并接受标头。
调用此 URL 为二进制主题创建消费者实例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary
然后消费消息,接受头设置为application/vnd.kafka.binary.v1+json
:
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary
此调用返回 JSON 文档中的消息,值以 Base64 编码
[ {"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1}, {"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]
如前所述,消费者将根据 REST 代理的 consumer.instance.timeout.ms
配置自动销毁;也可以使用消费者实例 URI 和 HTTP DELETE 调用销毁实例,如下所示:
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer
在本文中,您了解了如何将 Kafka REST 代理用于 MapR 流,它允许任何应用程序使用在 MapR 融合数据平台中发布的消息。
您可以在 MapR 文档 和以下资源中找到有关 Kafka REST 代理的更多信息:
标签2: Java教程地址:https://www.cundage.com/article/jcg-getting-started-kafka-rest-proxy-mapr-streams.html