将事件发布到 Kafka
MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
向 MinIO 部署添加 Kafka 端点
以下过程为在 MinIO 部署中支持存储桶通知添加新的 Kafka 服务端点。
先决条件
Kafka 最小版本和支持的版本
MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
MinIO mc
命令行工具
1) 将 Kafka 端点添加到 MinIO
您可以使用环境变量或通过设置运行时配置设置来配置新的 Kafka 服务端点。
MinIO 支持使用环境变量指定 Kafka 服务端点和相关的配置设置。 minio server
进程会在下次启动时应用指定的设置。
以下示例代码设置了与配置 Kafka 服务端点相关的所有环境变量。 最小 *必需* 变量为 MINIO_NOTIFY_KAFKA_ENABLE
和 MINIO_NOTIFY_KAFKA_BROKERS
set MINIO_NOTIFY_KAFKA_ENABLE_<IDENTIFIER>="on"
set MINIO_NOTIFY_KAFKA_BROKERS_<IDENTIFIER>="<ENDPOINT>"
set MINIO_NOTIFY_KAFKA_TOPIC_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_USERNAME_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_PASSWORD_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_MECHANISM_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_DIR_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_LIMIT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_VERSION_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_COMMENT_<IDENTIFIER>="<string>"
将
<IDENTIFIER>
替换为 Kafka 服务端点的唯一描述性字符串。 对与新目标服务端点相关的所有环境变量使用相同的<IDENTIFIER>
值。 以下示例假设标识符为PRIMARY
。如果指定的
<IDENTIFIER>
与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将 *覆盖* 该端点的任何现有设置。 使用mc admin config get notify_kafka
查看 MinIO 部署中当前配置的 Kafka 端点。将
<ENDPOINT>
替换为 Kafka 代理的逗号分隔列表。 例如"kafka1.example.com:2021,kafka2.example.com:2021"
有关每个环境变量的完整文档,请参阅 用于 Bucket 通知 的 Kafka 服务。
MinIO 支持使用 mc admin config set
命令和 notify_kafka
配置键在正在运行的 minio server
进程上添加或更新 Kafka 端点。 您必须重新启动 minio server
进程才能应用任何新的或更新的配置设置。
以下示例代码设置了与配置 Kafka 服务端点相关的所有设置。 最小 *必需* 设置为 notify_kafka brokers
mc admin config set ALIAS/ notify_kafka:IDENTIFIER \
brokers="<ENDPOINT>" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
将
IDENTIFIER
替换为 Kafka 服务端点的唯一描述性字符串。 本过程中的以下示例假设标识符为PRIMARY
。如果指定的
IDENTIFIER
与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将 *覆盖* 该端点的任何现有设置。 使用mc admin config get notify_kafka
查看 MinIO 部署中当前配置的 Kafka 端点。将
ENDPOINT
替换为 Kafka 代理的逗号分隔列表。 例如"kafka1.example.com:2021,kafka2.example.com:2021"
有关每个设置的完整文档,请参阅 Kafka Bucket 通知配置设置。
2) 重新启动 MinIO 部署
您必须重新启动 MinIO 部署才能应用配置更改。 使用 mc admin service restart
命令重新启动部署。
mc admin service restart ALIAS
将 ALIAS
替换为要重新启动的部署的 别名。
minio server
进程在启动时会为每个配置的 Kafka 目标打印一行,类似于以下内容
SQS ARNs: arn:minio:sqs::primary:kafka
将关联的 Kafka 部署作为目标配置 Bucket 通知时,必须指定 ARN 资源。
识别 Bucket 通知 的 ARN
在之前创建端点时,您定义了要分配给 Bucket 通知目标 ARN 的 <IDENTIFIER>
。 以下步骤返回部署中配置的 ARN。 通过查找您指定的 <IDENTIFIER>
来识别之前创建的 ARN。
查看 JSON 输出
复制并运行以下命令,将
ALIAS
替换为部署的 别名。mc admin info --json ALIAS
在 JSON 输出中,查找键
info.sqsARN
。您需要的 ARN 是与您指定的
<IDENTIFIER>
匹配的该键的值。例如,
arn:minio:sqs::primary:kafka
。
使用 jq 解析 JSON 以获取值
3) 使用 Kafka 端点作为目标配置 Bucket 通知
使用 mc event add
命令添加一个新的 Bucket 通知事件,并将配置的 Kafka 服务作为目标
mc event add ALIAS/BUCKET arn:minio:sqs::primary:kafka \
--event EVENTS
使用 mc event ls
查看给定通知目标的所有配置的 Bucket 事件
mc event ls ALIAS/BUCKET arn:minio:sqs::primary:kafka
4) 验证配置的事件
对您为其配置新事件的 Bucket 执行操作,并在 Kafka 服务中检查通知数据。 所需的操作取决于在配置 Bucket 通知时指定的 事件
。
例如,如果 Bucket 通知配置包含 s3:ObjectCreated:Put
事件,则可以使用 mc cp
命令在 Bucket 中创建一个新对象并触发通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET
更新 MinIO 部署中的 Kafka 端点
以下过程更新现有的 Kafka 服务端点,以支持 MinIO 部署中的 Bucket 通知。
先决条件
Kafka 最小版本和支持版本
MinIO 依赖于https://github.com/Shopify/sarama项目进行 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
MinIO mc
命令行工具
1) 列出部署中配置的 Kafka 端点
使用 mc admin config get
命令列出部署中当前配置的 Kafka 服务端点。
mc admin config get ALIAS/ notify_kafka
将 ALIAS
替换为 MinIO 部署的 别名。
命令输出类似如下所示
notify_kafka:primary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka:secondary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka
密钥是 Kafka 通知设置的顶级配置密钥。 brokers
密钥指定了给定 notify_kafka 密钥的 Kafka 服务端点。 notify_kafka:<IDENTIFIER>
后缀描述了该 Kafka 服务端点的唯一标识符。
注意您想要在下一步中更新的 Kafka 服务端点的标识符。
2) 更新 Kafka 端点
使用 mc admin config set
命令设置 Kafka 服务端点的新配置。
mc admin config set ALIAS/ notify_kafka:<IDENTIFIER> \
brokers="https://kafka1.example.net:9200, https://kafka2.example.net:9200" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
notify_kafka brokers
配置设置是 Kafka 服务端点的最低要求。所有其他配置设置都是可选的。请参阅 Kafka 通知设置 以获取 Kafka 配置设置的完整列表。
3) 重启 MinIO 部署
您必须重新启动 MinIO 部署才能应用配置更改。 使用 mc admin service restart
命令重新启动部署。
mc admin service restart ALIAS
将 ALIAS
替换为要重新启动的部署的 别名。
minio server
进程在启动时会为每个配置的 Kafka 目标打印一行,类似于以下内容
SQS ARNs: arn:minio:sqs::primary:kafka
4) 验证更改
对使用更新的 Kafka 服务端点具有事件配置的存储桶执行操作,并检查 Kafka 服务的通知数据。所需的操作取决于配置存储桶通知时指定的 事件
。
例如,如果 Bucket 通知配置包含 s3:ObjectCreated:Put
事件,则可以使用 mc cp
命令在 Bucket 中创建一个新对象并触发通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET