S3 GeoTIFF Event Metadata Extractor

This notebook processes GeoTIFF files from S3 that contain activation events in their filenames and extracts event metadata.
Author

NASA Disasters Team

Published

March 16, 2026

This notebook processes GeoTIFF files from S3 that contain activation events in their filenames.

Workflow:

  1. Read files from AWS S3
  2. Detect activation event pattern: YYYYMM_<EVENT>_<HAZARD>_*.tif
  3. Add event metadata to GeoTIFF attributes
  4. Move file to new S3 location
  5. Optionally delete from original location

Configuration

Set your S3 bucket names and paths here.

# S3 Configuration
SOURCE_BUCKET = "nasa-disasters"
SOURCE_PREFIX = "drcs_activations_new/Landsat/burnRatio"  # Path within source bucket

DESTINATION_BUCKET = "nasa-disasters-dev"
DESTINATION_PREFIX = "ProgramData/Landsat/burnRatio/"  # Path within destination bucket (must have / at the very end of it!!!)

# AWS Region
AWS_REGION = "us-west-2"

# Processing options
DELETE_AFTER_MOVE = False  # Set to True to delete original files after successful processing

Metadata options

HAZARD = None #Can be a python list or None
LOCATION = None #Can be a python list or None

Import Libraries

import boto3
import re
import io
from typing import Dict, Optional, Tuple, List
import rasterio
from rasterio.io import MemoryFile
from datetime import datetime
from osgeo import gdal
from rio_cogeo.cogeo import cog_validate, cog_translate
from rio_cogeo.profiles import cog_profiles

# Enable GDAL exceptions
gdal.UseExceptions()

print("Libraries imported successfully")
Libraries imported successfully

AWS Session and Role Assumption

Assume the current role that’s already active in the account.

# Create boto3 session using current credentials
session = boto3.Session(region_name=AWS_REGION)

# Get STS client to verify current identity
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()

print(f"Current AWS Identity:")
print(f"  Account: {identity['Account']}")
print(f"  ARN: {identity['Arn']}")
print(f"  User ID: {identity['UserId']}")

# Create S3 client
s3_client = session.client('s3')

print("\nS3 client created successfully")
Current AWS Identity:
  Account: 515966502221
  ARN: arn:aws:sts::515966502221:assumed-role/disasters-prod/botocore-session-1773760456
  User ID: AROAXQIQAAVGWA2LHXCPW:botocore-session-1773760456

S3 client created successfully

Helper Functions

def detect_activation_event(filename: str) -> Optional[Dict[str, str]]:
    """
    Detect activation event pattern in filename.
    
    Pattern: YYYYMM_<EVENT>_<HAZARD>_*.tif
    Example: 202401_EMSR123_FLOOD_extent.tif
    
    Args:
        filename: The filename to check
        
    Returns:
        Dictionary with event metadata if pattern matches, None otherwise
    """
    # Pattern: YYYYMM_<HAZARD>_<EVENT>_*.tif
    pattern = r'^(\d{6})_([A-Za-z0-9]+)_([A-Za-z0-9]+)_(.*)\.tif$'
    
    match = re.match(pattern, filename)
    
    if match:
        year_month, hazard, location, remainder = match.groups()
        
        return {
            'year_month': year_month,
            'hazard': hazard,
            'location': location,
            'remainder': remainder,
            'original_filename': filename
        }
    
    return None


def list_s3_files(bucket: str, prefix: str, pattern: str = '.tif') -> List[str]:
    """
    List all files in S3 bucket matching the pattern.
    
    Args:
        bucket: S3 bucket name
        prefix: Prefix (folder path) within bucket
        pattern: File extension or pattern to match
        
    Returns:
        List of S3 keys
    """
    files = []
    paginator = s3_client.get_paginator('list_objects_v2')
    
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                if pattern in key:
                    files.append(key)
    
    return files


def read_geotiff_from_s3(bucket: str, key: str) -> Tuple[bytes, dict]:
    """
    Read GeoTIFF file from S3 into memory.
    
    Args:
        bucket: S3 bucket name
        key: S3 object key
        
    Returns:
        Tuple of (file bytes, rasterio metadata)
    """
    # Download file to memory
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    file_bytes = obj['Body'].read()
    
    # Read metadata with rasterio
    with MemoryFile(file_bytes) as memfile:
        with memfile.open() as dataset:
            metadata = dataset.meta.copy()
            tags = dataset.tags()
            
    return file_bytes, metadata, tags


def validate_cog(file_bytes: bytes, filename: str = "temp.tif") -> Tuple[bool, dict]:
    """
    Validate if a GeoTIFF is a valid Cloud-Optimized GeoTIFF (COG).
    
    Args:
        file_bytes: GeoTIFF file bytes
        filename: Optional filename for display purposes
        
    Returns:
        Tuple of (is_valid: bool, validation_info: dict)
    """
    vsimem_path = f'/vsimem/{filename}'
    
    try:
        # Write to virtual memory
        gdal.FileFromMemBuffer(vsimem_path, file_bytes)
        
        # Validate COG structure
        is_valid, errors, warnings = cog_validate(vsimem_path)
        
        # Get detailed info
        ds = gdal.Open(vsimem_path)
        if ds:
            metadata = {
                'is_cog': is_valid,
                'errors': errors,
                'warnings': warnings,
                'width': ds.RasterXSize,
                'height': ds.RasterYSize,
                'bands': ds.RasterCount,
                'compression': ds.GetMetadataItem('COMPRESSION', 'IMAGE_STRUCTURE') or 'None',
                'tiled': ds.GetMetadataItem('INTERLEAVE', 'IMAGE_STRUCTURE') == 'PIXEL',
                'blocksize': (ds.GetRasterBand(1).GetBlockSize() if ds.RasterCount > 0 else (None, None)),
                'overviews': ds.GetRasterBand(1).GetOverviewCount() if ds.RasterCount > 0 else 0
            }
            ds = None
        else:
            metadata = {'is_cog': False, 'errors': ['Could not open file'], 'warnings': []}
        
        # Clean up
        gdal.Unlink(vsimem_path)
        
        return is_valid, metadata
        
    except Exception as e:
        gdal.Unlink(vsimem_path)
        return False, {'is_cog': False, 'errors': [str(e)], 'warnings': []}


def add_metadata_and_create_cog(file_bytes: bytes, event_metadata: Dict[str, str], HAZARD: List[str], LOCATION: List[str], ) -> bytes:
    """
    Rebuild GeoTIFF as a valid COG with metadata using rio-cogeo library.
    
    Preserves all original compression settings (compression type, level, predictor)
    from the source file and only adds activation event metadata.
    
    Args:
        file_bytes: Original GeoTIFF file bytes
        event_metadata: Dictionary with event information
        
    Returns:
        Modified GeoTIFF file bytes as a valid COG with original compression settings
    """
    input_path = '/vsimem/input.tif'
    output_path = '/vsimem/output_cog.tif'
    
    try:
        # Write input to virtual memory
        gdal.FileFromMemBuffer(input_path, file_bytes)
        
        # Read original file settings with rasterio
        print("    Reading original file settings...")
        with MemoryFile(file_bytes) as memfile:
            with memfile.open() as src:
                original_profile = src.profile.copy()
        
        # Extract compression settings from original file
        compress = original_profile.get('compress', 'ZSTD')
        predictor = original_profile.get('predictor', None)
        
        # Get compression level if available
        compress_level = None
        if 'zstd_level' in original_profile:
            compress_level = original_profile['zstd_level']
        elif 'zlevel' in original_profile:  # For DEFLATE compression
            compress_level = original_profile['zlevel']
        
        print(f"    Original settings: compress={compress}, predictor={predictor}, level={compress_level}")
        print("    Rebuilding as COG (preserving original compression settings)...")
        
        # Prepare metadata tags
        tags = {
            'ACTIVATION_EVENT': f"{event_metadata['year_month']}_{event_metadata['hazard']}_{event_metadata['location']}",
            'YEAR_MONTH': event_metadata['year_month'],
            'HAZARD': ','.join(HAZARD) if isinstance(HAZARD, list) else (HAZARD or event_metadata['hazard']),
            'LOCATION': ','.join(LOCATION) if isinstance(LOCATION, list) else (LOCATION or event_metadata['location']),
            'PROCESSING_DATE': datetime.utcnow().isoformat(),
            'PROCESSOR': 'S3 GeoTIFF COG Processor v1.0'
        }
        print(f'Hazard tag {HAZARD}')
        print(f'Location tag {LOCATION}')
        
        # Create COG profile using original compression settings
        cog_profile = {
            'driver': 'GTiff',
            'interleave': 'pixel',
            'tiled': True,
            'blockxsize': 512,
            'blockysize': 512,
            'compress': compress
        }
        
        # Add compression level if present
        if compress_level is not None:
            if compress.upper() == 'ZSTD':
                cog_profile['zstd_level'] = compress_level
            elif compress.upper() in ['DEFLATE', 'LZW']:
                cog_profile['zlevel'] = compress_level
        
        # Add predictor if present in original
        if predictor is not None:
            cog_profile['predictor'] = predictor
            print(f"    Using original predictor: {predictor}")
        
        # Use cog_translate to create proper COG
        cog_translate(
            source=input_path,
            dst_path=output_path,
            dst_kwargs=cog_profile,
            add_mask=False,
            overview_level=5,
            overview_resampling='average',
            web_optimized=True,
            additional_cog_metadata=tags,
            quiet=False
        )
        
        print("    COG creation complete with embedded metadata")
        
        # Read output from virtual memory
        vsi_file = gdal.VSIFOpenL(output_path, 'rb')
        if not vsi_file:
            raise Exception("Could not read output COG from virtual memory")
        
        gdal.VSIFSeekL(vsi_file, 0, 2)  # Seek to end
        file_size = gdal.VSIFTellL(vsi_file)
        gdal.VSIFSeekL(vsi_file, 0, 0)  # Seek to start
        
        output_bytes = gdal.VSIFReadL(1, file_size, vsi_file)
        gdal.VSIFCloseL(vsi_file)
        
        # Clean up virtual files
        gdal.Unlink(input_path)
        gdal.Unlink(output_path)
        
        print(f"    Output size: {len(output_bytes):,} bytes")
        
        return output_bytes
        
    except Exception as e:
        # Print full traceback for debugging
        import traceback
        print(f"\n    ERROR DETAILS:")
        traceback.print_exc()
        
        # Clean up on error
        try:
            gdal.Unlink(input_path)
        except:
            pass
        try:
            gdal.Unlink(output_path)
        except:
            pass
        
        raise Exception(f"COG creation failed: {str(e)}")


def upload_to_s3(file_bytes: bytes, bucket: str, key: str) -> bool:
    """
    Upload file bytes to S3.
    
    Args:
        file_bytes: File content as bytes
        bucket: Destination S3 bucket
        key: Destination S3 key
        
    Returns:
        True if successful
    """
    try:
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=file_bytes,
            ContentType='image/tiff'
        )
        return True
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return False


def delete_from_s3(bucket: str, key: str) -> bool:
    """
    Delete file from S3.
    
    Args:
        bucket: S3 bucket name
        key: S3 object key
        
    Returns:
        True if successful
    """
    try:
        s3_client.delete_object(Bucket=bucket, Key=key)
        return True
    except Exception as e:
        print(f"Error deleting from S3: {e}")
        return False


print("Helper functions defined successfully")
Helper functions defined successfully

Test Pattern Detection

Test the filename pattern detection with some examples.

# Test cases
test_filenames = [
    "202302_Earthquake_Turkiye_Turkey_UNW_2022-04-06_to_2023-02-08_day.tif"
]

print("Testing pattern detection:\n")
for filename in test_filenames:
    result = detect_activation_event(filename)
    print(f'Remainder = {result["remainder"]}')
    if result:
        print(f"βœ“ {filename}")
        print(f"  Year/Month: {result['year_month']}")
        print(f"  Location: {result['location']}")
        print(f"  Hazard: {result['hazard']}")
        print(f"  Remainder: {result['remainder']}")
    else:
        print(f"βœ— {filename} - No match")
    print()
Testing pattern detection:

Remainder = Turkey_UNW_2022-04-06_to_2023-02-08_day
βœ“ 202302_Earthquake_Turkiye_Turkey_UNW_2022-04-06_to_2023-02-08_day.tif
  Year/Month: 202302
  Location: Turkiye
  Hazard: Earthquake
  Remainder: Turkey_UNW_2022-04-06_to_2023-02-08_day

List Files in Source Bucket

# List all .tif files in source bucket
print(f"Listing files in s3://{SOURCE_BUCKET}/{SOURCE_PREFIX}\n")

tif_files = list_s3_files(SOURCE_BUCKET, SOURCE_PREFIX, '.tif')

print(f"Found {len(tif_files)} .tif files:")
for file_key in tif_files:
    filename = file_key.split('/')[-1]
    event_info = detect_activation_event(filename)
    
    if event_info:
        print(f"  βœ“ {file_key} - Location: {event_info['location']}, Hazard: {event_info['hazard']}")
    else:
        print(f"  βœ— {file_key} - No activation event pattern")
Listing files in s3://nasa-disasters/drcs_activations_new/Landsat/burnRatio

Found 1 .tif files:
  βœ“ drcs_activations_new/Landsat/burnRatio/202501_Fire_CA_LC09_NBR_182831_041036_2025-01-14_day.tif - Location: CA, Hazard: Fire

Process Files

Main processing loop: detect activation events, add metadata, move to destination.

def process_file(source_key: str) -> Dict[str, any]:
    """
    Process a single GeoTIFF file.
    
    Returns:
        Dictionary with processing results
    """
    filename = source_key.split('/')[-1]
    result = {
        'filename': filename,
        'source_key': source_key,
        'success': False,
        'message': ''
    }
    
    # Step 1: Detect activation event pattern
    event_metadata = detect_activation_event(filename)
    final_filename = f'{event_metadata["remainder"]}.tif'
    
    if not event_metadata:
        result['message'] = 'No activation event pattern detected - skipping'
        return result
    
    print(f"\nProcessing: {filename}")
    print(f"  Location: {event_metadata['location']}, Hazard: {event_metadata['hazard']}")
    
    try:
        # Step 2: Read file from S3
        print("  Reading from S3...")
        file_bytes, metadata, tags = read_geotiff_from_s3(SOURCE_BUCKET, source_key)
        print(f"    Size: {len(file_bytes):,} bytes")
        print(f"    Dimensions: {metadata['width']}x{metadata['height']}")
        print(f"    CRS: {metadata.get('crs', 'Unknown')}")
        
        # Step 2.5: Validate input COG status
        print("  Validating input COG structure...")
        input_is_cog, input_cog_info = validate_cog(file_bytes, f"input_{filename}")
        
        if input_is_cog:
            print(f"    βœ“ Input is valid COG")
            print(f"      Compression: {input_cog_info.get('compression', 'Unknown')}")
            print(f"      Tile size: {input_cog_info.get('blocksize', (0, 0))}")
            print(f"      Overviews: {input_cog_info.get('overviews', 0)} levels")
        else:
            print(f"    ⚠ Input is NOT a valid COG - will convert")
            if input_cog_info.get('errors'):
                for error in input_cog_info['errors'][:3]:  # Show first 3 errors
                    print(f"      - {error}")
        
        # Step 3: Add metadata and create/optimize COG
        print("  Creating COG with metadata and ZSTD compression...")
        modified_bytes = add_metadata_and_create_cog(file_bytes, event_metadata, HAZARD, LOCATION)
        print(f"    Output size: {len(modified_bytes):,} bytes")
        
        size_change = ((len(modified_bytes) - len(file_bytes)) / len(file_bytes)) * 100
        print(f"    Size change: {size_change:+.1f}%")
        
        # Step 3.5: Validate output COG status
        print("  Validating output COG structure...")
        output_is_cog, output_cog_info = validate_cog(modified_bytes, f"output_{filename}")
        
        if output_is_cog:
            print(f"    βœ“ Output is valid COG")
            print(f"      Compression: {output_cog_info.get('compression', 'Unknown')}")
            print(f"      Tile size: {output_cog_info.get('blocksize', (0, 0))}")
            print(f"      Overviews: {output_cog_info.get('overviews', 0)} levels")
            result['cog_valid'] = True
        else:
            print(f"    βœ— WARNING: Output is NOT a valid COG!")
            if output_cog_info.get('errors'):
                for error in output_cog_info['errors'][:3]:
                    print(f"      - {error}")
            result['cog_valid'] = False
        
        # Step 4: Upload to destination
        dest_key = DESTINATION_PREFIX + final_filename
        print(f"  Uploading to s3://{DESTINATION_BUCKET}/{dest_key}...")
        
        if upload_to_s3(modified_bytes, DESTINATION_BUCKET, dest_key):
            print("    βœ“ Upload successful")
            result['destination_key'] = dest_key
            result['success'] = True
            result['message'] = 'Processed successfully'
            result['input_was_cog'] = input_is_cog
            result['output_is_cog'] = output_is_cog
            
            # Step 5: Optionally delete from source
            if DELETE_AFTER_MOVE:
                print(f"  Deleting from source...")
                if delete_from_s3(SOURCE_BUCKET, source_key):
                    print("    βœ“ Deleted from source")
                    result['deleted_from_source'] = True
                else:
                    print("    βœ— Failed to delete from source")
                    result['deleted_from_source'] = False
        else:
            result['message'] = 'Upload to destination failed'
            
    except Exception as e:
        result['message'] = f'Error: {str(e)}'
        print(f"  βœ— Error: {e}")
    
    return result


# Process all files
print(f"\n{'='*80}")
print("Starting batch processing")
print(f"{'='*80}")

results = []
for file_key in tif_files:
    result = process_file(file_key)
    results.append(result)

print(f"\n{'='*80}")
print("Processing Complete")
print(f"{'='*80}")

================================================================================
Starting batch processing
================================================================================

Processing: 202501_Fire_CA_LC09_NBR_182831_041036_2025-01-14_day.tif
  Location: CA, Hazard: Fire
  Reading from S3...
    Size: 158,238,568 bytes
    Dimensions: 8554x7185
    CRS: EPSG:4326
  Validating input COG structure...
    βœ“ Input is valid COG
      Compression: ZSTD
      Tile size: [512, 512]
      Overviews: 4 levels
  Creating COG with metadata and ZSTD compression...
/tmp/ipykernel_205/1171966028.py:178: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  'PROCESSING_DATE': datetime.utcnow().isoformat(),
Reading input: /vsimem/input.tif
    Reading original file settings...
    Original settings: compress=zstd, predictor=None, level=None
    Rebuilding as COG (preserving original compression settings)...
Hazard tag None
Location tag None
Adding overviews...
Updating dataset tags...
Writing output to: /vsimem/output_cog.tif
    COG creation complete with embedded metadata
    Output size: 150,769,442 bytes
    Output size: 150,769,442 bytes
    Size change: -4.7%
  Validating output COG structure...
    βœ“ Output is valid COG
      Compression: ZSTD
      Tile size: [512, 512]
      Overviews: 5 levels
  Uploading to s3://nasa-disasters-dev/ProgramData/Landsat/burnRatio/LC09_NBR_182831_041036_2025-01-14_day.tif...
    βœ“ Upload successful

================================================================================
Processing Complete
================================================================================

Summary Report

# Summary statistics
total_files = len(results)
successful = sum(1 for r in results if r['success'])
skipped = sum(1 for r in results if 'skipping' in r['message'])
failed = total_files - successful - skipped

# COG statistics
input_cogs = sum(1 for r in results if r.get('input_was_cog', False))
output_cogs = sum(1 for r in results if r.get('output_is_cog', False))
converted_to_cog = sum(1 for r in results if r.get('success') and not r.get('input_was_cog', False) and r.get('output_is_cog', False))

print("\nProcessing Summary:")
print(f"  Total files: {total_files}")
print(f"  βœ“ Successful: {successful}")
print(f"  β—‹ Skipped (no pattern): {skipped}")
print(f"  βœ— Failed: {failed}")

print("\nCOG Status:")
print(f"  Input COGs: {input_cogs}/{total_files}")
print(f"  Output COGs: {output_cogs}/{total_files}")
print(f"  Converted to COG: {converted_to_cog}")

if DELETE_AFTER_MOVE:
    deleted = sum(1 for r in results if r.get('deleted_from_source', False))
    print(f"\n  πŸ—‘ Deleted from source: {deleted}")

print("\nDetailed Results:")
for result in results:
    status = 'βœ“' if result['success'] else ('β—‹' if 'skipping' in result['message'] else 'βœ—')
    cog_status = ''
    if result.get('success'):
        if result.get('output_is_cog'):
            cog_status = ' [COG βœ“]'
        else:
            cog_status = ' [COG βœ—]'
    print(f"  {status} {result['filename']}: {result['message']}{cog_status}")

Verify Metadata in Processed Files

Read a processed file and verify the metadata was added correctly.

# Get first successful result
successful_results = [r for r in results if r['success']]

if successful_results:
    sample_result = successful_results[0]
    dest_key = sample_result['destination_key']
    
    print(f"Verifying metadata in: s3://{DESTINATION_BUCKET}/{dest_key}\n")
    
    # Read processed file
    file_bytes, metadata, tags = read_geotiff_from_s3(DESTINATION_BUCKET, dest_key)
    
    print("=" * 60)
    print("GeoTIFF Metadata Tags:")
    print("=" * 60)
    for key, value in tags.items():
        if 'ACTIVATION' in key or 'PROCESSING' in key or 'ORIGINAL' in key or 'PROCESSOR' in key or 'COG' in key:
            print(f"  {key}: {value}")
    
    print("\n" + "=" * 60)
    print("COG Validation Results:")
    print("=" * 60)
    
    # Validate COG structure
    is_cog, cog_info = validate_cog(file_bytes, dest_key.split('/')[-1])
    
    if is_cog:
        print("βœ“ File is a VALID Cloud-Optimized GeoTIFF (COG)\n")
        
        print("Structure Details:")
        print(f"  Dimensions: {cog_info.get('width')}x{cog_info.get('height')}")
        print(f"  Bands: {cog_info.get('bands')}")
        print(f"  Compression: {cog_info.get('compression')}")
        print(f"  Tiled: {cog_info.get('tiled')}")
        print(f"  Block/Tile Size: {cog_info.get('blocksize')}")
        print(f"  Overview Levels: {cog_info.get('overviews')}")
        
        if cog_info.get('warnings'):
            print("\nWarnings:")
            for warning in cog_info['warnings']:
                print(f"  ⚠ {warning}")
    else:
        print("βœ— File is NOT a valid COG\n")
        
        if cog_info.get('errors'):
            print("Errors:")
            for error in cog_info['errors']:
                print(f"  βœ— {error}")
        
        if cog_info.get('warnings'):
            print("\nWarnings:")
            for warning in cog_info['warnings']:
                print(f"  ⚠ {warning}")
    
    print("\n" + "=" * 60)
    
else:
    print("No successfully processed files to verify")
Back to top