ANDROID: ring-buffer: Introducing external writer support

The ring buffer is convenient: it has a page granularity and its format
is already supported by userspace tools such as trace-cmd. It is a
natural solution to store events that would come from outside the kernel
such as a hypervisor.

In that case, where a writer is external to the kernel, the latter would
only be responsible for the allocation and to read back the ring buffer.

The allocation is done with the newly introduced function which just
needs a size and a set of callbacks (notice only the overwrite mode is
supported at the moment):

  ring_buffer_alloc_ext(unsigned long size,
                        struct ring_buffer_ext_cb *cb)

The callbacks given to this allocator enables communication with the
external writer:

  (*swap_reader)(int cpu):    Ask the writer to swap the current reader
                              page with the head.

  (*update_footers)(int cpu): Ask the writer to update material in the
                              page footers.

Each page from the ring buffer has indeed a footer in which statistics
and page status can be retrieved. This allows the kernel to update its
view on the ring buffer, following a reader page swap or a footers
update.

After the trace_buffer is allocated, a helper serializes the relevant
information into a structure that can be easily sent to the external
writer:

  trace_buffer_pack(struct trace_buffer *trace_buffer,
                    struct trace_buffer_pack *pack)

The footer and pack description can be found in the newly introduced
header file include/linux/ring_buffer_ext.h.

When the kernel is writing to the ring buffer, it can wake up quite
easily the reader. That's not the case when the writer is external. A
new function allows polling for reading the ring buffer:

  ring_buffer_poke(struct trace_buffer *buffer, int cpu)

A ring-buffer allocated for an external writer will forbid any writing
(the whole design of the ring buffer mandates a single writer) and will
also prevent extending or extracting pages.

When I presented this work to the tracingsummit, rosted@ told me he saw
some overlapping with an idea he had to enable him to map the tracing
buffers in userspace. We designed together a solution that would enable
both features. Problem now, if on one hand, the development of the new
design has started already... it would nonetheless impose a significant
revamp of this patchset, which wouldn't make it to Android14. Nothing
technically wrong with anything here, but sending it to LKML wouldn't
make sense, as I know already this isn't as "reusable" as the version
agreed upon.

Bug: 229972309
Change-Id: Iafcc1e2683a7460c94de3db116878c303601df64
Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
This commit is contained in:
Vincent Donnefort 2022-08-24 16:12:23 +01:00
parent 54c734c8ed
commit 5ecbcb61e1
3 changed files with 443 additions and 35 deletions

View File

@ -4,6 +4,7 @@
#include <linux/mm.h>
#include <linux/poll.h>
#include <linux/ring_buffer_ext.h>
#include <linux/seq_file.h>
#include <asm/local.h>
@ -140,6 +141,14 @@ __ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *k
__ring_buffer_alloc((size), (flags), &__key); \
})
struct ring_buffer_ext_cb {
int (*update_footers)(int cpu);
int (*swap_reader)(int cpu);
};
struct trace_buffer *
ring_buffer_alloc_ext(unsigned long size, struct ring_buffer_ext_cb *cb);
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full);
__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
struct file *filp, poll_table *poll_table, int full);
@ -254,4 +263,8 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
#define trace_rb_cpu_prepare NULL
#endif
size_t trace_buffer_pack_size(struct trace_buffer *trace_buffer);
int trace_buffer_pack(struct trace_buffer *trace_buffer, struct trace_buffer_pack *pack);
int ring_buffer_poke(struct trace_buffer *buffer, int cpu);
#endif /* _LINUX_RING_BUFFER_H */

View File

@ -0,0 +1,79 @@
/* SPDX-License-Identifier: GPL-2.0 */
#ifndef _LINUX_RING_BUFFER_EXT_H
#define _LINUX_RING_BUFFER_EXT_H
#include <linux/mm.h>
#include <linux/types.h>
struct rb_ext_stats {
u64 entries;
unsigned long pages_touched;
unsigned long overrun;
};
#define RB_PAGE_FT_HEAD (1 << 0)
#define RB_PAGE_FT_READER (1 << 1)
#define RB_PAGE_FT_COMMIT (1 << 2)
/*
* The pages where the events are stored are the only shared elements between
* the reader and the external writer. They are convenient to enable
* communication from the writer to the reader. The data will be used by the
* reader to update its view on the ring buffer.
*/
struct rb_ext_page_footer {
atomic_t writer_status;
atomic_t reader_status;
struct rb_ext_stats stats;
};
static inline struct rb_ext_page_footer *rb_ext_page_get_footer(void *page)
{
struct rb_ext_page_footer *footer;
unsigned long page_va = (unsigned long)page;
page_va = ALIGN_DOWN(page_va, PAGE_SIZE);
return (struct rb_ext_page_footer *)(page_va + PAGE_SIZE -
sizeof(*footer));
}
#define BUF_EXT_PAGE_SIZE (BUF_PAGE_SIZE - sizeof(struct rb_ext_page_footer))
/*
* An external writer can't rely on the internal struct ring_buffer_per_cpu.
* Instead, allow to pack the relevant information into struct
* ring_buffer_pack which can be sent to the writer. The latter can then create
* its own view on the ring buffer.
*/
struct ring_buffer_pack {
int cpu;
unsigned long reader_page_va;
unsigned long nr_pages;
unsigned long page_va[];
};
struct trace_buffer_pack {
int nr_cpus;
unsigned long total_pages;
char __data[]; /* contains ring_buffer_pack */
};
static inline
struct ring_buffer_pack *__next_ring_buffer_pack(struct ring_buffer_pack *rb_pack)
{
size_t len;
len = offsetof(struct ring_buffer_pack, page_va) +
sizeof(unsigned long) * rb_pack->nr_pages;
return (struct ring_buffer_pack *)((void *)rb_pack + len);
}
/*
* Accessor for ring_buffer_pack's within trace_buffer_pack
*/
#define for_each_ring_buffer_pack(rb_pack, cpu, trace_pack) \
for (rb_pack = (struct ring_buffer_pack *)&trace_pack->__data[0], cpu = 0; \
cpu < trace_pack->nr_cpus; \
cpu++, rb_pack = __next_ring_buffer_pack(rb_pack))
#endif

View File

@ -5,6 +5,7 @@
* Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
*/
#include <linux/trace_recursion.h>
#include <linux/ring_buffer_ext.h>
#include <linux/trace_events.h>
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
@ -516,6 +517,8 @@ struct trace_buffer {
struct rb_irq_work irq_work;
bool time_stamp_abs;
struct ring_buffer_ext_cb *ext_cb;
};
struct ring_buffer_iter {
@ -721,6 +724,16 @@ static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
}
#endif
static inline bool has_ext_writer(struct trace_buffer *buffer)
{
return !!buffer->ext_cb;
}
static inline bool rb_has_ext_writer(struct ring_buffer_per_cpu *cpu_buffer)
{
return has_ext_writer(cpu_buffer->buffer);
}
/*
* Enable this to make sure that the event passed to
* ring_buffer_event_time_stamp() is not committed and also
@ -1856,6 +1869,26 @@ struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
}
EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
struct trace_buffer *ring_buffer_alloc_ext(unsigned long size,
struct ring_buffer_ext_cb *cb)
{
struct trace_buffer *buffer;
if (!cb || !cb->update_footers || !cb->swap_reader)
return NULL;
buffer = ring_buffer_alloc(size, RB_FL_OVERWRITE);
if (!buffer)
return NULL;
WARN_ON(cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE,
&buffer->node));
buffer->ext_cb = cb;
atomic_set(&buffer->record_disabled, 1);
return buffer;
}
/**
* ring_buffer_free - free a ring buffer.
* @buffer: the buffer to free.
@ -1865,7 +1898,9 @@ ring_buffer_free(struct trace_buffer *buffer)
{
int cpu;
cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
if (!has_ext_writer(buffer))
cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE,
&buffer->node);
for_each_buffer_cpu(buffer, cpu)
rb_free_cpu_buffer(buffer->buffers[cpu]);
@ -2134,6 +2169,8 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
unsigned long nr_pages;
int cpu, err;
if (unlikely(has_ext_writer(buffer)))
return -EINVAL;
/*
* Always succeed at resizing a non-existent buffer:
*/
@ -3859,6 +3896,9 @@ void ring_buffer_discard_commit(struct trace_buffer *buffer,
struct ring_buffer_per_cpu *cpu_buffer;
int cpu;
if (unlikely(has_ext_writer(buffer)))
return;
/* The event is discarded regardless */
rb_event_discard(event);
@ -4014,6 +4054,9 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
*/
void ring_buffer_record_enable(struct trace_buffer *buffer)
{
if (unlikely(has_ext_writer(buffer)))
return;
atomic_dec(&buffer->record_disabled);
}
EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
@ -4057,6 +4100,9 @@ void ring_buffer_record_on(struct trace_buffer *buffer)
unsigned int rd;
unsigned int new_rd;
if (unlikely(has_ext_writer(buffer)))
return;
do {
rd = atomic_read(&buffer->record_disabled);
new_rd = rd & ~RB_BUFFER_OFF;
@ -4501,49 +4547,119 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
return;
}
static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
static void __set_head_page_flag(struct buffer_page *head, int flag)
{
struct buffer_page *reader = NULL;
unsigned long overwrite;
unsigned long flags;
int nr_loops = 0;
int ret;
struct list_head *prev = head->list.prev;
local_irq_save(flags);
arch_spin_lock(&cpu_buffer->lock);
prev->next = (struct list_head *)(((unsigned long)prev->next & ~RB_FLAG_MASK) | flag);
}
static int __read_footer_reader_status(struct buffer_page *bpage)
{
struct rb_ext_page_footer *footer = rb_ext_page_get_footer(bpage->page);
return atomic_read(&footer->reader_status);
}
static int __read_footer_writer_status(struct buffer_page *bpage)
{
struct rb_ext_page_footer *footer = rb_ext_page_get_footer(bpage->page);
return atomic_read(&footer->writer_status);
}
static struct buffer_page *
ring_buffer_search_footer(struct buffer_page *start, unsigned long flag)
{
bool search_writer = flag == RB_PAGE_FT_COMMIT;
struct buffer_page *bpage = start;
unsigned long status;
int cnt = 0;
again:
do {
status = search_writer ? __read_footer_writer_status(bpage) :
__read_footer_reader_status(bpage);
if (flag & status)
return bpage;
rb_inc_page(&bpage);
} while (bpage != start);
again:
/*
* This should normally only loop twice. But because the
* start of the reader inserts an empty page, it causes
* a case where we will loop three times. There should be no
* reason to loop four times (that I know of).
* There's a chance the writer is in the middle of moving the flag and
* we might not find anything after a first round. Let's try again.
*/
if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
reader = NULL;
goto out;
if (cnt++ < 3)
goto again;
return NULL;
}
static struct buffer_page *
noinline rb_swap_reader_page_ext(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *new_reader, *new_rb_page, *new_head;
struct rb_ext_page_footer *footer;
unsigned long overrun;
if (cpu_buffer->buffer->ext_cb->swap_reader(cpu_buffer->cpu)) {
WARN_ON(1);
return NULL;
}
reader = cpu_buffer->reader_page;
new_rb_page = cpu_buffer->reader_page;
/* If there's more to read, return this page */
if (cpu_buffer->reader_page->read < rb_page_size(reader))
goto out;
/*
* Find what page is the new reader... starting with the latest known
* head.
*/
new_reader = ring_buffer_search_footer(cpu_buffer->head_page,
RB_PAGE_FT_READER);
if (!new_reader) {
WARN_ON(1);
return NULL;
}
/* Never should we have an index greater than the size */
if (RB_WARN_ON(cpu_buffer,
cpu_buffer->reader_page->read > rb_page_size(reader)))
goto out;
/* ... and install it into the ring buffer in place of the old head */
rb_list_head_clear(&new_reader->list);
new_rb_page->list.next = new_reader->list.next;
new_rb_page->list.prev = new_reader->list.prev;
new_rb_page->list.next->prev = &new_rb_page->list;
new_rb_page->list.prev->next = &new_rb_page->list;
/* check if we caught up to the tail */
reader = NULL;
if (cpu_buffer->commit_page == cpu_buffer->reader_page)
goto out;
cpu_buffer->reader_page = new_reader;
cpu_buffer->reader_page->read = 0;
/* Don't bother swapping if the ring buffer is empty */
if (rb_num_of_entries(cpu_buffer) == 0)
goto out;
/* Install the new head page */
new_head = new_rb_page;
rb_inc_page(&new_head);
cpu_buffer->head_page = new_head;
/*
* cpu_buffer->pages just needs to point to the buffer, it
* has no specific buffer page to point to. Lets move it out
* of our way so we don't accidentally swap it.
*/
cpu_buffer->pages = &new_head->list;
__set_head_page_flag(new_head, RB_PAGE_HEAD);
footer = rb_ext_page_get_footer(new_reader->page);
overrun = footer->stats.overrun;
if (overrun != cpu_buffer->last_overrun) {
cpu_buffer->lost_events = overrun - cpu_buffer->last_overrun;
cpu_buffer->last_overrun = overrun;
}
return new_reader;
}
static struct buffer_page *
rb_swap_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *reader;
unsigned long overwrite;
int ret;
/*
* Reset the reader page to size zero.
@ -4559,7 +4675,8 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
*/
reader = rb_set_head_page(cpu_buffer);
if (!reader)
goto out;
return NULL;
cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
cpu_buffer->reader_page->list.prev = reader->list.prev;
@ -4623,7 +4740,60 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->last_overrun = overwrite;
}
goto again;
return reader;
}
static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *reader = NULL;
unsigned long flags;
int nr_loops = 0;
unsigned int page_size;
local_irq_save(flags);
arch_spin_lock(&cpu_buffer->lock);
again:
/*
* This should normally only loop twice. But because the
* start of the reader inserts an empty page, it causes
* a case where we will loop three times. There should be no
* reason to loop four times (that I know of).
*/
if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
reader = NULL;
goto out;
}
reader = cpu_buffer->reader_page;
/* If there's more to read, return this page */
if (cpu_buffer->reader_page->read < rb_page_size(reader))
goto out;
page_size = rb_page_size(reader);
/* Never should we have an index greater than the size */
if (RB_WARN_ON(cpu_buffer,
cpu_buffer->reader_page->read > page_size))
goto out;
/* check if we caught up to the tail */
reader = NULL;
if (cpu_buffer->commit_page == cpu_buffer->reader_page)
goto out;
/* Don't bother swapping if the ring buffer is empty */
if (rb_num_of_entries(cpu_buffer) == 0)
goto out;
if (rb_has_ext_writer(cpu_buffer))
reader = rb_swap_reader_page_ext(cpu_buffer);
else
reader = rb_swap_reader_page(cpu_buffer);
if (reader)
goto again;
out:
/* Update the read_stamp on the first event */
@ -5043,6 +5213,73 @@ ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
}
EXPORT_SYMBOL_GPL(ring_buffer_consume);
static void ring_buffer_update_view(struct ring_buffer_per_cpu *cpu_buffer)
{
struct rb_ext_page_footer *footer;
struct buffer_page *bpage;
if (!rb_has_ext_writer(cpu_buffer))
return;
raw_spin_lock_irq(&cpu_buffer->reader_lock);
arch_spin_lock(&cpu_buffer->lock);
cpu_buffer->buffer->ext_cb->update_footers(cpu_buffer->cpu);
bpage = cpu_buffer->reader_page;
footer = rb_ext_page_get_footer(bpage->page);
local_set(&cpu_buffer->entries, footer->stats.entries);
local_set(&cpu_buffer->pages_touched, footer->stats.pages_touched);
local_set(&cpu_buffer->overrun, footer->stats.overrun);
/* Update the commit page */
bpage = ring_buffer_search_footer(cpu_buffer->commit_page,
RB_PAGE_FT_COMMIT);
if (!bpage) {
WARN_ON(1);
goto unlock;
}
cpu_buffer->commit_page = bpage;
/* Update the head page */
bpage = ring_buffer_search_footer(cpu_buffer->head_page,
RB_PAGE_FT_HEAD);
if (!bpage) {
WARN_ON(1);
goto unlock;
}
/* Reset the previous RB_PAGE_HEAD flag */
__set_head_page_flag(cpu_buffer->head_page, RB_PAGE_NORMAL);
/* Set RB_PAGE_HEAD flag pointing to the new head */
__set_head_page_flag(bpage, RB_PAGE_HEAD);
cpu_buffer->reader_page->list.next = &cpu_buffer->head_page->list;
cpu_buffer->head_page = bpage;
unlock:
arch_spin_unlock(&cpu_buffer->lock);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
}
int ring_buffer_poke(struct trace_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return -EINVAL;
cpu_buffer = buffer->buffers[cpu];
ring_buffer_update_view(cpu_buffer);
rb_wakeups(buffer, cpu_buffer);
return 0;
}
/**
* ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
* @buffer: The ring buffer to read from
@ -5089,6 +5326,8 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
atomic_inc(&cpu_buffer->resize_disabled);
ring_buffer_update_view(cpu_buffer);
return iter;
}
EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
@ -5449,6 +5688,9 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
struct ring_buffer_per_cpu *cpu_buffer_b;
int ret = -EINVAL;
if (unlikely(has_ext_writer(buffer_a) || has_ext_writer(buffer_b)))
return -EINVAL;
if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
!cpumask_test_cpu(cpu, buffer_b->cpumask))
goto out;
@ -5529,6 +5771,9 @@ void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
unsigned long flags;
struct page *page;
if (unlikely(has_ext_writer(buffer)))
return ERR_PTR(-EINVAL);
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return ERR_PTR(-ENODEV);
@ -5643,6 +5888,9 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
u64 save_timestamp;
int ret = -1;
if (unlikely(has_ext_writer(buffer)))
goto out;
if (!cpumask_test_cpu(cpu, buffer->cpumask))
goto out;
@ -5844,6 +6092,74 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
return 0;
}
#define TRACE_BUFFER_PACK_HDR_SIZE offsetof(struct trace_buffer_pack, __data)
#define RING_BUFFER_PACK_HDR_SIZE offsetof(struct ring_buffer_pack, page_va)
size_t trace_buffer_pack_size(struct trace_buffer *trace_buffer)
{
size_t size = 0;
int cpu;
for_each_buffer_cpu(trace_buffer, cpu) {
struct ring_buffer_per_cpu *rb = trace_buffer->buffers[cpu];
size += rb->nr_pages * sizeof(unsigned long);
size += RING_BUFFER_PACK_HDR_SIZE;
}
size += TRACE_BUFFER_PACK_HDR_SIZE;
return size;
}
int trace_buffer_pack(struct trace_buffer *trace_buffer,
struct trace_buffer_pack *pack)
{
struct ring_buffer_pack *cpu_pack;
int cpu = -1, pack_cpu, j;
if (!has_ext_writer(trace_buffer))
return -EINVAL;
pack->nr_cpus = cpumask_weight(trace_buffer->cpumask);
pack->total_pages = 0;
for_each_ring_buffer_pack(cpu_pack, pack_cpu, pack) {
struct ring_buffer_per_cpu *rb;
unsigned long flags, nr_pages;
struct buffer_page *bpage;
cpu = cpumask_next(cpu, trace_buffer->cpumask);
if (cpu > nr_cpu_ids) {
WARN_ON(1);
break;
}
rb = trace_buffer->buffers[cpu];
local_irq_save(flags);
arch_spin_lock(&rb->lock);
bpage = rb->head_page;
nr_pages = rb->nr_pages;
pack->total_pages += nr_pages + 1;
cpu_pack->cpu = cpu;
cpu_pack->reader_page_va = (unsigned long)rb->reader_page->page;
cpu_pack->nr_pages = nr_pages;
for (j = 0; j < nr_pages; j++) {
cpu_pack->page_va[j] = (unsigned long)bpage->page;
rb_inc_page(&bpage);
}
arch_spin_unlock(&rb->lock);
local_irq_restore(flags);
}
return 0;
}
#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
/*
* This is a basic integrity check of the ring buffer.