将事件发布到 Kafka
MinIO 依靠https://github.com/Shopify/sarama项目实现 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
在 MinIO 部署中添加 Kafka 端点
以下步骤添加了一个新的 Kafka 服务端点,用于在 MinIO 部署中支持存储桶通知。
先决条件
Kafka 最小版本和支持的版本
MinIO 依靠https://github.com/Shopify/sarama项目实现 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
MinIO mc
命令行工具
1) 将 Kafka 端点添加到 MinIO
您可以使用环境变量或通过设置运行时配置设置来配置新的 Kafka 服务端点。
MinIO 支持使用环境变量指定 Kafka 服务端点和相关的配置设置。minio server
进程在其下一次启动时应用指定的设置。
以下示例代码设置了与配置 Kafka 服务端点相关的所有环境变量。最小必需变量是MINIO_NOTIFY_KAFKA_ENABLE
和MINIO_NOTIFY_KAFKA_BROKERS
set MINIO_NOTIFY_KAFKA_ENABLE_<IDENTIFIER>="on"
set MINIO_NOTIFY_KAFKA_BROKERS_<IDENTIFIER>="<ENDPOINT>"
set MINIO_NOTIFY_KAFKA_TOPIC_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_USERNAME_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_PASSWORD_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_MECHANISM_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_DIR_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_LIMIT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_VERSION_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_COMMENT_<IDENTIFIER>="<string>"
将
<IDENTIFIER>
替换为 Kafka 服务端点的唯一描述性字符串。对与新目标服务端点相关的所有环境变量使用相同的<IDENTIFIER>
值。以下示例假设标识符为PRIMARY
。如果指定的
<IDENTIFIER>
与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将覆盖该端点的任何现有设置。使用mc admin config get notify_kafka
查看 MinIO 部署中当前配置的 Kafka 端点。将
<ENDPOINT>
替换为 Kafka 代理的逗号分隔列表。例如"kafka1.example.com:2021,kafka2.example.com:2021"
有关每个环境变量的完整文档,请参阅存储桶通知的 Kafka 服务。
MinIO 支持在运行的 minio server
进程上添加或更新 Kafka 端点,方法是使用 mc admin config set
命令和 notify_kafka
配置键。您必须重启 minio server
进程才能应用任何新的或更新的配置设置。
以下示例代码设置了与配置 Kafka 服务端点相关的所有设置。最小必需设置是 notify_kafka brokers
mc admin config set ALIAS/ notify_kafka:IDENTIFIER \
brokers="<ENDPOINT>" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
将
IDENTIFIER
替换为 Kafka 服务端点的唯一描述性字符串。本过程中的以下示例假设标识符为PRIMARY
。如果指定的
IDENTIFIER
与 MinIO 部署中现有的 Kafka 服务端点匹配,则新设置将覆盖该端点的任何现有设置。使用mc admin config get notify_kafka
检查 MinIO 部署中当前配置的 Kafka 端点。将
ENDPOINT
替换为 Kafka 代理的逗号分隔列表。例如"kafka1.example.com:2021,kafka2.example.com:2021"
有关每个设置的完整文档,请参阅 Kafka Bucket Notification Configuration Settings。
2) 重启 MinIO 部署
您必须重启 MinIO 部署才能应用配置更改。使用 mc admin service restart
命令重启部署。
mc admin service restart ALIAS
将 ALIAS
替换为要重启的部署的 别名。
minio server
进程在启动时为每个配置的 Kafka 目标打印一行,类似于以下内容
SQS ARNs: arn:minio:sqs::primary:kafka
在使用关联的 Kafka 部署作为目标配置存储桶通知时,您必须指定 ARN 资源。
识别存储桶通知的 ARN
您之前在创建端点时定义了 <IDENTIFIER>
用于为存储桶通知的目标 ARN 赋值。以下步骤返回部署上配置的 ARN。通过查找您指定的 <IDENTIFIER>
来识别之前创建的 ARN。
查看 JSON 输出
复制并运行以下命令,将
ALIAS
替换为部署的 别名。mc admin info --json ALIAS
在 JSON 输出中,查找键
info.sqsARN
。您需要的 ARN 是与您指定的
<IDENTIFIER>
匹配的该键的值。例如,
arn:minio:sqs::primary:kafka
。
使用 jq 解析 JSON 以获取值
3) 使用 Kafka 端点作为目标配置存储桶通知
使用 mc event add
命令添加一个新的存储桶通知事件,并将配置的 Kafka 服务作为目标。
mc event add ALIAS/BUCKET arn:minio:sqs::primary:kafka \
--event EVENTS
使用 mc event ls
查看给定通知目标的所有配置的存储桶事件。
mc event ls ALIAS/BUCKET arn:minio:sqs::primary:kafka
4) 验证配置的事件
对您为其配置新事件的存储桶执行操作,并在 Kafka 服务中检查通知数据。所需的操作取决于在配置存储桶通知时指定的 事件
。
例如,如果存储桶通知配置包含 s3:ObjectCreated:Put
事件,则可以使用 mc cp
命令在存储桶中创建新对象并触发通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET
更新 MinIO 部署中的 Kafka 端点
以下过程更新现有的 Kafka 服务端点,以支持 MinIO 部署中的 存储桶通知。
先决条件
Kafka 最小版本和支持的版本
MinIO 依靠https://github.com/Shopify/sarama项目实现 Kafka 连接,并共享该项目的 Kafka 支持。有关更多详细信息,请参阅sarama
兼容性和 API 稳定性部分。
MinIO mc
命令行工具
1) 列出部署中配置的 Kafka 端点
使用 mc admin config get
命令列出部署中当前配置的 Kafka 服务端点。
mc admin config get ALIAS/ notify_kafka
将 ALIAS
替换为 MinIO 部署的 别名。
命令输出类似于以下内容
notify_kafka:primary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka:secondary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka
键是 Kafka 通知设置的顶级配置键。 brokers
键指定给定 notify_kafka 键的 Kafka 服务端点。 notify_kafka:<IDENTIFIER>
后缀描述该 Kafka 服务端点的唯一标识符。
记下您要更新的 Kafka 服务端点的标识符,以便下一步使用。
2) 更新 Kafka 端点
使用 mc admin config set
命令设置 Kafka 服务端点的新的配置。
mc admin config set ALIAS/ notify_kafka:<IDENTIFIER> \
brokers="https://kafka1.example.net:9200, https://kafka2.example.net:9200" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
notify_kafka brokers
配置设置是 Kafka 服务端点的最低要求。所有其他配置设置都是可选的。有关 Kafka 配置设置的完整列表,请参阅 Kafka 通知设置。
3) 重新启动 MinIO 部署
您必须重启 MinIO 部署才能应用配置更改。使用 mc admin service restart
命令重启部署。
mc admin service restart ALIAS
将 ALIAS
替换为要重启的部署的 别名。
minio server
进程在启动时为每个配置的 Kafka 目标打印一行,类似于以下内容
SQS ARNs: arn:minio:sqs::primary:kafka
4) 验证更改
对使用更新的 Kafka 服务端点的事件配置的存储桶执行操作,并在 Kafka 服务中检查通知数据。所需的操作取决于配置存储桶通知时指定的 events
。
例如,如果存储桶通知配置包含 s3:ObjectCreated:Put
事件,则可以使用 mc cp
命令在存储桶中创建新对象并触发通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET