all tests passing

This commit is contained in:
Andrew Kelley 2019-08-11 19:53:10 -04:00
parent af8c6ccb4b
commit 4d8d513e16
No known key found for this signature in database
GPG Key ID: 7C5F548F728501A9
17 changed files with 240 additions and 381 deletions

View File

@ -1,10 +1,13 @@
* for loops need to spill the index. other payload captures probably also need to spill
* compile error (instead of crashing) for trying to get @Frame of generic function
* compile error (instead of crashing) for trying to async call and passing @Frame of wrong function
* `const result = (await a) + (await b);` this causes "Instruction does not dominate all uses" - need spill
* compile error for error: expected anyframe->T, found 'anyframe'
* compile error for error: expected anyframe->T, found 'i32'
* await of a non async function
* async call on a non async function
* a test where an async function destroys its own frame in a defer
* implicit cast of normal function to async function should be allowed when it is inferred to be async
* revive std.event.Loop
* @typeInfo for @Frame(func)
* peer type resolution of *@Frame(func) and anyframe
* peer type resolution of *@Frame(func) and anyframe->T when the return type matches
@ -36,3 +39,4 @@
- it can be assumed that these are always available: the awaiter ptr, return ptr if applicable,
error return trace ptr if applicable.
- it can be assumed that it is never cancelled
* fix the debug info for variables of async functions

View File

@ -770,7 +770,7 @@ fn tokenizeAndPrintRaw(docgen_tokenizer: *Tokenizer, out: var, source_token: Tok
.Keyword_or,
.Keyword_orelse,
.Keyword_packed,
.Keyword_promise,
.Keyword_anyframe,
.Keyword_pub,
.Keyword_resume,
.Keyword_return,

View File

@ -6024,13 +6024,14 @@ const assert = std.debug.assert;
var x: i32 = 1;
test "create a coroutine and cancel it" {
const p = try async<std.debug.global_allocator> simpleAsyncFn();
comptime assert(@typeOf(p) == promise->void);
cancel p;
test "call an async function" {
var frame = async simpleAsyncFn();
comptime assert(@typeOf(frame) == @Frame(simpleAsyncFn));
assert(x == 2);
}
async<*std.mem.Allocator> fn simpleAsyncFn() void {
fn simpleAsyncFn() void {
x += 1;
suspend;
x += 1;
}
{#code_end#}
@ -6041,60 +6042,33 @@ async<*std.mem.Allocator> fn simpleAsyncFn() void {
return to the caller or resumer. The following code demonstrates where control flow
goes:
</p>
{#code_begin|test#}
const std = @import("std");
const assert = std.debug.assert;
test "coroutine suspend, resume, cancel" {
seq('a');
const p = try async<std.debug.global_allocator> testAsyncSeq();
seq('c');
resume p;
seq('f');
cancel p;
seq('g');
assert(std.mem.eql(u8, points, "abcdefg"));
}
async fn testAsyncSeq() void {
defer seq('e');
seq('b');
suspend;
seq('d');
}
var points = [_]u8{0} ** "abcdefg".len;
var index: usize = 0;
fn seq(c: u8) void {
points[index] = c;
index += 1;
}
{#code_end#}
<p>
TODO another test example here
</p>
<p>
When an async function suspends itself, it must be sure that it will be
resumed or canceled somehow, for example by registering its promise handle
in an event loop. Use a suspend capture block to gain access to the
promise:
promise (TODO this is outdated):
</p>
{#code_begin|test#}
const std = @import("std");
const assert = std.debug.assert;
var the_frame: anyframe = undefined;
var result = false;
test "coroutine suspend with block" {
const p = try async<std.debug.global_allocator> testSuspendBlock();
_ = async testSuspendBlock();
std.debug.assert(!result);
resume a_promise;
resume the_frame;
std.debug.assert(result);
cancel p;
}
var a_promise: promise = undefined;
var result = false;
async fn testSuspendBlock() void {
fn testSuspendBlock() void {
suspend {
comptime assert(@typeOf(@handle()) == promise->void);
a_promise = @handle();
comptime assert(@typeOf(@frame()) == *@Frame(testSuspendBlock));
the_frame = @frame();
}
result = true;
}
@ -6124,16 +6098,13 @@ const std = @import("std");
const assert = std.debug.assert;
test "resume from suspend" {
var buf: [500]u8 = undefined;
var a = &std.heap.FixedBufferAllocator.init(buf[0..]).allocator;
var my_result: i32 = 1;
const p = try async<a> testResumeFromSuspend(&my_result);
cancel p;
_ = async testResumeFromSuspend(&my_result);
std.debug.assert(my_result == 2);
}
async fn testResumeFromSuspend(my_result: *i32) void {
suspend {
resume @handle();
resume @frame();
}
my_result.* += 1;
suspend;
@ -6172,30 +6143,30 @@ async fn testResumeFromSuspend(my_result: *i32) void {
const std = @import("std");
const assert = std.debug.assert;
var a_promise: promise = undefined;
var the_frame: anyframe = undefined;
var final_result: i32 = 0;
test "coroutine await" {
seq('a');
const p = async<std.debug.global_allocator> amain() catch unreachable;
_ = async amain();
seq('f');
resume a_promise;
resume the_frame;
seq('i');
assert(final_result == 1234);
assert(std.mem.eql(u8, seq_points, "abcdefghi"));
}
async fn amain() void {
seq('b');
const p = async another() catch unreachable;
var f = async another();
seq('e');
final_result = await p;
final_result = await f;
seq('h');
}
async fn another() i32 {
seq('c');
suspend {
seq('d');
a_promise = @handle();
the_frame = @frame();
}
seq('g');
return 1234;

View File

@ -5325,7 +5325,7 @@ static Error resolve_coro_frame(CodeGen *g, ZigType *frame_type) {
if (*instruction->name_hint == 0) {
name = buf_ptr(buf_sprintf("@local%" ZIG_PRI_usize, alloca_i));
} else {
name = instruction->name_hint;
name = buf_ptr(buf_sprintf("%s.%" ZIG_PRI_usize, instruction->name_hint, alloca_i));
}
field_names.append(name);
field_types.append(child_type);

View File

@ -535,24 +535,24 @@ static LLVMValueRef make_fn_llvm_value(CodeGen *g, ZigFn *fn) {
// use the ABI alignment, which is fine.
}
unsigned init_gen_i = 0;
if (!type_has_bits(return_type)) {
// nothing to do
} else if (type_is_nonnull_ptr(return_type)) {
addLLVMAttr(llvm_fn, 0, "nonnull");
} else if (!is_async && want_first_arg_sret(g, &fn_type->data.fn.fn_type_id)) {
// Sret pointers must not be address 0
addLLVMArgAttr(llvm_fn, 0, "nonnull");
addLLVMArgAttr(llvm_fn, 0, "sret");
if (cc_want_sret_attr(cc)) {
addLLVMArgAttr(llvm_fn, 0, "noalias");
}
init_gen_i = 1;
}
if (is_async) {
addLLVMArgAttr(llvm_fn, 0, "nonnull");
} else {
unsigned init_gen_i = 0;
if (!type_has_bits(return_type)) {
// nothing to do
} else if (type_is_nonnull_ptr(return_type)) {
addLLVMAttr(llvm_fn, 0, "nonnull");
} else if (want_first_arg_sret(g, &fn_type->data.fn.fn_type_id)) {
// Sret pointers must not be address 0
addLLVMArgAttr(llvm_fn, 0, "nonnull");
addLLVMArgAttr(llvm_fn, 0, "sret");
if (cc_want_sret_attr(cc)) {
addLLVMArgAttr(llvm_fn, 0, "noalias");
}
init_gen_i = 1;
}
// set parameter attributes
FnWalk fn_walk = {};
fn_walk.id = FnWalkIdAttrs;
@ -911,7 +911,7 @@ static Buf *panic_msg_buf(PanicMsgId msg_id) {
case PanicMsgIdBadResume:
return buf_create_from_str("resumed an async function which already returned");
case PanicMsgIdBadAwait:
return buf_create_from_str("async function awaited/canceled twice");
return buf_create_from_str("async function awaited twice");
case PanicMsgIdBadReturn:
return buf_create_from_str("async function returned twice");
case PanicMsgIdResumedAnAwaitingFn:
@ -2350,6 +2350,10 @@ static LLVMValueRef ir_render_return_begin(CodeGen *g, IrExecutable *executable,
return get_handle_value(g, g->cur_ret_ptr, operand_type, get_pointer_to_type(g, operand_type, true));
}
static void set_tail_call_if_appropriate(CodeGen *g, LLVMValueRef call_inst) {
LLVMSetTailCall(call_inst, true);
}
static LLVMValueRef ir_render_return(CodeGen *g, IrExecutable *executable, IrInstructionReturn *instruction) {
if (fn_is_async(g->cur_fn)) {
LLVMTypeRef usize_type_ref = g->builtin_types.entry_usize->llvm_type;
@ -2394,7 +2398,7 @@ static LLVMValueRef ir_render_return(CodeGen *g, IrExecutable *executable, IrIns
LLVMValueRef their_frame_ptr = LLVMBuildIntToPtr(g->builder, masked_prev_val,
get_llvm_type(g, any_frame_type), "");
LLVMValueRef call_inst = gen_resume(g, nullptr, their_frame_ptr, ResumeIdReturn, nullptr);
LLVMSetTailCall(call_inst, true);
set_tail_call_if_appropriate(g, call_inst);
LLVMBuildRetVoid(g->builder);
g->cur_is_after_return = false;
@ -4009,7 +4013,7 @@ static LLVMValueRef ir_render_call(CodeGen *g, IrExecutable *executable, IrInstr
LLVMBasicBlockRef call_bb = gen_suspend_begin(g, "CallResume");
LLVMValueRef call_inst = gen_resume(g, fn_val, frame_result_loc, ResumeIdCall, nullptr);
LLVMSetTailCall(call_inst, true);
set_tail_call_if_appropriate(g, call_inst);
LLVMBuildRetVoid(g->builder);
LLVMPositionBuilderAtEnd(g->builder, call_bb);
@ -5520,7 +5524,7 @@ static LLVMValueRef ir_render_cancel(CodeGen *g, IrExecutable *executable, IrIns
LLVMPositionBuilderAtEnd(g->builder, early_return_block);
LLVMValueRef call_inst = gen_resume(g, nullptr, target_frame_ptr, ResumeIdAwaitEarlyReturn, awaiter_ored_val);
LLVMSetTailCall(call_inst, true);
set_tail_call_if_appropriate(g, call_inst);
LLVMBuildRetVoid(g->builder);
LLVMPositionBuilderAtEnd(g->builder, resume_bb);
@ -5556,8 +5560,9 @@ static LLVMValueRef ir_render_await(CodeGen *g, IrExecutable *executable, IrInst
}
// supply the error return trace pointer
LLVMValueRef my_err_ret_trace_val = get_cur_err_ret_trace_val(g, instruction->base.scope);
if (my_err_ret_trace_val != nullptr) {
if (codegen_fn_has_err_ret_tracing_arg(g, result_type)) {
LLVMValueRef my_err_ret_trace_val = get_cur_err_ret_trace_val(g, instruction->base.scope);
assert(my_err_ret_trace_val != nullptr);
LLVMValueRef err_ret_trace_ptr_ptr = LLVMBuildStructGEP(g->builder, target_frame_ptr,
frame_index_trace_arg(g, result_type), "");
LLVMBuildStore(g->builder, my_err_ret_trace_val, err_ret_trace_ptr_ptr);
@ -5588,7 +5593,7 @@ static LLVMValueRef ir_render_await(CodeGen *g, IrExecutable *executable, IrInst
// Tail resume it now, so that it can complete.
LLVMPositionBuilderAtEnd(g->builder, early_return_block);
LLVMValueRef call_inst = gen_resume(g, nullptr, target_frame_ptr, ResumeIdAwaitEarlyReturn, awaiter_init_val);
LLVMSetTailCall(call_inst, true);
set_tail_call_if_appropriate(g, call_inst);
LLVMBuildRetVoid(g->builder);
// Rely on the target to resume us from suspension.

View File

@ -15064,6 +15064,9 @@ static IrInstruction *ir_analyze_async_call(IrAnalyze *ira, IrInstructionCallSrc
if (result_loc != nullptr && (type_is_invalid(result_loc->value.type) || instr_is_unreachable(result_loc))) {
return result_loc;
}
result_loc = ir_implicit_cast(ira, result_loc, get_pointer_to_type(ira->codegen, frame_type, false));
if (type_is_invalid(result_loc->value.type))
return ira->codegen->invalid_instruction;
return &ir_build_call_gen(ira, &call_instruction->base, fn_entry, fn_ref, arg_count,
casted_args, FnInlineAuto, true, nullptr, result_loc, frame_type)->base;
}

View File

@ -77,18 +77,19 @@ pub fn Channel(comptime T: type) type {
/// must be called when all calls to put and get have suspended and no more calls occur
pub fn destroy(self: *SelfChannel) void {
while (self.getters.get()) |get_node| {
cancel get_node.data.tick_node.data;
resume get_node.data.tick_node.data;
}
while (self.putters.get()) |put_node| {
cancel put_node.data.tick_node.data;
resume put_node.data.tick_node.data;
}
self.loop.allocator.free(self.buffer_nodes);
self.loop.allocator.destroy(self);
}
/// puts a data item in the channel. The promise completes when the value has been added to the
/// puts a data item in the channel. The function returns when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
/// Or when the channel is destroyed.
pub fn put(self: *SelfChannel, data: T) void {
var my_tick_node = Loop.NextTickNode.init(@frame());
var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
.tick_node = &my_tick_node,
@ -114,7 +115,7 @@ pub fn Channel(comptime T: type) type {
}
}
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// await this function to get an item from the channel. If the buffer is empty, the frame will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
// TODO integrate this function with named return values

View File

@ -76,12 +76,8 @@ pub const Request = struct {
pub const PWriteVError = error{OutOfMemory} || File.WriteError;
/// data - just the inner references - must live until pwritev promise completes.
/// data - just the inner references - must live until pwritev frame completes.
pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
switch (builtin.os) {
.macosx,
.linux,
@ -109,7 +105,7 @@ pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: us
}
}
/// data must outlive the returned promise
/// data must outlive the returned frame
pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
if (data.len == 0) return;
if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable);
@ -123,15 +119,10 @@ pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, off
}
pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
.handle = @handle(),
.handle = @frame(),
.overlapped = windows.OVERLAPPED{
.Internal = 0,
.InternalHigh = 0,
@ -166,18 +157,13 @@ pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64)
}
}
/// iovecs must live until pwritev promise completes.
/// iovecs must live until pwritev frame completes.
pub async fn pwritevPosix(
loop: *Loop,
fd: fd_t,
iovecs: []const os.iovec_const,
offset: usize,
) os.WriteError!void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
var req_node = RequestNode{
.prev = null,
.next = null,
@ -194,7 +180,7 @@ pub async fn pwritevPosix(
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
.data = @frame(),
},
},
},
@ -211,13 +197,8 @@ pub async fn pwritevPosix(
pub const PReadVError = error{OutOfMemory} || File.ReadError;
/// data - just the inner references - must live until preadv promise completes.
/// data - just the inner references - must live until preadv frame completes.
pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
assert(data.len != 0);
switch (builtin.os) {
.macosx,
@ -246,7 +227,7 @@ pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PR
}
}
/// data must outlive the returned promise
/// data must outlive the returned frame
pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
assert(data.len != 0);
if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable);
@ -272,15 +253,10 @@ pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u6
}
pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
.handle = @handle(),
.handle = @frame(),
.overlapped = windows.OVERLAPPED{
.Internal = 0,
.InternalHigh = 0,
@ -314,18 +290,13 @@ pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize
return usize(bytes_transferred);
}
/// iovecs must live until preadv promise completes
/// iovecs must live until preadv frame completes
pub async fn preadvPosix(
loop: *Loop,
fd: fd_t,
iovecs: []const os.iovec,
offset: usize,
) os.ReadError!usize {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
var req_node = RequestNode{
.prev = null,
.next = null,
@ -342,7 +313,7 @@ pub async fn preadvPosix(
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
.data = @frame(),
},
},
},
@ -363,11 +334,6 @@ pub async fn openPosix(
flags: u32,
mode: File.Mode,
) File.OpenError!fd_t {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
const path_c = try std.os.toPosixPath(path);
var req_node = RequestNode{
@ -386,7 +352,7 @@ pub async fn openPosix(
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
.data = @frame(),
},
},
},
@ -643,11 +609,6 @@ async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !
}
async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
defer loop.allocator.free(path_with_null);
@ -667,7 +628,7 @@ async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
.data = @frame(),
},
},
},
@ -682,7 +643,7 @@ async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8
return req_node.data.msg.WriteFile.result;
}
/// The promise resumes when the last data has been confirmed written, but before the file handle
/// The frame resumes when the last data has been confirmed written, but before the file handle
/// is closed.
/// Caller owns returned memory.
pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
@ -734,7 +695,7 @@ pub const WatchEventId = enum {
//
// const FileTable = std.AutoHashMap([]const u8, *Put);
// const Put = struct {
// putter: promise,
// putter: anyframe,
// value_ptr: *V,
// };
// },
@ -748,21 +709,21 @@ pub const WatchEventId = enum {
// const WindowsOsData = struct {
// table_lock: event.Lock,
// dir_table: DirTable,
// all_putters: std.atomic.Queue(promise),
// all_putters: std.atomic.Queue(anyframe),
// ref_count: std.atomic.Int(usize),
//
// const DirTable = std.AutoHashMap([]const u8, *Dir);
// const FileTable = std.AutoHashMap([]const u16, V);
//
// const Dir = struct {
// putter: promise,
// putter: anyframe,
// file_table: FileTable,
// table_lock: event.Lock,
// };
// };
//
// const LinuxOsData = struct {
// putter: promise,
// putter: anyframe,
// inotify_fd: i32,
// wd_table: WdTable,
// table_lock: event.Lock,
@ -776,7 +737,7 @@ pub const WatchEventId = enum {
// };
// };
//
// const FileToHandle = std.AutoHashMap([]const u8, promise);
// const FileToHandle = std.AutoHashMap([]const u8, anyframe);
//
// const Self = @This();
//
@ -811,7 +772,7 @@ pub const WatchEventId = enum {
// .table_lock = event.Lock.init(loop),
// .dir_table = OsData.DirTable.init(loop.allocator),
// .ref_count = std.atomic.Int(usize).init(1),
// .all_putters = std.atomic.Queue(promise).init(),
// .all_putters = std.atomic.Queue(anyframe).init(),
// },
// };
// return self;
@ -926,14 +887,9 @@ pub const WatchEventId = enum {
// }
//
// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
// // TODO https://github.com/ziglang/zig/issues/1194
// suspend {
// resume @handle();
// }
//
// var value_copy = value;
// var put = OsData.Put{
// .putter = @handle(),
// .putter = @frame(),
// .value_ptr = &value_copy,
// };
// out_put.* = &put;
@ -1091,18 +1047,13 @@ pub const WatchEventId = enum {
// }
//
// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
// // TODO https://github.com/ziglang/zig/issues/1194
// suspend {
// resume @handle();
// }
//
// self.ref();
// defer self.deref();
//
// defer os.close(dir_handle);
//
// var putter_node = std.atomic.Queue(promise).Node{
// .data = @handle(),
// var putter_node = std.atomic.Queue(anyframe).Node{
// .data = @frame(),
// .prev = null,
// .next = null,
// };
@ -1112,7 +1063,7 @@ pub const WatchEventId = enum {
// var resume_node = Loop.ResumeNode.Basic{
// .base = Loop.ResumeNode{
// .id = Loop.ResumeNode.Id.Basic,
// .handle = @handle(),
// .handle = @frame(),
// .overlapped = windows.OVERLAPPED{
// .Internal = 0,
// .InternalHigh = 0,
@ -1207,17 +1158,12 @@ pub const WatchEventId = enum {
// }
//
// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
// // TODO https://github.com/ziglang/zig/issues/1194
// suspend {
// resume @handle();
// }
//
// const loop = channel.loop;
//
// var watch = Self{
// .channel = channel,
// .os_data = OsData{
// .putter = @handle(),
// .putter = @frame(),
// .inotify_fd = inotify_fd,
// .wd_table = OsData.WdTable.init(loop.allocator),
// .table_lock = event.Lock.init(loop),

View File

@ -2,8 +2,6 @@ const std = @import("../std.zig");
const assert = std.debug.assert;
const testing = std.testing;
const builtin = @import("builtin");
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Lock = std.event.Lock;
const Loop = std.event.Loop;
@ -23,7 +21,7 @@ pub fn Future(comptime T: type) type {
available: u8,
const Self = @This();
const Queue = std.atomic.Queue(promise);
const Queue = std.atomic.Queue(anyframe);
pub fn init(loop: *Loop) Self {
return Self{
@ -37,10 +35,10 @@ pub fn Future(comptime T: type) type {
/// available.
/// Thread-safe.
pub async fn get(self: *Self) *T {
if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
return &self.data;
}
const held = await (async self.lock.acquire() catch unreachable);
const held = self.lock.acquire();
held.release();
return &self.data;
@ -49,7 +47,7 @@ pub fn Future(comptime T: type) type {
/// Gets the data without waiting for it. If it's available, a pointer is
/// returned. Otherwise, null is returned.
pub fn getOrNull(self: *Self) ?*T {
if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
return &self.data;
} else {
return null;
@ -62,10 +60,10 @@ pub fn Future(comptime T: type) type {
/// It's not required to call start() before resolve() but it can be useful since
/// this method is thread-safe.
pub async fn start(self: *Self) ?*T {
const state = @cmpxchgStrong(u8, &self.available, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null;
const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null;
switch (state) {
1 => {
const held = await (async self.lock.acquire() catch unreachable);
const held = self.lock.acquire();
held.release();
return &self.data;
},
@ -77,7 +75,7 @@ pub fn Future(comptime T: type) type {
/// Make the data become available. May be called only once.
/// Before calling this, modify the `data` property.
pub fn resolve(self: *Self) void {
const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst);
const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst);
assert(prev == 0 or prev == 1); // resolve() called twice
Lock.Held.release(Lock.Held{ .lock = &self.lock });
}
@ -86,7 +84,7 @@ pub fn Future(comptime T: type) type {
test "std.event.Future" {
// https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded or builtin.os != builtin.Os.linux) return error.SkipZigTest;
if (builtin.single_threaded) return error.SkipZigTest;
const allocator = std.heap.direct_allocator;
@ -94,38 +92,33 @@ test "std.event.Future" {
try loop.initMultiThreaded(allocator);
defer loop.deinit();
const handle = try async<allocator> testFuture(&loop);
defer cancel handle;
const handle = async testFuture(&loop);
loop.run();
}
async fn testFuture(loop: *Loop) void {
suspend {
resume @handle();
}
var future = Future(i32).init(loop);
const a = async waitOnFuture(&future) catch @panic("memory");
const b = async waitOnFuture(&future) catch @panic("memory");
const c = async resolveFuture(&future) catch @panic("memory");
const a = async waitOnFuture(&future);
const b = async waitOnFuture(&future);
const c = async resolveFuture(&future);
// TODO make this work:
//const result = (await a) + (await b);
const a_result = await a;
const b_result = await b;
const result = a_result + b_result;
const result = (await a) + (await b);
cancel c;
testing.expect(result == 12);
}
async fn waitOnFuture(future: *Future(i32)) i32 {
suspend {
resume @handle();
}
return (await (async future.get() catch @panic("memory"))).*;
return future.get().*;
}
async fn resolveFuture(future: *Future(i32)) void {
suspend {
resume @handle();
}
future.data = 6;
future.resolve();
}

View File

@ -2,8 +2,6 @@ const std = @import("../std.zig");
const builtin = @import("builtin");
const Lock = std.event.Lock;
const Loop = std.event.Loop;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const testing = std.testing;
/// ReturnType must be `void` or `E!void`
@ -16,10 +14,10 @@ pub fn Group(comptime ReturnType: type) type {
const Self = @This();
const Error = switch (@typeInfo(ReturnType)) {
builtin.TypeId.ErrorUnion => |payload| payload.error_set,
.ErrorUnion => |payload| payload.error_set,
else => void,
};
const Stack = std.atomic.Stack(promise->ReturnType);
const Stack = std.atomic.Stack(anyframe->ReturnType);
pub fn init(loop: *Loop) Self {
return Self{
@ -29,7 +27,7 @@ pub fn Group(comptime ReturnType: type) type {
};
}
/// Cancel all the outstanding promises. Can be called even if wait was already called.
/// Cancel all the outstanding frames. Can be called even if wait was already called.
pub fn deinit(self: *Self) void {
while (self.coro_stack.pop()) |node| {
cancel node.data;
@ -40,8 +38,8 @@ pub fn Group(comptime ReturnType: type) type {
}
}
/// Add a promise to the group. Thread-safe.
pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
/// Add a frame to the group. Thread-safe.
pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) {
const node = try self.lock.loop.allocator.create(Stack.Node);
node.* = Stack.Node{
.next = undefined,
@ -51,7 +49,7 @@ pub fn Group(comptime ReturnType: type) type {
}
/// Add a node to the group. Thread-safe. Cannot fail.
/// `node.data` should be the promise handle to add to the group.
/// `node.data` should be the frame handle to add to the group.
/// The node's memory should be in the coroutine frame of
/// the handle that is in the node, or somewhere guaranteed to live
/// at least as long.
@ -59,40 +57,11 @@ pub fn Group(comptime ReturnType: type) type {
self.coro_stack.push(node);
}
/// This is equivalent to an async call, but the async function is added to the group, instead
/// of returning a promise. func must be async and have return type ReturnType.
/// Thread-safe.
pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
const S = struct {
async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType {
// TODO this is a hack to make the memory following be inside the coro frame
suspend {
var my_node: Stack.Node = undefined;
node.* = &my_node;
resume @handle();
}
// TODO this allocation elision should be guaranteed because we await it in
// this coro frame
return await (async func(args2) catch unreachable);
}
};
var node: *Stack.Node = undefined;
const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args);
node.* = Stack.Node{
.next = undefined,
.data = handle,
};
self.coro_stack.push(node);
}
/// Wait for all the calls and promises of the group to complete.
/// Thread-safe.
/// Safe to call any number of times.
pub async fn wait(self: *Self) ReturnType {
// TODO catch unreachable because the allocation can be grouped with
// the coro frame allocation
const held = await (async self.lock.acquire() catch unreachable);
const held = self.lock.acquire();
defer held.release();
while (self.coro_stack.pop()) |node| {
@ -131,8 +100,7 @@ test "std.event.Group" {
try loop.initMultiThreaded(allocator);
defer loop.deinit();
const handle = try async<allocator> testGroup(&loop);
defer cancel handle;
const handle = async testGroup(&loop);
loop.run();
}
@ -140,26 +108,30 @@ test "std.event.Group" {
async fn testGroup(loop: *Loop) void {
var count: usize = 0;
var group = Group(void).init(loop);
group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory");
group.call(increaseByTen, &count) catch @panic("memory");
await (async group.wait() catch @panic("memory"));
var sleep_a_little_frame = async sleepALittle(&count);
group.add(&sleep_a_little_frame) catch @panic("memory");
var increase_by_ten_frame = async increaseByTen(&count);
group.add(&increase_by_ten_frame) catch @panic("memory");
group.wait();
testing.expect(count == 11);
var another = Group(anyerror!void).init(loop);
another.add(async somethingElse() catch @panic("memory")) catch @panic("memory");
another.call(doSomethingThatFails) catch @panic("memory");
testing.expectError(error.ItBroke, await (async another.wait() catch @panic("memory")));
var something_else_frame = async somethingElse();
another.add(&something_else_frame) catch @panic("memory");
var something_that_fails_frame = async doSomethingThatFails();
another.add(&something_that_fails_frame) catch @panic("memory");
testing.expectError(error.ItBroke, another.wait());
}
async fn sleepALittle(count: *usize) void {
std.time.sleep(1 * std.time.millisecond);
_ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
}
async fn increaseByTen(count: *usize) void {
var i: usize = 0;
while (i < 10) : (i += 1) {
_ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
}
}

View File

@ -1,6 +1,5 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const mem = std.mem;
@ -12,13 +11,13 @@ pub fn InStream(comptime ReadError: type) type {
/// Return the number of bytes read. It may be less than buffer.len.
/// If the number of bytes read is 0, it means end of stream.
/// End of stream is not an error condition.
readFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!usize,
readFn: async fn (self: *Self, buffer: []u8) Error!usize,
/// Return the number of bytes read. It may be less than buffer.len.
/// If the number of bytes read is 0, it means end of stream.
/// End of stream is not an error condition.
pub async fn read(self: *Self, buffer: []u8) !usize {
return await (async self.readFn(self, buffer) catch unreachable);
return self.readFn(self, buffer);
}
/// Return the number of bytes read. If it is less than buffer.len
@ -26,7 +25,7 @@ pub fn InStream(comptime ReadError: type) type {
pub async fn readFull(self: *Self, buffer: []u8) !usize {
var index: usize = 0;
while (index != buf.len) {
const amt_read = try await (async self.read(buf[index..]) catch unreachable);
const amt_read = try self.read(buf[index..]);
if (amt_read == 0) return index;
index += amt_read;
}
@ -35,25 +34,25 @@ pub fn InStream(comptime ReadError: type) type {
/// Same as `readFull` but end of stream returns `error.EndOfStream`.
pub async fn readNoEof(self: *Self, buf: []u8) !void {
const amt_read = try await (async self.readFull(buf[index..]) catch unreachable);
const amt_read = try self.readFull(buf[index..]);
if (amt_read < buf.len) return error.EndOfStream;
}
pub async fn readIntLittle(self: *Self, comptime T: type) !T {
var bytes: [@sizeOf(T)]u8 = undefined;
try await (async self.readNoEof(bytes[0..]) catch unreachable);
try self.readNoEof(bytes[0..]);
return mem.readIntLittle(T, &bytes);
}
pub async fn readIntBe(self: *Self, comptime T: type) !T {
var bytes: [@sizeOf(T)]u8 = undefined;
try await (async self.readNoEof(bytes[0..]) catch unreachable);
try self.readNoEof(bytes[0..]);
return mem.readIntBig(T, &bytes);
}
pub async fn readInt(self: *Self, comptime T: type, endian: builtin.Endian) !T {
var bytes: [@sizeOf(T)]u8 = undefined;
try await (async self.readNoEof(bytes[0..]) catch unreachable);
try self.readNoEof(bytes[0..]);
return mem.readInt(T, &bytes, endian);
}
@ -61,7 +60,7 @@ pub fn InStream(comptime ReadError: type) type {
// Only extern and packed structs have defined in-memory layout.
comptime assert(@typeInfo(T).Struct.layout != builtin.TypeInfo.ContainerLayout.Auto);
var res: [1]T = undefined;
try await (async self.readNoEof(@sliceToBytes(res[0..])) catch unreachable);
try self.readNoEof(@sliceToBytes(res[0..]));
return res[0];
}
};
@ -72,6 +71,6 @@ pub fn OutStream(comptime WriteError: type) type {
const Self = @This();
pub const Error = WriteError;
writeFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!void,
writeFn: async fn (self: *Self, buffer: []u8) Error!void,
};
}

View File

@ -3,8 +3,6 @@ const builtin = @import("builtin");
const assert = std.debug.assert;
const testing = std.testing;
const mem = std.mem;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Loop = std.event.Loop;
/// Thread-safe async/await lock.
@ -17,7 +15,7 @@ pub const Lock = struct {
queue: Queue,
queue_empty_bit: u8, // TODO make this a bool
const Queue = std.atomic.Queue(promise);
const Queue = std.atomic.Queue(anyframe);
pub const Held = struct {
lock: *Lock,
@ -30,19 +28,19 @@ pub const Lock = struct {
}
// We need to release the lock.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
return;
}
while (true) {
const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst);
if (old_bit != 0) {
// We did not obtain the lock. Great, the queue is someone else's problem.
return;
@ -55,11 +53,11 @@ pub const Lock = struct {
}
// Release the lock again.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
// Find out if we can be done.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
return;
}
}
@ -88,15 +86,11 @@ pub const Lock = struct {
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
assert(self.shared_bit == 0);
while (self.queue.get()) |node| cancel node.data;
while (self.queue.get()) |node| resume node.data;
}
pub async fn acquire(self: *Lock) Held {
// TODO explicitly put this memory in the coroutine frame #1194
suspend {
resume @handle();
}
var my_tick_node = Loop.NextTickNode.init(@handle());
var my_tick_node = Loop.NextTickNode.init(@frame());
errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
suspend {
@ -107,9 +101,9 @@ pub const Lock = struct {
// We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
// will attempt to grab the lock.
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.queue_empty_bit, .Xchg, 0, .SeqCst);
const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst);
if (old_bit == 0) {
if (self.queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
@ -123,8 +117,7 @@ pub const Lock = struct {
};
test "std.event.Lock" {
// TODO https://github.com/ziglang/zig/issues/2377
if (true) return error.SkipZigTest;
// TODO https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
const allocator = std.heap.direct_allocator;
@ -136,39 +129,34 @@ test "std.event.Lock" {
var lock = Lock.init(&loop);
defer lock.deinit();
const handle = try async<allocator> testLock(&loop, &lock);
defer cancel handle;
_ = async testLock(&loop, &lock);
loop.run();
testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data);
}
async fn testLock(loop: *Loop, lock: *Lock) void {
// TODO explicitly put next tick node memory in the coroutine frame #1194
suspend {
resume @handle();
}
const handle1 = async lockRunner(lock) catch @panic("out of memory");
const handle1 = async lockRunner(lock);
var tick_node1 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle1,
.data = &handle1,
};
loop.onNextTick(&tick_node1);
const handle2 = async lockRunner(lock) catch @panic("out of memory");
const handle2 = async lockRunner(lock);
var tick_node2 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle2,
.data = &handle2,
};
loop.onNextTick(&tick_node2);
const handle3 = async lockRunner(lock) catch @panic("out of memory");
const handle3 = async lockRunner(lock);
var tick_node3 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle3,
.data = &handle3,
};
loop.onNextTick(&tick_node3);
@ -185,7 +173,7 @@ async fn lockRunner(lock: *Lock) void {
var i: usize = 0;
while (i < shared_test_data.len) : (i += 1) {
const lock_promise = async lock.acquire() catch @panic("out of memory");
const lock_promise = async lock.acquire();
const handle = await lock_promise;
defer handle.release();

View File

@ -457,7 +457,7 @@ pub const Loop = struct {
var resume_node = ResumeNode.Basic{
.base = ResumeNode{
.id = ResumeNode.Id.Basic,
.handle = @handle(),
.handle = @frame(),
.overlapped = ResumeNode.overlapped_init,
},
};
@ -469,7 +469,7 @@ pub const Loop = struct {
var resume_node = ResumeNode.Basic{
.base = ResumeNode{
.id = ResumeNode.Id.Basic,
.handle = @handle(),
.handle = @frame(),
.overlapped = ResumeNode.overlapped_init,
},
.kev = undefined,

View File

@ -9,17 +9,17 @@ const File = std.fs.File;
const fd_t = os.fd_t;
pub const Server = struct {
handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void,
handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
loop: *Loop,
sockfd: ?i32,
accept_coro: ?promise,
accept_coro: ?anyframe,
listen_address: std.net.Address,
waiting_for_emfile_node: PromiseNode,
listen_resume_node: event.Loop.ResumeNode,
const PromiseNode = std.TailQueue(promise).Node;
const PromiseNode = std.TailQueue(anyframe).Node;
pub fn init(loop: *Loop) Server {
// TODO can't initialize handler coroutine here because we need well defined copy elision
@ -41,7 +41,7 @@ pub const Server = struct {
pub fn listen(
self: *Server,
address: *const std.net.Address,
handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void,
handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
) !void {
self.handleRequestFn = handleRequestFn;
@ -53,7 +53,7 @@ pub const Server = struct {
try os.listen(sockfd, os.SOMAXCONN);
self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd));
self.accept_coro = try async<self.loop.allocator> Server.handler(self);
self.accept_coro = async Server.handler(self);
errdefer cancel self.accept_coro.?;
self.listen_resume_node.handle = self.accept_coro.?;
@ -86,12 +86,7 @@ pub const Server = struct {
continue;
}
var socket = File.openHandle(accepted_fd);
_ = async<self.loop.allocator> self.handleRequestFn(self, &accepted_addr, socket) catch |err| switch (err) {
error.OutOfMemory => {
socket.close();
continue;
},
};
self.handleRequestFn(self, &accepted_addr, socket);
} else |err| switch (err) {
error.ProcessFdQuotaExceeded => @panic("TODO handle this error"),
error.ConnectionAborted => continue,
@ -124,7 +119,7 @@ pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 {
mem.copy(u8, sock_addr.path[0..], path);
const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len);
try os.connect_async(sockfd, &sock_addr, size);
try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
try os.getsockoptError(sockfd);
return sockfd;
@ -149,7 +144,7 @@ pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize
.iov_len = buffer.len,
};
const iovs: *const [1]os.iovec = &iov;
return await (async readvPosix(loop, fd, iovs, 1) catch unreachable);
return readvPosix(loop, fd, iovs, 1);
}
pub const WriteError = error{};
@ -160,7 +155,7 @@ pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteErr
.iov_len = buffer.len,
};
const iovs: *const [1]os.iovec_const = &iov;
return await (async writevPosix(loop, fd, iovs, 1) catch unreachable);
return writevPosix(loop, fd, iovs, 1);
}
pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void {
@ -174,7 +169,7 @@ pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, cou
os.EINVAL => unreachable,
os.EFAULT => unreachable,
os.EAGAIN => {
try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT) catch unreachable);
try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT);
continue;
},
os.EBADF => unreachable, // always a race condition
@ -205,7 +200,7 @@ pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count:
os.EINVAL => unreachable,
os.EFAULT => unreachable,
os.EAGAIN => {
try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN) catch unreachable);
try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN);
continue;
},
os.EBADF => unreachable, // always a race condition
@ -232,7 +227,7 @@ pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void {
};
}
return await (async writevPosix(loop, fd, iovecs.ptr, data.len) catch unreachable);
return writevPosix(loop, fd, iovecs.ptr, data.len);
}
pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize {
@ -246,7 +241,7 @@ pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize {
};
}
return await (async readvPosix(loop, fd, iovecs.ptr, data.len) catch unreachable);
return readvPosix(loop, fd, iovecs.ptr, data.len);
}
pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File {
@ -256,7 +251,7 @@ pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File {
errdefer os.close(sockfd);
try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in));
try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
try os.getsockoptError(sockfd);
return File.openHandle(sockfd);
@ -275,17 +270,16 @@ test "listen on a port, send bytes, receive bytes" {
tcp_server: Server,
const Self = @This();
async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void {
async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void {
const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592
defer socket.close();
// TODO guarantee elision of this allocation
const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
(await next_handler) catch |err| {
const next_handler = errorableHandler(self, _addr, socket) catch |err| {
std.debug.panic("unable to handle connection: {}\n", err);
};
suspend {
cancel @handle();
cancel @frame();
}
}
async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void {
@ -306,15 +300,14 @@ test "listen on a port, send bytes, receive bytes" {
defer server.tcp_server.deinit();
try server.tcp_server.listen(&addr, MyServer.handler);
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server);
defer cancel p;
_ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server);
loop.run();
}
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
errdefer @panic("test failure");
var socket_file = try await try async connect(loop, address);
var socket_file = try connect(loop, address);
defer socket_file.close();
var buf: [512]u8 = undefined;
@ -340,9 +333,9 @@ pub const OutStream = struct {
};
}
async<*mem.Allocator> fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
const self = @fieldParentPtr(OutStream, "stream", out_stream);
return await (async write(self.loop, self.fd, bytes) catch unreachable);
return write(self.loop, self.fd, bytes);
}
};
@ -362,8 +355,8 @@ pub const InStream = struct {
};
}
async<*mem.Allocator> fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
const self = @fieldParentPtr(InStream, "stream", in_stream);
return await (async read(self.loop, self.fd, bytes) catch unreachable);
return read(self.loop, self.fd, bytes);
}
};

View File

@ -3,8 +3,6 @@ const builtin = @import("builtin");
const assert = std.debug.assert;
const testing = std.testing;
const mem = std.mem;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Loop = std.event.Loop;
/// Thread-safe async/await lock.
@ -28,19 +26,19 @@ pub const RwLock = struct {
const ReadLock = 2;
};
const Queue = std.atomic.Queue(promise);
const Queue = std.atomic.Queue(anyframe);
pub const HeldRead = struct {
lock: *RwLock,
pub fn release(self: HeldRead) void {
// If other readers still hold the lock, we're done.
if (@atomicRmw(usize, &self.lock.reader_lock_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) != 1) {
if (@atomicRmw(usize, &self.lock.reader_lock_count, .Sub, 1, .SeqCst) != 1) {
return;
}
_ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
_ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;
}
@ -61,17 +59,17 @@ pub const RwLock = struct {
}
// We need to release the write lock. Check if any readers are waiting to grab the lock.
if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) {
// Switch to a read lock.
_ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.ReadLock, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst);
while (self.lock.reader_queue.get()) |node| {
self.lock.loop.onNextTick(node);
}
return;
}
_ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
self.lock.commonPostUnlock();
}
@ -93,17 +91,16 @@ pub const RwLock = struct {
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *RwLock) void {
assert(self.shared_state == State.Unlocked);
while (self.writer_queue.get()) |node| cancel node.data;
while (self.reader_queue.get()) |node| cancel node.data;
while (self.writer_queue.get()) |node| resume node.data;
while (self.reader_queue.get()) |node| resume node.data;
}
pub async fn acquireRead(self: *RwLock) HeldRead {
_ = @atomicRmw(usize, &self.reader_lock_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.reader_lock_count, .Add, 1, .SeqCst);
suspend {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = @handle(),
.data = @frame(),
.prev = undefined,
.next = undefined,
};
@ -115,10 +112,10 @@ pub const RwLock = struct {
// We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1,
// some actor will attempt to grab the lock.
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst);
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst)) |old_state| old_state == State.ReadLock else true;
const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true;
if (have_read_lock) {
// Give out all the read locks.
if (self.reader_queue.get()) |first_node| {
@ -134,9 +131,8 @@ pub const RwLock = struct {
pub async fn acquireWrite(self: *RwLock) HeldWrite {
suspend {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = @handle(),
.data = @frame(),
.prev = undefined,
.next = undefined,
};
@ -148,10 +144,10 @@ pub const RwLock = struct {
// We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1,
// some actor will attempt to grab the lock.
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst);
// Here we must be the one to acquire the write lock. It cannot already be locked.
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) == null) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) {
// We now have a write lock.
if (self.writer_queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
@ -169,8 +165,8 @@ pub const RwLock = struct {
// obtain the lock.
// But if there's a writer_queue item or a reader_queue item,
// we are the actor which must loop and attempt to grab the lock again.
if (@atomicLoad(u8, &self.writer_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
}
@ -180,13 +176,13 @@ pub const RwLock = struct {
return;
}
// Release the lock again.
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst);
continue;
}
if (@atomicLoad(u8, &self.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
}
@ -199,8 +195,8 @@ pub const RwLock = struct {
return;
}
// Release the lock again.
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) {
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;
}
@ -215,6 +211,9 @@ test "std.event.RwLock" {
// https://github.com/ziglang/zig/issues/2377
if (true) return error.SkipZigTest;
// https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
const allocator = std.heap.direct_allocator;
var loop: Loop = undefined;
@ -224,8 +223,7 @@ test "std.event.RwLock" {
var lock = RwLock.init(&loop);
defer lock.deinit();
const handle = try async<allocator> testLock(&loop, &lock);
defer cancel handle;
const handle = testLock(&loop, &lock);
loop.run();
const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
@ -233,28 +231,31 @@ test "std.event.RwLock" {
}
async fn testLock(loop: *Loop, lock: *RwLock) void {
// TODO explicitly put next tick node memory in the coroutine frame #1194
suspend {
resume @handle();
}
var read_nodes: [100]Loop.NextTickNode = undefined;
for (read_nodes) |*read_node| {
read_node.data = async readRunner(lock) catch @panic("out of memory");
const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory");
read_node.data = frame;
frame.* = async readRunner(lock);
loop.onNextTick(read_node);
}
var write_nodes: [shared_it_count]Loop.NextTickNode = undefined;
for (write_nodes) |*write_node| {
write_node.data = async writeRunner(lock) catch @panic("out of memory");
const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory");
write_node.data = frame;
frame.* = async writeRunner(lock);
loop.onNextTick(write_node);
}
for (write_nodes) |*write_node| {
await @ptrCast(promise->void, write_node.data);
const casted = @ptrCast(*const @Frame(writeRunner), write_node.data);
await casted;
loop.allocator.destroy(casted);
}
for (read_nodes) |*read_node| {
await @ptrCast(promise->void, read_node.data);
const casted = @ptrCast(*const @Frame(readRunner), read_node.data);
await casted;
loop.allocator.destroy(casted);
}
}
@ -269,7 +270,7 @@ async fn writeRunner(lock: *RwLock) void {
var i: usize = 0;
while (i < shared_test_data.len) : (i += 1) {
std.time.sleep(100 * std.time.microsecond);
const lock_promise = async lock.acquireWrite() catch @panic("out of memory");
const lock_promise = async lock.acquireWrite();
const handle = await lock_promise;
defer handle.release();
@ -287,7 +288,7 @@ async fn readRunner(lock: *RwLock) void {
var i: usize = 0;
while (i < shared_test_data.len) : (i += 1) {
const lock_promise = async lock.acquireRead() catch @panic("out of memory");
const lock_promise = async lock.acquireRead();
const handle = await lock_promise;
defer handle.release();

View File

@ -1183,7 +1183,7 @@ test "zig fmt: resume from suspend block" {
try testCanonical(
\\fn foo() void {
\\ suspend {
\\ resume @handle();
\\ resume @frame();
\\ }
\\}
\\

View File

@ -1403,24 +1403,14 @@ pub fn addCases(cases: *tests.CompileErrorContext) void {
);
cases.add(
"@handle() called outside of function definition",
\\var handle_undef: promise = undefined;
\\var handle_dummy: promise = @handle();
"@frame() called outside of function definition",
\\var handle_undef: anyframe = undefined;
\\var handle_dummy: anyframe = @frame();
\\export fn entry() bool {
\\ return handle_undef == handle_dummy;
\\}
,
"tmp.zig:2:29: error: @handle() called outside of function definition",
);
cases.add(
"@handle() in non-async function",
\\export fn entry() bool {
\\ var handle_undef: promise = undefined;
\\ return handle_undef == @handle();
\\}
,
"tmp.zig:3:28: error: @handle() in non-async function",
"tmp.zig:2:30: error: @frame() called outside of function definition",
);
cases.add(
@ -1796,15 +1786,9 @@ pub fn addCases(cases: *tests.CompileErrorContext) void {
cases.add(
"suspend inside suspend block",
\\const std = @import("std",);
\\
\\export fn entry() void {
\\ var buf: [500]u8 = undefined;
\\ var a = &std.heap.FixedBufferAllocator.init(buf[0..]).allocator;
\\ const p = (async<a> foo()) catch unreachable;
\\ cancel p;
\\ _ = async foo();
\\}
\\
\\async fn foo() void {
\\ suspend {
\\ suspend {
@ -1812,8 +1796,8 @@ pub fn addCases(cases: *tests.CompileErrorContext) void {
\\ }
\\}
,
"tmp.zig:12:9: error: cannot suspend inside suspend block",
"tmp.zig:11:5: note: other suspend block here",
"tmp.zig:6:9: error: cannot suspend inside suspend block",
"tmp.zig:5:5: note: other suspend block here",
);
cases.add(
@ -1854,15 +1838,14 @@ pub fn addCases(cases: *tests.CompileErrorContext) void {
cases.add(
"returning error from void async function",
\\const std = @import("std",);
\\export fn entry() void {
\\ const p = async<std.debug.global_allocator> amain() catch unreachable;
\\ _ = async amain();
\\}
\\async fn amain() void {
\\ return error.ShouldBeCompileError;
\\}
,
"tmp.zig:6:17: error: expected type 'void', found 'error{ShouldBeCompileError}'",
"tmp.zig:5:17: error: expected type 'void', found 'error{ShouldBeCompileError}'",
);
cases.add(