AWS Kinesis Stream Processor
Expert in building real-time data streaming applications with AWS Kinesis.
Core Concepts
Kinesis Components
| Component |
Purpose |
Use Case |
| Data Streams |
Real-time data ingestion |
Custom processing, low latency |
| Data Firehose |
Delivery to destinations |
S3, Redshift, Elasticsearch |
| Data Analytics |
SQL-based processing |
Real-time analytics |
| Video Streams |
Video streaming |
IoT, media processing |
Key Limits
Kinesis Data Streams:
per_shard:
write: "1,000 records/sec OR 1 MB/sec"
read: "5 transactions/sec, up to 10,000 records"
read_throughput: "2 MB/sec"
per_stream:
max_shards: "500 (soft limit)"
retention: "24 hours (default) to 365 days"
per_record:
max_size: "1 MB"
partition_key: "256 bytes max"
Producer Implementation
Python Producer with Batching
import boto3
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any
class KinesisProducer:
"""Optimized Kinesis producer with batching and error handling."""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.buffer: List[Dict] = []
self.buffer_size = 500 # Max records per batch
self.buffer_time = 0.1 # Flush every 100ms
self.last_flush = time.time()
def put_record(self, data: Dict[str, Any], partition_key: str) -> None:
"""Add record to buffer, flush if needed."""
self.buffer.append({
'Data': json.dumps(data).encode('utf-8'),
'PartitionKey': partition_key
})
if len(self.buffer) >= self.buffer_size:
self.flush()
elif time.time() - self.last_flush > self.buffer_time:
self.flush()
def flush(self) -> None:
"""Send buffered records to Kinesis."""
if not self.buffer:
return
records = self.buffer[:500] # PutRecords limit
self.buffer = self.buffer[500:]
try:
response = self.client.put_records(
StreamName=self.stream_name,
Records=records
)
# Handle partial failures
failed_count = response.get('FailedRecordCount', 0)
if failed_count > 0:
self._handle_failures(response, records)
except Exception as e:
print(f"Kinesis put_records error: {e}")
# Implement retry logic or dead letter queue
raise
self.last_flush = time.time()
def _handle_failures(self, response: Dict, records: List[Dict]) -> None:
"""Retry failed records with exponential backoff."""
failed_records = []
for i, record_response in enumerate(response['Records']):
if 'ErrorCode' in record_response:
failed_records.append(records[i])
print(f"Failed record: {record_response['ErrorCode']} - {record_response.get('ErrorMessage')}")
# Retry failed records
if failed_records:
time.sleep(0.1) # Brief backoff
self.client.put_records(
StreamName=self.stream_name,
Records=failed_records
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
Node.js Producer
const { KinesisClient, PutRecordsCommand } = require('@aws-sdk/client-kinesis');
class KinesisProducer {
constructor(streamName, region = 'us-east-1') {
this.streamName = streamName;
this.client = new KinesisClient({ region });
this.buffer = [];
this.bufferSize = 500;
this.flushInterval = 100; // ms
// Auto-flush timer
setInterval(() => this.flush(), this.flushInterval);
}
async putRecord(data, partitionKey) {
this.buffer.push({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: partitionKey
});
if (this.buffer.length >= this.bufferSize) {
await this.flush();
}
}
async flush() {
if (this.buffer.length === 0) return;
const records = this.buffer.splice(0, 500);
try {
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: records
});
const response = await this.client.send(command);
if (response.FailedRecordCount > 0) {
await this.handleFailures(response, records);
}
} catch (error) {
console.error('Kinesis error:', error);
throw error;
}
}
async handleFailures(response, records) {
const failedRecords = response.Records
.map((r, i) => r.ErrorCode ? records[i] : null)
.filter(Boolean);
if (failedRecords.length > 0) {
// Exponential backoff retry
await new Promise(resolve => setTimeout(resolve, 100));
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: failedRecords
});
await this.client.send(command);
}
}
}
Consumer Patterns
Lambda Consumer
import json
import base64
from typing import Dict, Any, List
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Process Kinesis records from Lambda trigger."""
processed_records = []
failed_records = []
for record in event['Records']:
try:
# Decode Kinesis record
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# Process record
result = process_record(data)
processed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'result': result
})
except Exception as e:
print(f"Error processing record: {e}")
failed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'error': str(e)
})
# Report results
print(f"Processed: {len(processed_records)}, Failed: {len(failed_records)}")
# Return batch item failures for partial batch response
return {
'batchItemFailures': [
{'itemIdentifier': r['sequenceNumber']}
for r in failed_records
]
}
def process_record(data: Dict) -> Dict:
"""Business logic for processing each record."""
# Transform data
transformed = {
'id': data.get('id'),
'timestamp': data.get('timestamp'),
'processed_at': datetime.utcnow().isoformat(),
'value': data.get('value', 0) * 2 # Example transformation
}
# Write to downstream (DynamoDB, S3, etc.)
write_to_downstream(transformed)
return transformed
KCL Consumer (Java-style with Python)
import boto3
import time
from datetime import datetime
class KinesisConsumer:
"""KCL-style consumer with checkpointing."""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.checkpoint_interval = 60 # seconds
self.last_checkpoint = time.time()
def process_shard(self, shard_id: str) -> None:
"""Process records from a single shard."""
# Get shard iterator
iterator_response = self.client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST' # or 'TRIM_HORIZON', 'AT_SEQUENCE_NUMBER'
)
shard_iterator = iterator_response['ShardIterator']
while True:
try:
response = self.client.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
self.process_record(record)
# Checkpoint periodically
if time.time() - self.last_checkpoint > self.checkpoint_interval:
self.checkpoint(shard_id, response['Records'][-1]['SequenceNumber'])
# Get next iterator
shard_iterator = response.get('NextShardIterator')
if not shard_iterator:
break
# Respect rate limits
if len(response['Records']) == 0:
time.sleep(0.5)
except Exception as e:
print(f"Error processing shard {shard_id}: {e}")
time.sleep(1)
def process_record(self, record: Dict) -> None:
"""Process individual record."""
data = json.loads(record['Data'])
# Business logic here
print(f"Processing: {data}")
def checkpoint(self, shard_id: str, sequence_number: str) -> None:
"""Save checkpoint for recovery."""
# Store in DynamoDB or other persistent store
print(f"Checkpoint: shard={shard_id}, seq={sequence_number}")
self.last_checkpoint = time.time()
Enhanced Fan-Out Consumer
import boto3
import json
def setup_enhanced_fanout(stream_arn: str, consumer_name: str) -> str:
"""Register enhanced fan-out consumer for dedicated throughput."""
client = boto3.client('kinesis')
# Register consumer
response = client.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
consumer_arn = response['Consumer']['ConsumerARN']
# Wait for consumer to become active
waiter = client.get_waiter('stream_consumer_active')
waiter.wait(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
return consumer_arn
def subscribe_to_shard(consumer_arn: str, shard_id: str):
"""Subscribe to shard with enhanced fan-out."""
client = boto3.client('kinesis')
response = client.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition={
'Type': 'LATEST'
}
)
# Process events from subscription
for event in response['EventStream']:
if 'SubscribeToShardEvent' in event:
records = event['SubscribeToShardEvent']['Records']
for record in records:
process_record(record)
Infrastructure as Code
CloudFormation
AWSTemplateFormatVersion: '2010-09-09'
Description: Kinesis Data Stream with Lambda Consumer
Parameters:
StreamName:
Type: String
Default: my-data-stream
ShardCount:
Type: Number
Default: 2
RetentionPeriod:
Type: Number
Default: 24
Resources:
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Ref StreamName
ShardCount: !Ref ShardCount
RetentionPeriodHours: !Ref RetentionPeriod
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Environment
Value: production
ProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: kinesis-processor
Runtime: python3.11
Handler: index.lambda_handler
MemorySize: 256
Timeout: 60
Role: !GetAtt ProcessorRole.Arn
Code:
ZipFile: |
import json
import base64
def lambda_handler(event, context):
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
print(f"Processed: {payload}")
return {'statusCode': 200}
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt KinesisStream.Arn
FunctionName: !Ref ProcessorFunction
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
MaximumRetryAttempts: 3
BisectBatchOnFunctionError: true
ParallelizationFactor: 1
ProcessorRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# CloudWatch Alarms
IteratorAgeAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: kinesis-iterator-age
MetricName: GetRecords.IteratorAgeMilliseconds
Namespace: AWS/Kinesis
Dimensions:
- Name: StreamName
Value: !Ref StreamName
Statistic: Maximum
Period: 60
EvaluationPeriods: 5
Threshold: 60000 # 1 minute
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref AlertTopic
AlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: kinesis-alerts
Outputs:
StreamArn:
Value: !GetAtt KinesisStream.Arn
StreamName:
Value: !Ref KinesisStream
Terraform
resource "aws_kinesis_stream" "main" {
name = var.stream_name
shard_count = var.shard_count
retention_period = var.retention_hours
encryption_type = "KMS"
kms_key_id = "alias/aws/kinesis"
shard_level_metrics = [
"IncomingBytes",
"IncomingRecords",
"OutgoingBytes",
"OutgoingRecords",
"WriteProvisionedThroughputExceeded",
"ReadProvisionedThroughputExceeded",
"IteratorAgeMilliseconds"
]
tags = {
Environment = var.environment
}
}
resource "aws_lambda_event_source_mapping" "kinesis" {
event_source_arn = aws_kinesis_stream.main.arn
function_name = aws_lambda_function.processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
maximum_retry_attempts = 3
bisect_batch_on_function_error = true
parallelization_factor = 1
}
Monitoring and Alerting
Key CloudWatch Metrics
| Metric |
Description |
Alert Threshold |
IncomingRecords |
Records put per second |
Monitor for traffic patterns |
IncomingBytes |
Bytes put per second |
80% of shard limit |
WriteProvisionedThroughputExceeded |
Throttled writes |
>0 |
ReadProvisionedThroughputExceeded |
Throttled reads |
>0 |
GetRecords.IteratorAgeMilliseconds |
Consumer lag |
>60000ms |
GetRecords.Success |
Successful GetRecords |
Monitor for drops |
Monitoring Dashboard
import boto3
def get_stream_metrics(stream_name: str, period_minutes: int = 5):
"""Get key Kinesis metrics for monitoring."""
cloudwatch = boto3.client('cloudwatch')
metrics = [
'IncomingRecords',
'IncomingBytes',
'WriteProvisionedThroughputExceeded',
'GetRecords.IteratorAgeMilliseconds'
]
results = {}
for metric in metrics:
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Kinesis',
MetricName=metric,
Dimensions=[{'Name': 'StreamName', 'Value': stream_name}],
StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Sum', 'Average', 'Maximum']
)
results[metric] = response['Datapoints']
return results
Лучшие практики
- Partition key design — распределяйте данные равномерно по шардам
- Batch writes — используйте PutRecords вместо PutRecord
- Handle throttling — реализуйте exponential backoff
- Monitor iterator age — отслеживайте отставание consumers
- Use enhanced fan-out — для множества consumers с низкой задержкой
- Enable encryption — KMS encryption для sensitive данных