MapR Ecosystem Package 2.0 (MEP) 带来了一些与 MapR Streams 相关的新功能:
MapR 生态系统包 (MEP) 是一种提供生态系统升级的方式,与核心升级分离——允许您独立于 MapR 融合数据平台升级您的工具。您可以在本文中了解有关 MEP 2.0 的更多信息。
在此博客中,我们描述了如何使用 Kafka REST 代理向/从 MapR Streams 发布和使用消息。 REST 代理是 MapR 融合数据平台的重要补充,允许任何编程语言使用 MapR 流。
随 MapR Streams 工具一起提供的 Kafka REST 代理可以与 MapR Streams(默认)以及 Apache Kafka(在混合模式下)一起使用。在本文中,我们将重点关注 MapR Streams。
流是主题的集合,您可以通过以下方式将其作为一个组进行管理:
您可以在文档 中找到有关 MapR Streams 概念的更多信息。
在您的 MapR 集群或沙箱上,运行以下命令:
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p $ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3 $ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3
打开两个终端窗口并使用以下命令运行消费者 Kafka 实用程序:
消费者
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary
这两个终端窗口将允许您查看发布在不同主题上的消息。
端点 /topics/[topic_name] 允许您获取有关该主题的一些信息。在 MapR Streams 中,主题是由路径标识的流的一部分;要通过 REST API 访问主题,您必须输入完整路径并将其编码在 URL 中;例如:
运行以下命令,获取有关 sensor-json 主题的信息:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
注意:为了简单起见,我从运行 Kafka REST 代理的节点运行命令,因此可以使用 localhost。
您可以通过添加 Python 命令以漂亮的方式打印 JSON,例如:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool
默认流
如上所述,Stream 路径是您必须在命令中使用的主题名称的一部分;但是,可以将 MapR Kafka REST 代理配置为使用默认流。对于此配置,您应该在 /opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties 文件中添加以下属性:
当您更改 Kafka REST 代理配置时,您必须使用 maprcli 或 MCS 重新启动服务。
使用 streams.default.stream 属性的主要原因是为了简化应用程序使用的 URL。例如:
在本文中,所有 URL 都包含编码流名称,因此您可以在不更改配置的情况下开始使用 Kafka REST 代理,并且还可以将其用于不同的流。
MapR Streams 的 Kafka REST 代理允许应用程序将消息发布到 MapR Streams。消息可以作为 JSON 或二进制内容(base64 编码)发送。
发送 JSON 消息:
{ "records": [ { "value": { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] }
完整的请求是:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
您应该会在终端窗口中看到打印的消息,/apps/iot-stream:sensor-json 消费者正在运行。
发送二进制消息:
{ "records": [ { "value":"SGVsbG8gV29ybGQ=" } ] }
请注意,SGVsbG8gV29ybGQ= 是以 Base64 编码的字符串“Hello World”。
完整的请求是:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary
您应该会在终端窗口中看到打印的消息,/apps/iot-stream:sensor-binary 消费者正在运行。
发送多条消息:
HTTP Body 的记录字段允许您发送多个消息;例如,您可以发送:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
此命令将发送 2 条消息并将偏移量增加 2。您可以通过在 JSON 数组中添加新元素来对二进制内容执行相同的操作;例如:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \ http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary
您可能知道,可以将密钥绑定到消息以确保具有相同密钥的所有消息都将到达同一分区。为此,请将 key 属性添加到消息中,如下所示:
{ "records": [ { "key": "K001", "value": { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] }
现在您知道如何使用 REST 代理将消息发布到 MapR Streams 主题,让我们看看如何使用消息。
REST 代理也可用于消费来自主题的消息;对于此任务,您需要:
创建消费者实例
以下请求创建消费者实例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json
来自服务器的响应如下所示:
{ "instance_id":"iot_json_consumer", "base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }
请注意,我们使用了 /consumers/[topic_name] 来创建消费者。 base_uri 将由后续请求用于从主题获取消息。与任何 MapR Streams/Kafka 消费者一样,auto.offset.reset 定义了它的行为。在本例中,该值设置为earliest,这意味着消费者将从头开始读取消息。您可以在 MapR Streams 文档 中找到有关消费者配置的更多信息。
消费消息
要使用消息,只需将 MapR Streams 主题添加到消费者实例的 URL。
以下请求使用主题中的消息:
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json
此调用返回 JSON 文档中的消息:
[ {"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1}, {"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2}, {"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]
每次调用 API 都会根据上次调用的偏移量返回发布的新消息。
请注意,消费者将被销毁:
如果您需要使用二进制消息,方法是相同的:您需要更改格式和 Accept 标头。
调用此 URL 为二进制主题创建消费者实例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary
然后消费消息,接受头设置为application/vnd.kafka.binary.v1+json:
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary
此调用返回 JSON 文档中的消息,值以 Base64 编码:
[ {"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1}, {"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]
如前所述,消费者将根据 REST Proxy 的 consumer.instance.timeout.msconfiguration 自动销毁;也可以使用消费者实例 URI 和 HTTP DELETE 调用销毁实例,如下所示:
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer
在本文中,您了解了如何将 Kafka REST 代理用于 MapR 流,它允许任何应用程序使用在 MapR 融合数据平台中发布的消息。
您可以在 MapR 文档 和以下资源中找到有关 Kafka REST 代理的更多信息:
标签2: Java教程地址:https://www.cundage.com/article/jcg-getting-started-kafka-rest-proxy-mapr-streams-2.html