add event loop Channel abstraction

This is akin to channels in Go, except:
 * implemented in userland
 * they are lock-free and thread-safe
 * they integrate with the userland event loop

The self hosted compiler is changed to use a channel for events,
and made to stay alive, watching files and performing builds when
things change, however the main.zig file exits after 1 build.

Note that nothing is actually built yet, it just parses the input
and then declares that the build succeeded.

Next items to do:
 * add windows and macos support for std.event.Loop
 * improve the event loop stop() operation
 * make the event loop multiplex coroutines onto kernel threads
 * watch source file for updates, and provide AST diffs
   (at least list the top level declaration changes)
 * top level declaration analysis
This commit is contained in:
Andrew Kelley 2018-06-29 15:39:55 -04:00
parent 2759c7951d
commit a3f55aaf34
6 changed files with 416 additions and 41 deletions

View File

@ -1,6 +1,7 @@
const std = @import("std");
const builtin = @import("builtin");
const event = std.event;
const os = std.os;
const io = std.io;
const mem = std.mem;
@ -43,6 +44,9 @@ const Command = struct {
};
pub fn main() !void {
// This allocator needs to be thread-safe because we use it for the event.Loop
// which multiplexes coroutines onto kernel threads.
// libc allocator is guaranteed to have this property.
const allocator = std.heap.c_allocator;
var stdout_file = try std.io.getStdOut();
@ -380,8 +384,10 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1);
defer allocator.free(zig_lib_dir);
var loop = try event.Loop.init(allocator);
var module = try Module.create(
allocator,
&loop,
root_name,
root_source_file,
Target.Native,
@ -471,9 +477,35 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
module.emit_file_type = emit_type;
module.link_objects = link_objects;
module.assembly_files = assembly_files;
module.link_out_file = flags.single("out-file");
try module.build();
try module.link(flags.single("out-file"));
const process_build_events_handle = try async<loop.allocator> processBuildEvents(module, true);
defer cancel process_build_events_handle;
loop.run();
}
async fn processBuildEvents(module: *Module, watch: bool) void {
while (watch) {
// TODO directly awaiting async should guarantee memory allocation elision
const build_event = await (async module.events.get() catch unreachable);
switch (build_event) {
Module.Event.Ok => {
std.debug.warn("Build succeeded\n");
// for now we stop after 1
module.loop.stop();
return;
},
Module.Event.Error => |err| {
std.debug.warn("build failed: {}\n", @errorName(err));
@panic("TODO error return trace");
},
Module.Event.Fail => |errs| {
@panic("TODO print compile error messages");
},
}
}
}
fn cmdBuildExe(allocator: *Allocator, args: []const []const u8) !void {
@ -780,4 +812,3 @@ const CliPkg = struct {
self.children.deinit();
}
};

View File

@ -11,9 +11,11 @@ const warn = std.debug.warn;
const Token = std.zig.Token;
const ArrayList = std.ArrayList;
const errmsg = @import("errmsg.zig");
const ast = std.zig.ast;
const event = std.event;
pub const Module = struct {
allocator: *mem.Allocator,
loop: *event.Loop,
name: Buffer,
root_src_path: ?[]const u8,
module: llvm.ModuleRef,
@ -76,6 +78,50 @@ pub const Module = struct {
kind: Kind,
link_out_file: ?[]const u8,
events: *event.Channel(Event),
// TODO handle some of these earlier and report them in a way other than error codes
pub const BuildError = error{
OutOfMemory,
EndOfStream,
BadFd,
Io,
IsDir,
Unexpected,
SystemResources,
SharingViolation,
PathAlreadyExists,
FileNotFound,
AccessDenied,
PipeBusy,
FileTooBig,
SymLinkLoop,
ProcessFdQuotaExceeded,
NameTooLong,
SystemFdQuotaExceeded,
NoDevice,
PathNotFound,
NoSpaceLeft,
NotDir,
FileSystem,
OperationAborted,
IoPending,
BrokenPipe,
WouldBlock,
FileClosed,
DestinationAddressRequired,
DiskQuota,
InputOutput,
NoStdHandles,
};
pub const Event = union(enum) {
Ok,
Fail: []errmsg.Msg,
Error: BuildError,
};
pub const DarwinVersionMin = union(enum) {
None,
MacOS: []const u8,
@ -104,7 +150,7 @@ pub const Module = struct {
};
pub fn create(
allocator: *mem.Allocator,
loop: *event.Loop,
name: []const u8,
root_src_path: ?[]const u8,
target: *const Target,
@ -113,7 +159,7 @@ pub const Module = struct {
zig_lib_dir: []const u8,
cache_dir: []const u8,
) !*Module {
var name_buffer = try Buffer.init(allocator, name);
var name_buffer = try Buffer.init(loop.allocator, name);
errdefer name_buffer.deinit();
const context = c.LLVMContextCreate() orelse return error.OutOfMemory;
@ -125,8 +171,12 @@ pub const Module = struct {
const builder = c.LLVMCreateBuilderInContext(context) orelse return error.OutOfMemory;
errdefer c.LLVMDisposeBuilder(builder);
const module_ptr = try allocator.create(Module{
.allocator = allocator,
const events = try event.Channel(Event).create(loop, 0);
errdefer events.destroy();
return loop.allocator.create(Module{
.loop = loop,
.events = events,
.name = name_buffer,
.root_src_path = root_src_path,
.module = module,
@ -171,7 +221,7 @@ pub const Module = struct {
.link_objects = [][]const u8{},
.windows_subsystem_windows = false,
.windows_subsystem_console = false,
.link_libs_list = ArrayList(*LinkLib).init(allocator),
.link_libs_list = ArrayList(*LinkLib).init(loop.allocator),
.libc_link_lib = null,
.err_color = errmsg.Color.Auto,
.darwin_frameworks = [][]const u8{},
@ -179,9 +229,8 @@ pub const Module = struct {
.test_filters = [][]const u8{},
.test_name_prefix = null,
.emit_file_type = Emit.Binary,
.link_out_file = null,
});
errdefer allocator.destroy(module_ptr);
return module_ptr;
}
fn dump(self: *Module) void {
@ -189,58 +238,70 @@ pub const Module = struct {
}
pub fn destroy(self: *Module) void {
self.events.destroy();
c.LLVMDisposeBuilder(self.builder);
c.LLVMDisposeModule(self.module);
c.LLVMContextDispose(self.context);
self.name.deinit();
self.allocator.destroy(self);
self.a().destroy(self);
}
pub fn build(self: *Module) !void {
if (self.llvm_argv.len != 0) {
var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.allocator, [][]const []const u8{
var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.a(), [][]const []const u8{
[][]const u8{"zig (LLVM option parsing)"},
self.llvm_argv,
});
defer c_compatible_args.deinit();
// TODO this sets global state
c.ZigLLVMParseCommandLineOptions(self.llvm_argv.len + 1, c_compatible_args.ptr);
}
_ = try async<self.a()> self.buildAsync();
}
async fn buildAsync(self: *Module) void {
while (true) {
// TODO directly awaiting async should guarantee memory allocation elision
// TODO also async before suspending should guarantee memory allocation elision
(await (async self.addRootSrc() catch unreachable)) catch |err| {
await (async self.events.put(Event{ .Error = err }) catch unreachable);
return;
};
await (async self.events.put(Event.Ok) catch unreachable);
}
}
async fn addRootSrc(self: *Module) !void {
const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path");
const root_src_real_path = os.path.real(self.allocator, root_src_path) catch |err| {
const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| {
try printError("unable to get real path '{}': {}", root_src_path, err);
return err;
};
errdefer self.allocator.free(root_src_real_path);
errdefer self.a().free(root_src_real_path);
const source_code = io.readFileAlloc(self.allocator, root_src_real_path) catch |err| {
const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| {
try printError("unable to open '{}': {}", root_src_real_path, err);
return err;
};
errdefer self.allocator.free(source_code);
errdefer self.a().free(source_code);
warn("====input:====\n");
warn("{}", source_code);
warn("====parse:====\n");
var tree = try std.zig.parse(self.allocator, source_code);
var tree = try std.zig.parse(self.a(), source_code);
defer tree.deinit();
var stderr_file = try std.io.getStdErr();
var stderr_file_out_stream = std.io.FileOutStream.init(&stderr_file);
const out_stream = &stderr_file_out_stream.stream;
warn("====fmt:====\n");
_ = try std.zig.render(self.allocator, out_stream, &tree);
warn("====ir:====\n");
warn("TODO\n\n");
warn("====llvm ir:====\n");
self.dump();
//var it = tree.root_node.decls.iterator();
//while (it.next()) |decl_ptr| {
// const decl = decl_ptr.*;
// switch (decl.id) {
// ast.Node.Comptime => @panic("TODO"),
// ast.Node.VarDecl => @panic("TODO"),
// ast.Node.UseDecl => @panic("TODO"),
// ast.Node.FnDef => @panic("TODO"),
// ast.Node.TestDecl => @panic("TODO"),
// else => unreachable,
// }
//}
}
pub fn link(self: *Module, out_file: ?[]const u8) !void {
@ -263,11 +324,11 @@ pub const Module = struct {
}
}
const link_lib = try self.allocator.create(LinkLib{
const link_lib = try self.a().create(LinkLib{
.name = name,
.path = null,
.provided_explicitly = provided_explicitly,
.symbols = ArrayList([]u8).init(self.allocator),
.symbols = ArrayList([]u8).init(self.a()),
});
try self.link_libs_list.append(link_lib);
if (is_libc) {
@ -275,6 +336,10 @@ pub const Module = struct {
}
return link_lib;
}
fn a(self: Module) *mem.Allocator {
return self.loop.allocator;
}
};
fn printError(comptime format: []const u8, args: ...) !void {

View File

@ -1,4 +1,4 @@
const std = @import("std");
const std = @import("../index.zig");
const assert = std.debug.assert;
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;

View File

@ -4,6 +4,8 @@ const assert = std.debug.assert;
const event = this;
const mem = std.mem;
const posix = std.os.posix;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
pub const TcpServer = struct {
handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
@ -95,16 +97,29 @@ pub const Loop = struct {
allocator: *mem.Allocator,
epollfd: i32,
keep_running: bool,
next_tick_queue: std.atomic.QueueMpsc(promise),
fn init(allocator: *mem.Allocator) !Loop {
pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
/// The allocator must be thread-safe because we use it for multiplexing
/// coroutines onto kernel threads.
pub fn init(allocator: *mem.Allocator) !Loop {
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
errdefer std.os.close(epollfd);
return Loop{
.keep_running = true,
.allocator = allocator,
.epollfd = epollfd,
.next_tick_queue = std.atomic.QueueMpsc(promise).init(),
};
}
/// must call stop before deinit
pub fn deinit(self: *Loop) void {
std.os.close(self.epollfd);
}
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
var ev = std.os.linux.epoll_event{
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
@ -126,11 +141,21 @@ pub const Loop = struct {
pub fn stop(self: *Loop) void {
// TODO make atomic
self.keep_running = false;
// TODO activate an fd in the epoll set
// TODO activate an fd in the epoll set which should cancel all the promises
}
/// bring your own linked list node. this means it can't fail.
pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
self.next_tick_queue.put(node);
}
pub fn run(self: *Loop) void {
while (self.keep_running) {
// TODO multiplex the next tick queue and the epoll event results onto a thread pool
while (self.next_tick_queue.get()) |node| {
resume node.data;
}
if (!self.keep_running) break;
var events: [16]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
@ -141,6 +166,215 @@ pub const Loop = struct {
}
};
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
/// when buffer is empty, consumers suspend and are resumed by producers
/// when buffer is full, producers suspend and are resumed by consumers
pub fn Channel(comptime T: type) type {
return struct {
loop: *Loop,
getters: std.atomic.QueueMpsc(GetNode),
putters: std.atomic.QueueMpsc(PutNode),
get_count: usize,
put_count: usize,
dispatch_lock: u8, // TODO make this a bool
need_dispatch: u8, // TODO make this a bool
// simple fixed size ring buffer
buffer_nodes: []T,
buffer_index: usize,
buffer_len: usize,
const SelfChannel = this;
const GetNode = struct {
ptr: *T,
tick_node: *Loop.NextTickNode,
};
const PutNode = struct {
data: T,
tick_node: *Loop.NextTickNode,
};
/// call destroy when done
pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
const buffer_nodes = try loop.allocator.alloc(T, capacity);
errdefer loop.allocator.free(buffer_nodes);
const self = try loop.allocator.create(SelfChannel{
.loop = loop,
.buffer_len = 0,
.buffer_nodes = buffer_nodes,
.buffer_index = 0,
.dispatch_lock = 0,
.need_dispatch = 0,
.getters = std.atomic.QueueMpsc(GetNode).init(),
.putters = std.atomic.QueueMpsc(PutNode).init(),
.get_count = 0,
.put_count = 0,
});
errdefer loop.allocator.destroy(self);
return self;
}
/// must be called when all calls to put and get have suspended and no more calls occur
pub fn destroy(self: *SelfChannel) void {
while (self.getters.get()) |get_node| {
cancel get_node.data.tick_node.data;
}
while (self.putters.get()) |put_node| {
cancel put_node.data.tick_node.data;
}
self.loop.allocator.free(self.buffer_nodes);
self.loop.allocator.destroy(self);
}
/// puts a data item in the channel. The promise completes when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
// TODO should be able to group memory allocation failure before first suspend point
// so that the async invocation catches it
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
suspend |handle| {
var my_tick_node = Loop.NextTickNode{
.next = undefined,
.data = handle,
};
var queue_node = std.atomic.QueueMpsc(PutNode).Node{
.data = PutNode{
.tick_node = &my_tick_node,
.data = data,
},
.next = undefined,
};
self.putters.put(&queue_node);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.loop.onNextTick(dispatch_tick_node_ptr);
}
}
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
// TODO should be able to group memory allocation failure before first suspend point
// so that the async invocation catches it
var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
_ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
var debug_handle: usize = undefined;
suspend |handle| {
debug_handle = @ptrToInt(handle);
var my_tick_node = Loop.NextTickNode{
.next = undefined,
.data = handle,
};
var queue_node = std.atomic.QueueMpsc(GetNode).Node{
.data = GetNode{
.ptr = &result,
.tick_node = &my_tick_node,
},
.next = undefined,
};
self.getters.put(&queue_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.loop.onNextTick(dispatch_tick_node_ptr);
}
return result;
}
async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
// resumed by onNextTick
suspend |handle| {
var tick_node = Loop.NextTickNode{
.data = handle,
.next = undefined,
};
tick_node_ptr.* = &tick_node;
}
// set the "need dispatch" flag
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
lock: while (true) {
// set the lock flag
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (prev_lock != 0) return;
// clear the need_dispatch flag since we're about to do it
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
while (true) {
one_dispatch: {
// later we correct these extra subtractions
var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
// transfer self.buffer to self.getters
while (self.buffer_len != 0) {
if (get_count == 0) break :one_dispatch;
const get_node = &self.getters.get().?.data;
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
self.loop.onNextTick(get_node.tick_node);
self.buffer_len -= 1;
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
// direct transfer self.putters to self.getters
while (get_count != 0 and put_count != 0) {
const get_node = &self.getters.get().?.data;
const put_node = &self.putters.get().?.data;
get_node.ptr.* = put_node.data;
self.loop.onNextTick(get_node.tick_node);
self.loop.onNextTick(put_node.tick_node);
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
// transfer self.putters to self.buffer
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
const put_node = &self.putters.get().?.data;
self.buffer_nodes[self.buffer_index] = put_node.data;
self.loop.onNextTick(put_node.tick_node);
self.buffer_index +%= 1;
self.buffer_len += 1;
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
}
// undo the extra subtractions
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
// clear need-dispatch flag
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
if (need_dispatch != 0) continue;
const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
assert(my_lock != 0);
// we have to check again now that we unlocked
if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
return;
}
}
}
};
}
pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
@ -199,6 +433,7 @@ test "listen on a port, send bytes, receive bytes" {
defer cancel p;
loop.run();
}
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
errdefer @panic("test failure");
@ -211,3 +446,43 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
assert(mem.eql(u8, msg, "hello from server\n"));
loop.stop();
}
test "std.event.Channel" {
var da = std.heap.DirectAllocator.init();
defer da.deinit();
const allocator = &da.allocator;
var loop = try Loop.init(allocator);
defer loop.deinit();
const channel = try Channel(i32).create(&loop, 0);
defer channel.destroy();
const handle = try async<allocator> testChannelGetter(&loop, channel);
defer cancel handle;
const putter = try async<allocator> testChannelPutter(channel);
defer cancel putter;
loop.run();
}
async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
errdefer @panic("test failed");
const value1_promise = try async channel.get();
const value1 = await value1_promise;
assert(value1 == 1234);
const value2_promise = try async channel.get();
const value2 = await value2_promise;
assert(value2 == 4567);
loop.stop();
}
async fn testChannelPutter(channel: *Channel(i32)) void {
await (async channel.put(1234) catch @panic("out of memory"));
await (async channel.put(4567) catch @panic("out of memory"));
}

View File

@ -130,6 +130,9 @@ pub fn formatType(
try output(context, "error.");
return output(context, @errorName(value));
},
builtin.TypeId.Promise => {
return format(context, Errors, output, "promise@{x}", @ptrToInt(value));
},
builtin.TypeId.Pointer => |ptr_info| switch (ptr_info.size) {
builtin.TypeInfo.Pointer.Size.One => switch (@typeInfo(ptr_info.child)) {
builtin.TypeId.Array => |info| {

View File

@ -38,6 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void {
}
/// This allocator makes a syscall directly for every allocation and free.
/// TODO make this thread-safe. The windows implementation will need some atomics.
pub const DirectAllocator = struct {
allocator: Allocator,
heap_handle: ?HeapHandle,