mirror of
https://github.com/ziglang/zig.git
synced 2025-02-11 07:00:16 +00:00
use enum with atomics in std lib
This commit is contained in:
parent
7e5b234b8b
commit
5194fc57d1
@ -12,12 +12,13 @@ pub fn Future(comptime T: type) type {
|
||||
return struct {
|
||||
lock: Lock,
|
||||
data: T,
|
||||
available: Available,
|
||||
|
||||
/// TODO make this an enum
|
||||
/// 0 - not started
|
||||
/// 1 - started
|
||||
/// 2 - finished
|
||||
available: u8,
|
||||
const Available = enum(u8) {
|
||||
NotStarted,
|
||||
Started,
|
||||
Finished,
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
const Queue = std.atomic.Queue(anyframe);
|
||||
@ -34,7 +35,7 @@ pub fn Future(comptime T: type) type {
|
||||
/// available.
|
||||
/// Thread-safe.
|
||||
pub async fn get(self: *Self) *T {
|
||||
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
|
||||
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
|
||||
return &self.data;
|
||||
}
|
||||
const held = self.lock.acquire();
|
||||
@ -46,7 +47,7 @@ pub fn Future(comptime T: type) type {
|
||||
/// Gets the data without waiting for it. If it's available, a pointer is
|
||||
/// returned. Otherwise, null is returned.
|
||||
pub fn getOrNull(self: *Self) ?*T {
|
||||
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
|
||||
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
|
||||
return &self.data;
|
||||
} else {
|
||||
return null;
|
||||
@ -59,7 +60,7 @@ pub fn Future(comptime T: type) type {
|
||||
/// It's not required to call start() before resolve() but it can be useful since
|
||||
/// this method is thread-safe.
|
||||
pub async fn start(self: *Self) ?*T {
|
||||
const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null;
|
||||
const state = @cmpxchgStrong(Available, &self.available, .NotStarted, .Started, .SeqCst, .SeqCst) orelse return null;
|
||||
switch (state) {
|
||||
1 => {
|
||||
const held = self.lock.acquire();
|
||||
@ -74,8 +75,8 @@ pub fn Future(comptime T: type) type {
|
||||
/// Make the data become available. May be called only once.
|
||||
/// Before calling this, modify the `data` property.
|
||||
pub fn resolve(self: *Self) void {
|
||||
const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst);
|
||||
assert(prev == 0 or prev == 1); // resolve() called twice
|
||||
const prev = @atomicRmw(Available, &self.available, .Xchg, .Finished, .SeqCst);
|
||||
assert(prev != .Finished); // resolve() called twice
|
||||
Lock.Held.release(Lock.Held{ .lock = &self.lock });
|
||||
}
|
||||
};
|
||||
|
@ -13,17 +13,17 @@ const Loop = std.event.Loop;
|
||||
/// When a write lock is held, it will not be released until the writer queue is empty.
|
||||
/// TODO: make this API also work in blocking I/O mode
|
||||
pub const RwLock = struct {
|
||||
shared_state: u8, // TODO make this an enum
|
||||
shared_state: State,
|
||||
writer_queue: Queue,
|
||||
reader_queue: Queue,
|
||||
writer_queue_empty_bit: u8, // TODO make this a bool
|
||||
reader_queue_empty_bit: u8, // TODO make this a bool
|
||||
reader_lock_count: usize,
|
||||
|
||||
const State = struct {
|
||||
const Unlocked = 0;
|
||||
const WriteLock = 1;
|
||||
const ReadLock = 2;
|
||||
const State = enum(u8) {
|
||||
Unlocked,
|
||||
WriteLock,
|
||||
ReadLock,
|
||||
};
|
||||
|
||||
const Queue = std.atomic.Queue(anyframe);
|
||||
@ -41,7 +41,7 @@ pub const RwLock = struct {
|
||||
}
|
||||
|
||||
_ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
|
||||
if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
if (@cmpxchgStrong(State, &self.lock.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
@ -64,7 +64,7 @@ pub const RwLock = struct {
|
||||
// We need to release the write lock. Check if any readers are waiting to grab the lock.
|
||||
if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) {
|
||||
// Switch to a read lock.
|
||||
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst);
|
||||
_ = @atomicRmw(State, &self.lock.shared_state, .Xchg, .ReadLock, .SeqCst);
|
||||
while (self.lock.reader_queue.get()) |node| {
|
||||
global_event_loop.onNextTick(node);
|
||||
}
|
||||
@ -72,7 +72,7 @@ pub const RwLock = struct {
|
||||
}
|
||||
|
||||
_ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
|
||||
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
|
||||
_ = @atomicRmw(State, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
|
||||
|
||||
self.lock.commonPostUnlock();
|
||||
}
|
||||
@ -80,7 +80,7 @@ pub const RwLock = struct {
|
||||
|
||||
pub fn init() RwLock {
|
||||
return RwLock{
|
||||
.shared_state = State.Unlocked,
|
||||
.shared_state = .Unlocked,
|
||||
.writer_queue = Queue.init(),
|
||||
.writer_queue_empty_bit = 1,
|
||||
.reader_queue = Queue.init(),
|
||||
@ -92,7 +92,7 @@ pub const RwLock = struct {
|
||||
/// Must be called when not locked. Not thread safe.
|
||||
/// All calls to acquire() and release() must complete before calling deinit().
|
||||
pub fn deinit(self: *RwLock) void {
|
||||
assert(self.shared_state == State.Unlocked);
|
||||
assert(self.shared_state == .Unlocked);
|
||||
while (self.writer_queue.get()) |node| resume node.data;
|
||||
while (self.reader_queue.get()) |node| resume node.data;
|
||||
}
|
||||
@ -116,7 +116,7 @@ pub const RwLock = struct {
|
||||
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst);
|
||||
|
||||
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
|
||||
const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true;
|
||||
const have_read_lock = if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == .ReadLock else true;
|
||||
if (have_read_lock) {
|
||||
// Give out all the read locks.
|
||||
if (self.reader_queue.get()) |first_node| {
|
||||
@ -147,7 +147,7 @@ pub const RwLock = struct {
|
||||
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst);
|
||||
|
||||
// Here we must be the one to acquire the write lock. It cannot already be locked.
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) == null) {
|
||||
// We now have a write lock.
|
||||
if (self.writer_queue.get()) |node| {
|
||||
// Whether this node is us or someone else, we tail resume it.
|
||||
@ -166,7 +166,7 @@ pub const RwLock = struct {
|
||||
// But if there's a writer_queue item or a reader_queue item,
|
||||
// we are the actor which must loop and attempt to grab the lock again.
|
||||
if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) {
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
@ -177,12 +177,12 @@ pub const RwLock = struct {
|
||||
}
|
||||
// Release the lock again.
|
||||
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
|
||||
_ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst);
|
||||
_ = @atomicRmw(State, &self.shared_state, .Xchg, .Unlocked, .SeqCst);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) {
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst) != null) {
|
||||
// We did not obtain the lock. Great, the queues are someone else's problem.
|
||||
return;
|
||||
}
|
||||
@ -196,7 +196,7 @@ pub const RwLock = struct {
|
||||
}
|
||||
// Release the lock again.
|
||||
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
|
||||
if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
if (@cmpxchgStrong(State, &self.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
|
||||
// Didn't unlock. Someone else's problem.
|
||||
return;
|
||||
}
|
||||
|
@ -1,24 +1,26 @@
|
||||
const std = @import("std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const AtomicRmwOp = builtin.AtomicRmwOp;
|
||||
const AtomicOrder = builtin.AtomicOrder;
|
||||
|
||||
/// Thread-safe initialization of global data.
|
||||
/// TODO use a mutex instead of a spinlock
|
||||
pub fn lazyInit(comptime T: type) LazyInit(T) {
|
||||
return LazyInit(T){
|
||||
.data = undefined,
|
||||
.state = 0,
|
||||
};
|
||||
}
|
||||
|
||||
fn LazyInit(comptime T: type) type {
|
||||
return struct {
|
||||
state: u8, // TODO make this an enum
|
||||
state: State = .NotResolved,
|
||||
data: Data,
|
||||
|
||||
const State = enum(u8) {
|
||||
NotResolved,
|
||||
Resolving,
|
||||
Resolved,
|
||||
};
|
||||
|
||||
const Self = @This();
|
||||
|
||||
// TODO this isn't working for void, investigate and then remove this special case
|
||||
@ -30,14 +32,14 @@ fn LazyInit(comptime T: type) type {
|
||||
/// perform the initialization and then call resolve().
|
||||
pub fn get(self: *Self) ?Ptr {
|
||||
while (true) {
|
||||
var state = @cmpxchgWeak(u8, &self.state, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null;
|
||||
var state = @cmpxchgWeak(State, &self.state, .NotResolved, .Resolving, .SeqCst, .SeqCst) orelse return null;
|
||||
switch (state) {
|
||||
0 => continue,
|
||||
1 => {
|
||||
.NotResolved => continue,
|
||||
.Resolving => {
|
||||
// TODO mutex instead of a spinlock
|
||||
continue;
|
||||
},
|
||||
2 => {
|
||||
.Resolved => {
|
||||
if (@sizeOf(T) == 0) {
|
||||
return @as(T, undefined);
|
||||
} else {
|
||||
@ -50,8 +52,8 @@ fn LazyInit(comptime T: type) type {
|
||||
}
|
||||
|
||||
pub fn resolve(self: *Self) void {
|
||||
const prev = @atomicRmw(u8, &self.state, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst);
|
||||
assert(prev == 1); // resolve() called twice
|
||||
const prev = @atomicRmw(State, &self.state, .Xchg, .Resolved, .SeqCst);
|
||||
assert(prev != .Resolved); // resolve() called twice
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -39,12 +39,14 @@ pub const Mutex = if (builtin.single_threaded)
|
||||
}
|
||||
else
|
||||
struct {
|
||||
state: u32, // TODO: make this an enum
|
||||
state: State, // TODO: make this an enum
|
||||
parker: ThreadParker,
|
||||
|
||||
const Unlocked = 0;
|
||||
const Sleeping = 1;
|
||||
const Locked = 2;
|
||||
const State = enum(u32) {
|
||||
Unlocked,
|
||||
Sleeping,
|
||||
Locked,
|
||||
};
|
||||
|
||||
/// number of iterations to spin yielding the cpu
|
||||
const SPIN_CPU = 4;
|
||||
@ -57,7 +59,7 @@ else
|
||||
|
||||
pub fn init() Mutex {
|
||||
return Mutex{
|
||||
.state = Unlocked,
|
||||
.state = .Unlocked,
|
||||
.parker = ThreadParker.init(),
|
||||
};
|
||||
}
|
||||
@ -70,10 +72,10 @@ else
|
||||
mutex: *Mutex,
|
||||
|
||||
pub fn release(self: Held) void {
|
||||
switch (@atomicRmw(u32, &self.mutex.state, .Xchg, Unlocked, .Release)) {
|
||||
Locked => {},
|
||||
Sleeping => self.mutex.parker.unpark(&self.mutex.state),
|
||||
Unlocked => unreachable, // unlocking an unlocked mutex
|
||||
switch (@atomicRmw(State, &self.mutex.state, .Xchg, .Unlocked, .Release)) {
|
||||
.Locked => {},
|
||||
.Sleeping => self.mutex.parker.unpark(@ptrCast(*const u32, &self.mutex.state)),
|
||||
.Unlocked => unreachable, // unlocking an unlocked mutex
|
||||
else => unreachable, // should never be anything else
|
||||
}
|
||||
}
|
||||
@ -83,34 +85,34 @@ else
|
||||
// Try and speculatively grab the lock.
|
||||
// If it fails, the state is either Locked or Sleeping
|
||||
// depending on if theres a thread stuck sleeping below.
|
||||
var state = @atomicRmw(u32, &self.state, .Xchg, Locked, .Acquire);
|
||||
if (state == Unlocked)
|
||||
var state = @atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire);
|
||||
if (state == .Unlocked)
|
||||
return Held{ .mutex = self };
|
||||
|
||||
while (true) {
|
||||
// try and acquire the lock using cpu spinning on failure
|
||||
var spin: usize = 0;
|
||||
while (spin < SPIN_CPU) : (spin += 1) {
|
||||
var value = @atomicLoad(u32, &self.state, .Monotonic);
|
||||
while (value == Unlocked)
|
||||
value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
|
||||
var value = @atomicLoad(State, &self.state, .Monotonic);
|
||||
while (value == .Unlocked)
|
||||
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
|
||||
SpinLock.yield(SPIN_CPU_COUNT);
|
||||
}
|
||||
|
||||
// try and acquire the lock using thread rescheduling on failure
|
||||
spin = 0;
|
||||
while (spin < SPIN_THREAD) : (spin += 1) {
|
||||
var value = @atomicLoad(u32, &self.state, .Monotonic);
|
||||
while (value == Unlocked)
|
||||
value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
|
||||
var value = @atomicLoad(State, &self.state, .Monotonic);
|
||||
while (value == .Unlocked)
|
||||
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
|
||||
std.os.sched_yield() catch std.time.sleep(1);
|
||||
}
|
||||
|
||||
// failed to acquire the lock, go to sleep until woken up by `Held.release()`
|
||||
if (@atomicRmw(u32, &self.state, .Xchg, Sleeping, .Acquire) == Unlocked)
|
||||
if (@atomicRmw(State, &self.state, .Xchg, .Sleeping, .Acquire) == .Unlocked)
|
||||
return Held{ .mutex = self };
|
||||
state = Sleeping;
|
||||
self.parker.park(&self.state, Sleeping);
|
||||
state = .Sleeping;
|
||||
self.parker.park(@ptrCast(*const u32, &self.state), @enumToInt(State.Sleeping));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user