Data Processing

BING provides comprehensive tools for processing ocean color remote sensing data, with special emphasis on NASA’s PACE mission and integration with Argo BGC floats.

PACE Data Processing

Loading PACE OCI Data

from ocpy.pace import io as pace_io
import os

# Load PACE L2 file
pace_file = os.path.join(
    os.getenv('OS_COLOR'),
    'PACE/L2_AOP/PACE_OCI.20240315T183000.L2.OC_AOP.V2.nc'
)

# Load with quality flags
xds, flags = pace_io.load_oci_l2(pace_file)

# Available variables
print(xds.data_vars.keys())
# Rrs, Rrs_unc, chlor_a, poc, ...

Extracting Rrs Spectra

import numpy as np

def extract_pace_rrs(xds, lat, lon, window=3):
    """Extract Rrs spectrum at a location with spatial averaging

    Parameters
    ----------
    xds : xarray.Dataset
        PACE L2 dataset
    lat, lon : float
        Target coordinates
    window : int
        Spatial window size (pixels)

    Returns
    -------
    dict
        Wavelengths, Rrs, uncertainties
    """
    # Find nearest pixel
    idx_lat = np.argmin(np.abs(xds.latitude.values - lat))
    idx_lon = np.argmin(np.abs(xds.longitude.values - lon))

    # Define window
    lat_slice = slice(max(0, idx_lat-window), idx_lat+window+1)
    lon_slice = slice(max(0, idx_lon-window), idx_lon+window+1)

    # Extract and average
    rrs_window = xds.Rrs[lat_slice, lon_slice, :]
    rrs_mean = rrs_window.mean(dim=['latitude', 'longitude'])
    rrs_std = rrs_window.std(dim=['latitude', 'longitude'])

    # Get uncertainties
    unc_window = xds.Rrs_unc[lat_slice, lon_slice, :]
    unc_mean = np.sqrt((unc_window**2).mean(dim=['latitude', 'longitude']))

    return {
        'wavelength': xds.wavelength.values,
        'Rrs': rrs_mean.values,
        'Rrs_unc': unc_mean.values,
        'Rrs_std': rrs_std.values,
        'n_pixels': np.sum(~np.isnan(rrs_window.values))
    }

Quality Control

def quality_control_pace(xds, flags):
    """Apply quality control to PACE data

    Parameters
    ----------
    xds : xarray.Dataset
        PACE L2 dataset
    flags : dict
        Quality flags from pace_io.load_oci_l2

    Returns
    -------
    xarray.Dataset
        Masked dataset
    """
    # Define flag bits to check
    flag_bits = {
        'ATMFAIL': 1,      # Atmospheric correction failure
        'LAND': 2,         # Land
        'CLOUD': 4,        # Cloud
        'GLINT': 8,        # Sun glint
        'HISOLZEN': 256,   # High solar zenith
        'LOWLW': 512,      # Low water-leaving radiance
        'CHLFAIL': 1024    # Chlorophyll algorithm failure
    }

    # Create mask
    mask = np.zeros_like(flags['l2_flags'], dtype=bool)
    for flag_name, bit_val in flag_bits.items():
        mask |= (flags['l2_flags'] & bit_val) > 0

    # Apply mask
    xds_clean = xds.where(~mask)

    return xds_clean

Argo BGC Integration

Matching Satellite and Float Data

import pandas as pd
from datetime import timedelta

def match_argo_pace(argo_profiles, pace_granules,
                    max_time_diff=3, max_distance=25):
    """Match Argo profiles with PACE granules

    Parameters
    ----------
    argo_profiles : pd.DataFrame
        Argo profile locations and times
    pace_granules : list
        PACE granule metadata
    max_time_diff : float
        Maximum time difference in hours
    max_distance : float
        Maximum distance in km

    Returns
    -------
    pd.DataFrame
        Matched profiles with granule information
    """
    matches = []

    for idx, profile in argo_profiles.iterrows():
        for granule in pace_granules:
            # Time check
            time_diff = abs(profile['datetime'] - granule['datetime'])
            if time_diff > timedelta(hours=max_time_diff):
                continue

            # Distance check
            distance = haversine_distance(
                profile['lat'], profile['lon'],
                granule['center_lat'], granule['center_lon']
            )

            if distance <= max_distance:
                matches.append({
                    'profile_id': profile['id'],
                    'granule_id': granule['id'],
                    'time_diff': time_diff.total_seconds() / 3600,
                    'distance': distance,
                    'profile_lat': profile['lat'],
                    'profile_lon': profile['lon']
                })

    return pd.DataFrame(matches)

Processing Argo Profiles

def process_argo_bgc(profile_file):
    """Process Argo BGC profile data

    Parameters
    ----------
    profile_file : str
        Path to Argo profile netCDF file

    Returns
    -------
    dict
        Processed profile data
    """
    import xarray as xr

    # Load profile
    ds = xr.open_dataset(profile_file)

    # Extract BGC variables
    data = {
        'pressure': ds.PRES.values,
        'temperature': ds.TEMP.values,
        'salinity': ds.PSAL.values,
        'chlorophyll': ds.CHLA.values if 'CHLA' in ds else None,
        'bbp': ds.BBP700.values if 'BBP700' in ds else None,
        'oxygen': ds.DOXY.values if 'DOXY' in ds else None
    }

    # Quality control
    for var in ['chlorophyll', 'bbp', 'oxygen']:
        if data[var] is not None:
            qc_var = f'{var}_QC'
            if qc_var in ds:
                # Keep only good data (QC flags 1, 2)
                mask = ds[qc_var].values > 2
                data[var][mask] = np.nan

    # Calculate derived variables
    if data['bbp'] is not None:
        # Estimate POC from bbp
        data['poc'] = 31600 * data['bbp'] - 1.18  # Stramski et al. 2008

    return data

Data Organization

Directory Structure

Recommended directory structure for BING projects:

project/
├── data/
│   ├── pace/
│   │   ├── L2_AOP/         # PACE Level 2 files
│   │   └── matchups/       # Matched PACE-Argo data
│   ├── argo/
│   │   ├── profiles/       # Argo profile files
│   │   └── metadata/       # Profile metadata
│   └── auxiliary/
│       ├── bathymetry/     # Bathymetry data
│       └── climatology/    # Climatological data
├── results/
│   ├── fits/              # Fitting results
│   ├── figures/           # Generated plots
│   └── statistics/        # Statistical analyses
└── notebooks/             # Jupyter notebooks

Data Management

class BingDataManager:
    """Centralized data management for BING workflows"""

    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.pace_dir = os.path.join(base_dir, 'data/pace')
        self.argo_dir = os.path.join(base_dir, 'data/argo')
        self.results_dir = os.path.join(base_dir, 'results')

        # Create directories if needed
        for dir_path in [self.pace_dir, self.argo_dir, self.results_dir]:
            os.makedirs(dir_path, exist_ok=True)

    def get_pace_file(self, date, product='L2_AOP'):
        """Get PACE file path for a given date"""
        pattern = f"PACE_OCI.{date:%Y%m%d}*.{product}.nc"
        files = glob.glob(os.path.join(self.pace_dir, product, pattern))
        return files[0] if files else None

    def save_results(self, results, identifier):
        """Save processing results with metadata"""
        timestamp = datetime.now().isoformat()
        results['metadata'] = {
            'identifier': identifier,
            'timestamp': timestamp,
            'version': bing.__version__
        }

        outfile = os.path.join(
            self.results_dir,
            f"{identifier}_{timestamp[:10]}.npz"
        )
        np.savez(outfile, **results)
        return outfile

Batch Processing

Parallel Processing

from joblib import Parallel, delayed
import multiprocessing

def process_pace_granule(granule_file, config):
    """Process single PACE granule"""
    xds, flags = pace_io.load_oci_l2(granule_file)
    # ... processing logic ...
    return results

# Parallel processing
n_cores = multiprocessing.cpu_count() - 1

results = Parallel(n_jobs=n_cores)(
    delayed(process_pace_granule)(f, config)
    for f in granule_files
)

Pipeline Example

def bing_processing_pipeline(pace_file, argo_file, output_dir):
    """Complete BING processing pipeline

    Parameters
    ----------
    pace_file : str
        PACE L2 file path
    argo_file : str
        Argo profile file path
    output_dir : str
        Output directory
    """
    # Step 1: Load and QC PACE data
    xds, flags = pace_io.load_oci_l2(pace_file)
    xds_clean = quality_control_pace(xds, flags)

    # Step 2: Load Argo profile
    argo_data = process_argo_bgc(argo_file)

    # Step 3: Extract matchup
    rrs_data = extract_pace_rrs(
        xds_clean,
        argo_data['lat'],
        argo_data['lon']
    )

    # Step 4: Fit models
    from bing.parameters import standard
    from bing.models import utils as model_utils
    from bing.fitting import chisq_fit

    params = standard.expb_pow()
    models = model_utils.init(
        params.model_names,
        rrs_data['wavelength']
    )

    fit_result = chisq_fit.fit(
        models,
        rrs_data['wavelength'],
        rrs_data['Rrs'],
        rrs_data['Rrs_unc']
    )

    # Step 5: Save results
    output = {
        'pace_data': rrs_data,
        'argo_data': argo_data,
        'fit_result': fit_result,
        'metadata': {
            'pace_file': pace_file,
            'argo_file': argo_file,
            'processing_date': datetime.now().isoformat()
        }
    }

    outfile = os.path.join(output_dir, 'pipeline_results.npz')
    np.savez(outfile, **output)

    return output

Data Formats

Input Formats

BING supports various input data formats:

  • NetCDF: PACE L2, Argo profiles

  • CSV: Matchup tables, metadata

  • NPZ: Numpy compressed arrays

  • HDF5: Large dataset storage

  • JSON: Configuration and metadata

Output Formats

Standard output formats:

# Fitting results format
fit_results = {
    'wavelength': np.array([...]),      # Wavelengths
    'Rrs': np.array([...]),             # Measured Rrs
    'Rrs_unc': np.array([...]),         # Uncertainties
    'fitted_params': np.array([...]),   # Best-fit parameters
    'chains': np.array([...]),          # MCMC chains
    'statistics': {                     # Fit statistics
        'chi2': float,
        'rmse': float,
        'r2': float
    },
    'metadata': {                       # Processing metadata
        'timestamp': str,
        'version': str,
        'config': dict
    }
}

Best Practices

  1. Quality Control: Always apply quality flags to satellite data

  2. Spatial Averaging: Use spatial windows to reduce noise

  3. Temporal Matching: Consider time differences for matchups

  4. Data Organization: Maintain consistent directory structure

  5. Metadata: Always save processing metadata with results

  6. Version Control: Track data and code versions

  7. Validation: Compare with in-situ measurements when available