Skip to content

Advanced Concurrency and Callbacks

Overview

Grapa provides sophisticated concurrency and callback capabilities that go far beyond traditional async/await patterns. The language was designed from the ground up to be parallel by design with advanced callback systems for complex asynchronous operations.

Thread Safety and Parallel-by-Design Architecture

Universal Thread Safety

Grapa is fully thread safe in all supported environments: - Command line - Safe concurrent operations - Grapa shell - Safe interactive use - Python/GrapaPy - Safe integration

All built-in operations—including map, filter, reduce, $thread, and $net—are safe to use concurrently without special precautions.

Automatic Variable Locking

CRITICAL FEATURE: Every access to a Grapa variable automatically locks/unlocks the underlying object. This was extensively stress-tested with hundreds of threads to ensure: - No crashes under extreme concurrent access - Cross-platform thread safety - Worker object reliability - Predictable behavior under load

Advanced Callback Systems

$thread Callback Architecture

Grapa's $thread system provides sophisticated callback capabilities:

/* Basic thread with callbacks */
myRun = op(input) {
    "myRun:".echo();
    $sys().echo(@$local);
    input.c = input.a + input.b;
    "\n".echo();
    @$local;
};

myDone = op(input, result) {
    "myDone:".echo();
    $sys().echo(@$local);
    "\n".echo();
};

t = $thread();
t.start(myRun, {a:1, b:2}, myDone);

Output:

myRun:{"input":{"a":1,"b":2}}
myDone:{"input":{"a":1,"b":2,"c":3},"result":{"input":{"a":1,"b":2,"c":3}}}

Full Coroutine Support with Advanced Control

Grapa's $thread system is a complete coroutine implementation with advanced control methods:

Core Coroutine Methods

/* Create a coroutine thread */
thread = $thread(op() {
    "Starting coroutine".echo();
    thread.suspend();  /* Pause execution */
    "Resumed from suspend".echo();
    thread.wait();     /* Wait for signal */
    "Woken by signal".echo();
});

/* Control the coroutine */
thread.resume();   /* Resume from suspend */
thread.signal();   /* Wake from wait */

Advanced Synchronization Methods

/* Thread synchronization primitives */
thread = $thread(op() {
    /* Try to acquire lock */
    if (thread.trylock()) {
        "Lock acquired".echo();
        /* Critical section */
        thread.unlock();  /* Release lock */
    } else {
        "Lock busy".echo();
    };

    /* Wait for condition */
    thread.wait();
    "Condition met".echo();
});

/* Control from another thread */
thread.lock();     /* Acquire lock (blocking) */
thread.signal();   /* Wake waiting thread */
thread.unlock();   /* Release lock */

State Inspection Methods

/* Check thread state */
thread = $thread(op() {
    thread.suspend();
    "Suspended".echo();
});

/* State inspection */
if (thread.suspended()) {
    "Thread is suspended".echo();
};

if (thread.waiting()) {
    "Thread is waiting".echo();
};

Real-World Coroutine Usage

The lexer→compiler→executor pipeline demonstrates sophisticated coroutine usage:

/* Lexer thread - pauses when input queue is empty */
lexer_thread = $thread(op() {
    while (true) {
        if (input_queue.empty()) {
            lexer_thread.wait();  /* Wait for input */
        };
        token = process_input();
        compiler_queue.add(token);
        compiler_thread.signal();  /* Wake compiler */
    };
});

/* Compiler thread - pauses when lexer queue is empty */
compiler_thread = $thread(op() {
    while (true) {
        if (lexer_queue.empty()) {
            compiler_thread.wait();  /* Wait for tokens */
        };
        code = compile_tokens();
        executor_queue.add(code);
        executor_thread.signal();  /* Wake executor */
    };
});

/* Executor thread - pauses when compiler queue is empty */
executor_thread = $thread(op() {
    while (true) {
        if (compiler_queue.empty()) {
            executor_thread.wait();  /* Wait for code */
        };
        execute_code();
    };
});

Key Features: - suspend()/resume(): Direct coroutine control - wait()/signal(): Condition variable support - trylock()/lock()/unlock(): Thread synchronization primitives - waiting()/suspended(): State inspection - Queue-based coordination: Built-in support for producer-consumer patterns - Cooperative multitasking: Explicit control flow without complex async patterns

Widget Callback System

The widget system provides rich callback capabilities with object references:

/* Widget with advanced callbacks */
w = $WIDGET("double_window", 0, 0, 340, 260, "test", {color: "BLUE"});
w.show();

/* Post callback handlers */
w.set({
    on_post_start: op(o) {
        o.set({"color":"YELLOW"});
        o.redraw();
    },
    on_post_echo: op(o, data) {
        o.set({"append":data.str(), "key":"end"});
    },
    on_post_prompt: op(o, data) {
        o.set({"append":"\ngrapa> ", "key":"end"});
    },
    on_post_end: op(o, data) {
        o.set({
            "append":"\n" + data.str(),
            "key":"end",
            "color":"white",
            "cursor_state":"show",
            "cursor_color":"black"
        });
        o.redraw();
    }
});

Network Callback System

Network operations support sophisticated callback patterns:

/* Network with callbacks */
processPost = op(in) {
    {processed: in};
};

postHandler = op(in) {
    $local.data = in.split("\r").join("");
    $local.len = data.len() - data.split("\n\n")[0].len() - 2;
    if (len < 0) len = 0;
    $local.body = data.right(len);
    $local.rstr = processPost(body).str();
    "HTTP/1.1 200 OK\r\nContent-Type: text/json\r\nContent-Length: " + rstr.len().str() + "\r\n\r\n" + rstr;
};

postConnectHandler = op(netSession) {
    netSession.data = "";
};

postMessageHandler = op(netSession, message, hasmore) {
    netSession.data += message;
    if (hasmore == 0) {
        netSession.send(postHandler(netSession.data));
        netSession.data = "";
    };
};

n = $net();
n.onlisten(':12345', postMessageHandler, postConnectHandler);

Structured Concurrency with Functional Methods

Parallel Processing by Default

Grapa's functional methods provide structured concurrency that's superior to traditional async/await:

/* Parallel by default - creates one thread per item */
small_data = [1, 2, 3, 4, 5];
squares = small_data.map(op(x) { x * x; });

/* For large datasets, limit threads to avoid resource exhaustion */
large_data = (1000000).range(0,1);
squares = large_data.map(op(x) { x * x; }, 8);  /* Limit to 8 threads */

Advanced Functional Operations

/* Parallel map with additional data */
numbers = [1, 2, 3, 4, 5];
result = numbers.map(op(x, multiplier) { x * multiplier; }, 10);
/* Result: [10, 20, 30, 40, 50] */

/* Parallel filter with threshold */
filtered = numbers.filter(op(x, threshold) { x > threshold; }, 3);
/* Result: [4, 5] */

/* Parallel reduce with thread count */
sum = large_data.reduce(op(acc, x) { acc + x; }, 0, 4);

Method Chaining with Parallel Processing

/* Complex parallel processing pipeline */
result = (1000000).range(0,1)
    .filter(op(x) { x % 2 == 0; }, null, 8)      /* Parallel filter */
    .map(op(x) { x * x; }, null, 8)              /* Parallel map */
    .reduce(op(acc, x) { acc + x; }, 0, 4);      /* Parallel reduce */

Execution Tree Metaprogramming

Human-Readable Execution Trees

Grapa's execution trees are human-readable and manipulable, providing advanced metaprogramming capabilities:

/* Compile script to execution tree */
script = "x = 5; y = x * 2; y.echo();";
tree = $sys().compile(script);

/* Execution tree is human-readable */
tree.echo();
/* Output shows the structured execution tree */

Runtime Tree Manipulation

/* Create and manipulate execution trees */
custom_tree = op(x, y) { x + y; };
tree_str = custom_tree.str();  /* Human-readable tree representation */

/* Execute tree with parameters */
result = custom_tree(5, 3);  /* Result: 8 */

Advanced Concurrency Patterns

Worker Thread Coordination with Functional Methods

Grapa's .map() and .filter() methods are not just data processing tools - they're sophisticated concurrency primitives that can coordinate multiple worker threads. This enables structured concurrency patterns where all threads must complete before proceeding.

Thread Synchronization Barrier

/* Spawn multiple worker threads that all must complete */
workers = [1, 2, 3, 4, 5].map(op(worker_id) {
    /* Each worker does independent work */
    ("Worker " + worker_id.str() + " starting").echo();
    sleep(worker_id);  /* Simulate work */
    ("Worker " + worker_id.str() + " completed").echo();
    worker_id * 100;  /* Return result */
});
/* All workers complete before proceeding */
("All workers finished").echo();
/* Result: [100, 200, 300, 400, 500] */

Parallel Task Execution

/* Execute multiple independent tasks in parallel */
tasks = [
    op() { "Task A: Database query".echo(); sleep(2); "A done".echo(); },
    op() { "Task B: API call".echo(); sleep(1); "B done".echo(); },
    op() { "Task C: File processing".echo(); sleep(3); "C done".echo(); }
];

results = tasks.map(op(task) { task(); });
/* All tasks complete before proceeding */
("All tasks completed").echo();

Resource Pool Coordination

/* Coordinate multiple resource operations */
resources = ["db1", "db2", "db3", "cache1", "cache2"].map(op(resource) {
    ("Connecting to " + resource).echo();
    /* Simulate connection */
    sleep(1);
    ("Connected to " + resource).echo();
    resource + "_connection";
});
/* All resources connected before proceeding */
("All resources ready").echo();
/* Result: ["db1_connection", "db2_connection", "db3_connection", "cache1_connection", "cache2_connection"] */

System Initialization Pattern

/* Initialize multiple system components in parallel */
components = [
    op() { initialize_database(); },
    op() { initialize_cache(); },
    op() { initialize_api_server(); },
    op() { initialize_file_system(); }
];

initialized = components.map(op(init) { init(); });
/* All components initialized before proceeding */
("System ready").echo();

Microservices Coordination

/* Coordinate multiple microservice calls */
services = [
    op() { call_user_service(); },
    op() { call_payment_service(); },
    op() { call_inventory_service(); }
];
results = services.map(op(service) { service(); });
/* All services respond before proceeding */

Worker Pattern Integration

Grapa's worker pattern is deeply integrated throughout the system:

/* Worker pattern for complex operations */
worker_op = op(input) {
    /* Complex processing */
    result = input.map(op(x) { x * x; });
    result;
};

/* Execute with worker thread */
t = $thread();
t.start(worker_op, large_dataset, op(result) { result.echo(); });

Network-Based Distributed Processing

/* Distributed processing across network */
distributed_op = op(data) {
    /* Process data across multiple nodes */
    processed = data.map(op(item) { process_item(item); });
    processed;
};

/* Network callback for distributed results */
network_handler = op(message, hasmore) {
    if (hasmore == 0) {
        /* Process complete message */
        result = distributed_op(message);
        send_result(result);
    };
};

Comparison with Traditional Async/Await

Grapa's Superior Approach

Feature Traditional Async/Await Grapa's Approach
Parallel Processing Manual Promise.all() Automatic with .map()/.filter()
Thread Safety Manual synchronization Automatic variable locking
Callback Context Limited scope Rich object references
Execution Trees Not accessible Human-readable and manipulable
Structured Concurrency Complex patterns Built-in with functional methods
Network Integration Separate libraries Native $net() with callbacks

Why Grapa's Approach is Superior

  1. Automatic Parallelism: .map() and .filter() are parallel by default
  2. Rich Callbacks: Object references provide full context access
  3. Execution Trees: Human-readable, manipulable execution structures
  4. Built-in Thread Safety: No manual synchronization required
  5. Network Integration: Native distributed processing capabilities

Best Practices

Thread Count Management

/* For small datasets - let Grapa handle parallelism */
small_data = [1, 2, 3, 4, 5];
result = small_data.map(op(x) { x * x; });

/* For large datasets - specify thread count */
large_data = (1000000).range(0,1);
result = large_data.map(op(x) { x * x; }, 8);  /* Limit to 8 threads */

Performance Considerations: Parallel vs Sequential

Important: .map() and .filter() must copy results from worker threads, which can be expensive for large datasets. Consider the trade-offs:

/* For large datasets with simple operations - use sequential */
big_data = (1000000).range();
result = [];
i = 0;
while (i < big_data.len()) {
    result += big_data[i] * 2;
    i += 1;
}

/* For smaller datasets with complex work - use parallel */
small_data = (1000).range();
result = small_data.map(op(x) { 
    /* Complex calculation that benefits from parallelization */
    complex_calculation = x * x + x.sqrt() + x.sin();
    complex_calculation;
}, 4);

/* .reduce() is more efficient - can use PTR references */
sum = big_data.reduce(op(acc, x) { acc + x; }, 0, 4);

When to use each approach: - Use sequential (for/while) for large datasets with simple operations - Use parallel (.map/.filter) for smaller datasets with complex operations - Use .reduce() for large datasets when possible (more efficient)

Callback Design Patterns

/* Rich callback with object reference */
widget_callback = op(o, data) {
    /* o = widget object reference */
    o.set({"text": data.str()});
    o.redraw();
};

/* Network callback with session management */
network_callback = op(session, message, hasmore) {
    session.data += message;
    if (hasmore == 0) {
        process_complete_message(session.data);
        session.data = "";
    };
};