# S3 Configuration
SOURCE_BUCKET = "nasa-disasters"
SOURCE_PREFIX = "drcs_activations_new/Landsat/burnRatio" # Path within source bucket
DESTINATION_BUCKET = "nasa-disasters-dev"
DESTINATION_PREFIX = "ProgramData/Landsat/burnRatio/" # Path within destination bucket (must have / at the very end of it!!!)
# AWS Region
AWS_REGION = "us-west-2"
# Processing options
DELETE_AFTER_MOVE = False # Set to True to delete original files after successful processingS3 GeoTIFF Event Metadata Extractor
This notebook processes GeoTIFF files from S3 that contain activation events in their filenames and extracts event metadata.
This notebook processes GeoTIFF files from S3 that contain activation events in their filenames.
Workflow:
- Read files from AWS S3
- Detect activation event pattern:
YYYYMM_<EVENT>_<HAZARD>_*.tif - Add event metadata to GeoTIFF attributes
- Move file to new S3 location
- Optionally delete from original location
Configuration
Set your S3 bucket names and paths here.
Metadata options
HAZARD = None #Can be a python list or None
LOCATION = None #Can be a python list or NoneImport Libraries
import boto3
import re
import io
from typing import Dict, Optional, Tuple, List
import rasterio
from rasterio.io import MemoryFile
from datetime import datetime
from osgeo import gdal
from rio_cogeo.cogeo import cog_validate, cog_translate
from rio_cogeo.profiles import cog_profiles
# Enable GDAL exceptions
gdal.UseExceptions()
print("Libraries imported successfully")Libraries imported successfully
AWS Session and Role Assumption
Assume the current role thatβs already active in the account.
# Create boto3 session using current credentials
session = boto3.Session(region_name=AWS_REGION)
# Get STS client to verify current identity
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
print(f"Current AWS Identity:")
print(f" Account: {identity['Account']}")
print(f" ARN: {identity['Arn']}")
print(f" User ID: {identity['UserId']}")
# Create S3 client
s3_client = session.client('s3')
print("\nS3 client created successfully")Current AWS Identity:
Account: 515966502221
ARN: arn:aws:sts::515966502221:assumed-role/disasters-prod/botocore-session-1773760456
User ID: AROAXQIQAAVGWA2LHXCPW:botocore-session-1773760456
S3 client created successfully
Helper Functions
def detect_activation_event(filename: str) -> Optional[Dict[str, str]]:
"""
Detect activation event pattern in filename.
Pattern: YYYYMM_<EVENT>_<HAZARD>_*.tif
Example: 202401_EMSR123_FLOOD_extent.tif
Args:
filename: The filename to check
Returns:
Dictionary with event metadata if pattern matches, None otherwise
"""
# Pattern: YYYYMM_<HAZARD>_<EVENT>_*.tif
pattern = r'^(\d{6})_([A-Za-z0-9]+)_([A-Za-z0-9]+)_(.*)\.tif$'
match = re.match(pattern, filename)
if match:
year_month, hazard, location, remainder = match.groups()
return {
'year_month': year_month,
'hazard': hazard,
'location': location,
'remainder': remainder,
'original_filename': filename
}
return None
def list_s3_files(bucket: str, prefix: str, pattern: str = '.tif') -> List[str]:
"""
List all files in S3 bucket matching the pattern.
Args:
bucket: S3 bucket name
prefix: Prefix (folder path) within bucket
pattern: File extension or pattern to match
Returns:
List of S3 keys
"""
files = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
if 'Contents' in page:
for obj in page['Contents']:
key = obj['Key']
if pattern in key:
files.append(key)
return files
def read_geotiff_from_s3(bucket: str, key: str) -> Tuple[bytes, dict]:
"""
Read GeoTIFF file from S3 into memory.
Args:
bucket: S3 bucket name
key: S3 object key
Returns:
Tuple of (file bytes, rasterio metadata)
"""
# Download file to memory
obj = s3_client.get_object(Bucket=bucket, Key=key)
file_bytes = obj['Body'].read()
# Read metadata with rasterio
with MemoryFile(file_bytes) as memfile:
with memfile.open() as dataset:
metadata = dataset.meta.copy()
tags = dataset.tags()
return file_bytes, metadata, tags
def validate_cog(file_bytes: bytes, filename: str = "temp.tif") -> Tuple[bool, dict]:
"""
Validate if a GeoTIFF is a valid Cloud-Optimized GeoTIFF (COG).
Args:
file_bytes: GeoTIFF file bytes
filename: Optional filename for display purposes
Returns:
Tuple of (is_valid: bool, validation_info: dict)
"""
vsimem_path = f'/vsimem/{filename}'
try:
# Write to virtual memory
gdal.FileFromMemBuffer(vsimem_path, file_bytes)
# Validate COG structure
is_valid, errors, warnings = cog_validate(vsimem_path)
# Get detailed info
ds = gdal.Open(vsimem_path)
if ds:
metadata = {
'is_cog': is_valid,
'errors': errors,
'warnings': warnings,
'width': ds.RasterXSize,
'height': ds.RasterYSize,
'bands': ds.RasterCount,
'compression': ds.GetMetadataItem('COMPRESSION', 'IMAGE_STRUCTURE') or 'None',
'tiled': ds.GetMetadataItem('INTERLEAVE', 'IMAGE_STRUCTURE') == 'PIXEL',
'blocksize': (ds.GetRasterBand(1).GetBlockSize() if ds.RasterCount > 0 else (None, None)),
'overviews': ds.GetRasterBand(1).GetOverviewCount() if ds.RasterCount > 0 else 0
}
ds = None
else:
metadata = {'is_cog': False, 'errors': ['Could not open file'], 'warnings': []}
# Clean up
gdal.Unlink(vsimem_path)
return is_valid, metadata
except Exception as e:
gdal.Unlink(vsimem_path)
return False, {'is_cog': False, 'errors': [str(e)], 'warnings': []}
def add_metadata_and_create_cog(file_bytes: bytes, event_metadata: Dict[str, str], HAZARD: List[str], LOCATION: List[str], ) -> bytes:
"""
Rebuild GeoTIFF as a valid COG with metadata using rio-cogeo library.
Preserves all original compression settings (compression type, level, predictor)
from the source file and only adds activation event metadata.
Args:
file_bytes: Original GeoTIFF file bytes
event_metadata: Dictionary with event information
Returns:
Modified GeoTIFF file bytes as a valid COG with original compression settings
"""
input_path = '/vsimem/input.tif'
output_path = '/vsimem/output_cog.tif'
try:
# Write input to virtual memory
gdal.FileFromMemBuffer(input_path, file_bytes)
# Read original file settings with rasterio
print(" Reading original file settings...")
with MemoryFile(file_bytes) as memfile:
with memfile.open() as src:
original_profile = src.profile.copy()
# Extract compression settings from original file
compress = original_profile.get('compress', 'ZSTD')
predictor = original_profile.get('predictor', None)
# Get compression level if available
compress_level = None
if 'zstd_level' in original_profile:
compress_level = original_profile['zstd_level']
elif 'zlevel' in original_profile: # For DEFLATE compression
compress_level = original_profile['zlevel']
print(f" Original settings: compress={compress}, predictor={predictor}, level={compress_level}")
print(" Rebuilding as COG (preserving original compression settings)...")
# Prepare metadata tags
tags = {
'ACTIVATION_EVENT': f"{event_metadata['year_month']}_{event_metadata['hazard']}_{event_metadata['location']}",
'YEAR_MONTH': event_metadata['year_month'],
'HAZARD': ','.join(HAZARD) if isinstance(HAZARD, list) else (HAZARD or event_metadata['hazard']),
'LOCATION': ','.join(LOCATION) if isinstance(LOCATION, list) else (LOCATION or event_metadata['location']),
'PROCESSING_DATE': datetime.utcnow().isoformat(),
'PROCESSOR': 'S3 GeoTIFF COG Processor v1.0'
}
print(f'Hazard tag {HAZARD}')
print(f'Location tag {LOCATION}')
# Create COG profile using original compression settings
cog_profile = {
'driver': 'GTiff',
'interleave': 'pixel',
'tiled': True,
'blockxsize': 512,
'blockysize': 512,
'compress': compress
}
# Add compression level if present
if compress_level is not None:
if compress.upper() == 'ZSTD':
cog_profile['zstd_level'] = compress_level
elif compress.upper() in ['DEFLATE', 'LZW']:
cog_profile['zlevel'] = compress_level
# Add predictor if present in original
if predictor is not None:
cog_profile['predictor'] = predictor
print(f" Using original predictor: {predictor}")
# Use cog_translate to create proper COG
cog_translate(
source=input_path,
dst_path=output_path,
dst_kwargs=cog_profile,
add_mask=False,
overview_level=5,
overview_resampling='average',
web_optimized=True,
additional_cog_metadata=tags,
quiet=False
)
print(" COG creation complete with embedded metadata")
# Read output from virtual memory
vsi_file = gdal.VSIFOpenL(output_path, 'rb')
if not vsi_file:
raise Exception("Could not read output COG from virtual memory")
gdal.VSIFSeekL(vsi_file, 0, 2) # Seek to end
file_size = gdal.VSIFTellL(vsi_file)
gdal.VSIFSeekL(vsi_file, 0, 0) # Seek to start
output_bytes = gdal.VSIFReadL(1, file_size, vsi_file)
gdal.VSIFCloseL(vsi_file)
# Clean up virtual files
gdal.Unlink(input_path)
gdal.Unlink(output_path)
print(f" Output size: {len(output_bytes):,} bytes")
return output_bytes
except Exception as e:
# Print full traceback for debugging
import traceback
print(f"\n ERROR DETAILS:")
traceback.print_exc()
# Clean up on error
try:
gdal.Unlink(input_path)
except:
pass
try:
gdal.Unlink(output_path)
except:
pass
raise Exception(f"COG creation failed: {str(e)}")
def upload_to_s3(file_bytes: bytes, bucket: str, key: str) -> bool:
"""
Upload file bytes to S3.
Args:
file_bytes: File content as bytes
bucket: Destination S3 bucket
key: Destination S3 key
Returns:
True if successful
"""
try:
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=file_bytes,
ContentType='image/tiff'
)
return True
except Exception as e:
print(f"Error uploading to S3: {e}")
return False
def delete_from_s3(bucket: str, key: str) -> bool:
"""
Delete file from S3.
Args:
bucket: S3 bucket name
key: S3 object key
Returns:
True if successful
"""
try:
s3_client.delete_object(Bucket=bucket, Key=key)
return True
except Exception as e:
print(f"Error deleting from S3: {e}")
return False
print("Helper functions defined successfully")Helper functions defined successfully
Test Pattern Detection
Test the filename pattern detection with some examples.
# Test cases
test_filenames = [
"202302_Earthquake_Turkiye_Turkey_UNW_2022-04-06_to_2023-02-08_day.tif"
]
print("Testing pattern detection:\n")
for filename in test_filenames:
result = detect_activation_event(filename)
print(f'Remainder = {result["remainder"]}')
if result:
print(f"β {filename}")
print(f" Year/Month: {result['year_month']}")
print(f" Location: {result['location']}")
print(f" Hazard: {result['hazard']}")
print(f" Remainder: {result['remainder']}")
else:
print(f"β {filename} - No match")
print()Testing pattern detection:
Remainder = Turkey_UNW_2022-04-06_to_2023-02-08_day
β 202302_Earthquake_Turkiye_Turkey_UNW_2022-04-06_to_2023-02-08_day.tif
Year/Month: 202302
Location: Turkiye
Hazard: Earthquake
Remainder: Turkey_UNW_2022-04-06_to_2023-02-08_day
List Files in Source Bucket
# List all .tif files in source bucket
print(f"Listing files in s3://{SOURCE_BUCKET}/{SOURCE_PREFIX}\n")
tif_files = list_s3_files(SOURCE_BUCKET, SOURCE_PREFIX, '.tif')
print(f"Found {len(tif_files)} .tif files:")
for file_key in tif_files:
filename = file_key.split('/')[-1]
event_info = detect_activation_event(filename)
if event_info:
print(f" β {file_key} - Location: {event_info['location']}, Hazard: {event_info['hazard']}")
else:
print(f" β {file_key} - No activation event pattern")Listing files in s3://nasa-disasters/drcs_activations_new/Landsat/burnRatio
Found 1 .tif files:
β drcs_activations_new/Landsat/burnRatio/202501_Fire_CA_LC09_NBR_182831_041036_2025-01-14_day.tif - Location: CA, Hazard: Fire
Process Files
Main processing loop: detect activation events, add metadata, move to destination.
def process_file(source_key: str) -> Dict[str, any]:
"""
Process a single GeoTIFF file.
Returns:
Dictionary with processing results
"""
filename = source_key.split('/')[-1]
result = {
'filename': filename,
'source_key': source_key,
'success': False,
'message': ''
}
# Step 1: Detect activation event pattern
event_metadata = detect_activation_event(filename)
final_filename = f'{event_metadata["remainder"]}.tif'
if not event_metadata:
result['message'] = 'No activation event pattern detected - skipping'
return result
print(f"\nProcessing: {filename}")
print(f" Location: {event_metadata['location']}, Hazard: {event_metadata['hazard']}")
try:
# Step 2: Read file from S3
print(" Reading from S3...")
file_bytes, metadata, tags = read_geotiff_from_s3(SOURCE_BUCKET, source_key)
print(f" Size: {len(file_bytes):,} bytes")
print(f" Dimensions: {metadata['width']}x{metadata['height']}")
print(f" CRS: {metadata.get('crs', 'Unknown')}")
# Step 2.5: Validate input COG status
print(" Validating input COG structure...")
input_is_cog, input_cog_info = validate_cog(file_bytes, f"input_{filename}")
if input_is_cog:
print(f" β Input is valid COG")
print(f" Compression: {input_cog_info.get('compression', 'Unknown')}")
print(f" Tile size: {input_cog_info.get('blocksize', (0, 0))}")
print(f" Overviews: {input_cog_info.get('overviews', 0)} levels")
else:
print(f" β Input is NOT a valid COG - will convert")
if input_cog_info.get('errors'):
for error in input_cog_info['errors'][:3]: # Show first 3 errors
print(f" - {error}")
# Step 3: Add metadata and create/optimize COG
print(" Creating COG with metadata and ZSTD compression...")
modified_bytes = add_metadata_and_create_cog(file_bytes, event_metadata, HAZARD, LOCATION)
print(f" Output size: {len(modified_bytes):,} bytes")
size_change = ((len(modified_bytes) - len(file_bytes)) / len(file_bytes)) * 100
print(f" Size change: {size_change:+.1f}%")
# Step 3.5: Validate output COG status
print(" Validating output COG structure...")
output_is_cog, output_cog_info = validate_cog(modified_bytes, f"output_{filename}")
if output_is_cog:
print(f" β Output is valid COG")
print(f" Compression: {output_cog_info.get('compression', 'Unknown')}")
print(f" Tile size: {output_cog_info.get('blocksize', (0, 0))}")
print(f" Overviews: {output_cog_info.get('overviews', 0)} levels")
result['cog_valid'] = True
else:
print(f" β WARNING: Output is NOT a valid COG!")
if output_cog_info.get('errors'):
for error in output_cog_info['errors'][:3]:
print(f" - {error}")
result['cog_valid'] = False
# Step 4: Upload to destination
dest_key = DESTINATION_PREFIX + final_filename
print(f" Uploading to s3://{DESTINATION_BUCKET}/{dest_key}...")
if upload_to_s3(modified_bytes, DESTINATION_BUCKET, dest_key):
print(" β Upload successful")
result['destination_key'] = dest_key
result['success'] = True
result['message'] = 'Processed successfully'
result['input_was_cog'] = input_is_cog
result['output_is_cog'] = output_is_cog
# Step 5: Optionally delete from source
if DELETE_AFTER_MOVE:
print(f" Deleting from source...")
if delete_from_s3(SOURCE_BUCKET, source_key):
print(" β Deleted from source")
result['deleted_from_source'] = True
else:
print(" β Failed to delete from source")
result['deleted_from_source'] = False
else:
result['message'] = 'Upload to destination failed'
except Exception as e:
result['message'] = f'Error: {str(e)}'
print(f" β Error: {e}")
return result
# Process all files
print(f"\n{'='*80}")
print("Starting batch processing")
print(f"{'='*80}")
results = []
for file_key in tif_files:
result = process_file(file_key)
results.append(result)
print(f"\n{'='*80}")
print("Processing Complete")
print(f"{'='*80}")
================================================================================
Starting batch processing
================================================================================
Processing: 202501_Fire_CA_LC09_NBR_182831_041036_2025-01-14_day.tif
Location: CA, Hazard: Fire
Reading from S3...
Size: 158,238,568 bytes
Dimensions: 8554x7185
CRS: EPSG:4326
Validating input COG structure...
β Input is valid COG
Compression: ZSTD
Tile size: [512, 512]
Overviews: 4 levels
Creating COG with metadata and ZSTD compression...
/tmp/ipykernel_205/1171966028.py:178: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
'PROCESSING_DATE': datetime.utcnow().isoformat(),
Reading input: /vsimem/input.tif
Reading original file settings...
Original settings: compress=zstd, predictor=None, level=None
Rebuilding as COG (preserving original compression settings)...
Hazard tag None
Location tag None
Adding overviews...
Updating dataset tags...
Writing output to: /vsimem/output_cog.tif
COG creation complete with embedded metadata
Output size: 150,769,442 bytes
Output size: 150,769,442 bytes
Size change: -4.7%
Validating output COG structure...
β Output is valid COG
Compression: ZSTD
Tile size: [512, 512]
Overviews: 5 levels
Uploading to s3://nasa-disasters-dev/ProgramData/Landsat/burnRatio/LC09_NBR_182831_041036_2025-01-14_day.tif...
β Upload successful
================================================================================
Processing Complete
================================================================================
Summary Report
# Summary statistics
total_files = len(results)
successful = sum(1 for r in results if r['success'])
skipped = sum(1 for r in results if 'skipping' in r['message'])
failed = total_files - successful - skipped
# COG statistics
input_cogs = sum(1 for r in results if r.get('input_was_cog', False))
output_cogs = sum(1 for r in results if r.get('output_is_cog', False))
converted_to_cog = sum(1 for r in results if r.get('success') and not r.get('input_was_cog', False) and r.get('output_is_cog', False))
print("\nProcessing Summary:")
print(f" Total files: {total_files}")
print(f" β Successful: {successful}")
print(f" β Skipped (no pattern): {skipped}")
print(f" β Failed: {failed}")
print("\nCOG Status:")
print(f" Input COGs: {input_cogs}/{total_files}")
print(f" Output COGs: {output_cogs}/{total_files}")
print(f" Converted to COG: {converted_to_cog}")
if DELETE_AFTER_MOVE:
deleted = sum(1 for r in results if r.get('deleted_from_source', False))
print(f"\n π Deleted from source: {deleted}")
print("\nDetailed Results:")
for result in results:
status = 'β' if result['success'] else ('β' if 'skipping' in result['message'] else 'β')
cog_status = ''
if result.get('success'):
if result.get('output_is_cog'):
cog_status = ' [COG β]'
else:
cog_status = ' [COG β]'
print(f" {status} {result['filename']}: {result['message']}{cog_status}")Verify Metadata in Processed Files
Read a processed file and verify the metadata was added correctly.
# Get first successful result
successful_results = [r for r in results if r['success']]
if successful_results:
sample_result = successful_results[0]
dest_key = sample_result['destination_key']
print(f"Verifying metadata in: s3://{DESTINATION_BUCKET}/{dest_key}\n")
# Read processed file
file_bytes, metadata, tags = read_geotiff_from_s3(DESTINATION_BUCKET, dest_key)
print("=" * 60)
print("GeoTIFF Metadata Tags:")
print("=" * 60)
for key, value in tags.items():
if 'ACTIVATION' in key or 'PROCESSING' in key or 'ORIGINAL' in key or 'PROCESSOR' in key or 'COG' in key:
print(f" {key}: {value}")
print("\n" + "=" * 60)
print("COG Validation Results:")
print("=" * 60)
# Validate COG structure
is_cog, cog_info = validate_cog(file_bytes, dest_key.split('/')[-1])
if is_cog:
print("β File is a VALID Cloud-Optimized GeoTIFF (COG)\n")
print("Structure Details:")
print(f" Dimensions: {cog_info.get('width')}x{cog_info.get('height')}")
print(f" Bands: {cog_info.get('bands')}")
print(f" Compression: {cog_info.get('compression')}")
print(f" Tiled: {cog_info.get('tiled')}")
print(f" Block/Tile Size: {cog_info.get('blocksize')}")
print(f" Overview Levels: {cog_info.get('overviews')}")
if cog_info.get('warnings'):
print("\nWarnings:")
for warning in cog_info['warnings']:
print(f" β {warning}")
else:
print("β File is NOT a valid COG\n")
if cog_info.get('errors'):
print("Errors:")
for error in cog_info['errors']:
print(f" β {error}")
if cog_info.get('warnings'):
print("\nWarnings:")
for warning in cog_info['warnings']:
print(f" β {warning}")
print("\n" + "=" * 60)
else:
print("No successfully processed files to verify")