Skip to content

Multi-Syntax Data Processing

Overview

Grapa's multi-syntax capabilities enable powerful data processing workflows that combine SQL, JSON, XML, and custom syntaxes seamlessly. This use case demonstrates how to build sophisticated data pipelines using multiple native syntaxes.

Scenario: ETL Pipeline with Multi-Syntax Support

Problem Statement

You need to process data from multiple sources: - CSV files with user data - JSON APIs with product information
- XML feeds with inventory data - Database tables with transaction history

The goal is to create a unified data processing pipeline that can handle all these formats natively.

Solution: Multi-Syntax ETL Pipeline

/* Multi-Syntax ETL Pipeline Example */

/* Step 1: Define SQL syntax for database operations using custom_command and custom_function as variables */
custom_command = rule select $STR from $STR {op(fields:$2,table_name:$4){
    ("SQL: SELECT " + fields + " FROM " + table_name).echo();
    /* Database query implementation */
}};

custom_function = rule count '(' $STR ')' from $STR {op(field:$3,table_name:$6){
    records = get_table(table_name).ls();
    return records.len();
}};

/* Step 2: Define JSON syntax for API data */
custom_function = rule $STR '->' $STR {op(json:$1,path:$3){
    /* JSON path extraction */
    return json.json().getfield(path);
}};

custom_function = rule json_extract '(' $STR ',' $STR ')' {op(json:$3,path:$5){
    /* JSON extraction with error handling */
    try {
        return json.json().getfield(path);
    } catch (error) {
        return $err("JSON extraction failed");
    };
}};

/* Step 3: Define XML syntax for feed processing */
custom_function = rule xpath '(' $STR ',' $STR ')' {op(xml:$3,xpath_expr:$5){
    /* XPath evaluation */
    return xml.xml().xpath(xpath_expr);
}};

custom_function = rule xml_extract '(' $STR ',' $STR ')' {op(xml:$3,path:$5){
    /* XML extraction with validation */
    if (xml.xml().valid()) {
        return xml.xml().extract(path);
    } else {
        return $err("Invalid XML");
    };
}};

/* Step 4: Define custom pipeline syntax */
custom_command = rule pipeline '{' <$pipeline_steps> '}' {op(steps:$3){
    ("Pipeline execution started").echo();
    result = null;
    i = 0;
    while (i < steps.len()) {
        step = steps[i];
        ("Executing step: " + step).echo();
        result = execute_pipeline_step(step, result);
        i += 1;
    };
    ("Pipeline completed").echo();
    return result;
}};

/* Step 5: Execute multi-syntax ETL pipeline */
("=== Multi-Syntax ETL Pipeline ===").echo();

/* Load and process CSV data */
("Loading CSV data...").echo();
csv_data = $file().read("users.csv");
users = parse_csv(csv_data);

/* Query database for transactions */
("Querying database...").echo();
op(parse)("select user_id, amount from transactions")();
transaction_count = op(parse)("count(*) from transactions")();

/* Extract data from JSON API */
("Processing JSON API data...").echo();
api_response = fetch_api("https://api.example.com/products");
product_names = op(parse)('api_response->"products"->"name"')();
product_count = op(parse)('json_extract(api_response, "$.products.length")')();

/* Parse XML inventory feed */
("Processing XML inventory...").echo();
xml_feed = fetch_xml("https://feed.example.com/inventory");
inventory_items = op(parse)('xpath(xml_feed, "//item/name")')();
total_items = op(parse)('xml_extract(xml_feed, "//inventory/@total")')();

/* Execute unified pipeline */
("Executing unified pipeline...").echo();
result = op(parse)('pipeline { 
    load "users.csv" | 
    join "transactions" | 
    enrich "products.json" | 
    validate "inventory.xml" | 
    output "final_report.json" 
}')();

("=== Pipeline Complete ===").echo();

Advanced Multi-Syntax Patterns

1. SQL with JSON Fields

/* Query database with JSON field filtering */
custom_command = rule select $STR from $STR where $STR '->' $STR '=' $STR {
    op(fields:$2,table:$4,json_field:$6,path:$8,value:$10){
        ("SQL with JSON: " + fields + " FROM " + table + " WHERE " + json_field + "->" + path + " = " + value).echo();

        records = get_table(table).ls();
        filtered = records.filter(op(record) {
            json_data = record.getfield(json_field);
            return json_data.json().getfield(path) == value;
        });

        display_results(fields, filtered);
    }
};

/* Execute SQL with JSON filtering */
op(parse)('select * from users where metadata->"city" = "New York"')();
op(parse)('select name, age from users where preferences->"theme" = "dark"')();

2. XML with SQL Integration

/* Process XML and store in database */
custom_command = rule process_xml $STR into $STR {op(xml_file:$2,table_name:$4){
    ("Processing XML: " + xml_file + " into " + table_name).echo();

    xml_data = $file().read(xml_file);
    items = op(parse)('xpath(xml_data, "//item")')();

    i = 0;
    while (i < items.len()) {
        item = items[i];
        name = op(parse)('xml_extract(item, "name")')();
        price = op(parse)('xml_extract(item, "price")')();

        /* Insert into database using SQL */
        op(parse)('insert into ' + table_name + ' values ' + name + ',' + price)();
        i += 1;
    };
}};

/* Execute XML to SQL pipeline */
op(parse)('process_xml inventory.xml into products')();

3. Custom DSL for Data Validation

/* Define validation DSL */
custom_command = rule validate $STR '{' <$validation_rules> '}' {op(data:$2,rules:$4){
    ("Validating data: " + data).echo();

    validation_result = true;
    i = 0;
    while (i < rules.len()) {
        rule = rules[i];
        if (!apply_validation_rule(data, rule)) {
            validation_result = false;
        };
        i += 1;
    };

    if (validation_result) {
        ("Validation passed").echo();
    } else {
        ("Validation failed").echo();
    };

    return validation_result;
}};

custom_command = rule field $STR required {op(field_name:$2){
    return {"type": "required", "field": field_name};
}};

custom_command = rule field $STR type $STR {op(field_name:$2,type_name:$4){
    return {"type": "type_check", "field": field_name, "expected_type": type_name};
}};

/* Execute validation DSL */
is_valid = op(parse)('validate user_data { 
    field "name" required | 
    field "age" type "integer" | 
    field "email" type "email" 
}')();

Performance Optimization

1. Syntax Compilation Caching

/* Cache compiled syntax for performance */
syntax_cache = {};

compile_syntax = op(syntax_name, syntax_def) {
    if (syntax_cache[syntax_name]) {
        return syntax_cache[syntax_name];
    };

    compiled = op(parse)(syntax_def);
    syntax_cache[syntax_name] = compiled;
    return compiled;
};

/* Use cached syntax */
sql_syntax = compile_syntax("sql", "custom_command = rule select...");
json_syntax = compile_syntax("json", "custom_function = rule $STR->$STR...");
xml_syntax = compile_syntax("xml", "custom_function = rule xpath...");

2. Parallel Multi-Syntax Processing

/* Process multiple data sources in parallel */
process_sources = op(sources) {
    results = sources.map(op(source) {
        if (source.type == "csv") {
            return process_csv(source.file);
        } else if (source.type == "json") {
            return op(parse)('json_extract(' + source.data + ', "$")')();
        } else if (source.type == "xml") {
            return op(parse)('xpath(' + source.data + ', "//root")')();
        } else if (source.type == "sql") {
            return op(parse)('select * from ' + source.table)();
        };
    }, 4);  /* 4 threads */

    return results;
};

/* Execute parallel processing */
sources = [
    {"type": "csv", "file": "users.csv"},
    {"type": "json", "data": api_response},
    {"type": "xml", "data": xml_feed},
    {"type": "sql", "table": "transactions"}
];

results = process_sources(sources);

Real-World Applications

1. E-commerce Data Integration

/* E-commerce data pipeline */
custom_command = rule ecommerce_pipeline '{' <$pipeline_steps> '}' {op(steps:$3){
    ("E-commerce pipeline started").echo();

    /* Process customer data from CSV */
    customers = process_csv("customers.csv");

    /* Query order database */
    orders = op(parse)("select customer_id, total from orders")();

    /* Extract product data from JSON API */
    products = op(parse)('api_response->"products"')();

    /* Parse inventory from XML feed */
    inventory = op(parse)('xpath(inventory_feed, "//item")')();

    /* Generate unified report */
    report = generate_unified_report(customers, orders, products, inventory);

    ("E-commerce pipeline completed").echo();
    return report;
}};

/* Execute e-commerce pipeline */
report = op(parse)('ecommerce_pipeline { 
    load_customers | 
    query_orders | 
    fetch_products | 
    sync_inventory | 
    generate_report 
}')();

2. Financial Data Processing

/* Financial data processing with validation */
custom_command = rule financial_pipeline '{' <$pipeline_steps> '}' {op(steps:$3){
    ("Financial pipeline started").echo();

    /* Load transaction data */
    transactions = op(parse)("select * from transactions")();

    /* Validate transaction data */
    validation_result = op(parse)('validate transactions { 
        field "amount" required | 
        field "amount" type "decimal" | 
        field "date" type "date" 
    }')();

    if (!validation_result) {
        ("Validation failed - stopping pipeline").echo();
        return $err("Validation failed");
    };

    /* Process with JSON enrichment */
    enriched = transactions.map(op(tx) {
        customer_data = op(parse)('json_extract(customers, "$.customers[?(@.id==' + tx.customer_id + ')]")')();
        return merge_transaction_customer(tx, customer_data);
    });

    /* Generate financial report */
    report = generate_financial_report(enriched);

    ("Financial pipeline completed").echo();
    return report;
}};

Benefits of Multi-Syntax Approach

1. Unified Processing

  • Single Language: All data processing in one language
  • Consistent Syntax: Familiar patterns across different data types
  • Integrated Workflows: Seamless combination of different syntaxes

2. Performance

  • Native Implementation: Leverages existing C++ libraries
  • Compilation Caching: Syntax compiled once and reused
  • Parallel Processing: Multi-threaded execution across syntaxes

3. Maintainability

  • Clear Separation: Each syntax handles its domain
  • Modular Design: Easy to add new syntaxes
  • Error Handling: Consistent error patterns across syntaxes

4. Extensibility

  • Custom DSLs: Create domain-specific languages
  • Plugin Architecture: Add new syntaxes without core changes
  • Future-Proof: Easy to adapt to new data formats

Conclusion

Multi-syntax data processing in Grapa provides a powerful approach to handling complex data integration scenarios. By combining SQL, JSON, XML, and custom syntaxes, you can build sophisticated ETL pipelines that are:

  • Unified: All processing in one language
  • Performant: Leverages native implementations
  • Maintainable: Clear separation of concerns
  • Extensible: Easy to add new capabilities

This approach makes Grapa uniquely suited for modern data processing challenges where multiple data formats and sources need to be integrated seamlessly.

See Also