diff --git a/include/linux/bpf.h b/include/linux/bpf.h index e0dbe0c0a17e..33e543b86e1a 100644 --- a/include/linux/bpf.h +++ b/include/linux/bpf.h @@ -451,7 +451,7 @@ enum bpf_type_flag { /* DYNPTR points to memory local to the bpf program. */ DYNPTR_TYPE_LOCAL = BIT(8 + BPF_BASE_TYPE_BITS), - /* DYNPTR points to a ringbuf record. */ + /* DYNPTR points to a kernel-produced ringbuf record. */ DYNPTR_TYPE_RINGBUF = BIT(9 + BPF_BASE_TYPE_BITS), /* Size is known at compile time. */ @@ -656,6 +656,7 @@ enum bpf_reg_type { PTR_TO_MEM, /* reg points to valid memory region */ PTR_TO_BUF, /* reg points to a read/write buffer */ PTR_TO_FUNC, /* reg points to a bpf program function */ + PTR_TO_DYNPTR, /* reg points to a dynptr */ __BPF_REG_TYPE_MAX, /* Extended reg_types. */ @@ -1394,6 +1395,11 @@ struct bpf_array { #define BPF_MAP_CAN_READ BIT(0) #define BPF_MAP_CAN_WRITE BIT(1) +/* Maximum number of user-producer ring buffer samples that can be drained in + * a call to bpf_user_ringbuf_drain(). + */ +#define BPF_MAX_USER_RINGBUF_SAMPLES (128 * 1024) + static inline u32 bpf_map_flags_to_cap(struct bpf_map *map) { u32 access_flags = map->map_flags & (BPF_F_RDONLY_PROG | BPF_F_WRONLY_PROG); @@ -2495,6 +2501,7 @@ extern const struct bpf_func_proto bpf_loop_proto; extern const struct bpf_func_proto bpf_copy_from_user_task_proto; extern const struct bpf_func_proto bpf_set_retval_proto; extern const struct bpf_func_proto bpf_get_retval_proto; +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto; const struct bpf_func_proto *tracing_prog_func_proto( enum bpf_func_id func_id, const struct bpf_prog *prog); @@ -2639,7 +2646,7 @@ enum bpf_dynptr_type { BPF_DYNPTR_TYPE_INVALID, /* Points to memory that is local to the bpf program */ BPF_DYNPTR_TYPE_LOCAL, - /* Underlying data is a ringbuf record */ + /* Underlying data is a kernel-produced ringbuf record */ BPF_DYNPTR_TYPE_RINGBUF, }; diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h index e18c85324db6..ead35f39f185 100644 --- a/include/uapi/linux/bpf.h +++ b/include/uapi/linux/bpf.h @@ -5388,6 +5388,43 @@ union bpf_attr { * Return * Current *ktime*. * + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags) + * Description + * Drain samples from the specified user ring buffer, and invoke + * the provided callback for each such sample: + * + * long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx); + * + * If **callback_fn** returns 0, the helper will continue to try + * and drain the next sample, up to a maximum of + * BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1, + * the helper will skip the rest of the samples and return. Other + * return values are not used now, and will be rejected by the + * verifier. + * Return + * The number of drained samples if no error was encountered while + * draining samples, or 0 if no samples were present in the ring + * buffer. If a user-space producer was epoll-waiting on this map, + * and at least one sample was drained, they will receive an event + * notification notifying them of available space in the ring + * buffer. If the BPF_RB_NO_WAKEUP flag is passed to this + * function, no wakeup notification will be sent. If the + * BPF_RB_FORCE_WAKEUP flag is passed, a wakeup notification will + * be sent even if no sample was drained. + * + * On failure, the returned value is one of the following: + * + * **-EBUSY** if the ring buffer is contended, and another calling + * context was concurrently draining the ring buffer. + * + * **-EINVAL** if user-space is not properly tracking the ring + * buffer due to the producer position not being aligned to 8 + * bytes, a sample not being aligned to 8 bytes, or the producer + * position not matching the advertised length of a sample. + * + * **-E2BIG** if user-space has tried to publish a sample which is + * larger than the size of the ring buffer, or which cannot fit + * within a struct bpf_dynptr. */ #define __BPF_FUNC_MAPPER(FN) \ FN(unspec), \ @@ -5599,6 +5636,7 @@ union bpf_attr { FN(tcp_raw_check_syncookie_ipv4), \ FN(tcp_raw_check_syncookie_ipv6), \ FN(ktime_get_tai_ns), \ + FN(user_ringbuf_drain), \ /* */ /* integer value in 'imm' field of BPF_CALL instruction selects which helper diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c index 41aeaf3862ec..cb5564c77482 100644 --- a/kernel/bpf/helpers.c +++ b/kernel/bpf/helpers.c @@ -1659,6 +1659,8 @@ bpf_base_func_proto(enum bpf_func_id func_id) return &bpf_for_each_map_elem_proto; case BPF_FUNC_loop: return &bpf_loop_proto; + case BPF_FUNC_user_ringbuf_drain: + return &bpf_user_ringbuf_drain_proto; default: break; } diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c index 754e915748fb..9e832acf4692 100644 --- a/kernel/bpf/ringbuf.c +++ b/kernel/bpf/ringbuf.c @@ -38,6 +38,22 @@ struct bpf_ringbuf { struct page **pages; int nr_pages; spinlock_t spinlock ____cacheline_aligned_in_smp; + /* For user-space producer ring buffers, an atomic_t busy bit is used + * to synchronize access to the ring buffers in the kernel, rather than + * the spinlock that is used for kernel-producer ring buffers. This is + * done because the ring buffer must hold a lock across a BPF program's + * callback: + * + * __bpf_user_ringbuf_peek() // lock acquired + * -> program callback_fn() + * -> __bpf_user_ringbuf_sample_release() // lock released + * + * It is unsafe and incorrect to hold an IRQ spinlock across what could + * be a long execution window, so we instead simply disallow concurrent + * access to the ring buffer by kernel consumers, and return -EBUSY from + * __bpf_user_ringbuf_peek() if the busy bit is held by another task. + */ + atomic_t busy ____cacheline_aligned_in_smp; /* Consumer and producer counters are put into separate pages to * allow each position to be mapped with different permissions. * This prevents a user-space application from modifying the @@ -153,6 +169,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) return NULL; spin_lock_init(&rb->spinlock); + atomic_set(&rb->busy, 0); init_waitqueue_head(&rb->waitq); init_irq_work(&rb->work, bpf_ringbuf_notify); @@ -288,8 +305,13 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb) return prod_pos - cons_pos; } -static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp, - struct poll_table_struct *pts) +static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb) +{ + return rb->mask + 1; +} + +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp, + struct poll_table_struct *pts) { struct bpf_ringbuf_map *rb_map; @@ -301,13 +323,26 @@ static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp, return 0; } +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp, + struct poll_table_struct *pts) +{ + struct bpf_ringbuf_map *rb_map; + + rb_map = container_of(map, struct bpf_ringbuf_map, map); + poll_wait(filp, &rb_map->rb->waitq, pts); + + if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb)) + return EPOLLOUT | EPOLLWRNORM; + return 0; +} + BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map) const struct bpf_map_ops ringbuf_map_ops = { .map_meta_equal = bpf_map_meta_equal, .map_alloc = ringbuf_map_alloc, .map_free = ringbuf_map_free, .map_mmap = ringbuf_map_mmap_kern, - .map_poll = ringbuf_map_poll, + .map_poll = ringbuf_map_poll_kern, .map_lookup_elem = ringbuf_map_lookup_elem, .map_update_elem = ringbuf_map_update_elem, .map_delete_elem = ringbuf_map_delete_elem, @@ -321,6 +356,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = { .map_alloc = ringbuf_map_alloc, .map_free = ringbuf_map_free, .map_mmap = ringbuf_map_mmap_user, + .map_poll = ringbuf_map_poll_user, .map_lookup_elem = ringbuf_map_lookup_elem, .map_update_elem = ringbuf_map_update_elem, .map_delete_elem = ringbuf_map_delete_elem, @@ -362,7 +398,7 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size) return NULL; len = round_up(size + BPF_RINGBUF_HDR_SZ, 8); - if (len > rb->mask + 1) + if (len > ringbuf_total_data_sz(rb)) return NULL; cons_pos = smp_load_acquire(&rb->consumer_pos); @@ -509,7 +545,7 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags) case BPF_RB_AVAIL_DATA: return ringbuf_avail_data_sz(rb); case BPF_RB_RING_SIZE: - return rb->mask + 1; + return ringbuf_total_data_sz(rb); case BPF_RB_CONS_POS: return smp_load_acquire(&rb->consumer_pos); case BPF_RB_PROD_POS: @@ -603,3 +639,138 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = { .arg1_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE, .arg2_type = ARG_ANYTHING, }; + +static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size) +{ + int err; + u32 hdr_len, sample_len, total_len, flags, *hdr; + u64 cons_pos, prod_pos; + + /* Synchronizes with smp_store_release() in user-space producer. */ + prod_pos = smp_load_acquire(&rb->producer_pos); + if (prod_pos % 8) + return -EINVAL; + + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */ + cons_pos = smp_load_acquire(&rb->consumer_pos); + if (cons_pos >= prod_pos) + return -ENODATA; + + hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask)); + /* Synchronizes with smp_store_release() in user-space producer. */ + hdr_len = smp_load_acquire(hdr); + flags = hdr_len & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT); + sample_len = hdr_len & ~flags; + total_len = round_up(sample_len + BPF_RINGBUF_HDR_SZ, 8); + + /* The sample must fit within the region advertised by the producer position. */ + if (total_len > prod_pos - cons_pos) + return -EINVAL; + + /* The sample must fit within the data region of the ring buffer. */ + if (total_len > ringbuf_total_data_sz(rb)) + return -E2BIG; + + /* The sample must fit into a struct bpf_dynptr. */ + err = bpf_dynptr_check_size(sample_len); + if (err) + return -E2BIG; + + if (flags & BPF_RINGBUF_DISCARD_BIT) { + /* If the discard bit is set, the sample should be skipped. + * + * Update the consumer pos, and return -EAGAIN so the caller + * knows to skip this sample and try to read the next one. + */ + smp_store_release(&rb->consumer_pos, cons_pos + total_len); + return -EAGAIN; + } + + if (flags & BPF_RINGBUF_BUSY_BIT) + return -ENODATA; + + *sample = (void *)((uintptr_t)rb->data + + (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask)); + *size = sample_len; + return 0; +} + +static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags) +{ + u64 consumer_pos; + u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8); + + /* Using smp_load_acquire() is unnecessary here, as the busy-bit + * prevents another task from writing to consumer_pos after it was read + * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek(). + */ + consumer_pos = rb->consumer_pos; + /* Synchronizes with smp_load_acquire() in user-space producer. */ + smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size); +} + +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map, + void *, callback_fn, void *, callback_ctx, u64, flags) +{ + struct bpf_ringbuf *rb; + long samples, discarded_samples = 0, ret = 0; + bpf_callback_t callback = (bpf_callback_t)callback_fn; + u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP; + int busy = 0; + + if (unlikely(flags & ~wakeup_flags)) + return -EINVAL; + + rb = container_of(map, struct bpf_ringbuf_map, map)->rb; + + /* If another consumer is already consuming a sample, wait for them to finish. */ + if (!atomic_try_cmpxchg(&rb->busy, &busy, 1)) + return -EBUSY; + + for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) { + int err; + u32 size; + void *sample; + struct bpf_dynptr_kern dynptr; + + err = __bpf_user_ringbuf_peek(rb, &sample, &size); + if (err) { + if (err == -ENODATA) { + break; + } else if (err == -EAGAIN) { + discarded_samples++; + continue; + } else { + ret = err; + goto schedule_work_return; + } + } + + bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, 0, size); + ret = callback((uintptr_t)&dynptr, (uintptr_t)callback_ctx, 0, 0, 0); + __bpf_user_ringbuf_sample_release(rb, size, flags); + } + ret = samples - discarded_samples; + +schedule_work_return: + /* Prevent the clearing of the busy-bit from being reordered before the + * storing of any rb consumer or producer positions. + */ + smp_mb__before_atomic(); + atomic_set(&rb->busy, 0); + + if (flags & BPF_RB_FORCE_WAKEUP) + irq_work_queue(&rb->work); + else if (!(flags & BPF_RB_NO_WAKEUP) && samples > 0) + irq_work_queue(&rb->work); + return ret; +} + +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = { + .func = bpf_user_ringbuf_drain, + .ret_type = RET_INTEGER, + .arg1_type = ARG_CONST_MAP_PTR, + .arg2_type = ARG_PTR_TO_FUNC, + .arg3_type = ARG_PTR_TO_STACK_OR_NULL, + .arg4_type = ARG_ANYTHING, +}; diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c index 83710b60e708..c76fa45a5906 100644 --- a/kernel/bpf/verifier.c +++ b/kernel/bpf/verifier.c @@ -563,6 +563,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env, [PTR_TO_BUF] = "buf", [PTR_TO_FUNC] = "func", [PTR_TO_MAP_KEY] = "map_key", + [PTR_TO_DYNPTR] = "dynptr_ptr", }; if (type & PTR_MAYBE_NULL) { @@ -5688,6 +5689,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK } static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } }; static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } }; static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } }; +static const struct bpf_reg_types dynptr_types = { + .types = { + PTR_TO_STACK, + PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL, + } +}; static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = { [ARG_PTR_TO_MAP_KEY] = &map_key_value_types, @@ -5714,7 +5721,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = { [ARG_PTR_TO_CONST_STR] = &const_str_ptr_types, [ARG_PTR_TO_TIMER] = &timer_types, [ARG_PTR_TO_KPTR] = &kptr_types, - [ARG_PTR_TO_DYNPTR] = &stack_ptr_types, + [ARG_PTR_TO_DYNPTR] = &dynptr_types, }; static int check_reg_type(struct bpf_verifier_env *env, u32 regno, @@ -6066,6 +6073,13 @@ skip_type_check: err = check_mem_size_reg(env, reg, regno, true, meta); break; case ARG_PTR_TO_DYNPTR: + /* We only need to check for initialized / uninitialized helper + * dynptr args if the dynptr is not PTR_TO_DYNPTR, as the + * assumption is that if it is, that a helper function + * initialized the dynptr on behalf of the BPF program. + */ + if (base_type(reg->type) == PTR_TO_DYNPTR) + break; if (arg_type & MEM_UNINIT) { if (!is_dynptr_reg_valid_uninit(env, reg)) { verbose(env, "Dynptr has to be an uninitialized dynptr\n"); @@ -6241,7 +6255,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env, goto error; break; case BPF_MAP_TYPE_USER_RINGBUF: - goto error; + if (func_id != BPF_FUNC_user_ringbuf_drain) + goto error; + break; case BPF_MAP_TYPE_STACK_TRACE: if (func_id != BPF_FUNC_get_stackid) goto error; @@ -6361,6 +6377,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env, if (map->map_type != BPF_MAP_TYPE_RINGBUF) goto error; break; + case BPF_FUNC_user_ringbuf_drain: + if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF) + goto error; + break; case BPF_FUNC_get_stackid: if (map->map_type != BPF_MAP_TYPE_STACK_TRACE) goto error; @@ -6887,6 +6907,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env, return 0; } +static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env, + struct bpf_func_state *caller, + struct bpf_func_state *callee, + int insn_idx) +{ + /* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void + * callback_ctx, u64 flags); + * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx); + */ + __mark_reg_not_init(env, &callee->regs[BPF_REG_0]); + callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL; + __mark_reg_known_zero(&callee->regs[BPF_REG_1]); + callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3]; + + /* unused */ + __mark_reg_not_init(env, &callee->regs[BPF_REG_3]); + __mark_reg_not_init(env, &callee->regs[BPF_REG_4]); + __mark_reg_not_init(env, &callee->regs[BPF_REG_5]); + + callee->in_callback_fn = true; + return 0; +} + static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx) { struct bpf_verifier_state *state = env->cur_state; @@ -7346,12 +7389,18 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn case BPF_FUNC_dynptr_data: for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) { if (arg_type_is_dynptr(fn->arg_type[i])) { + struct bpf_reg_state *reg = ®s[BPF_REG_1 + i]; + if (meta.ref_obj_id) { verbose(env, "verifier internal error: meta.ref_obj_id already set\n"); return -EFAULT; } - /* Find the id of the dynptr we're tracking the reference of */ - meta.ref_obj_id = stack_slot_get_id(env, ®s[BPF_REG_1 + i]); + + if (base_type(reg->type) != PTR_TO_DYNPTR) + /* Find the id of the dynptr we're + * tracking the reference of + */ + meta.ref_obj_id = stack_slot_get_id(env, reg); break; } } @@ -7360,6 +7409,10 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn return -EFAULT; } break; + case BPF_FUNC_user_ringbuf_drain: + err = __check_func_call(env, insn, insn_idx_p, meta.subprogno, + set_user_ringbuf_callback_state); + break; } if (err) diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h index e18c85324db6..ead35f39f185 100644 --- a/tools/include/uapi/linux/bpf.h +++ b/tools/include/uapi/linux/bpf.h @@ -5388,6 +5388,43 @@ union bpf_attr { * Return * Current *ktime*. * + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags) + * Description + * Drain samples from the specified user ring buffer, and invoke + * the provided callback for each such sample: + * + * long (\*callback_fn)(struct bpf_dynptr \*dynptr, void \*ctx); + * + * If **callback_fn** returns 0, the helper will continue to try + * and drain the next sample, up to a maximum of + * BPF_MAX_USER_RINGBUF_SAMPLES samples. If the return value is 1, + * the helper will skip the rest of the samples and return. Other + * return values are not used now, and will be rejected by the + * verifier. + * Return + * The number of drained samples if no error was encountered while + * draining samples, or 0 if no samples were present in the ring + * buffer. If a user-space producer was epoll-waiting on this map, + * and at least one sample was drained, they will receive an event + * notification notifying them of available space in the ring + * buffer. If the BPF_RB_NO_WAKEUP flag is passed to this + * function, no wakeup notification will be sent. If the + * BPF_RB_FORCE_WAKEUP flag is passed, a wakeup notification will + * be sent even if no sample was drained. + * + * On failure, the returned value is one of the following: + * + * **-EBUSY** if the ring buffer is contended, and another calling + * context was concurrently draining the ring buffer. + * + * **-EINVAL** if user-space is not properly tracking the ring + * buffer due to the producer position not being aligned to 8 + * bytes, a sample not being aligned to 8 bytes, or the producer + * position not matching the advertised length of a sample. + * + * **-E2BIG** if user-space has tried to publish a sample which is + * larger than the size of the ring buffer, or which cannot fit + * within a struct bpf_dynptr. */ #define __BPF_FUNC_MAPPER(FN) \ FN(unspec), \ @@ -5599,6 +5636,7 @@ union bpf_attr { FN(tcp_raw_check_syncookie_ipv4), \ FN(tcp_raw_check_syncookie_ipv6), \ FN(ktime_get_tai_ns), \ + FN(user_ringbuf_drain), \ /* */ /* integer value in 'imm' field of BPF_CALL instruction selects which helper