开始使用用于 MapR 流的 Kafka REST 代理

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(439)   2023-09-08 13:19:18

介绍

MapR Ecosystem Package 2.0 (MEP) 带来了一些与 MapR Streams 相关的新功能:

  • Kafka REST Proxy for MapR Streams 为 MapR Streams 和 Kafka 集群提供 RESTful 接口,以使用和生成消息并执行管理操作。
  • Kafka Connect for MapR Streams 是一种用于在 MapR Streams 与 Apache Kafka 和其他存储系统之间流式传输数据的实用程序。

MapR 生态系统包 (MEP) 是一种提供与核心升级分离的生态系统升级的方法——允许您独立于融合数据平台升级您的工具。您可以在这篇文章中了解更多关于 MEP 2.0 的信息。

在此博客中,我们描述了如何使用 REST 代理向/从 MapR 流发布和使用消息。 REST 代理是 MapR 融合数据平台的重要补充,允许任何编程语言使用 MapR 流。

随 MapR Streams 工具一起提供的 Kafka REST 代理可以与 MapR Streams(默认)一起使用,但也可以与 Apache Kafka 混合使用。在本文中,我们将重点关注 MapR Streams。

先决条件

  • MapR 融合数据平台 5.2 与 MEP 2.0
    • 使用 MapR 流工具
  • curl、wget 或任何 HTTP/REST 客户端工具

创建 MapR 流和主题

流是主题的集合,您可以通过以下方式将其作为一个组进行管理:

  1. 设置适用于该流中所有主题的安全策略
  2. 为流中创建的每个新主题设置默认分区数
  3. 为流中每个主题的消息设置生存时间

您可以在文档 中找到有关 MapR Streams 概念的更多信息。

在您的 Mapr 集群或沙盒上,运行以下命令:

$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p

$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3

$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

启动 Kafka 控制台生产者和消费者

打开两个终端窗口并使用以下命令运行消费者 Kafka 实用程序:

消费者

  • 主题传感器-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • 主题传感器二进制
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

这两个终端窗口将允许您查看发布在不同主题上的消息

使用 Kafka REST 代理

检查主题元数据

端点 /topics/[topic_name] 允许您获取有关该主题的一些信息。在 MapR Streams 中,主题是 stream 的一部分,由路径标识;要通过 REST API 使用主题,您必须使用完整路径,并将其编码在 URL 中;例如:

  • /apps/iot-stream:sensor-json 将被编码为 %2Fapps%2Fiot-stream%3Asensor-json

运行以下命令,获取有关 sensor-json 主题的信息

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

注意:为简单起见,我从运行 Kafka REST 代理的节点运行命令,因此可以使用 localhost

您可以通过添加 Python 命令以漂亮的方式打印 JSON,例如:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

默认流

如上所述,Stream 路径是您必须在命令中使用的主题名称的一部分;然而,可以将 MapR Kafka REST 代理配置为使用默认流。为此,您应该在 /opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties 文件中添加以下属性:

  • streams.default.stream=/apps/iot-stream当您更改 Kafka REST 代理配置时,您必须使用 maprcli 或 MCS 重新启动服务。使用 streams.default.stream 属性的主要原因是为了简化 URL例如,应用程序使用
    • streams.default.stream 你可以使用curl -X GET http://localhost:8082/topics/
    • 没有这个配置,或者如果你想使用一个特定的流,你必须在 URL http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json
    • 中指定它

    在本文中,所有 URL 都包含编码流名称,这样您就可以在不更改配置的情况下开始使用 Kafka REST 代理,并且还可以将其用于不同的流。

发布消息

MapR Streams 的 Kafka REST 代理允许应用程序将消息发布到 MapR Streams。消息可以作为 JSON 或二进制内容(base64 编码)发送。

发送 JSON 消息:

  • 查询应该是 HTTP POST
  • 内容类型应该是:application/vnd.kafka.json.v1+json
  • 正文:
{
  "records":
  [
    {
      "value":
      {
        "temp" : 10 ,
        "speed" : 40 ,
        "direction" : "NW"
        }  
      }
  ]
}

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
  --data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

您应该会在运行 /apps/iot-stream:sensor-json 使用者的终端窗口中看到打印的消息。

发送二进制消息:

  • 查询应该是 HTTP POST
  • 内容类型应该是:application/vnd.kafka.binary.v1+json
  • 正文:
{
  "records":
  [
    {
      "value":"SGVsbG8gV29ybGQ="
    }
  ]
}

请注意,SGVsbG8gV29ybGQ= 是以 Base64 编码的字符串“Hello World”。

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
  --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您应该会在运行 /apps/iot-stream:sensor-binary 使用者的终端窗口中看到打印的消息。

发送多条消息

HTTP Body 的 records 字段允许您发送多条消息,例如您可以发送:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
  --data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

此命令将发送 2 条消息,并将偏移量增加 2。您可以对二进制内容执行相同的操作,只需在 JSON 数组中添加新元素即可;例如:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
  --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您可能知道,可以为消息设置一个键,以确保具有相同键的所有消息都将到达同一分区。为此,将 key 属性添加到消息中,如下所示:

{
  "records":
  [
    {
      "key": "K001",
      "value":
      {
        "temp" : 10 ,
        "speed" : 40 ,
        "direction" : "NW"
        }  
      }
  ]
}

现在您知道如何使用 REST 代理将消息发布到 MapR Stream 主题,让我们看看如何使用消息。

消费消息

REST 代理也可用于消费来自主题的消息;为此你需要:

  1. 创建消费者实例。
  2. 使用第一次调用返回的 URL 来阅读消息。
  3. 如果需要,删除消费者实例。

创建消费者实例

以下请求创建消费者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

来自服务器的响应如下所示:

{
  "instance_id":"iot_json_consumer",
  "base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer"
}

请注意,我们使用了 /consumers/[topic_name] 来创建消费者。

base_uri 将被后续请求用于从主题获取消息。像任何 MapR Streams/Kafka 消费者一样,auto.offset.reset 定义了它的行为。在此示例中,该值设置为 earliest,这意味着消费者将从头开始阅读消息。您可以在 MapR Streams 文档 中找到有关消费者配置的更多信息。

消费消息

要使用消息,只需将 Mapr Streams 主题添加到消费者实例的 URL。

以下请求使用主题中的消息:

curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

此调用返回 JSON 文档中的消息:

[
  {"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},
  {"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},
  {"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

每次调用 API 都会根据上次调用的偏移量返回发布的新消息。

请注意,消费者将被销毁:

  • consumer.instance.timeout.ms 设置的空闲时间后(默认值设置为 300000 毫秒/5 分钟)
  • 使用 REST API 调用销毁它(见下文)。

使用二进制格式消息

如果您需要使用二进制消息,方法是相同的,您需要更改格式并接受标头。

调用此 URL 为二进制主题创建消费者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

然后消费消息,接受头设置为application/vnd.kafka.binary.v1+json

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

此调用返回 JSON 文档中的消息,值以 Base64 编码

[
  {"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},
  {"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

删除消费者实例

如前所述,消费者将根据 REST 代理的 consumer.instance.timeout.ms 配置自动销毁;也可以使用消费者实例 URI 和 HTTP DELETE 调用销毁实例,如下所示:

curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

总结

在本文中,您了解了如何将 Kafka REST 代理用于 MapR 流,它允许任何应用程序使用在 MapR 融合数据平台中发布的消息。

您可以在 MapR 文档 和以下资源中找到有关 Kafka REST 代理的更多信息:

  • MapR 流入门
  • “流式架构:使用 Apache Kafka 和 MapR 流的新设计”电子书,作者 Ted Dunning 和 Ellen Friedman
标签2: Java教程
地址:https://www.cundage.com/article/jcg-getting-started-kafka-rest-proxy-mapr-streams.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...