文档

将事件发布到 Kafka

MinIO 支持将存储桶通知事件发布到Kafka服务端点。

MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama兼容性和 API 稳定性部分。

向 MinIO 部署添加 Kafka 端点

以下过程为在 MinIO 部署中支持存储桶通知添加新的 Kafka 服务端点。

先决条件

Kafka 最小版本和支持的版本

MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama兼容性和 API 稳定性部分。

MinIO mc 命令行工具

此过程使用mc命令行工具执行某些操作。有关安装说明,请参阅mc快速入门

1) 将 Kafka 端点添加到 MinIO

您可以使用环境变量通过设置运行时配置设置来配置新的 Kafka 服务端点。

MinIO 支持使用环境变量指定 Kafka 服务端点和相关的配置设置。 minio server 进程会在下次启动时应用指定的设置。

以下示例代码设置了与配置 Kafka 服务端点相关的所有环境变量。 最小 *必需* 变量为 MINIO_NOTIFY_KAFKA_ENABLEMINIO_NOTIFY_KAFKA_BROKERS

set MINIO_NOTIFY_KAFKA_ENABLE_<IDENTIFIER>="on"
set MINIO_NOTIFY_KAFKA_BROKERS_<IDENTIFIER>="<ENDPOINT>"
set MINIO_NOTIFY_KAFKA_TOPIC_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_USERNAME_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_PASSWORD_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_MECHANISM_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_DIR_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_LIMIT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_VERSION_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_COMMENT_<IDENTIFIER>="<string>"
  • <IDENTIFIER> 替换为 Kafka 服务端点的唯一描述性字符串。 对与新目标服务端点相关的所有环境变量使用相同的 <IDENTIFIER> 值。 以下示例假设标识符为 PRIMARY

    如果指定的 <IDENTIFIER> 与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将 *覆盖* 该端点的任何现有设置。 使用 mc admin config get notify_kafka 查看 MinIO 部署中当前配置的 Kafka 端点。

  • <ENDPOINT> 替换为 Kafka 代理的逗号分隔列表。 例如

    "kafka1.example.com:2021,kafka2.example.com:2021"

有关每个环境变量的完整文档,请参阅 用于 Bucket 通知 的 Kafka 服务

MinIO 支持使用 mc admin config set 命令和 notify_kafka 配置键在正在运行的 minio server 进程上添加或更新 Kafka 端点。 您必须重新启动 minio server 进程才能应用任何新的或更新的配置设置。

以下示例代码设置了与配置 Kafka 服务端点相关的所有设置。 最小 *必需* 设置为 notify_kafka brokers

mc admin config set ALIAS/ notify_kafka:IDENTIFIER \
   brokers="<ENDPOINT>" \
   topic="<string>" \
   sasl_username="<string>" \
   sasl_password="<string>" \
   sasl_mechanism="<string>" \
   tls_client_auth="<string>" \
   tls="<string>" \
   tls_skip_verify="<string>" \
   client_tls_cert="<string>" \
   client_tls_key="<string>" \
   version="<string>" \
   queue_dir="<string>" \
   queue_limit="<string>" \
   comment="<string>"
  • IDENTIFIER 替换为 Kafka 服务端点的唯一描述性字符串。 本过程中的以下示例假设标识符为 PRIMARY

    如果指定的 IDENTIFIER 与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将 *覆盖* 该端点的任何现有设置。 使用 mc admin config get notify_kafka 查看 MinIO 部署中当前配置的 Kafka 端点。

  • ENDPOINT 替换为 Kafka 代理的逗号分隔列表。 例如

    "kafka1.example.com:2021,kafka2.example.com:2021"

有关每个设置的完整文档,请参阅 Kafka Bucket 通知配置设置

2) 重新启动 MinIO 部署

您必须重新启动 MinIO 部署才能应用配置更改。 使用 mc admin service restart 命令重新启动部署。

mc admin service restart ALIAS

ALIAS 替换为要重新启动的部署的 别名

minio server 进程在启动时会为每个配置的 Kafka 目标打印一行,类似于以下内容

SQS ARNs: arn:minio:sqs::primary:kafka

将关联的 Kafka 部署作为目标配置 Bucket 通知时,必须指定 ARN 资源。

识别 Bucket 通知 的 ARN

在之前创建端点时,您定义了要分配给 Bucket 通知目标 ARN 的 <IDENTIFIER>。 以下步骤返回部署中配置的 ARN。 通过查找您指定的 <IDENTIFIER> 来识别之前创建的 ARN。

查看 JSON 输出

  1. 复制并运行以下命令,将 ALIAS 替换为部署的 别名

    mc admin info --json ALIAS
    
  2. 在 JSON 输出中,查找键 info.sqsARN

    您需要的 ARN 是与您指定的 <IDENTIFIER> 匹配的该键的值。

    例如,arn:minio:sqs::primary:kafka

使用 jq 解析 JSON 以获取值

  1. 安装 jq

  2. 复制并运行以下命令,将 ALIAS 替换为部署的 别名

    mc admin info --json ALIAS | jq  .info.sqsARN
    

    这将返回用于通知的 ARN,例如 arn:minio:sqs::primary:kafka

3) 使用 Kafka 端点作为目标配置 Bucket 通知

使用 mc event add 命令添加一个新的 Bucket 通知事件,并将配置的 Kafka 服务作为目标

mc event add ALIAS/BUCKET arn:minio:sqs::primary:kafka \
  --event EVENTS
  • ALIAS 替换为 MinIO 部署的 别名

  • BUCKET 替换为要在其中配置事件的 Bucket 的名称。

  • EVENTS 替换为 MinIO 触发通知的 事件 的逗号分隔列表。

使用 mc event ls 查看给定通知目标的所有配置的 Bucket 事件

mc event ls ALIAS/BUCKET arn:minio:sqs::primary:kafka

4) 验证配置的事件

对您为其配置新事件的 Bucket 执行操作,并在 Kafka 服务中检查通知数据。 所需的操作取决于在配置 Bucket 通知时指定的 事件

例如,如果 Bucket 通知配置包含 s3:ObjectCreated:Put 事件,则可以使用 mc cp 命令在 Bucket 中创建一个新对象并触发通知。

mc cp ~/data/new-object.txt ALIAS/BUCKET

更新 MinIO 部署中的 Kafka 端点

以下过程更新现有的 Kafka 服务端点,以支持 MinIO 部署中的 Bucket 通知

先决条件

Kafka 最小版本和支持版本

MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama兼容性和 API 稳定性部分。

MinIO mc 命令行工具

此过程使用mc命令行工具执行某些操作。有关安装说明,请参阅mc快速入门

1) 列出部署中配置的 Kafka 端点

使用 mc admin config get 命令列出部署中当前配置的 Kafka 服务端点。

mc admin config get ALIAS/ notify_kafka

ALIAS 替换为 MinIO 部署的 别名

命令输出类似如下所示

notify_kafka:primary tls_skip_verify="off"  queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka:secondary tls_skip_verify="off"  queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""

notify_kafka 密钥是 Kafka 通知设置的顶级配置密钥。 brokers 密钥指定了给定 notify_kafka 密钥的 Kafka 服务端点。 notify_kafka:<IDENTIFIER> 后缀描述了该 Kafka 服务端点的唯一标识符。

注意您想要在下一步中更新的 Kafka 服务端点的标识符。

2) 更新 Kafka 端点

使用 mc admin config set 命令设置 Kafka 服务端点的新配置。

mc admin config set ALIAS/ notify_kafka:<IDENTIFIER> \
   brokers="https://kafka1.example.net:9200, https://kafka2.example.net:9200" \
   topic="<string>" \
   sasl_username="<string>" \
   sasl_password="<string>" \
   sasl_mechanism="<string>" \
   tls_client_auth="<string>" \
   tls="<string>" \
   tls_skip_verify="<string>" \
   client_tls_cert="<string>" \
   client_tls_key="<string>" \
   version="<string>" \
   queue_dir="<string>" \
   queue_limit="<string>" \
   comment="<string>"

notify_kafka brokers 配置设置是 Kafka 服务端点的最低要求。所有其他配置设置都是可选的。请参阅 Kafka 通知设置 以获取 Kafka 配置设置的完整列表。

3) 重启 MinIO 部署

您必须重新启动 MinIO 部署才能应用配置更改。 使用 mc admin service restart 命令重新启动部署。

mc admin service restart ALIAS

ALIAS 替换为要重新启动的部署的 别名

minio server 进程在启动时会为每个配置的 Kafka 目标打印一行,类似于以下内容

SQS ARNs: arn:minio:sqs::primary:kafka

4) 验证更改

对使用更新的 Kafka 服务端点具有事件配置的存储桶执行操作,并检查 Kafka 服务的通知数据。所需的操作取决于配置存储桶通知时指定的 事件

例如,如果 Bucket 通知配置包含 s3:ObjectCreated:Put 事件,则可以使用 mc cp 命令在 Bucket 中创建一个新对象并触发通知。

mc cp ~/data/new-object.txt ALIAS/BUCKET