Lambda Amazon Kinesis
AWS Kinesis 服务用于捕获/存储来自网站点击、日志、社交媒体源的实时跟踪数据。我们可以触发 AWS Lambda 对这些日志执行额外的处理。
要求
开始使用 Kinesis 和 AWS Lambda 的基本要求如下所示-
创建具有所需权限的角色
在 Kinesis 中创建数据流
创建 AWS Lambda 函数。
向 AWS Lambda 添加代码
将数据添加到 Kinesis 数据流
示例
让我们研究一个示例,在该示例中我们将触发 AWS Lambda 来处理来自 Kinesis 的数据流,并使用收到的数据发送邮件。
用于解释该过程的简单框图如下所示-
创建具有所需权限的角色
转到 AWS 控制台并创建角色。
在 Kinesis 中创建数据流
转到 AWS 控制台并在 kinesis 中创建数据流。
如图所示有 4 个选项。我们将在本例中创建数据流。
点击
创建数据流。在下面给出的 Kinesis 流名称中输入名称。
输入数据流的分片数。
Shards 的详细信息如下所示-
输入名称并点击底部的
创建 Kinesis 流按钮。
请注意,流激活需要一定的时间。
创建 AWS Lambda 函数
转到 AWS 控制台并单击 Lambda。创建 AWS Lambda 函数,如图所示-
点击屏幕末尾的
创建函数按钮。将 Kinesis 作为触发器添加到 AWS Lambda。
向 Kinesis 触发器添加配置详细信息-
添加触发器,现在将代码添加到 AWS Lambda。
向 AWS Lambda 添加代码
为此,我们将使用 nodejs 作为运行时。使用 Kinesis 数据流触发 AWS Lambda 后,我们将发送邮件。
const aws = require("aws-sdk");
var ses = new aws.SES({
region: 'us-east-1'
});
exports.handler = function(event, context, callback) {
let payload = "";
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
var eParams = {
Destination: {
ToAddresses: ["xxxxxxx@gmail.com"]
},
Message: {
Body: {
Text: {
Data:payload
}
},
Subject: {
Data: "Kinesis data stream"
}
},
Source: "cxxxxxxxxx@gmail.com"
};
var email = ses.sendEmail(eParams, function(err, data) {
if (err) console.log(err);
else {
console.log("===EMAIL SENT===");
console.log("EMAIL CODE END");
console.log('EMAIL: ', email);
context.succeed(event);
callback(null, "email is send");
}
});
};
事件参数具有输入到 kinesis 数据流中的数据。一旦将数据输入到 kinesis 数据流中,上述 aws lambda 代码将被激活。
将数据添加到 Kinesis 数据流
这里我们将使用 AWS CLI 添加数据 kinesis 数据流,如下所示。为此,我们可以使用以下命令-
aws kinesis put-record--stream-name kinesisdemo --data "hello world"--
partition-key "789675"
然后,AWS Lambda 被激活并发送邮件。
