DynamoDB 表活动
DynamoDB 流使您能够跟踪和响应表项更改。使用此功能创建一个应用程序,该应用程序通过更新跨源的信息来响应更改。为大型多用户系统的数千个用户同步数据。使用它向用户发送更新通知。它的应用被证明是多样的和实质性的。 DynamoDB 流是用于实现此功能的主要工具。
流捕获包含表中项目修改的时间顺序序列。他们最多保留这些数据 24 小时。应用程序使用它们几乎实时地查看原始项目和修改项目。
在表上启用的流捕获所有修改。在任何 CRUD 操作中,DynamoDB 都会使用已修改项的主键属性创建一个流记录。您可以为附加信息(例如图像前后)配置流。
Streams 有两个保证-
每条记录在流中出现一次,并且
每一项修改都会产生与修改顺序相同的流记录。
实时处理所有流,以便您将它们用于应用程序中的相关功能。
管理流
在创建表时,您可以启用流。现有表允许禁用流或更改设置。 Streams 提供了异步操作的特性,这意味着对表性能没有影响。
利用 AWS 管理控制台进行简单的流管理。首先,导航到控制台,然后选择
Tables。在概览选项卡中,选择
管理流。在窗口内,选择添加到表数据修改流中的信息。输入所有设置后,选择
启用。
如果要禁用任何现有流,请选择
管理流,然后选择
禁用。
您还可以利用 API CreateTable 和 UpdateTable 来启用或更改流。使用参数 StreamSpecification 来配置流。 StreamEnabled 指定状态,表示启用为真,禁用为假。
StreamViewType 指定添加到流中的信息:KEYS_ONLY、NEW_IMAGE、OLD_IMAGE 和 NEW_AND_OLD_IMAGES。
流阅读
通过连接到端点并发出 API 请求来读取和处理流。每个流都由流记录组成,并且每个记录都作为拥有该流的单个修改而存在。流记录包括显示发布顺序的序列号。记录属于也称为分片的组。分片充当多个记录的容器,还保存访问和遍历记录所需的信息。 24 小时后,记录自动删除。
这些分片是根据需要生成和删除的,不会持续很长时间。它们还会自动划分为多个新分片,通常是为了响应写入活动高峰。在流禁用时,打开的分片关闭。分片之间的层次关系意味着应用程序必须优先考虑父分片以获得正确的处理顺序。您可以使用 Kinesis Adapter 自动执行此操作。
注意-导致无变化的操作不写入流记录。
访问和处理记录需要执行以下任务-
确定目标流的 ARN。
确定保存目标记录的流的分片。
访问分片以检索所需的记录。
注意-最多应该有 2 个进程同时读取一个分片。如果超过 2 个进程,那么它可以限制源。
可用的流 API 操作包括
列表流
DescribeStream
GetShardIterator
获取记录
您可以查看以下流阅读示例-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;
public class StreamsExample {
private static AmazonDynamoDBClient dynamoDBClient =
new AmazonDynamoDBClient(new ProfileCredentialsProvider());
private static AmazonDynamoDBStreamsClient streamsClient =
new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());
public static void main(String args[]) {
dynamoDBClient.setEndpoint("InsertDbEndpointHere");
streamsClient.setEndpoint("InsertStreamEndpointHere");
// table creation
String tableName = "MyTestingTable";
ArrayList<AttributeDefinition> attributeDefinitions =
new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition()
.withAttributeName("ID")
.withAttributeType("N"));
ArrayList<KeySchemaElement> keySchema = new
ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement()
.withAttributeName("ID")
.withKeyType(KeyType.HASH)); //Partition key
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(1L)
.withWriteCapacityUnits(1L))
.withStreamSpecification(streamSpecification);
System.out.println("Executing CreateTable for " + tableName);
dynamoDBClient.createTable(createTableRequest);
System.out.println("Creating " + tableName);
try {
Tables.awaitTableToBecomeActive(dynamoDBClient, tableName);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Get the table's stream settings
DescribeTableResult describeTableResult =
dynamoDBClient.describeTable(tableName);
String myStreamArn = describeTableResult.getTable().getLatestStreamArn();
StreamSpecification myStreamSpec =
describeTableResult.getTable().getStreamSpecification();
System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled());
System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());
// Add an item
int numChanges = 0;
System.out.println("Making some changes to table data");
Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
item.put("ID", new AttributeValue().withN("222"));
item.put("Alert", new AttributeValue().withS("item!"));
dynamoDBClient.putItem(tableName, item);
numChanges++;
// Update the item
Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("ID", new AttributeValue().withN("222"));
Map<String, AttributeValueUpdate> attributeUpdates =
new HashMap<String, AttributeValueUpdate>();
attributeUpdates.put("Alert", new AttributeValueUpdate()
.withAction(AttributeAction.PUT)
.withValue(new AttributeValue().withS("modified item")));
dynamoDBClient.updateItem(tableName, key, attributeUpdates);
numChanges++;
// Delete the item
dynamoDBClient.deleteItem(tableName, key);
numChanges++;
// Get stream shards
DescribeStreamResult describeStreamResult =
streamsClient.describeStream(new DescribeStreamRequest()
.withStreamArn(myStreamArn));
String streamArn =
describeStreamResult.getStreamDescription().getStreamArn();
List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
// Process shards
for (Shard shard : shards) {
String shardId = shard.getShardId();
System.out.println("Processing " + shardId + " in "+ streamArn);
// Get shard iterator
GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
.withStreamArn(myStreamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardIteratorResult =
streamsClient.getShardIterator(getShardIteratorRequest);
String nextItr = getShardIteratorResult.getShardIterator();
while (nextItr != null && numChanges > 0) {
// Read data records with iterator
GetRecordsResult getRecordsResult =
streamsClient.getRecords(new GetRecordsRequest().
withShardIterator(nextItr));
List<Record> records = getRecordsResult.getRecords();
System.out.println("Pulling records...");
for (Record record : records) {
System.out.println(record);
numChanges--;
}
nextItr = getRecordsResult.getNextShardIterator();
}
}
}
}