Data Processing
BING provides comprehensive tools for processing ocean color remote sensing data, with special emphasis on NASA’s PACE mission and integration with Argo BGC floats.
PACE Data Processing
Loading PACE OCI Data
from ocpy.pace import io as pace_io
import os
# Load PACE L2 file
pace_file = os.path.join(
os.getenv('OS_COLOR'),
'PACE/L2_AOP/PACE_OCI.20240315T183000.L2.OC_AOP.V2.nc'
)
# Load with quality flags
xds, flags = pace_io.load_oci_l2(pace_file)
# Available variables
print(xds.data_vars.keys())
# Rrs, Rrs_unc, chlor_a, poc, ...
Extracting Rrs Spectra
import numpy as np
def extract_pace_rrs(xds, lat, lon, window=3):
"""Extract Rrs spectrum at a location with spatial averaging
Parameters
----------
xds : xarray.Dataset
PACE L2 dataset
lat, lon : float
Target coordinates
window : int
Spatial window size (pixels)
Returns
-------
dict
Wavelengths, Rrs, uncertainties
"""
# Find nearest pixel
idx_lat = np.argmin(np.abs(xds.latitude.values - lat))
idx_lon = np.argmin(np.abs(xds.longitude.values - lon))
# Define window
lat_slice = slice(max(0, idx_lat-window), idx_lat+window+1)
lon_slice = slice(max(0, idx_lon-window), idx_lon+window+1)
# Extract and average
rrs_window = xds.Rrs[lat_slice, lon_slice, :]
rrs_mean = rrs_window.mean(dim=['latitude', 'longitude'])
rrs_std = rrs_window.std(dim=['latitude', 'longitude'])
# Get uncertainties
unc_window = xds.Rrs_unc[lat_slice, lon_slice, :]
unc_mean = np.sqrt((unc_window**2).mean(dim=['latitude', 'longitude']))
return {
'wavelength': xds.wavelength.values,
'Rrs': rrs_mean.values,
'Rrs_unc': unc_mean.values,
'Rrs_std': rrs_std.values,
'n_pixels': np.sum(~np.isnan(rrs_window.values))
}
Quality Control
def quality_control_pace(xds, flags):
"""Apply quality control to PACE data
Parameters
----------
xds : xarray.Dataset
PACE L2 dataset
flags : dict
Quality flags from pace_io.load_oci_l2
Returns
-------
xarray.Dataset
Masked dataset
"""
# Define flag bits to check
flag_bits = {
'ATMFAIL': 1, # Atmospheric correction failure
'LAND': 2, # Land
'CLOUD': 4, # Cloud
'GLINT': 8, # Sun glint
'HISOLZEN': 256, # High solar zenith
'LOWLW': 512, # Low water-leaving radiance
'CHLFAIL': 1024 # Chlorophyll algorithm failure
}
# Create mask
mask = np.zeros_like(flags['l2_flags'], dtype=bool)
for flag_name, bit_val in flag_bits.items():
mask |= (flags['l2_flags'] & bit_val) > 0
# Apply mask
xds_clean = xds.where(~mask)
return xds_clean
Argo BGC Integration
Matching Satellite and Float Data
import pandas as pd
from datetime import timedelta
def match_argo_pace(argo_profiles, pace_granules,
max_time_diff=3, max_distance=25):
"""Match Argo profiles with PACE granules
Parameters
----------
argo_profiles : pd.DataFrame
Argo profile locations and times
pace_granules : list
PACE granule metadata
max_time_diff : float
Maximum time difference in hours
max_distance : float
Maximum distance in km
Returns
-------
pd.DataFrame
Matched profiles with granule information
"""
matches = []
for idx, profile in argo_profiles.iterrows():
for granule in pace_granules:
# Time check
time_diff = abs(profile['datetime'] - granule['datetime'])
if time_diff > timedelta(hours=max_time_diff):
continue
# Distance check
distance = haversine_distance(
profile['lat'], profile['lon'],
granule['center_lat'], granule['center_lon']
)
if distance <= max_distance:
matches.append({
'profile_id': profile['id'],
'granule_id': granule['id'],
'time_diff': time_diff.total_seconds() / 3600,
'distance': distance,
'profile_lat': profile['lat'],
'profile_lon': profile['lon']
})
return pd.DataFrame(matches)
Processing Argo Profiles
def process_argo_bgc(profile_file):
"""Process Argo BGC profile data
Parameters
----------
profile_file : str
Path to Argo profile netCDF file
Returns
-------
dict
Processed profile data
"""
import xarray as xr
# Load profile
ds = xr.open_dataset(profile_file)
# Extract BGC variables
data = {
'pressure': ds.PRES.values,
'temperature': ds.TEMP.values,
'salinity': ds.PSAL.values,
'chlorophyll': ds.CHLA.values if 'CHLA' in ds else None,
'bbp': ds.BBP700.values if 'BBP700' in ds else None,
'oxygen': ds.DOXY.values if 'DOXY' in ds else None
}
# Quality control
for var in ['chlorophyll', 'bbp', 'oxygen']:
if data[var] is not None:
qc_var = f'{var}_QC'
if qc_var in ds:
# Keep only good data (QC flags 1, 2)
mask = ds[qc_var].values > 2
data[var][mask] = np.nan
# Calculate derived variables
if data['bbp'] is not None:
# Estimate POC from bbp
data['poc'] = 31600 * data['bbp'] - 1.18 # Stramski et al. 2008
return data
Data Organization
Directory Structure
Recommended directory structure for BING projects:
project/
├── data/
│ ├── pace/
│ │ ├── L2_AOP/ # PACE Level 2 files
│ │ └── matchups/ # Matched PACE-Argo data
│ ├── argo/
│ │ ├── profiles/ # Argo profile files
│ │ └── metadata/ # Profile metadata
│ └── auxiliary/
│ ├── bathymetry/ # Bathymetry data
│ └── climatology/ # Climatological data
├── results/
│ ├── fits/ # Fitting results
│ ├── figures/ # Generated plots
│ └── statistics/ # Statistical analyses
└── notebooks/ # Jupyter notebooks
Data Management
class BingDataManager:
"""Centralized data management for BING workflows"""
def __init__(self, base_dir):
self.base_dir = base_dir
self.pace_dir = os.path.join(base_dir, 'data/pace')
self.argo_dir = os.path.join(base_dir, 'data/argo')
self.results_dir = os.path.join(base_dir, 'results')
# Create directories if needed
for dir_path in [self.pace_dir, self.argo_dir, self.results_dir]:
os.makedirs(dir_path, exist_ok=True)
def get_pace_file(self, date, product='L2_AOP'):
"""Get PACE file path for a given date"""
pattern = f"PACE_OCI.{date:%Y%m%d}*.{product}.nc"
files = glob.glob(os.path.join(self.pace_dir, product, pattern))
return files[0] if files else None
def save_results(self, results, identifier):
"""Save processing results with metadata"""
timestamp = datetime.now().isoformat()
results['metadata'] = {
'identifier': identifier,
'timestamp': timestamp,
'version': bing.__version__
}
outfile = os.path.join(
self.results_dir,
f"{identifier}_{timestamp[:10]}.npz"
)
np.savez(outfile, **results)
return outfile
Batch Processing
Parallel Processing
from joblib import Parallel, delayed
import multiprocessing
def process_pace_granule(granule_file, config):
"""Process single PACE granule"""
xds, flags = pace_io.load_oci_l2(granule_file)
# ... processing logic ...
return results
# Parallel processing
n_cores = multiprocessing.cpu_count() - 1
results = Parallel(n_jobs=n_cores)(
delayed(process_pace_granule)(f, config)
for f in granule_files
)
Pipeline Example
def bing_processing_pipeline(pace_file, argo_file, output_dir):
"""Complete BING processing pipeline
Parameters
----------
pace_file : str
PACE L2 file path
argo_file : str
Argo profile file path
output_dir : str
Output directory
"""
# Step 1: Load and QC PACE data
xds, flags = pace_io.load_oci_l2(pace_file)
xds_clean = quality_control_pace(xds, flags)
# Step 2: Load Argo profile
argo_data = process_argo_bgc(argo_file)
# Step 3: Extract matchup
rrs_data = extract_pace_rrs(
xds_clean,
argo_data['lat'],
argo_data['lon']
)
# Step 4: Fit models
from bing.parameters import standard
from bing.models import utils as model_utils
from bing.fitting import chisq_fit
params = standard.expb_pow()
models = model_utils.init(
params.model_names,
rrs_data['wavelength']
)
fit_result = chisq_fit.fit(
models,
rrs_data['wavelength'],
rrs_data['Rrs'],
rrs_data['Rrs_unc']
)
# Step 5: Save results
output = {
'pace_data': rrs_data,
'argo_data': argo_data,
'fit_result': fit_result,
'metadata': {
'pace_file': pace_file,
'argo_file': argo_file,
'processing_date': datetime.now().isoformat()
}
}
outfile = os.path.join(output_dir, 'pipeline_results.npz')
np.savez(outfile, **output)
return output
Data Formats
Input Formats
BING supports various input data formats:
NetCDF: PACE L2, Argo profiles
CSV: Matchup tables, metadata
NPZ: Numpy compressed arrays
HDF5: Large dataset storage
JSON: Configuration and metadata
Output Formats
Standard output formats:
# Fitting results format
fit_results = {
'wavelength': np.array([...]), # Wavelengths
'Rrs': np.array([...]), # Measured Rrs
'Rrs_unc': np.array([...]), # Uncertainties
'fitted_params': np.array([...]), # Best-fit parameters
'chains': np.array([...]), # MCMC chains
'statistics': { # Fit statistics
'chi2': float,
'rmse': float,
'r2': float
},
'metadata': { # Processing metadata
'timestamp': str,
'version': str,
'config': dict
}
}
Best Practices
Quality Control: Always apply quality flags to satellite data
Spatial Averaging: Use spatial windows to reduce noise
Temporal Matching: Consider time differences for matchups
Data Organization: Maintain consistent directory structure
Metadata: Always save processing metadata with results
Version Control: Track data and code versions
Validation: Compare with in-situ measurements when available