Advanced Concurrency and Callbacks
Overview
Grapa provides sophisticated concurrency and callback capabilities that go far beyond traditional async/await patterns. The language was designed from the ground up to be parallel by design with advanced callback systems for complex asynchronous operations.
Thread Safety and Parallel-by-Design Architecture
Universal Thread Safety
Grapa is fully thread safe in all supported environments: - Command line - Safe concurrent operations - Grapa shell - Safe interactive use - Python/GrapaPy - Safe integration
All built-in operations—including map
, filter
, reduce
, $thread
, and $net
—are safe to use concurrently without special precautions.
Automatic Variable Locking
CRITICAL FEATURE: Every access to a Grapa variable automatically locks/unlocks the underlying object. This was extensively stress-tested with hundreds of threads to ensure: - No crashes under extreme concurrent access - Cross-platform thread safety - Worker object reliability - Predictable behavior under load
Advanced Callback Systems
$thread Callback Architecture
Grapa's $thread
system provides sophisticated callback capabilities:
/* Basic thread with callbacks */
myRun = op(input) {
"myRun:".echo();
$sys().echo(@$local);
input.c = input.a + input.b;
"\n".echo();
@$local;
};
myDone = op(input, result) {
"myDone:".echo();
$sys().echo(@$local);
"\n".echo();
};
t = $thread();
t.start(myRun, {a:1, b:2}, myDone);
Output:
myRun:{"input":{"a":1,"b":2}}
myDone:{"input":{"a":1,"b":2,"c":3},"result":{"input":{"a":1,"b":2,"c":3}}}
Full Coroutine Support with Advanced Control
Grapa's $thread
system is a complete coroutine implementation with advanced control methods:
Core Coroutine Methods
/* Create a coroutine thread */
thread = $thread(op() {
"Starting coroutine".echo();
thread.suspend(); /* Pause execution */
"Resumed from suspend".echo();
thread.wait(); /* Wait for signal */
"Woken by signal".echo();
});
/* Control the coroutine */
thread.resume(); /* Resume from suspend */
thread.signal(); /* Wake from wait */
Advanced Synchronization Methods
/* Thread synchronization primitives */
thread = $thread(op() {
/* Try to acquire lock */
if (thread.trylock()) {
"Lock acquired".echo();
/* Critical section */
thread.unlock(); /* Release lock */
} else {
"Lock busy".echo();
};
/* Wait for condition */
thread.wait();
"Condition met".echo();
});
/* Control from another thread */
thread.lock(); /* Acquire lock (blocking) */
thread.signal(); /* Wake waiting thread */
thread.unlock(); /* Release lock */
State Inspection Methods
/* Check thread state */
thread = $thread(op() {
thread.suspend();
"Suspended".echo();
});
/* State inspection */
if (thread.suspended()) {
"Thread is suspended".echo();
};
if (thread.waiting()) {
"Thread is waiting".echo();
};
Real-World Coroutine Usage
The lexer→compiler→executor pipeline demonstrates sophisticated coroutine usage:
/* Lexer thread - pauses when input queue is empty */
lexer_thread = $thread(op() {
while (true) {
if (input_queue.empty()) {
lexer_thread.wait(); /* Wait for input */
};
token = process_input();
compiler_queue.add(token);
compiler_thread.signal(); /* Wake compiler */
};
});
/* Compiler thread - pauses when lexer queue is empty */
compiler_thread = $thread(op() {
while (true) {
if (lexer_queue.empty()) {
compiler_thread.wait(); /* Wait for tokens */
};
code = compile_tokens();
executor_queue.add(code);
executor_thread.signal(); /* Wake executor */
};
});
/* Executor thread - pauses when compiler queue is empty */
executor_thread = $thread(op() {
while (true) {
if (compiler_queue.empty()) {
executor_thread.wait(); /* Wait for code */
};
execute_code();
};
});
Key Features:
- suspend()
/resume()
: Direct coroutine control
- wait()
/signal()
: Condition variable support
- trylock()
/lock()
/unlock()
: Thread synchronization primitives
- waiting()
/suspended()
: State inspection
- Queue-based coordination: Built-in support for producer-consumer patterns
- Cooperative multitasking: Explicit control flow without complex async patterns
Widget Callback System
The widget system provides rich callback capabilities with object references:
/* Widget with advanced callbacks */
w = $WIDGET("double_window", 0, 0, 340, 260, "test", {color: "BLUE"});
w.show();
/* Post callback handlers */
w.set({
on_post_start: op(o) {
o.set({"color":"YELLOW"});
o.redraw();
},
on_post_echo: op(o, data) {
o.set({"append":data.str(), "key":"end"});
},
on_post_prompt: op(o, data) {
o.set({"append":"\ngrapa> ", "key":"end"});
},
on_post_end: op(o, data) {
o.set({
"append":"\n" + data.str(),
"key":"end",
"color":"white",
"cursor_state":"show",
"cursor_color":"black"
});
o.redraw();
}
});
Network Callback System
Network operations support sophisticated callback patterns:
/* Network with callbacks */
processPost = op(in) {
{processed: in};
};
postHandler = op(in) {
$local.data = in.split("\r").join("");
$local.len = data.len() - data.split("\n\n")[0].len() - 2;
if (len < 0) len = 0;
$local.body = data.right(len);
$local.rstr = processPost(body).str();
"HTTP/1.1 200 OK\r\nContent-Type: text/json\r\nContent-Length: " + rstr.len().str() + "\r\n\r\n" + rstr;
};
postConnectHandler = op(netSession) {
netSession.data = "";
};
postMessageHandler = op(netSession, message, hasmore) {
netSession.data += message;
if (hasmore == 0) {
netSession.send(postHandler(netSession.data));
netSession.data = "";
};
};
n = $net();
n.onlisten(':12345', postMessageHandler, postConnectHandler);
Structured Concurrency with Functional Methods
Parallel Processing by Default
Grapa's functional methods provide structured concurrency that's superior to traditional async/await:
/* Parallel by default - creates one thread per item */
small_data = [1, 2, 3, 4, 5];
squares = small_data.map(op(x) { x * x; });
/* For large datasets, limit threads to avoid resource exhaustion */
large_data = (1000000).range(0,1);
squares = large_data.map(op(x) { x * x; }, 8); /* Limit to 8 threads */
Advanced Functional Operations
/* Parallel map with additional data */
numbers = [1, 2, 3, 4, 5];
result = numbers.map(op(x, multiplier) { x * multiplier; }, 10);
/* Result: [10, 20, 30, 40, 50] */
/* Parallel filter with threshold */
filtered = numbers.filter(op(x, threshold) { x > threshold; }, 3);
/* Result: [4, 5] */
/* Parallel reduce with thread count */
sum = large_data.reduce(op(acc, x) { acc + x; }, 0, 4);
Method Chaining with Parallel Processing
/* Complex parallel processing pipeline */
result = (1000000).range(0,1)
.filter(op(x) { x % 2 == 0; }, null, 8) /* Parallel filter */
.map(op(x) { x * x; }, null, 8) /* Parallel map */
.reduce(op(acc, x) { acc + x; }, 0, 4); /* Parallel reduce */
Execution Tree Metaprogramming
Human-Readable Execution Trees
Grapa's execution trees are human-readable and manipulable, providing advanced metaprogramming capabilities:
/* Compile script to execution tree */
script = "x = 5; y = x * 2; y.echo();";
tree = $sys().compile(script);
/* Execution tree is human-readable */
tree.echo();
/* Output shows the structured execution tree */
Runtime Tree Manipulation
/* Create and manipulate execution trees */
custom_tree = op(x, y) { x + y; };
tree_str = custom_tree.str(); /* Human-readable tree representation */
/* Execute tree with parameters */
result = custom_tree(5, 3); /* Result: 8 */
Advanced Concurrency Patterns
Worker Thread Coordination with Functional Methods
Grapa's .map()
and .filter()
methods are not just data processing tools - they're sophisticated concurrency primitives that can coordinate multiple worker threads. This enables structured concurrency patterns where all threads must complete before proceeding.
Thread Synchronization Barrier
/* Spawn multiple worker threads that all must complete */
workers = [1, 2, 3, 4, 5].map(op(worker_id) {
/* Each worker does independent work */
("Worker " + worker_id.str() + " starting").echo();
sleep(worker_id); /* Simulate work */
("Worker " + worker_id.str() + " completed").echo();
worker_id * 100; /* Return result */
});
/* All workers complete before proceeding */
("All workers finished").echo();
/* Result: [100, 200, 300, 400, 500] */
Parallel Task Execution
/* Execute multiple independent tasks in parallel */
tasks = [
op() { "Task A: Database query".echo(); sleep(2); "A done".echo(); },
op() { "Task B: API call".echo(); sleep(1); "B done".echo(); },
op() { "Task C: File processing".echo(); sleep(3); "C done".echo(); }
];
results = tasks.map(op(task) { task(); });
/* All tasks complete before proceeding */
("All tasks completed").echo();
Resource Pool Coordination
/* Coordinate multiple resource operations */
resources = ["db1", "db2", "db3", "cache1", "cache2"].map(op(resource) {
("Connecting to " + resource).echo();
/* Simulate connection */
sleep(1);
("Connected to " + resource).echo();
resource + "_connection";
});
/* All resources connected before proceeding */
("All resources ready").echo();
/* Result: ["db1_connection", "db2_connection", "db3_connection", "cache1_connection", "cache2_connection"] */
System Initialization Pattern
/* Initialize multiple system components in parallel */
components = [
op() { initialize_database(); },
op() { initialize_cache(); },
op() { initialize_api_server(); },
op() { initialize_file_system(); }
];
initialized = components.map(op(init) { init(); });
/* All components initialized before proceeding */
("System ready").echo();
Microservices Coordination
/* Coordinate multiple microservice calls */
services = [
op() { call_user_service(); },
op() { call_payment_service(); },
op() { call_inventory_service(); }
];
results = services.map(op(service) { service(); });
/* All services respond before proceeding */
Worker Pattern Integration
Grapa's worker pattern is deeply integrated throughout the system:
/* Worker pattern for complex operations */
worker_op = op(input) {
/* Complex processing */
result = input.map(op(x) { x * x; });
result;
};
/* Execute with worker thread */
t = $thread();
t.start(worker_op, large_dataset, op(result) { result.echo(); });
Network-Based Distributed Processing
/* Distributed processing across network */
distributed_op = op(data) {
/* Process data across multiple nodes */
processed = data.map(op(item) { process_item(item); });
processed;
};
/* Network callback for distributed results */
network_handler = op(message, hasmore) {
if (hasmore == 0) {
/* Process complete message */
result = distributed_op(message);
send_result(result);
};
};
Comparison with Traditional Async/Await
Grapa's Superior Approach
Feature | Traditional Async/Await | Grapa's Approach |
---|---|---|
Parallel Processing | Manual Promise.all() | Automatic with .map()/.filter() |
Thread Safety | Manual synchronization | Automatic variable locking |
Callback Context | Limited scope | Rich object references |
Execution Trees | Not accessible | Human-readable and manipulable |
Structured Concurrency | Complex patterns | Built-in with functional methods |
Network Integration | Separate libraries | Native $net() with callbacks |
Why Grapa's Approach is Superior
- Automatic Parallelism:
.map()
and.filter()
are parallel by default - Rich Callbacks: Object references provide full context access
- Execution Trees: Human-readable, manipulable execution structures
- Built-in Thread Safety: No manual synchronization required
- Network Integration: Native distributed processing capabilities
Best Practices
Thread Count Management
/* For small datasets - let Grapa handle parallelism */
small_data = [1, 2, 3, 4, 5];
result = small_data.map(op(x) { x * x; });
/* For large datasets - specify thread count */
large_data = (1000000).range(0,1);
result = large_data.map(op(x) { x * x; }, 8); /* Limit to 8 threads */
Performance Considerations: Parallel vs Sequential
Important: .map()
and .filter()
must copy results from worker threads, which can be expensive for large datasets. Consider the trade-offs:
/* For large datasets with simple operations - use sequential */
big_data = (1000000).range();
result = [];
i = 0;
while (i < big_data.len()) {
result += big_data[i] * 2;
i += 1;
}
/* For smaller datasets with complex work - use parallel */
small_data = (1000).range();
result = small_data.map(op(x) {
/* Complex calculation that benefits from parallelization */
complex_calculation = x * x + x.sqrt() + x.sin();
complex_calculation;
}, 4);
/* .reduce() is more efficient - can use PTR references */
sum = big_data.reduce(op(acc, x) { acc + x; }, 0, 4);
When to use each approach: - Use sequential (for/while) for large datasets with simple operations - Use parallel (.map/.filter) for smaller datasets with complex operations - Use .reduce() for large datasets when possible (more efficient)
Callback Design Patterns
/* Rich callback with object reference */
widget_callback = op(o, data) {
/* o = widget object reference */
o.set({"text": data.str()});
o.redraw();
};
/* Network callback with session management */
network_callback = op(session, message, hasmore) {
session.data += message;
if (hasmore == 0) {
process_complete_message(session.data);
session.data = "";
};
};