1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
7#include <linux/trace_recursion.h>
8#include <linux/trace_events.h>
9#include <linux/ring_buffer.h>
10#include <linux/trace_clock.h>
11#include <linux/sched/clock.h>
12#include <linux/cacheflush.h>
13#include <linux/trace_seq.h>
14#include <linux/spinlock.h>
15#include <linux/irq_work.h>
16#include <linux/security.h>
17#include <linux/uaccess.h>
18#include <linux/hardirq.h>
19#include <linux/kthread.h>	/* for self test */
20#include <linux/module.h>
21#include <linux/percpu.h>
22#include <linux/mutex.h>
23#include <linux/delay.h>
24#include <linux/slab.h>
25#include <linux/init.h>
26#include <linux/hash.h>
27#include <linux/list.h>
28#include <linux/cpu.h>
29#include <linux/oom.h>
30#include <linux/mm.h>
31
32#include <asm/local64.h>
33#include <asm/local.h>
34
35/*
36 * The "absolute" timestamp in the buffer is only 59 bits.
37 * If a clock has the 5 MSBs set, it needs to be saved and
38 * reinserted.
39 */
40#define TS_MSB		(0xf8ULL << 56)
41#define ABS_TS_MASK	(~TS_MSB)
42
43static void update_pages_handler(struct work_struct *work);
44
45/*
46 * The ring buffer header is special. We must manually up keep it.
47 */
48int ring_buffer_print_entry_header(struct trace_seq *s)
49{
50	trace_seq_puts(s, "# compressed entry header\n");
51	trace_seq_puts(s, "\ttype_len    :    5 bits\n");
52	trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
53	trace_seq_puts(s, "\tarray       :   32 bits\n");
54	trace_seq_putc(s, '\n');
55	trace_seq_printf(s, "\tpadding     : type == %d\n",
56			 RINGBUF_TYPE_PADDING);
57	trace_seq_printf(s, "\ttime_extend : type == %d\n",
58			 RINGBUF_TYPE_TIME_EXTEND);
59	trace_seq_printf(s, "\ttime_stamp : type == %d\n",
60			 RINGBUF_TYPE_TIME_STAMP);
61	trace_seq_printf(s, "\tdata max type_len  == %d\n",
62			 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
63
64	return !trace_seq_has_overflowed(s);
65}
66
67/*
68 * The ring buffer is made up of a list of pages. A separate list of pages is
69 * allocated for each CPU. A writer may only write to a buffer that is
70 * associated with the CPU it is currently executing on.  A reader may read
71 * from any per cpu buffer.
72 *
73 * The reader is special. For each per cpu buffer, the reader has its own
74 * reader page. When a reader has read the entire reader page, this reader
75 * page is swapped with another page in the ring buffer.
76 *
77 * Now, as long as the writer is off the reader page, the reader can do what
78 * ever it wants with that page. The writer will never write to that page
79 * again (as long as it is out of the ring buffer).
80 *
81 * Here's some silly ASCII art.
82 *
83 *   +------+
84 *   |reader|          RING BUFFER
85 *   |page  |
86 *   +------+        +---+   +---+   +---+
87 *                   |   |-->|   |-->|   |
88 *                   +---+   +---+   +---+
89 *                     ^               |
90 *                     |               |
91 *                     +---------------+
92 *
93 *
94 *   +------+
95 *   |reader|          RING BUFFER
96 *   |page  |------------------v
97 *   +------+        +---+   +---+   +---+
98 *                   |   |-->|   |-->|   |
99 *                   +---+   +---+   +---+
100 *                     ^               |
101 *                     |               |
102 *                     +---------------+
103 *
104 *
105 *   +------+
106 *   |reader|          RING BUFFER
107 *   |page  |------------------v
108 *   +------+        +---+   +---+   +---+
109 *      ^            |   |-->|   |-->|   |
110 *      |            +---+   +---+   +---+
111 *      |                              |
112 *      |                              |
113 *      +------------------------------+
114 *
115 *
116 *   +------+
117 *   |buffer|          RING BUFFER
118 *   |page  |------------------v
119 *   +------+        +---+   +---+   +---+
120 *      ^            |   |   |   |-->|   |
121 *      |   New      +---+   +---+   +---+
122 *      |  Reader------^               |
123 *      |   page                       |
124 *      +------------------------------+
125 *
126 *
127 * After we make this swap, the reader can hand this page off to the splice
128 * code and be done with it. It can even allocate a new page if it needs to
129 * and swap that into the ring buffer.
130 *
131 * We will be using cmpxchg soon to make all this lockless.
132 *
133 */
134
135/* Used for individual buffers (after the counter) */
136#define RB_BUFFER_OFF		(1 << 20)
137
138#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
139
140#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
141#define RB_ALIGNMENT		4U
142#define RB_MAX_SMALL_DATA	(RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
143#define RB_EVNT_MIN_SIZE	8U	/* two 32bit words */
144
145#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
146# define RB_FORCE_8BYTE_ALIGNMENT	0
147# define RB_ARCH_ALIGNMENT		RB_ALIGNMENT
148#else
149# define RB_FORCE_8BYTE_ALIGNMENT	1
150# define RB_ARCH_ALIGNMENT		8U
151#endif
152
153#define RB_ALIGN_DATA		__aligned(RB_ARCH_ALIGNMENT)
154
155/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
156#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
157
158enum {
159	RB_LEN_TIME_EXTEND = 8,
160	RB_LEN_TIME_STAMP =  8,
161};
162
163#define skip_time_extend(event) \
164	((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
165
166#define extended_time(event) \
167	(event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
168
169static inline bool rb_null_event(struct ring_buffer_event *event)
170{
171	return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
172}
173
174static void rb_event_set_padding(struct ring_buffer_event *event)
175{
176	/* padding has a NULL time_delta */
177	event->type_len = RINGBUF_TYPE_PADDING;
178	event->time_delta = 0;
179}
180
181static unsigned
182rb_event_data_length(struct ring_buffer_event *event)
183{
184	unsigned length;
185
186	if (event->type_len)
187		length = event->type_len * RB_ALIGNMENT;
188	else
189		length = event->array[0];
190	return length + RB_EVNT_HDR_SIZE;
191}
192
193/*
194 * Return the length of the given event. Will return
195 * the length of the time extend if the event is a
196 * time extend.
197 */
198static inline unsigned
199rb_event_length(struct ring_buffer_event *event)
200{
201	switch (event->type_len) {
202	case RINGBUF_TYPE_PADDING:
203		if (rb_null_event(event))
204			/* undefined */
205			return -1;
206		return  event->array[0] + RB_EVNT_HDR_SIZE;
207
208	case RINGBUF_TYPE_TIME_EXTEND:
209		return RB_LEN_TIME_EXTEND;
210
211	case RINGBUF_TYPE_TIME_STAMP:
212		return RB_LEN_TIME_STAMP;
213
214	case RINGBUF_TYPE_DATA:
215		return rb_event_data_length(event);
216	default:
217		WARN_ON_ONCE(1);
218	}
219	/* not hit */
220	return 0;
221}
222
223/*
224 * Return total length of time extend and data,
225 *   or just the event length for all other events.
226 */
227static inline unsigned
228rb_event_ts_length(struct ring_buffer_event *event)
229{
230	unsigned len = 0;
231
232	if (extended_time(event)) {
233		/* time extends include the data event after it */
234		len = RB_LEN_TIME_EXTEND;
235		event = skip_time_extend(event);
236	}
237	return len + rb_event_length(event);
238}
239
240/**
241 * ring_buffer_event_length - return the length of the event
242 * @event: the event to get the length of
243 *
244 * Returns the size of the data load of a data event.
245 * If the event is something other than a data event, it
246 * returns the size of the event itself. With the exception
247 * of a TIME EXTEND, where it still returns the size of the
248 * data load of the data event after it.
249 */
250unsigned ring_buffer_event_length(struct ring_buffer_event *event)
251{
252	unsigned length;
253
254	if (extended_time(event))
255		event = skip_time_extend(event);
256
257	length = rb_event_length(event);
258	if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
259		return length;
260	length -= RB_EVNT_HDR_SIZE;
261	if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
262                length -= sizeof(event->array[0]);
263	return length;
264}
265EXPORT_SYMBOL_GPL(ring_buffer_event_length);
266
267/* inline for ring buffer fast paths */
268static __always_inline void *
269rb_event_data(struct ring_buffer_event *event)
270{
271	if (extended_time(event))
272		event = skip_time_extend(event);
273	WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
274	/* If length is in len field, then array[0] has the data */
275	if (event->type_len)
276		return (void *)&event->array[0];
277	/* Otherwise length is in array[0] and array[1] has the data */
278	return (void *)&event->array[1];
279}
280
281/**
282 * ring_buffer_event_data - return the data of the event
283 * @event: the event to get the data from
284 */
285void *ring_buffer_event_data(struct ring_buffer_event *event)
286{
287	return rb_event_data(event);
288}
289EXPORT_SYMBOL_GPL(ring_buffer_event_data);
290
291#define for_each_buffer_cpu(buffer, cpu)		\
292	for_each_cpu(cpu, buffer->cpumask)
293
294#define for_each_online_buffer_cpu(buffer, cpu)		\
295	for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
296
297#define TS_SHIFT	27
298#define TS_MASK		((1ULL << TS_SHIFT) - 1)
299#define TS_DELTA_TEST	(~TS_MASK)
300
301static u64 rb_event_time_stamp(struct ring_buffer_event *event)
302{
303	u64 ts;
304
305	ts = event->array[0];
306	ts <<= TS_SHIFT;
307	ts += event->time_delta;
308
309	return ts;
310}
311
312/* Flag when events were overwritten */
313#define RB_MISSED_EVENTS	(1 << 31)
314/* Missed count stored at end */
315#define RB_MISSED_STORED	(1 << 30)
316
317#define RB_MISSED_MASK		(3 << 30)
318
319struct buffer_data_page {
320	u64		 time_stamp;	/* page time stamp */
321	local_t		 commit;	/* write committed index */
322	unsigned char	 data[] RB_ALIGN_DATA;	/* data of buffer page */
323};
324
325struct buffer_data_read_page {
326	unsigned		order;	/* order of the page */
327	struct buffer_data_page	*data;	/* actual data, stored in this page */
328};
329
330/*
331 * Note, the buffer_page list must be first. The buffer pages
332 * are allocated in cache lines, which means that each buffer
333 * page will be at the beginning of a cache line, and thus
334 * the least significant bits will be zero. We use this to
335 * add flags in the list struct pointers, to make the ring buffer
336 * lockless.
337 */
338struct buffer_page {
339	struct list_head list;		/* list of buffer pages */
340	local_t		 write;		/* index for next write */
341	unsigned	 read;		/* index for next read */
342	local_t		 entries;	/* entries on this page */
343	unsigned long	 real_end;	/* real end of data */
344	unsigned	 order;		/* order of the page */
345	u32		 id;		/* ID for external mapping */
346	struct buffer_data_page *page;	/* Actual data page */
347};
348
349/*
350 * The buffer page counters, write and entries, must be reset
351 * atomically when crossing page boundaries. To synchronize this
352 * update, two counters are inserted into the number. One is
353 * the actual counter for the write position or count on the page.
354 *
355 * The other is a counter of updaters. Before an update happens
356 * the update partition of the counter is incremented. This will
357 * allow the updater to update the counter atomically.
358 *
359 * The counter is 20 bits, and the state data is 12.
360 */
361#define RB_WRITE_MASK		0xfffff
362#define RB_WRITE_INTCNT		(1 << 20)
363
364static void rb_init_page(struct buffer_data_page *bpage)
365{
366	local_set(&bpage->commit, 0);
367}
368
369static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
370{
371	return local_read(&bpage->page->commit);
372}
373
374static void free_buffer_page(struct buffer_page *bpage)
375{
376	free_pages((unsigned long)bpage->page, bpage->order);
377	kfree(bpage);
378}
379
380/*
381 * We need to fit the time_stamp delta into 27 bits.
382 */
383static inline bool test_time_stamp(u64 delta)
384{
385	return !!(delta & TS_DELTA_TEST);
386}
387
388struct rb_irq_work {
389	struct irq_work			work;
390	wait_queue_head_t		waiters;
391	wait_queue_head_t		full_waiters;
392	atomic_t			seq;
393	bool				waiters_pending;
394	bool				full_waiters_pending;
395	bool				wakeup_full;
396};
397
398/*
399 * Structure to hold event state and handle nested events.
400 */
401struct rb_event_info {
402	u64			ts;
403	u64			delta;
404	u64			before;
405	u64			after;
406	unsigned long		length;
407	struct buffer_page	*tail_page;
408	int			add_timestamp;
409};
410
411/*
412 * Used for the add_timestamp
413 *  NONE
414 *  EXTEND - wants a time extend
415 *  ABSOLUTE - the buffer requests all events to have absolute time stamps
416 *  FORCE - force a full time stamp.
417 */
418enum {
419	RB_ADD_STAMP_NONE		= 0,
420	RB_ADD_STAMP_EXTEND		= BIT(1),
421	RB_ADD_STAMP_ABSOLUTE		= BIT(2),
422	RB_ADD_STAMP_FORCE		= BIT(3)
423};
424/*
425 * Used for which event context the event is in.
426 *  TRANSITION = 0
427 *  NMI     = 1
428 *  IRQ     = 2
429 *  SOFTIRQ = 3
430 *  NORMAL  = 4
431 *
432 * See trace_recursive_lock() comment below for more details.
433 */
434enum {
435	RB_CTX_TRANSITION,
436	RB_CTX_NMI,
437	RB_CTX_IRQ,
438	RB_CTX_SOFTIRQ,
439	RB_CTX_NORMAL,
440	RB_CTX_MAX
441};
442
443struct rb_time_struct {
444	local64_t	time;
445};
446typedef struct rb_time_struct rb_time_t;
447
448#define MAX_NEST	5
449
450/*
451 * head_page == tail_page && head == tail then buffer is empty.
452 */
453struct ring_buffer_per_cpu {
454	int				cpu;
455	atomic_t			record_disabled;
456	atomic_t			resize_disabled;
457	struct trace_buffer	*buffer;
458	raw_spinlock_t			reader_lock;	/* serialize readers */
459	arch_spinlock_t			lock;
460	struct lock_class_key		lock_key;
461	struct buffer_data_page		*free_page;
462	unsigned long			nr_pages;
463	unsigned int			current_context;
464	struct list_head		*pages;
465	struct buffer_page		*head_page;	/* read from head */
466	struct buffer_page		*tail_page;	/* write to tail */
467	struct buffer_page		*commit_page;	/* committed pages */
468	struct buffer_page		*reader_page;
469	unsigned long			lost_events;
470	unsigned long			last_overrun;
471	unsigned long			nest;
472	local_t				entries_bytes;
473	local_t				entries;
474	local_t				overrun;
475	local_t				commit_overrun;
476	local_t				dropped_events;
477	local_t				committing;
478	local_t				commits;
479	local_t				pages_touched;
480	local_t				pages_lost;
481	local_t				pages_read;
482	long				last_pages_touch;
483	size_t				shortest_full;
484	unsigned long			read;
485	unsigned long			read_bytes;
486	rb_time_t			write_stamp;
487	rb_time_t			before_stamp;
488	u64				event_stamp[MAX_NEST];
489	u64				read_stamp;
490	/* pages removed since last reset */
491	unsigned long			pages_removed;
492
493	unsigned int			mapped;
494	struct mutex			mapping_lock;
495	unsigned long			*subbuf_ids;	/* ID to subbuf VA */
496	struct trace_buffer_meta	*meta_page;
497
498	/* ring buffer pages to update, > 0 to add, < 0 to remove */
499	long				nr_pages_to_update;
500	struct list_head		new_pages; /* new pages to add */
501	struct work_struct		update_pages_work;
502	struct completion		update_done;
503
504	struct rb_irq_work		irq_work;
505};
506
507struct trace_buffer {
508	unsigned			flags;
509	int				cpus;
510	atomic_t			record_disabled;
511	atomic_t			resizing;
512	cpumask_var_t			cpumask;
513
514	struct lock_class_key		*reader_lock_key;
515
516	struct mutex			mutex;
517
518	struct ring_buffer_per_cpu	**buffers;
519
520	struct hlist_node		node;
521	u64				(*clock)(void);
522
523	struct rb_irq_work		irq_work;
524	bool				time_stamp_abs;
525
526	unsigned int			subbuf_size;
527	unsigned int			subbuf_order;
528	unsigned int			max_data_size;
529};
530
531struct ring_buffer_iter {
532	struct ring_buffer_per_cpu	*cpu_buffer;
533	unsigned long			head;
534	unsigned long			next_event;
535	struct buffer_page		*head_page;
536	struct buffer_page		*cache_reader_page;
537	unsigned long			cache_read;
538	unsigned long			cache_pages_removed;
539	u64				read_stamp;
540	u64				page_stamp;
541	struct ring_buffer_event	*event;
542	size_t				event_size;
543	int				missed_events;
544};
545
546int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s)
547{
548	struct buffer_data_page field;
549
550	trace_seq_printf(s, "\tfield: u64 timestamp;\t"
551			 "offset:0;\tsize:%u;\tsigned:%u;\n",
552			 (unsigned int)sizeof(field.time_stamp),
553			 (unsigned int)is_signed_type(u64));
554
555	trace_seq_printf(s, "\tfield: local_t commit;\t"
556			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
557			 (unsigned int)offsetof(typeof(field), commit),
558			 (unsigned int)sizeof(field.commit),
559			 (unsigned int)is_signed_type(long));
560
561	trace_seq_printf(s, "\tfield: int overwrite;\t"
562			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
563			 (unsigned int)offsetof(typeof(field), commit),
564			 1,
565			 (unsigned int)is_signed_type(long));
566
567	trace_seq_printf(s, "\tfield: char data;\t"
568			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
569			 (unsigned int)offsetof(typeof(field), data),
570			 (unsigned int)buffer->subbuf_size,
571			 (unsigned int)is_signed_type(char));
572
573	return !trace_seq_has_overflowed(s);
574}
575
576static inline void rb_time_read(rb_time_t *t, u64 *ret)
577{
578	*ret = local64_read(&t->time);
579}
580static void rb_time_set(rb_time_t *t, u64 val)
581{
582	local64_set(&t->time, val);
583}
584
585/*
586 * Enable this to make sure that the event passed to
587 * ring_buffer_event_time_stamp() is not committed and also
588 * is on the buffer that it passed in.
589 */
590//#define RB_VERIFY_EVENT
591#ifdef RB_VERIFY_EVENT
592static struct list_head *rb_list_head(struct list_head *list);
593static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
594			 void *event)
595{
596	struct buffer_page *page = cpu_buffer->commit_page;
597	struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
598	struct list_head *next;
599	long commit, write;
600	unsigned long addr = (unsigned long)event;
601	bool done = false;
602	int stop = 0;
603
604	/* Make sure the event exists and is not committed yet */
605	do {
606		if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
607			done = true;
608		commit = local_read(&page->page->commit);
609		write = local_read(&page->write);
610		if (addr >= (unsigned long)&page->page->data[commit] &&
611		    addr < (unsigned long)&page->page->data[write])
612			return;
613
614		next = rb_list_head(page->list.next);
615		page = list_entry(next, struct buffer_page, list);
616	} while (!done);
617	WARN_ON_ONCE(1);
618}
619#else
620static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
621			 void *event)
622{
623}
624#endif
625
626/*
627 * The absolute time stamp drops the 5 MSBs and some clocks may
628 * require them. The rb_fix_abs_ts() will take a previous full
629 * time stamp, and add the 5 MSB of that time stamp on to the
630 * saved absolute time stamp. Then they are compared in case of
631 * the unlikely event that the latest time stamp incremented
632 * the 5 MSB.
633 */
634static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
635{
636	if (save_ts & TS_MSB) {
637		abs |= save_ts & TS_MSB;
638		/* Check for overflow */
639		if (unlikely(abs < save_ts))
640			abs += 1ULL << 59;
641	}
642	return abs;
643}
644
645static inline u64 rb_time_stamp(struct trace_buffer *buffer);
646
647/**
648 * ring_buffer_event_time_stamp - return the event's current time stamp
649 * @buffer: The buffer that the event is on
650 * @event: the event to get the time stamp of
651 *
652 * Note, this must be called after @event is reserved, and before it is
653 * committed to the ring buffer. And must be called from the same
654 * context where the event was reserved (normal, softirq, irq, etc).
655 *
656 * Returns the time stamp associated with the current event.
657 * If the event has an extended time stamp, then that is used as
658 * the time stamp to return.
659 * In the highly unlikely case that the event was nested more than
660 * the max nesting, then the write_stamp of the buffer is returned,
661 * otherwise  current time is returned, but that really neither of
662 * the last two cases should ever happen.
663 */
664u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
665				 struct ring_buffer_event *event)
666{
667	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
668	unsigned int nest;
669	u64 ts;
670
671	/* If the event includes an absolute time, then just use that */
672	if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
673		ts = rb_event_time_stamp(event);
674		return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
675	}
676
677	nest = local_read(&cpu_buffer->committing);
678	verify_event(cpu_buffer, event);
679	if (WARN_ON_ONCE(!nest))
680		goto fail;
681
682	/* Read the current saved nesting level time stamp */
683	if (likely(--nest < MAX_NEST))
684		return cpu_buffer->event_stamp[nest];
685
686	/* Shouldn't happen, warn if it does */
687	WARN_ONCE(1, "nest (%d) greater than max", nest);
688
689 fail:
690	rb_time_read(&cpu_buffer->write_stamp, &ts);
691
692	return ts;
693}
694
695/**
696 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
697 * @buffer: The ring_buffer to get the number of pages from
698 * @cpu: The cpu of the ring_buffer to get the number of pages from
699 *
700 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
701 */
702size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
703{
704	return buffer->buffers[cpu]->nr_pages;
705}
706
707/**
708 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer
709 * @buffer: The ring_buffer to get the number of pages from
710 * @cpu: The cpu of the ring_buffer to get the number of pages from
711 *
712 * Returns the number of pages that have content in the ring buffer.
713 */
714size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
715{
716	size_t read;
717	size_t lost;
718	size_t cnt;
719
720	read = local_read(&buffer->buffers[cpu]->pages_read);
721	lost = local_read(&buffer->buffers[cpu]->pages_lost);
722	cnt = local_read(&buffer->buffers[cpu]->pages_touched);
723
724	if (WARN_ON_ONCE(cnt < lost))
725		return 0;
726
727	cnt -= lost;
728
729	/* The reader can read an empty page, but not more than that */
730	if (cnt < read) {
731		WARN_ON_ONCE(read > cnt + 1);
732		return 0;
733	}
734
735	return cnt - read;
736}
737
738static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
739{
740	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
741	size_t nr_pages;
742	size_t dirty;
743
744	nr_pages = cpu_buffer->nr_pages;
745	if (!nr_pages || !full)
746		return true;
747
748	/*
749	 * Add one as dirty will never equal nr_pages, as the sub-buffer
750	 * that the writer is on is not counted as dirty.
751	 * This is needed if "buffer_percent" is set to 100.
752	 */
753	dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
754
755	return (dirty * 100) >= (full * nr_pages);
756}
757
758/*
759 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
760 *
761 * Schedules a delayed work to wake up any task that is blocked on the
762 * ring buffer waiters queue.
763 */
764static void rb_wake_up_waiters(struct irq_work *work)
765{
766	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
767
768	/* For waiters waiting for the first wake up */
769	(void)atomic_fetch_inc_release(&rbwork->seq);
770
771	wake_up_all(&rbwork->waiters);
772	if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
773		/* Only cpu_buffer sets the above flags */
774		struct ring_buffer_per_cpu *cpu_buffer =
775			container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
776
777		/* Called from interrupt context */
778		raw_spin_lock(&cpu_buffer->reader_lock);
779		rbwork->wakeup_full = false;
780		rbwork->full_waiters_pending = false;
781
782		/* Waking up all waiters, they will reset the shortest full */
783		cpu_buffer->shortest_full = 0;
784		raw_spin_unlock(&cpu_buffer->reader_lock);
785
786		wake_up_all(&rbwork->full_waiters);
787	}
788}
789
790/**
791 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
792 * @buffer: The ring buffer to wake waiters on
793 * @cpu: The CPU buffer to wake waiters on
794 *
795 * In the case of a file that represents a ring buffer is closing,
796 * it is prudent to wake up any waiters that are on this.
797 */
798void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
799{
800	struct ring_buffer_per_cpu *cpu_buffer;
801	struct rb_irq_work *rbwork;
802
803	if (!buffer)
804		return;
805
806	if (cpu == RING_BUFFER_ALL_CPUS) {
807
808		/* Wake up individual ones too. One level recursion */
809		for_each_buffer_cpu(buffer, cpu)
810			ring_buffer_wake_waiters(buffer, cpu);
811
812		rbwork = &buffer->irq_work;
813	} else {
814		if (WARN_ON_ONCE(!buffer->buffers))
815			return;
816		if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
817			return;
818
819		cpu_buffer = buffer->buffers[cpu];
820		/* The CPU buffer may not have been initialized yet */
821		if (!cpu_buffer)
822			return;
823		rbwork = &cpu_buffer->irq_work;
824	}
825
826	/* This can be called in any context */
827	irq_work_queue(&rbwork->work);
828}
829
830static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
831{
832	struct ring_buffer_per_cpu *cpu_buffer;
833	bool ret = false;
834
835	/* Reads of all CPUs always waits for any data */
836	if (cpu == RING_BUFFER_ALL_CPUS)
837		return !ring_buffer_empty(buffer);
838
839	cpu_buffer = buffer->buffers[cpu];
840
841	if (!ring_buffer_empty_cpu(buffer, cpu)) {
842		unsigned long flags;
843		bool pagebusy;
844
845		if (!full)
846			return true;
847
848		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
849		pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
850		ret = !pagebusy && full_hit(buffer, cpu, full);
851
852		if (!ret && (!cpu_buffer->shortest_full ||
853			     cpu_buffer->shortest_full > full)) {
854		    cpu_buffer->shortest_full = full;
855		}
856		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
857	}
858	return ret;
859}
860
861static inline bool
862rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
863	     int cpu, int full, ring_buffer_cond_fn cond, void *data)
864{
865	if (rb_watermark_hit(buffer, cpu, full))
866		return true;
867
868	if (cond(data))
869		return true;
870
871	/*
872	 * The events can happen in critical sections where
873	 * checking a work queue can cause deadlocks.
874	 * After adding a task to the queue, this flag is set
875	 * only to notify events to try to wake up the queue
876	 * using irq_work.
877	 *
878	 * We don't clear it even if the buffer is no longer
879	 * empty. The flag only causes the next event to run
880	 * irq_work to do the work queue wake up. The worse
881	 * that can happen if we race with !trace_empty() is that
882	 * an event will cause an irq_work to try to wake up
883	 * an empty queue.
884	 *
885	 * There's no reason to protect this flag either, as
886	 * the work queue and irq_work logic will do the necessary
887	 * synchronization for the wake ups. The only thing
888	 * that is necessary is that the wake up happens after
889	 * a task has been queued. It's OK for spurious wake ups.
890	 */
891	if (full)
892		rbwork->full_waiters_pending = true;
893	else
894		rbwork->waiters_pending = true;
895
896	return false;
897}
898
899struct rb_wait_data {
900	struct rb_irq_work		*irq_work;
901	int				seq;
902};
903
904/*
905 * The default wait condition for ring_buffer_wait() is to just to exit the
906 * wait loop the first time it is woken up.
907 */
908static bool rb_wait_once(void *data)
909{
910	struct rb_wait_data *rdata = data;
911	struct rb_irq_work *rbwork = rdata->irq_work;
912
913	return atomic_read_acquire(&rbwork->seq) != rdata->seq;
914}
915
916/**
917 * ring_buffer_wait - wait for input to the ring buffer
918 * @buffer: buffer to wait on
919 * @cpu: the cpu buffer to wait on
920 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
921 * @cond: condition function to break out of wait (NULL to run once)
922 * @data: the data to pass to @cond.
923 *
924 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
925 * as data is added to any of the @buffer's cpu buffers. Otherwise
926 * it will wait for data to be added to a specific cpu buffer.
927 */
928int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
929		     ring_buffer_cond_fn cond, void *data)
930{
931	struct ring_buffer_per_cpu *cpu_buffer;
932	struct wait_queue_head *waitq;
933	struct rb_irq_work *rbwork;
934	struct rb_wait_data rdata;
935	int ret = 0;
936
937	/*
938	 * Depending on what the caller is waiting for, either any
939	 * data in any cpu buffer, or a specific buffer, put the
940	 * caller on the appropriate wait queue.
941	 */
942	if (cpu == RING_BUFFER_ALL_CPUS) {
943		rbwork = &buffer->irq_work;
944		/* Full only makes sense on per cpu reads */
945		full = 0;
946	} else {
947		if (!cpumask_test_cpu(cpu, buffer->cpumask))
948			return -ENODEV;
949		cpu_buffer = buffer->buffers[cpu];
950		rbwork = &cpu_buffer->irq_work;
951	}
952
953	if (full)
954		waitq = &rbwork->full_waiters;
955	else
956		waitq = &rbwork->waiters;
957
958	/* Set up to exit loop as soon as it is woken */
959	if (!cond) {
960		cond = rb_wait_once;
961		rdata.irq_work = rbwork;
962		rdata.seq = atomic_read_acquire(&rbwork->seq);
963		data = &rdata;
964	}
965
966	ret = wait_event_interruptible((*waitq),
967				rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
968
969	return ret;
970}
971
972/**
973 * ring_buffer_poll_wait - poll on buffer input
974 * @buffer: buffer to wait on
975 * @cpu: the cpu buffer to wait on
976 * @filp: the file descriptor
977 * @poll_table: The poll descriptor
978 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
979 *
980 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
981 * as data is added to any of the @buffer's cpu buffers. Otherwise
982 * it will wait for data to be added to a specific cpu buffer.
983 *
984 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
985 * zero otherwise.
986 */
987__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
988			  struct file *filp, poll_table *poll_table, int full)
989{
990	struct ring_buffer_per_cpu *cpu_buffer;
991	struct rb_irq_work *rbwork;
992
993	if (cpu == RING_BUFFER_ALL_CPUS) {
994		rbwork = &buffer->irq_work;
995		full = 0;
996	} else {
997		if (!cpumask_test_cpu(cpu, buffer->cpumask))
998			return EPOLLERR;
999
1000		cpu_buffer = buffer->buffers[cpu];
1001		rbwork = &cpu_buffer->irq_work;
1002	}
1003
1004	if (full) {
1005		poll_wait(filp, &rbwork->full_waiters, poll_table);
1006
1007		if (rb_watermark_hit(buffer, cpu, full))
1008			return EPOLLIN | EPOLLRDNORM;
1009		/*
1010		 * Only allow full_waiters_pending update to be seen after
1011		 * the shortest_full is set (in rb_watermark_hit). If the
1012		 * writer sees the full_waiters_pending flag set, it will
1013		 * compare the amount in the ring buffer to shortest_full.
1014		 * If the amount in the ring buffer is greater than the
1015		 * shortest_full percent, it will call the irq_work handler
1016		 * to wake up this list. The irq_handler will reset shortest_full
1017		 * back to zero. That's done under the reader_lock, but
1018		 * the below smp_mb() makes sure that the update to
1019		 * full_waiters_pending doesn't leak up into the above.
1020		 */
1021		smp_mb();
1022		rbwork->full_waiters_pending = true;
1023		return 0;
1024	}
1025
1026	poll_wait(filp, &rbwork->waiters, poll_table);
1027	rbwork->waiters_pending = true;
1028
1029	/*
1030	 * There's a tight race between setting the waiters_pending and
1031	 * checking if the ring buffer is empty.  Once the waiters_pending bit
1032	 * is set, the next event will wake the task up, but we can get stuck
1033	 * if there's only a single event in.
1034	 *
1035	 * FIXME: Ideally, we need a memory barrier on the writer side as well,
1036	 * but adding a memory barrier to all events will cause too much of a
1037	 * performance hit in the fast path.  We only need a memory barrier when
1038	 * the buffer goes from empty to having content.  But as this race is
1039	 * extremely small, and it's not a problem if another event comes in, we
1040	 * will fix it later.
1041	 */
1042	smp_mb();
1043
1044	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1045	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
1046		return EPOLLIN | EPOLLRDNORM;
1047	return 0;
1048}
1049
1050/* buffer may be either ring_buffer or ring_buffer_per_cpu */
1051#define RB_WARN_ON(b, cond)						\
1052	({								\
1053		int _____ret = unlikely(cond);				\
1054		if (_____ret) {						\
1055			if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1056				struct ring_buffer_per_cpu *__b =	\
1057					(void *)b;			\
1058				atomic_inc(&__b->buffer->record_disabled); \
1059			} else						\
1060				atomic_inc(&b->record_disabled);	\
1061			WARN_ON(1);					\
1062		}							\
1063		_____ret;						\
1064	})
1065
1066/* Up this if you want to test the TIME_EXTENTS and normalization */
1067#define DEBUG_SHIFT 0
1068
1069static inline u64 rb_time_stamp(struct trace_buffer *buffer)
1070{
1071	u64 ts;
1072
1073	/* Skip retpolines :-( */
1074	if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local))
1075		ts = trace_clock_local();
1076	else
1077		ts = buffer->clock();
1078
1079	/* shift to debug/test normalization and TIME_EXTENTS */
1080	return ts << DEBUG_SHIFT;
1081}
1082
1083u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
1084{
1085	u64 time;
1086
1087	preempt_disable_notrace();
1088	time = rb_time_stamp(buffer);
1089	preempt_enable_notrace();
1090
1091	return time;
1092}
1093EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1094
1095void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
1096				      int cpu, u64 *ts)
1097{
1098	/* Just stupid testing the normalize function and deltas */
1099	*ts >>= DEBUG_SHIFT;
1100}
1101EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1102
1103/*
1104 * Making the ring buffer lockless makes things tricky.
1105 * Although writes only happen on the CPU that they are on,
1106 * and they only need to worry about interrupts. Reads can
1107 * happen on any CPU.
1108 *
1109 * The reader page is always off the ring buffer, but when the
1110 * reader finishes with a page, it needs to swap its page with
1111 * a new one from the buffer. The reader needs to take from
1112 * the head (writes go to the tail). But if a writer is in overwrite
1113 * mode and wraps, it must push the head page forward.
1114 *
1115 * Here lies the problem.
1116 *
1117 * The reader must be careful to replace only the head page, and
1118 * not another one. As described at the top of the file in the
1119 * ASCII art, the reader sets its old page to point to the next
1120 * page after head. It then sets the page after head to point to
1121 * the old reader page. But if the writer moves the head page
1122 * during this operation, the reader could end up with the tail.
1123 *
1124 * We use cmpxchg to help prevent this race. We also do something
1125 * special with the page before head. We set the LSB to 1.
1126 *
1127 * When the writer must push the page forward, it will clear the
1128 * bit that points to the head page, move the head, and then set
1129 * the bit that points to the new head page.
1130 *
1131 * We also don't want an interrupt coming in and moving the head
1132 * page on another writer. Thus we use the second LSB to catch
1133 * that too. Thus:
1134 *
1135 * head->list->prev->next        bit 1          bit 0
1136 *                              -------        -------
1137 * Normal page                     0              0
1138 * Points to head page             0              1
1139 * New head page                   1              0
1140 *
1141 * Note we can not trust the prev pointer of the head page, because:
1142 *
1143 * +----+       +-----+        +-----+
1144 * |    |------>|  T  |---X--->|  N  |
1145 * |    |<------|     |        |     |
1146 * +----+       +-----+        +-----+
1147 *   ^                           ^ |
1148 *   |          +-----+          | |
1149 *   +----------|  R  |----------+ |
1150 *              |     |<-----------+
1151 *              +-----+
1152 *
1153 * Key:  ---X-->  HEAD flag set in pointer
1154 *         T      Tail page
1155 *         R      Reader page
1156 *         N      Next page
1157 *
1158 * (see __rb_reserve_next() to see where this happens)
1159 *
1160 *  What the above shows is that the reader just swapped out
1161 *  the reader page with a page in the buffer, but before it
1162 *  could make the new header point back to the new page added
1163 *  it was preempted by a writer. The writer moved forward onto
1164 *  the new page added by the reader and is about to move forward
1165 *  again.
1166 *
1167 *  You can see, it is legitimate for the previous pointer of
1168 *  the head (or any page) not to point back to itself. But only
1169 *  temporarily.
1170 */
1171
1172#define RB_PAGE_NORMAL		0UL
1173#define RB_PAGE_HEAD		1UL
1174#define RB_PAGE_UPDATE		2UL
1175
1176
1177#define RB_FLAG_MASK		3UL
1178
1179/* PAGE_MOVED is not part of the mask */
1180#define RB_PAGE_MOVED		4UL
1181
1182/*
1183 * rb_list_head - remove any bit
1184 */
1185static struct list_head *rb_list_head(struct list_head *list)
1186{
1187	unsigned long val = (unsigned long)list;
1188
1189	return (struct list_head *)(val & ~RB_FLAG_MASK);
1190}
1191
1192/*
1193 * rb_is_head_page - test if the given page is the head page
1194 *
1195 * Because the reader may move the head_page pointer, we can
1196 * not trust what the head page is (it may be pointing to
1197 * the reader page). But if the next page is a header page,
1198 * its flags will be non zero.
1199 */
1200static inline int
1201rb_is_head_page(struct buffer_page *page, struct list_head *list)
1202{
1203	unsigned long val;
1204
1205	val = (unsigned long)list->next;
1206
1207	if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1208		return RB_PAGE_MOVED;
1209
1210	return val & RB_FLAG_MASK;
1211}
1212
1213/*
1214 * rb_is_reader_page
1215 *
1216 * The unique thing about the reader page, is that, if the
1217 * writer is ever on it, the previous pointer never points
1218 * back to the reader page.
1219 */
1220static bool rb_is_reader_page(struct buffer_page *page)
1221{
1222	struct list_head *list = page->list.prev;
1223
1224	return rb_list_head(list->next) != &page->list;
1225}
1226
1227/*
1228 * rb_set_list_to_head - set a list_head to be pointing to head.
1229 */
1230static void rb_set_list_to_head(struct list_head *list)
1231{
1232	unsigned long *ptr;
1233
1234	ptr = (unsigned long *)&list->next;
1235	*ptr |= RB_PAGE_HEAD;
1236	*ptr &= ~RB_PAGE_UPDATE;
1237}
1238
1239/*
1240 * rb_head_page_activate - sets up head page
1241 */
1242static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1243{
1244	struct buffer_page *head;
1245
1246	head = cpu_buffer->head_page;
1247	if (!head)
1248		return;
1249
1250	/*
1251	 * Set the previous list pointer to have the HEAD flag.
1252	 */
1253	rb_set_list_to_head(head->list.prev);
1254}
1255
1256static void rb_list_head_clear(struct list_head *list)
1257{
1258	unsigned long *ptr = (unsigned long *)&list->next;
1259
1260	*ptr &= ~RB_FLAG_MASK;
1261}
1262
1263/*
1264 * rb_head_page_deactivate - clears head page ptr (for free list)
1265 */
1266static void
1267rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1268{
1269	struct list_head *hd;
1270
1271	/* Go through the whole list and clear any pointers found. */
1272	rb_list_head_clear(cpu_buffer->pages);
1273
1274	list_for_each(hd, cpu_buffer->pages)
1275		rb_list_head_clear(hd);
1276}
1277
1278static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1279			    struct buffer_page *head,
1280			    struct buffer_page *prev,
1281			    int old_flag, int new_flag)
1282{
1283	struct list_head *list;
1284	unsigned long val = (unsigned long)&head->list;
1285	unsigned long ret;
1286
1287	list = &prev->list;
1288
1289	val &= ~RB_FLAG_MASK;
1290
1291	ret = cmpxchg((unsigned long *)&list->next,
1292		      val | old_flag, val | new_flag);
1293
1294	/* check if the reader took the page */
1295	if ((ret & ~RB_FLAG_MASK) != val)
1296		return RB_PAGE_MOVED;
1297
1298	return ret & RB_FLAG_MASK;
1299}
1300
1301static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1302				   struct buffer_page *head,
1303				   struct buffer_page *prev,
1304				   int old_flag)
1305{
1306	return rb_head_page_set(cpu_buffer, head, prev,
1307				old_flag, RB_PAGE_UPDATE);
1308}
1309
1310static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1311				 struct buffer_page *head,
1312				 struct buffer_page *prev,
1313				 int old_flag)
1314{
1315	return rb_head_page_set(cpu_buffer, head, prev,
1316				old_flag, RB_PAGE_HEAD);
1317}
1318
1319static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1320				   struct buffer_page *head,
1321				   struct buffer_page *prev,
1322				   int old_flag)
1323{
1324	return rb_head_page_set(cpu_buffer, head, prev,
1325				old_flag, RB_PAGE_NORMAL);
1326}
1327
1328static inline void rb_inc_page(struct buffer_page **bpage)
1329{
1330	struct list_head *p = rb_list_head((*bpage)->list.next);
1331
1332	*bpage = list_entry(p, struct buffer_page, list);
1333}
1334
1335static struct buffer_page *
1336rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1337{
1338	struct buffer_page *head;
1339	struct buffer_page *page;
1340	struct list_head *list;
1341	int i;
1342
1343	if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1344		return NULL;
1345
1346	/* sanity check */
1347	list = cpu_buffer->pages;
1348	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1349		return NULL;
1350
1351	page = head = cpu_buffer->head_page;
1352	/*
1353	 * It is possible that the writer moves the header behind
1354	 * where we started, and we miss in one loop.
1355	 * A second loop should grab the header, but we'll do
1356	 * three loops just because I'm paranoid.
1357	 */
1358	for (i = 0; i < 3; i++) {
1359		do {
1360			if (rb_is_head_page(page, page->list.prev)) {
1361				cpu_buffer->head_page = page;
1362				return page;
1363			}
1364			rb_inc_page(&page);
1365		} while (page != head);
1366	}
1367
1368	RB_WARN_ON(cpu_buffer, 1);
1369
1370	return NULL;
1371}
1372
1373static bool rb_head_page_replace(struct buffer_page *old,
1374				struct buffer_page *new)
1375{
1376	unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1377	unsigned long val;
1378
1379	val = *ptr & ~RB_FLAG_MASK;
1380	val |= RB_PAGE_HEAD;
1381
1382	return try_cmpxchg(ptr, &val, (unsigned long)&new->list);
1383}
1384
1385/*
1386 * rb_tail_page_update - move the tail page forward
1387 */
1388static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1389			       struct buffer_page *tail_page,
1390			       struct buffer_page *next_page)
1391{
1392	unsigned long old_entries;
1393	unsigned long old_write;
1394
1395	/*
1396	 * The tail page now needs to be moved forward.
1397	 *
1398	 * We need to reset the tail page, but without messing
1399	 * with possible erasing of data brought in by interrupts
1400	 * that have moved the tail page and are currently on it.
1401	 *
1402	 * We add a counter to the write field to denote this.
1403	 */
1404	old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1405	old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1406
1407	/*
1408	 * Just make sure we have seen our old_write and synchronize
1409	 * with any interrupts that come in.
1410	 */
1411	barrier();
1412
1413	/*
1414	 * If the tail page is still the same as what we think
1415	 * it is, then it is up to us to update the tail
1416	 * pointer.
1417	 */
1418	if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1419		/* Zero the write counter */
1420		unsigned long val = old_write & ~RB_WRITE_MASK;
1421		unsigned long eval = old_entries & ~RB_WRITE_MASK;
1422
1423		/*
1424		 * This will only succeed if an interrupt did
1425		 * not come in and change it. In which case, we
1426		 * do not want to modify it.
1427		 *
1428		 * We add (void) to let the compiler know that we do not care
1429		 * about the return value of these functions. We use the
1430		 * cmpxchg to only update if an interrupt did not already
1431		 * do it for us. If the cmpxchg fails, we don't care.
1432		 */
1433		(void)local_cmpxchg(&next_page->write, old_write, val);
1434		(void)local_cmpxchg(&next_page->entries, old_entries, eval);
1435
1436		/*
1437		 * No need to worry about races with clearing out the commit.
1438		 * it only can increment when a commit takes place. But that
1439		 * only happens in the outer most nested commit.
1440		 */
1441		local_set(&next_page->page->commit, 0);
1442
1443		/* Either we update tail_page or an interrupt does */
1444		if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
1445			local_inc(&cpu_buffer->pages_touched);
1446	}
1447}
1448
1449static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1450			  struct buffer_page *bpage)
1451{
1452	unsigned long val = (unsigned long)bpage;
1453
1454	RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
1455}
1456
1457/**
1458 * rb_check_pages - integrity check of buffer pages
1459 * @cpu_buffer: CPU buffer with pages to test
1460 *
1461 * As a safety measure we check to make sure the data pages have not
1462 * been corrupted.
1463 *
1464 * Callers of this function need to guarantee that the list of pages doesn't get
1465 * modified during the check. In particular, if it's possible that the function
1466 * is invoked with concurrent readers which can swap in a new reader page then
1467 * the caller should take cpu_buffer->reader_lock.
1468 */
1469static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1470{
1471	struct list_head *head = rb_list_head(cpu_buffer->pages);
1472	struct list_head *tmp;
1473
1474	if (RB_WARN_ON(cpu_buffer,
1475			rb_list_head(rb_list_head(head->next)->prev) != head))
1476		return;
1477
1478	if (RB_WARN_ON(cpu_buffer,
1479			rb_list_head(rb_list_head(head->prev)->next) != head))
1480		return;
1481
1482	for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
1483		if (RB_WARN_ON(cpu_buffer,
1484				rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
1485			return;
1486
1487		if (RB_WARN_ON(cpu_buffer,
1488				rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
1489			return;
1490	}
1491}
1492
1493static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1494		long nr_pages, struct list_head *pages)
1495{
1496	struct buffer_page *bpage, *tmp;
1497	bool user_thread = current->mm != NULL;
1498	gfp_t mflags;
1499	long i;
1500
1501	/*
1502	 * Check if the available memory is there first.
1503	 * Note, si_mem_available() only gives us a rough estimate of available
1504	 * memory. It may not be accurate. But we don't care, we just want
1505	 * to prevent doing any allocation when it is obvious that it is
1506	 * not going to succeed.
1507	 */
1508	i = si_mem_available();
1509	if (i < nr_pages)
1510		return -ENOMEM;
1511
1512	/*
1513	 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1514	 * gracefully without invoking oom-killer and the system is not
1515	 * destabilized.
1516	 */
1517	mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1518
1519	/*
1520	 * If a user thread allocates too much, and si_mem_available()
1521	 * reports there's enough memory, even though there is not.
1522	 * Make sure the OOM killer kills this thread. This can happen
1523	 * even with RETRY_MAYFAIL because another task may be doing
1524	 * an allocation after this task has taken all memory.
1525	 * This is the task the OOM killer needs to take out during this
1526	 * loop, even if it was triggered by an allocation somewhere else.
1527	 */
1528	if (user_thread)
1529		set_current_oom_origin();
1530	for (i = 0; i < nr_pages; i++) {
1531		struct page *page;
1532
1533		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1534				    mflags, cpu_to_node(cpu_buffer->cpu));
1535		if (!bpage)
1536			goto free_pages;
1537
1538		rb_check_bpage(cpu_buffer, bpage);
1539
1540		list_add(&bpage->list, pages);
1541
1542		page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
1543					mflags | __GFP_COMP | __GFP_ZERO,
1544					cpu_buffer->buffer->subbuf_order);
1545		if (!page)
1546			goto free_pages;
1547		bpage->page = page_address(page);
1548		bpage->order = cpu_buffer->buffer->subbuf_order;
1549		rb_init_page(bpage->page);
1550
1551		if (user_thread && fatal_signal_pending(current))
1552			goto free_pages;
1553	}
1554	if (user_thread)
1555		clear_current_oom_origin();
1556
1557	return 0;
1558
1559free_pages:
1560	list_for_each_entry_safe(bpage, tmp, pages, list) {
1561		list_del_init(&bpage->list);
1562		free_buffer_page(bpage);
1563	}
1564	if (user_thread)
1565		clear_current_oom_origin();
1566
1567	return -ENOMEM;
1568}
1569
1570static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1571			     unsigned long nr_pages)
1572{
1573	LIST_HEAD(pages);
1574
1575	WARN_ON(!nr_pages);
1576
1577	if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
1578		return -ENOMEM;
1579
1580	/*
1581	 * The ring buffer page list is a circular list that does not
1582	 * start and end with a list head. All page list items point to
1583	 * other pages.
1584	 */
1585	cpu_buffer->pages = pages.next;
1586	list_del(&pages);
1587
1588	cpu_buffer->nr_pages = nr_pages;
1589
1590	rb_check_pages(cpu_buffer);
1591
1592	return 0;
1593}
1594
1595static struct ring_buffer_per_cpu *
1596rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
1597{
1598	struct ring_buffer_per_cpu *cpu_buffer;
1599	struct buffer_page *bpage;
1600	struct page *page;
1601	int ret;
1602
1603	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1604				  GFP_KERNEL, cpu_to_node(cpu));
1605	if (!cpu_buffer)
1606		return NULL;
1607
1608	cpu_buffer->cpu = cpu;
1609	cpu_buffer->buffer = buffer;
1610	raw_spin_lock_init(&cpu_buffer->reader_lock);
1611	lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1612	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1613	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1614	init_completion(&cpu_buffer->update_done);
1615	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1616	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1617	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1618	mutex_init(&cpu_buffer->mapping_lock);
1619
1620	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1621			    GFP_KERNEL, cpu_to_node(cpu));
1622	if (!bpage)
1623		goto fail_free_buffer;
1624
1625	rb_check_bpage(cpu_buffer, bpage);
1626
1627	cpu_buffer->reader_page = bpage;
1628
1629	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
1630				cpu_buffer->buffer->subbuf_order);
1631	if (!page)
1632		goto fail_free_reader;
1633	bpage->page = page_address(page);
1634	rb_init_page(bpage->page);
1635
1636	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1637	INIT_LIST_HEAD(&cpu_buffer->new_pages);
1638
1639	ret = rb_allocate_pages(cpu_buffer, nr_pages);
1640	if (ret < 0)
1641		goto fail_free_reader;
1642
1643	cpu_buffer->head_page
1644		= list_entry(cpu_buffer->pages, struct buffer_page, list);
1645	cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1646
1647	rb_head_page_activate(cpu_buffer);
1648
1649	return cpu_buffer;
1650
1651 fail_free_reader:
1652	free_buffer_page(cpu_buffer->reader_page);
1653
1654 fail_free_buffer:
1655	kfree(cpu_buffer);
1656	return NULL;
1657}
1658
1659static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1660{
1661	struct list_head *head = cpu_buffer->pages;
1662	struct buffer_page *bpage, *tmp;
1663
1664	irq_work_sync(&cpu_buffer->irq_work.work);
1665
1666	free_buffer_page(cpu_buffer->reader_page);
1667
1668	if (head) {
1669		rb_head_page_deactivate(cpu_buffer);
1670
1671		list_for_each_entry_safe(bpage, tmp, head, list) {
1672			list_del_init(&bpage->list);
1673			free_buffer_page(bpage);
1674		}
1675		bpage = list_entry(head, struct buffer_page, list);
1676		free_buffer_page(bpage);
1677	}
1678
1679	free_page((unsigned long)cpu_buffer->free_page);
1680
1681	kfree(cpu_buffer);
1682}
1683
1684/**
1685 * __ring_buffer_alloc - allocate a new ring_buffer
1686 * @size: the size in bytes per cpu that is needed.
1687 * @flags: attributes to set for the ring buffer.
1688 * @key: ring buffer reader_lock_key.
1689 *
1690 * Currently the only flag that is available is the RB_FL_OVERWRITE
1691 * flag. This flag means that the buffer will overwrite old data
1692 * when the buffer wraps. If this flag is not set, the buffer will
1693 * drop data when the tail hits the head.
1694 */
1695struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1696					struct lock_class_key *key)
1697{
1698	struct trace_buffer *buffer;
1699	long nr_pages;
1700	int bsize;
1701	int cpu;
1702	int ret;
1703
1704	/* keep it in its own cache line */
1705	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1706			 GFP_KERNEL);
1707	if (!buffer)
1708		return NULL;
1709
1710	if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1711		goto fail_free_buffer;
1712
1713	/* Default buffer page size - one system page */
1714	buffer->subbuf_order = 0;
1715	buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
1716
1717	/* Max payload is buffer page size - header (8bytes) */
1718	buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
1719
1720	nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
1721	buffer->flags = flags;
1722	buffer->clock = trace_clock_local;
1723	buffer->reader_lock_key = key;
1724
1725	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1726	init_waitqueue_head(&buffer->irq_work.waiters);
1727
1728	/* need at least two pages */
1729	if (nr_pages < 2)
1730		nr_pages = 2;
1731
1732	buffer->cpus = nr_cpu_ids;
1733
1734	bsize = sizeof(void *) * nr_cpu_ids;
1735	buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1736				  GFP_KERNEL);
1737	if (!buffer->buffers)
1738		goto fail_free_cpumask;
1739
1740	cpu = raw_smp_processor_id();
1741	cpumask_set_cpu(cpu, buffer->cpumask);
1742	buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1743	if (!buffer->buffers[cpu])
1744		goto fail_free_buffers;
1745
1746	ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1747	if (ret < 0)
1748		goto fail_free_buffers;
1749
1750	mutex_init(&buffer->mutex);
1751
1752	return buffer;
1753
1754 fail_free_buffers:
1755	for_each_buffer_cpu(buffer, cpu) {
1756		if (buffer->buffers[cpu])
1757			rb_free_cpu_buffer(buffer->buffers[cpu]);
1758	}
1759	kfree(buffer->buffers);
1760
1761 fail_free_cpumask:
1762	free_cpumask_var(buffer->cpumask);
1763
1764 fail_free_buffer:
1765	kfree(buffer);
1766	return NULL;
1767}
1768EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1769
1770/**
1771 * ring_buffer_free - free a ring buffer.
1772 * @buffer: the buffer to free.
1773 */
1774void
1775ring_buffer_free(struct trace_buffer *buffer)
1776{
1777	int cpu;
1778
1779	cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1780
1781	irq_work_sync(&buffer->irq_work.work);
1782
1783	for_each_buffer_cpu(buffer, cpu)
1784		rb_free_cpu_buffer(buffer->buffers[cpu]);
1785
1786	kfree(buffer->buffers);
1787	free_cpumask_var(buffer->cpumask);
1788
1789	kfree(buffer);
1790}
1791EXPORT_SYMBOL_GPL(ring_buffer_free);
1792
1793void ring_buffer_set_clock(struct trace_buffer *buffer,
1794			   u64 (*clock)(void))
1795{
1796	buffer->clock = clock;
1797}
1798
1799void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
1800{
1801	buffer->time_stamp_abs = abs;
1802}
1803
1804bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
1805{
1806	return buffer->time_stamp_abs;
1807}
1808
1809static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1810{
1811	return local_read(&bpage->entries) & RB_WRITE_MASK;
1812}
1813
1814static inline unsigned long rb_page_write(struct buffer_page *bpage)
1815{
1816	return local_read(&bpage->write) & RB_WRITE_MASK;
1817}
1818
1819static bool
1820rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1821{
1822	struct list_head *tail_page, *to_remove, *next_page;
1823	struct buffer_page *to_remove_page, *tmp_iter_page;
1824	struct buffer_page *last_page, *first_page;
1825	unsigned long nr_removed;
1826	unsigned long head_bit;
1827	int page_entries;
1828
1829	head_bit = 0;
1830
1831	raw_spin_lock_irq(&cpu_buffer->reader_lock);
1832	atomic_inc(&cpu_buffer->record_disabled);
1833	/*
1834	 * We don't race with the readers since we have acquired the reader
1835	 * lock. We also don't race with writers after disabling recording.
1836	 * This makes it easy to figure out the first and the last page to be
1837	 * removed from the list. We unlink all the pages in between including
1838	 * the first and last pages. This is done in a busy loop so that we
1839	 * lose the least number of traces.
1840	 * The pages are freed after we restart recording and unlock readers.
1841	 */
1842	tail_page = &cpu_buffer->tail_page->list;
1843
1844	/*
1845	 * tail page might be on reader page, we remove the next page
1846	 * from the ring buffer
1847	 */
1848	if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1849		tail_page = rb_list_head(tail_page->next);
1850	to_remove = tail_page;
1851
1852	/* start of pages to remove */
1853	first_page = list_entry(rb_list_head(to_remove->next),
1854				struct buffer_page, list);
1855
1856	for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1857		to_remove = rb_list_head(to_remove)->next;
1858		head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1859	}
1860	/* Read iterators need to reset themselves when some pages removed */
1861	cpu_buffer->pages_removed += nr_removed;
1862
1863	next_page = rb_list_head(to_remove)->next;
1864
1865	/*
1866	 * Now we remove all pages between tail_page and next_page.
1867	 * Make sure that we have head_bit value preserved for the
1868	 * next page
1869	 */
1870	tail_page->next = (struct list_head *)((unsigned long)next_page |
1871						head_bit);
1872	next_page = rb_list_head(next_page);
1873	next_page->prev = tail_page;
1874
1875	/* make sure pages points to a valid page in the ring buffer */
1876	cpu_buffer->pages = next_page;
1877
1878	/* update head page */
1879	if (head_bit)
1880		cpu_buffer->head_page = list_entry(next_page,
1881						struct buffer_page, list);
1882
1883	/* pages are removed, resume tracing and then free the pages */
1884	atomic_dec(&cpu_buffer->record_disabled);
1885	raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1886
1887	RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1888
1889	/* last buffer page to remove */
1890	last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1891				list);
1892	tmp_iter_page = first_page;
1893
1894	do {
1895		cond_resched();
1896
1897		to_remove_page = tmp_iter_page;
1898		rb_inc_page(&tmp_iter_page);
1899
1900		/* update the counters */
1901		page_entries = rb_page_entries(to_remove_page);
1902		if (page_entries) {
1903			/*
1904			 * If something was added to this page, it was full
1905			 * since it is not the tail page. So we deduct the
1906			 * bytes consumed in ring buffer from here.
1907			 * Increment overrun to account for the lost events.
1908			 */
1909			local_add(page_entries, &cpu_buffer->overrun);
1910			local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
1911			local_inc(&cpu_buffer->pages_lost);
1912		}
1913
1914		/*
1915		 * We have already removed references to this list item, just
1916		 * free up the buffer_page and its page
1917		 */
1918		free_buffer_page(to_remove_page);
1919		nr_removed--;
1920
1921	} while (to_remove_page != last_page);
1922
1923	RB_WARN_ON(cpu_buffer, nr_removed);
1924
1925	return nr_removed == 0;
1926}
1927
1928static bool
1929rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1930{
1931	struct list_head *pages = &cpu_buffer->new_pages;
1932	unsigned long flags;
1933	bool success;
1934	int retries;
1935
1936	/* Can be called at early boot up, where interrupts must not been enabled */
1937	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1938	/*
1939	 * We are holding the reader lock, so the reader page won't be swapped
1940	 * in the ring buffer. Now we are racing with the writer trying to
1941	 * move head page and the tail page.
1942	 * We are going to adapt the reader page update process where:
1943	 * 1. We first splice the start and end of list of new pages between
1944	 *    the head page and its previous page.
1945	 * 2. We cmpxchg the prev_page->next to point from head page to the
1946	 *    start of new pages list.
1947	 * 3. Finally, we update the head->prev to the end of new list.
1948	 *
1949	 * We will try this process 10 times, to make sure that we don't keep
1950	 * spinning.
1951	 */
1952	retries = 10;
1953	success = false;
1954	while (retries--) {
1955		struct list_head *head_page, *prev_page;
1956		struct list_head *last_page, *first_page;
1957		struct list_head *head_page_with_bit;
1958		struct buffer_page *hpage = rb_set_head_page(cpu_buffer);
1959
1960		if (!hpage)
1961			break;
1962		head_page = &hpage->list;
1963		prev_page = head_page->prev;
1964
1965		first_page = pages->next;
1966		last_page  = pages->prev;
1967
1968		head_page_with_bit = (struct list_head *)
1969				     ((unsigned long)head_page | RB_PAGE_HEAD);
1970
1971		last_page->next = head_page_with_bit;
1972		first_page->prev = prev_page;
1973
1974		/* caution: head_page_with_bit gets updated on cmpxchg failure */
1975		if (try_cmpxchg(&prev_page->next,
1976				&head_page_with_bit, first_page)) {
1977			/*
1978			 * yay, we replaced the page pointer to our new list,
1979			 * now, we just have to update to head page's prev
1980			 * pointer to point to end of list
1981			 */
1982			head_page->prev = last_page;
1983			success = true;
1984			break;
1985		}
1986	}
1987
1988	if (success)
1989		INIT_LIST_HEAD(pages);
1990	/*
1991	 * If we weren't successful in adding in new pages, warn and stop
1992	 * tracing
1993	 */
1994	RB_WARN_ON(cpu_buffer, !success);
1995	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1996
1997	/* free pages if they weren't inserted */
1998	if (!success) {
1999		struct buffer_page *bpage, *tmp;
2000		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2001					 list) {
2002			list_del_init(&bpage->list);
2003			free_buffer_page(bpage);
2004		}
2005	}
2006	return success;
2007}
2008
2009static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
2010{
2011	bool success;
2012
2013	if (cpu_buffer->nr_pages_to_update > 0)
2014		success = rb_insert_pages(cpu_buffer);
2015	else
2016		success = rb_remove_pages(cpu_buffer,
2017					-cpu_buffer->nr_pages_to_update);
2018
2019	if (success)
2020		cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
2021}
2022
2023static void update_pages_handler(struct work_struct *work)
2024{
2025	struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2026			struct ring_buffer_per_cpu, update_pages_work);
2027	rb_update_pages(cpu_buffer);
2028	complete(&cpu_buffer->update_done);
2029}
2030
2031/**
2032 * ring_buffer_resize - resize the ring buffer
2033 * @buffer: the buffer to resize.
2034 * @size: the new size.
2035 * @cpu_id: the cpu buffer to resize
2036 *
2037 * Minimum size is 2 * buffer->subbuf_size.
2038 *
2039 * Returns 0 on success and < 0 on failure.
2040 */
2041int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
2042			int cpu_id)
2043{
2044	struct ring_buffer_per_cpu *cpu_buffer;
2045	unsigned long nr_pages;
2046	int cpu, err;
2047
2048	/*
2049	 * Always succeed at resizing a non-existent buffer:
2050	 */
2051	if (!buffer)
2052		return 0;
2053
2054	/* Make sure the requested buffer exists */
2055	if (cpu_id != RING_BUFFER_ALL_CPUS &&
2056	    !cpumask_test_cpu(cpu_id, buffer->cpumask))
2057		return 0;
2058
2059	nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
2060
2061	/* we need a minimum of two pages */
2062	if (nr_pages < 2)
2063		nr_pages = 2;
2064
2065	/* prevent another thread from changing buffer sizes */
2066	mutex_lock(&buffer->mutex);
2067	atomic_inc(&buffer->resizing);
2068
2069	if (cpu_id == RING_BUFFER_ALL_CPUS) {
2070		/*
2071		 * Don't succeed if resizing is disabled, as a reader might be
2072		 * manipulating the ring buffer and is expecting a sane state while
2073		 * this is true.
2074		 */
2075		for_each_buffer_cpu(buffer, cpu) {
2076			cpu_buffer = buffer->buffers[cpu];
2077			if (atomic_read(&cpu_buffer->resize_disabled)) {
2078				err = -EBUSY;
2079				goto out_err_unlock;
2080			}
2081		}
2082
2083		/* calculate the pages to update */
2084		for_each_buffer_cpu(buffer, cpu) {
2085			cpu_buffer = buffer->buffers[cpu];
2086
2087			cpu_buffer->nr_pages_to_update = nr_pages -
2088							cpu_buffer->nr_pages;
2089			/*
2090			 * nothing more to do for removing pages or no update
2091			 */
2092			if (cpu_buffer->nr_pages_to_update <= 0)
2093				continue;
2094			/*
2095			 * to add pages, make sure all new pages can be
2096			 * allocated without receiving ENOMEM
2097			 */
2098			INIT_LIST_HEAD(&cpu_buffer->new_pages);
2099			if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2100						&cpu_buffer->new_pages)) {
2101				/* not enough memory for new pages */
2102				err = -ENOMEM;
2103				goto out_err;
2104			}
2105
2106			cond_resched();
2107		}
2108
2109		cpus_read_lock();
2110		/*
2111		 * Fire off all the required work handlers
2112		 * We can't schedule on offline CPUs, but it's not necessary
2113		 * since we can change their buffer sizes without any race.
2114		 */
2115		for_each_buffer_cpu(buffer, cpu) {
2116			cpu_buffer = buffer->buffers[cpu];
2117			if (!cpu_buffer->nr_pages_to_update)
2118				continue;
2119
2120			/* Can't run something on an offline CPU. */
2121			if (!cpu_online(cpu)) {
2122				rb_update_pages(cpu_buffer);
2123				cpu_buffer->nr_pages_to_update = 0;
2124			} else {
2125				/* Run directly if possible. */
2126				migrate_disable();
2127				if (cpu != smp_processor_id()) {
2128					migrate_enable();
2129					schedule_work_on(cpu,
2130							 &cpu_buffer->update_pages_work);
2131				} else {
2132					update_pages_handler(&cpu_buffer->update_pages_work);
2133					migrate_enable();
2134				}
2135			}
2136		}
2137
2138		/* wait for all the updates to complete */
2139		for_each_buffer_cpu(buffer, cpu) {
2140			cpu_buffer = buffer->buffers[cpu];
2141			if (!cpu_buffer->nr_pages_to_update)
2142				continue;
2143
2144			if (cpu_online(cpu))
2145				wait_for_completion(&cpu_buffer->update_done);
2146			cpu_buffer->nr_pages_to_update = 0;
2147		}
2148
2149		cpus_read_unlock();
2150	} else {
2151		cpu_buffer = buffer->buffers[cpu_id];
2152
2153		if (nr_pages == cpu_buffer->nr_pages)
2154			goto out;
2155
2156		/*
2157		 * Don't succeed if resizing is disabled, as a reader might be
2158		 * manipulating the ring buffer and is expecting a sane state while
2159		 * this is true.
2160		 */
2161		if (atomic_read(&cpu_buffer->resize_disabled)) {
2162			err = -EBUSY;
2163			goto out_err_unlock;
2164		}
2165
2166		cpu_buffer->nr_pages_to_update = nr_pages -
2167						cpu_buffer->nr_pages;
2168
2169		INIT_LIST_HEAD(&cpu_buffer->new_pages);
2170		if (cpu_buffer->nr_pages_to_update > 0 &&
2171			__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2172					    &cpu_buffer->new_pages)) {
2173			err = -ENOMEM;
2174			goto out_err;
2175		}
2176
2177		cpus_read_lock();
2178
2179		/* Can't run something on an offline CPU. */
2180		if (!cpu_online(cpu_id))
2181			rb_update_pages(cpu_buffer);
2182		else {
2183			/* Run directly if possible. */
2184			migrate_disable();
2185			if (cpu_id == smp_processor_id()) {
2186				rb_update_pages(cpu_buffer);
2187				migrate_enable();
2188			} else {
2189				migrate_enable();
2190				schedule_work_on(cpu_id,
2191						 &cpu_buffer->update_pages_work);
2192				wait_for_completion(&cpu_buffer->update_done);
2193			}
2194		}
2195
2196		cpu_buffer->nr_pages_to_update = 0;
2197		cpus_read_unlock();
2198	}
2199
2200 out:
2201	/*
2202	 * The ring buffer resize can happen with the ring buffer
2203	 * enabled, so that the update disturbs the tracing as little
2204	 * as possible. But if the buffer is disabled, we do not need
2205	 * to worry about that, and we can take the time to verify
2206	 * that the buffer is not corrupt.
2207	 */
2208	if (atomic_read(&buffer->record_disabled)) {
2209		atomic_inc(&buffer->record_disabled);
2210		/*
2211		 * Even though the buffer was disabled, we must make sure
2212		 * that it is truly disabled before calling rb_check_pages.
2213		 * There could have been a race between checking
2214		 * record_disable and incrementing it.
2215		 */
2216		synchronize_rcu();
2217		for_each_buffer_cpu(buffer, cpu) {
2218			unsigned long flags;
2219
2220			cpu_buffer = buffer->buffers[cpu];
2221			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2222			rb_check_pages(cpu_buffer);
2223			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2224		}
2225		atomic_dec(&buffer->record_disabled);
2226	}
2227
2228	atomic_dec(&buffer->resizing);
2229	mutex_unlock(&buffer->mutex);
2230	return 0;
2231
2232 out_err:
2233	for_each_buffer_cpu(buffer, cpu) {
2234		struct buffer_page *bpage, *tmp;
2235
2236		cpu_buffer = buffer->buffers[cpu];
2237		cpu_buffer->nr_pages_to_update = 0;
2238
2239		if (list_empty(&cpu_buffer->new_pages))
2240			continue;
2241
2242		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2243					list) {
2244			list_del_init(&bpage->list);
2245			free_buffer_page(bpage);
2246		}
2247	}
2248 out_err_unlock:
2249	atomic_dec(&buffer->resizing);
2250	mutex_unlock(&buffer->mutex);
2251	return err;
2252}
2253EXPORT_SYMBOL_GPL(ring_buffer_resize);
2254
2255void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
2256{
2257	mutex_lock(&buffer->mutex);
2258	if (val)
2259		buffer->flags |= RB_FL_OVERWRITE;
2260	else
2261		buffer->flags &= ~RB_FL_OVERWRITE;
2262	mutex_unlock(&buffer->mutex);
2263}
2264EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2265
2266static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2267{
2268	return bpage->page->data + index;
2269}
2270
2271static __always_inline struct ring_buffer_event *
2272rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2273{
2274	return __rb_page_index(cpu_buffer->reader_page,
2275			       cpu_buffer->reader_page->read);
2276}
2277
2278static struct ring_buffer_event *
2279rb_iter_head_event(struct ring_buffer_iter *iter)
2280{
2281	struct ring_buffer_event *event;
2282	struct buffer_page *iter_head_page = iter->head_page;
2283	unsigned long commit;
2284	unsigned length;
2285
2286	if (iter->head != iter->next_event)
2287		return iter->event;
2288
2289	/*
2290	 * When the writer goes across pages, it issues a cmpxchg which
2291	 * is a mb(), which will synchronize with the rmb here.
2292	 * (see rb_tail_page_update() and __rb_reserve_next())
2293	 */
2294	commit = rb_page_commit(iter_head_page);
2295	smp_rmb();
2296
2297	/* An event needs to be at least 8 bytes in size */
2298	if (iter->head > commit - 8)
2299		goto reset;
2300
2301	event = __rb_page_index(iter_head_page, iter->head);
2302	length = rb_event_length(event);
2303
2304	/*
2305	 * READ_ONCE() doesn't work on functions and we don't want the
2306	 * compiler doing any crazy optimizations with length.
2307	 */
2308	barrier();
2309
2310	if ((iter->head + length) > commit || length > iter->event_size)
2311		/* Writer corrupted the read? */
2312		goto reset;
2313
2314	memcpy(iter->event, event, length);
2315	/*
2316	 * If the page stamp is still the same after this rmb() then the
2317	 * event was safely copied without the writer entering the page.
2318	 */
2319	smp_rmb();
2320
2321	/* Make sure the page didn't change since we read this */
2322	if (iter->page_stamp != iter_head_page->page->time_stamp ||
2323	    commit > rb_page_commit(iter_head_page))
2324		goto reset;
2325
2326	iter->next_event = iter->head + length;
2327	return iter->event;
2328 reset:
2329	/* Reset to the beginning */
2330	iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2331	iter->head = 0;
2332	iter->next_event = 0;
2333	iter->missed_events = 1;
2334	return NULL;
2335}
2336
2337/* Size is determined by what has been committed */
2338static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2339{
2340	return rb_page_commit(bpage) & ~RB_MISSED_MASK;
2341}
2342
2343static __always_inline unsigned
2344rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2345{
2346	return rb_page_commit(cpu_buffer->commit_page);
2347}
2348
2349static __always_inline unsigned
2350rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event)
2351{
2352	unsigned long addr = (unsigned long)event;
2353
2354	addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1;
2355
2356	return addr - BUF_PAGE_HDR_SIZE;
2357}
2358
2359static void rb_inc_iter(struct ring_buffer_iter *iter)
2360{
2361	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2362
2363	/*
2364	 * The iterator could be on the reader page (it starts there).
2365	 * But the head could have moved, since the reader was
2366	 * found. Check for this case and assign the iterator
2367	 * to the head page instead of next.
2368	 */
2369	if (iter->head_page == cpu_buffer->reader_page)
2370		iter->head_page = rb_set_head_page(cpu_buffer);
2371	else
2372		rb_inc_page(&iter->head_page);
2373
2374	iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2375	iter->head = 0;
2376	iter->next_event = 0;
2377}
2378
2379/*
2380 * rb_handle_head_page - writer hit the head page
2381 *
2382 * Returns: +1 to retry page
2383 *           0 to continue
2384 *          -1 on error
2385 */
2386static int
2387rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2388		    struct buffer_page *tail_page,
2389		    struct buffer_page *next_page)
2390{
2391	struct buffer_page *new_head;
2392	int entries;
2393	int type;
2394	int ret;
2395
2396	entries = rb_page_entries(next_page);
2397
2398	/*
2399	 * The hard part is here. We need to move the head
2400	 * forward, and protect against both readers on
2401	 * other CPUs and writers coming in via interrupts.
2402	 */
2403	type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2404				       RB_PAGE_HEAD);
2405
2406	/*
2407	 * type can be one of four:
2408	 *  NORMAL - an interrupt already moved it for us
2409	 *  HEAD   - we are the first to get here.
2410	 *  UPDATE - we are the interrupt interrupting
2411	 *           a current move.
2412	 *  MOVED  - a reader on another CPU moved the next
2413	 *           pointer to its reader page. Give up
2414	 *           and try again.
2415	 */
2416
2417	switch (type) {
2418	case RB_PAGE_HEAD:
2419		/*
2420		 * We changed the head to UPDATE, thus
2421		 * it is our responsibility to update
2422		 * the counters.
2423		 */
2424		local_add(entries, &cpu_buffer->overrun);
2425		local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
2426		local_inc(&cpu_buffer->pages_lost);
2427
2428		/*
2429		 * The entries will be zeroed out when we move the
2430		 * tail page.
2431		 */
2432
2433		/* still more to do */
2434		break;
2435
2436	case RB_PAGE_UPDATE:
2437		/*
2438		 * This is an interrupt that interrupt the
2439		 * previous update. Still more to do.
2440		 */
2441		break;
2442	case RB_PAGE_NORMAL:
2443		/*
2444		 * An interrupt came in before the update
2445		 * and processed this for us.
2446		 * Nothing left to do.
2447		 */
2448		return 1;
2449	case RB_PAGE_MOVED:
2450		/*
2451		 * The reader is on another CPU and just did
2452		 * a swap with our next_page.
2453		 * Try again.
2454		 */
2455		return 1;
2456	default:
2457		RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2458		return -1;
2459	}
2460
2461	/*
2462	 * Now that we are here, the old head pointer is
2463	 * set to UPDATE. This will keep the reader from
2464	 * swapping the head page with the reader page.
2465	 * The reader (on another CPU) will spin till
2466	 * we are finished.
2467	 *
2468	 * We just need to protect against interrupts
2469	 * doing the job. We will set the next pointer
2470	 * to HEAD. After that, we set the old pointer
2471	 * to NORMAL, but only if it was HEAD before.
2472	 * otherwise we are an interrupt, and only
2473	 * want the outer most commit to reset it.
2474	 */
2475	new_head = next_page;
2476	rb_inc_page(&new_head);
2477
2478	ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2479				    RB_PAGE_NORMAL);
2480
2481	/*
2482	 * Valid returns are:
2483	 *  HEAD   - an interrupt came in and already set it.
2484	 *  NORMAL - One of two things:
2485	 *            1) We really set it.
2486	 *            2) A bunch of interrupts came in and moved
2487	 *               the page forward again.
2488	 */
2489	switch (ret) {
2490	case RB_PAGE_HEAD:
2491	case RB_PAGE_NORMAL:
2492		/* OK */
2493		break;
2494	default:
2495		RB_WARN_ON(cpu_buffer, 1);
2496		return -1;
2497	}
2498
2499	/*
2500	 * It is possible that an interrupt came in,
2501	 * set the head up, then more interrupts came in
2502	 * and moved it again. When we get back here,
2503	 * the page would have been set to NORMAL but we
2504	 * just set it back to HEAD.
2505	 *
2506	 * How do you detect this? Well, if that happened
2507	 * the tail page would have moved.
2508	 */
2509	if (ret == RB_PAGE_NORMAL) {
2510		struct buffer_page *buffer_tail_page;
2511
2512		buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2513		/*
2514		 * If the tail had moved passed next, then we need
2515		 * to reset the pointer.
2516		 */
2517		if (buffer_tail_page != tail_page &&
2518		    buffer_tail_page != next_page)
2519			rb_head_page_set_normal(cpu_buffer, new_head,
2520						next_page,
2521						RB_PAGE_HEAD);
2522	}
2523
2524	/*
2525	 * If this was the outer most commit (the one that
2526	 * changed the original pointer from HEAD to UPDATE),
2527	 * then it is up to us to reset it to NORMAL.
2528	 */
2529	if (type == RB_PAGE_HEAD) {
2530		ret = rb_head_page_set_normal(cpu_buffer, next_page,
2531					      tail_page,
2532					      RB_PAGE_UPDATE);
2533		if (RB_WARN_ON(cpu_buffer,
2534			       ret != RB_PAGE_UPDATE))
2535			return -1;
2536	}
2537
2538	return 0;
2539}
2540
2541static inline void
2542rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2543	      unsigned long tail, struct rb_event_info *info)
2544{
2545	unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
2546	struct buffer_page *tail_page = info->tail_page;
2547	struct ring_buffer_event *event;
2548	unsigned long length = info->length;
2549
2550	/*
2551	 * Only the event that crossed the page boundary
2552	 * must fill the old tail_page with padding.
2553	 */
2554	if (tail >= bsize) {
2555		/*
2556		 * If the page was filled, then we still need
2557		 * to update the real_end. Reset it to zero
2558		 * and the reader will ignore it.
2559		 */
2560		if (tail == bsize)
2561			tail_page->real_end = 0;
2562
2563		local_sub(length, &tail_page->write);
2564		return;
2565	}
2566
2567	event = __rb_page_index(tail_page, tail);
2568
2569	/*
2570	 * Save the original length to the meta data.
2571	 * This will be used by the reader to add lost event
2572	 * counter.
2573	 */
2574	tail_page->real_end = tail;
2575
2576	/*
2577	 * If this event is bigger than the minimum size, then
2578	 * we need to be careful that we don't subtract the
2579	 * write counter enough to allow another writer to slip
2580	 * in on this page.
2581	 * We put in a discarded commit instead, to make sure
2582	 * that this space is not used again, and this space will
2583	 * not be accounted into 'entries_bytes'.
2584	 *
2585	 * If we are less than the minimum size, we don't need to
2586	 * worry about it.
2587	 */
2588	if (tail > (bsize - RB_EVNT_MIN_SIZE)) {
2589		/* No room for any events */
2590
2591		/* Mark the rest of the page with padding */
2592		rb_event_set_padding(event);
2593
2594		/* Make sure the padding is visible before the write update */
2595		smp_wmb();
2596
2597		/* Set the write back to the previous setting */
2598		local_sub(length, &tail_page->write);
2599		return;
2600	}
2601
2602	/* Put in a discarded event */
2603	event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE;
2604	event->type_len = RINGBUF_TYPE_PADDING;
2605	/* time delta must be non zero */
2606	event->time_delta = 1;
2607
2608	/* account for padding bytes */
2609	local_add(bsize - tail, &cpu_buffer->entries_bytes);
2610
2611	/* Make sure the padding is visible before the tail_page->write update */
2612	smp_wmb();
2613
2614	/* Set write to end of buffer */
2615	length = (tail + length) - bsize;
2616	local_sub(length, &tail_page->write);
2617}
2618
2619static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2620
2621/*
2622 * This is the slow path, force gcc not to inline it.
2623 */
2624static noinline struct ring_buffer_event *
2625rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2626	     unsigned long tail, struct rb_event_info *info)
2627{
2628	struct buffer_page *tail_page = info->tail_page;
2629	struct buffer_page *commit_page = cpu_buffer->commit_page;
2630	struct trace_buffer *buffer = cpu_buffer->buffer;
2631	struct buffer_page *next_page;
2632	int ret;
2633
2634	next_page = tail_page;
2635
2636	rb_inc_page(&next_page);
2637
2638	/*
2639	 * If for some reason, we had an interrupt storm that made
2640	 * it all the way around the buffer, bail, and warn
2641	 * about it.
2642	 */
2643	if (unlikely(next_page == commit_page)) {
2644		local_inc(&cpu_buffer->commit_overrun);
2645		goto out_reset;
2646	}
2647
2648	/*
2649	 * This is where the fun begins!
2650	 *
2651	 * We are fighting against races between a reader that
2652	 * could be on another CPU trying to swap its reader
2653	 * page with the buffer head.
2654	 *
2655	 * We are also fighting against interrupts coming in and
2656	 * moving the head or tail on us as well.
2657	 *
2658	 * If the next page is the head page then we have filled
2659	 * the buffer, unless the commit page is still on the
2660	 * reader page.
2661	 */
2662	if (rb_is_head_page(next_page, &tail_page->list)) {
2663
2664		/*
2665		 * If the commit is not on the reader page, then
2666		 * move the header page.
2667		 */
2668		if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2669			/*
2670			 * If we are not in overwrite mode,
2671			 * this is easy, just stop here.
2672			 */
2673			if (!(buffer->flags & RB_FL_OVERWRITE)) {
2674				local_inc(&cpu_buffer->dropped_events);
2675				goto out_reset;
2676			}
2677
2678			ret = rb_handle_head_page(cpu_buffer,
2679						  tail_page,
2680						  next_page);
2681			if (ret < 0)
2682				goto out_reset;
2683			if (ret)
2684				goto out_again;
2685		} else {
2686			/*
2687			 * We need to be careful here too. The
2688			 * commit page could still be on the reader
2689			 * page. We could have a small buffer, and
2690			 * have filled up the buffer with events
2691			 * from interrupts and such, and wrapped.
2692			 *
2693			 * Note, if the tail page is also on the
2694			 * reader_page, we let it move out.
2695			 */
2696			if (unlikely((cpu_buffer->commit_page !=
2697				      cpu_buffer->tail_page) &&
2698				     (cpu_buffer->commit_page ==
2699				      cpu_buffer->reader_page))) {
2700				local_inc(&cpu_buffer->commit_overrun);
2701				goto out_reset;
2702			}
2703		}
2704	}
2705
2706	rb_tail_page_update(cpu_buffer, tail_page, next_page);
2707
2708 out_again:
2709
2710	rb_reset_tail(cpu_buffer, tail, info);
2711
2712	/* Commit what we have for now. */
2713	rb_end_commit(cpu_buffer);
2714	/* rb_end_commit() decs committing */
2715	local_inc(&cpu_buffer->committing);
2716
2717	/* fail and let the caller try again */
2718	return ERR_PTR(-EAGAIN);
2719
2720 out_reset:
2721	/* reset write */
2722	rb_reset_tail(cpu_buffer, tail, info);
2723
2724	return NULL;
2725}
2726
2727/* Slow path */
2728static struct ring_buffer_event *
2729rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2730		  struct ring_buffer_event *event, u64 delta, bool abs)
2731{
2732	if (abs)
2733		event->type_len = RINGBUF_TYPE_TIME_STAMP;
2734	else
2735		event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2736
2737	/* Not the first event on the page, or not delta? */
2738	if (abs || rb_event_index(cpu_buffer, event)) {
2739		event->time_delta = delta & TS_MASK;
2740		event->array[0] = delta >> TS_SHIFT;
2741	} else {
2742		/* nope, just zero it */
2743		event->time_delta = 0;
2744		event->array[0] = 0;
2745	}
2746
2747	return skip_time_extend(event);
2748}
2749
2750#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2751static inline bool sched_clock_stable(void)
2752{
2753	return true;
2754}
2755#endif
2756
2757static void
2758rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2759		   struct rb_event_info *info)
2760{
2761	u64 write_stamp;
2762
2763	WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
2764		  (unsigned long long)info->delta,
2765		  (unsigned long long)info->ts,
2766		  (unsigned long long)info->before,
2767		  (unsigned long long)info->after,
2768		  (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}),
2769		  sched_clock_stable() ? "" :
2770		  "If you just came from a suspend/resume,\n"
2771		  "please switch to the trace global clock:\n"
2772		  "  echo global > /sys/kernel/tracing/trace_clock\n"
2773		  "or add trace_clock=global to the kernel command line\n");
2774}
2775
2776static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2777				      struct ring_buffer_event **event,
2778				      struct rb_event_info *info,
2779				      u64 *delta,
2780				      unsigned int *length)
2781{
2782	bool abs = info->add_timestamp &
2783		(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2784
2785	if (unlikely(info->delta > (1ULL << 59))) {
2786		/*
2787		 * Some timers can use more than 59 bits, and when a timestamp
2788		 * is added to the buffer, it will lose those bits.
2789		 */
2790		if (abs && (info->ts & TS_MSB)) {
2791			info->delta &= ABS_TS_MASK;
2792
2793		/* did the clock go backwards */
2794		} else if (info->before == info->after && info->before > info->ts) {
2795			/* not interrupted */
2796			static int once;
2797
2798			/*
2799			 * This is possible with a recalibrating of the TSC.
2800			 * Do not produce a call stack, but just report it.
2801			 */
2802			if (!once) {
2803				once++;
2804				pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2805					info->before, info->ts);
2806			}
2807		} else
2808			rb_check_timestamp(cpu_buffer, info);
2809		if (!abs)
2810			info->delta = 0;
2811	}
2812	*event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs);
2813	*length -= RB_LEN_TIME_EXTEND;
2814	*delta = 0;
2815}
2816
2817/**
2818 * rb_update_event - update event type and data
2819 * @cpu_buffer: The per cpu buffer of the @event
2820 * @event: the event to update
2821 * @info: The info to update the @event with (contains length and delta)
2822 *
2823 * Update the type and data fields of the @event. The length
2824 * is the actual size that is written to the ring buffer,
2825 * and with this, we can determine what to place into the
2826 * data field.
2827 */
2828static void
2829rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2830		struct ring_buffer_event *event,
2831		struct rb_event_info *info)
2832{
2833	unsigned length = info->length;
2834	u64 delta = info->delta;
2835	unsigned int nest = local_read(&cpu_buffer->committing) - 1;
2836
2837	if (!WARN_ON_ONCE(nest >= MAX_NEST))
2838		cpu_buffer->event_stamp[nest] = info->ts;
2839
2840	/*
2841	 * If we need to add a timestamp, then we
2842	 * add it to the start of the reserved space.
2843	 */
2844	if (unlikely(info->add_timestamp))
2845		rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
2846
2847	event->time_delta = delta;
2848	length -= RB_EVNT_HDR_SIZE;
2849	if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2850		event->type_len = 0;
2851		event->array[0] = length;
2852	} else
2853		event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2854}
2855
2856static unsigned rb_calculate_event_length(unsigned length)
2857{
2858	struct ring_buffer_event event; /* Used only for sizeof array */
2859
2860	/* zero length can cause confusions */
2861	if (!length)
2862		length++;
2863
2864	if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2865		length += sizeof(event.array[0]);
2866
2867	length += RB_EVNT_HDR_SIZE;
2868	length = ALIGN(length, RB_ARCH_ALIGNMENT);
2869
2870	/*
2871	 * In case the time delta is larger than the 27 bits for it
2872	 * in the header, we need to add a timestamp. If another
2873	 * event comes in when trying to discard this one to increase
2874	 * the length, then the timestamp will be added in the allocated
2875	 * space of this event. If length is bigger than the size needed
2876	 * for the TIME_EXTEND, then padding has to be used. The events
2877	 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2878	 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2879	 * As length is a multiple of 4, we only need to worry if it
2880	 * is 12 (RB_LEN_TIME_EXTEND + 4).
2881	 */
2882	if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2883		length += RB_ALIGNMENT;
2884
2885	return length;
2886}
2887
2888static inline bool
2889rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2890		  struct ring_buffer_event *event)
2891{
2892	unsigned long new_index, old_index;
2893	struct buffer_page *bpage;
2894	unsigned long addr;
2895
2896	new_index = rb_event_index(cpu_buffer, event);
2897	old_index = new_index + rb_event_ts_length(event);
2898	addr = (unsigned long)event;
2899	addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
2900
2901	bpage = READ_ONCE(cpu_buffer->tail_page);
2902
2903	/*
2904	 * Make sure the tail_page is still the same and
2905	 * the next write location is the end of this event
2906	 */
2907	if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2908		unsigned long write_mask =
2909			local_read(&bpage->write) & ~RB_WRITE_MASK;
2910		unsigned long event_length = rb_event_length(event);
2911
2912		/*
2913		 * For the before_stamp to be different than the write_stamp
2914		 * to make sure that the next event adds an absolute
2915		 * value and does not rely on the saved write stamp, which
2916		 * is now going to be bogus.
2917		 *
2918		 * By setting the before_stamp to zero, the next event
2919		 * is not going to use the write_stamp and will instead
2920		 * create an absolute timestamp. This means there's no
2921		 * reason to update the wirte_stamp!
2922		 */
2923		rb_time_set(&cpu_buffer->before_stamp, 0);
2924
2925		/*
2926		 * If an event were to come in now, it would see that the
2927		 * write_stamp and the before_stamp are different, and assume
2928		 * that this event just added itself before updating
2929		 * the write stamp. The interrupting event will fix the
2930		 * write stamp for us, and use an absolute timestamp.
2931		 */
2932
2933		/*
2934		 * This is on the tail page. It is possible that
2935		 * a write could come in and move the tail page
2936		 * and write to the next page. That is fine
2937		 * because we just shorten what is on this page.
2938		 */
2939		old_index += write_mask;
2940		new_index += write_mask;
2941
2942		/* caution: old_index gets updated on cmpxchg failure */
2943		if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) {
2944			/* update counters */
2945			local_sub(event_length, &cpu_buffer->entries_bytes);
2946			return true;
2947		}
2948	}
2949
2950	/* could not discard */
2951	return false;
2952}
2953
2954static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2955{
2956	local_inc(&cpu_buffer->committing);
2957	local_inc(&cpu_buffer->commits);
2958}
2959
2960static __always_inline void
2961rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2962{
2963	unsigned long max_count;
2964
2965	/*
2966	 * We only race with interrupts and NMIs on this CPU.
2967	 * If we own the commit event, then we can commit
2968	 * all others that interrupted us, since the interruptions
2969	 * are in stack format (they finish before they come
2970	 * back to us). This allows us to do a simple loop to
2971	 * assign the commit to the tail.
2972	 */
2973 again:
2974	max_count = cpu_buffer->nr_pages * 100;
2975
2976	while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2977		if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2978			return;
2979		if (RB_WARN_ON(cpu_buffer,
2980			       rb_is_reader_page(cpu_buffer->tail_page)))
2981			return;
2982		/*
2983		 * No need for a memory barrier here, as the update
2984		 * of the tail_page did it for this page.
2985		 */
2986		local_set(&cpu_buffer->commit_page->page->commit,
2987			  rb_page_write(cpu_buffer->commit_page));
2988		rb_inc_page(&cpu_buffer->commit_page);
2989		/* add barrier to keep gcc from optimizing too much */
2990		barrier();
2991	}
2992	while (rb_commit_index(cpu_buffer) !=
2993	       rb_page_write(cpu_buffer->commit_page)) {
2994
2995		/* Make sure the readers see the content of what is committed. */
2996		smp_wmb();
2997		local_set(&cpu_buffer->commit_page->page->commit,
2998			  rb_page_write(cpu_buffer->commit_page));
2999		RB_WARN_ON(cpu_buffer,
3000			   local_read(&cpu_buffer->commit_page->page->commit) &
3001			   ~RB_WRITE_MASK);
3002		barrier();
3003	}
3004
3005	/* again, keep gcc from optimizing */
3006	barrier();
3007
3008	/*
3009	 * If an interrupt came in just after the first while loop
3010	 * and pushed the tail page forward, we will be left with
3011	 * a dangling commit that will never go forward.
3012	 */
3013	if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
3014		goto again;
3015}
3016
3017static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
3018{
3019	unsigned long commits;
3020
3021	if (RB_WARN_ON(cpu_buffer,
3022		       !local_read(&cpu_buffer->committing)))
3023		return;
3024
3025 again:
3026	commits = local_read(&cpu_buffer->commits);
3027	/* synchronize with interrupts */
3028	barrier();
3029	if (local_read(&cpu_buffer->committing) == 1)
3030		rb_set_commit_to_write(cpu_buffer);
3031
3032	local_dec(&cpu_buffer->committing);
3033
3034	/* synchronize with interrupts */
3035	barrier();
3036
3037	/*
3038	 * Need to account for interrupts coming in between the
3039	 * updating of the commit page and the clearing of the
3040	 * committing counter.
3041	 */
3042	if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3043	    !local_read(&cpu_buffer->committing)) {
3044		local_inc(&cpu_buffer->committing);
3045		goto again;
3046	}
3047}
3048
3049static inline void rb_event_discard(struct ring_buffer_event *event)
3050{
3051	if (extended_time(event))
3052		event = skip_time_extend(event);
3053
3054	/* array[0] holds the actual length for the discarded event */
3055	event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3056	event->type_len = RINGBUF_TYPE_PADDING;
3057	/* time delta must be non zero */
3058	if (!event->time_delta)
3059		event->time_delta = 1;
3060}
3061
3062static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
3063{
3064	local_inc(&cpu_buffer->entries);
3065	rb_end_commit(cpu_buffer);
3066}
3067
3068static __always_inline void
3069rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
3070{
3071	if (buffer->irq_work.waiters_pending) {
3072		buffer->irq_work.waiters_pending = false;
3073		/* irq_work_queue() supplies it's own memory barriers */
3074		irq_work_queue(&buffer->irq_work.work);
3075	}
3076
3077	if (cpu_buffer->irq_work.waiters_pending) {
3078		cpu_buffer->irq_work.waiters_pending = false;
3079		/* irq_work_queue() supplies it's own memory barriers */
3080		irq_work_queue(&cpu_buffer->irq_work.work);
3081	}
3082
3083	if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3084		return;
3085
3086	if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3087		return;
3088
3089	if (!cpu_buffer->irq_work.full_waiters_pending)
3090		return;
3091
3092	cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3093
3094	if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
3095		return;
3096
3097	cpu_buffer->irq_work.wakeup_full = true;
3098	cpu_buffer->irq_work.full_waiters_pending = false;
3099	/* irq_work_queue() supplies it's own memory barriers */
3100	irq_work_queue(&cpu_buffer->irq_work.work);
3101}
3102
3103#ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
3104# define do_ring_buffer_record_recursion()	\
3105	do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
3106#else
3107# define do_ring_buffer_record_recursion() do { } while (0)
3108#endif
3109
3110/*
3111 * The lock and unlock are done within a preempt disable section.
3112 * The current_context per_cpu variable can only be modified
3113 * by the current task between lock and unlock. But it can
3114 * be modified more than once via an interrupt. To pass this
3115 * information from the lock to the unlock without having to
3116 * access the 'in_interrupt()' functions again (which do show
3117 * a bit of overhead in something as critical as function tracing,
3118 * we use a bitmask trick.
3119 *
3120 *  bit 1 =  NMI context
3121 *  bit 2 =  IRQ context
3122 *  bit 3 =  SoftIRQ context
3123 *  bit 4 =  normal context.
3124 *
3125 * This works because this is the order of contexts that can
3126 * preempt other contexts. A SoftIRQ never preempts an IRQ
3127 * context.
3128 *
3129 * When the context is determined, the corresponding bit is
3130 * checked and set (if it was set, then a recursion of that context
3131 * happened).
3132 *
3133 * On unlock, we need to clear this bit. To do so, just subtract
3134 * 1 from the current_context and AND it to itself.
3135 *
3136 * (binary)
3137 *  101 - 1 = 100
3138 *  101 & 100 = 100 (clearing bit zero)
3139 *
3140 *  1010 - 1 = 1001
3141 *  1010 & 1001 = 1000 (clearing bit 1)
3142 *
3143 * The least significant bit can be cleared this way, and it
3144 * just so happens that it is the same bit corresponding to
3145 * the current context.
3146 *
3147 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3148 * is set when a recursion is detected at the current context, and if
3149 * the TRANSITION bit is already set, it will fail the recursion.
3150 * This is needed because there's a lag between the changing of
3151 * interrupt context and updating the preempt count. In this case,
3152 * a false positive will be found. To handle this, one extra recursion
3153 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3154 * bit is already set, then it is considered a recursion and the function
3155 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3156 *
3157 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3158 * to be cleared. Even if it wasn't the context that set it. That is,
3159 * if an interrupt comes in while NORMAL bit is set and the ring buffer
3160 * is called before preempt_count() is updated, since the check will
3161 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3162 * NMI then comes in, it will set the NMI bit, but when the NMI code
3163 * does the trace_recursive_unlock() it will clear the TRANSITION bit
3164 * and leave the NMI bit set. But this is fine, because the interrupt
3165 * code that set the TRANSITION bit will then clear the NMI bit when it
3166 * calls trace_recursive_unlock(). If another NMI comes in, it will
3167 * set the TRANSITION bit and continue.
3168 *
3169 * Note: The TRANSITION bit only handles a single transition between context.
3170 */
3171
3172static __always_inline bool
3173trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3174{
3175	unsigned int val = cpu_buffer->current_context;
3176	int bit = interrupt_context_level();
3177
3178	bit = RB_CTX_NORMAL - bit;
3179
3180	if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3181		/*
3182		 * It is possible that this was called by transitioning
3183		 * between interrupt context, and preempt_count() has not
3184		 * been updated yet. In this case, use the TRANSITION bit.
3185		 */
3186		bit = RB_CTX_TRANSITION;
3187		if (val & (1 << (bit + cpu_buffer->nest))) {
3188			do_ring_buffer_record_recursion();
3189			return true;
3190		}
3191	}
3192
3193	val |= (1 << (bit + cpu_buffer->nest));
3194	cpu_buffer->current_context = val;
3195
3196	return false;
3197}
3198
3199static __always_inline void
3200trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3201{
3202	cpu_buffer->current_context &=
3203		cpu_buffer->current_context - (1 << cpu_buffer->nest);
3204}
3205
3206/* The recursive locking above uses 5 bits */
3207#define NESTED_BITS 5
3208
3209/**
3210 * ring_buffer_nest_start - Allow to trace while nested
3211 * @buffer: The ring buffer to modify
3212 *
3213 * The ring buffer has a safety mechanism to prevent recursion.
3214 * But there may be a case where a trace needs to be done while
3215 * tracing something else. In this case, calling this function
3216 * will allow this function to nest within a currently active
3217 * ring_buffer_lock_reserve().
3218 *
3219 * Call this function before calling another ring_buffer_lock_reserve() and
3220 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3221 */
3222void ring_buffer_nest_start(struct trace_buffer *buffer)
3223{
3224	struct ring_buffer_per_cpu *cpu_buffer;
3225	int cpu;
3226
3227	/* Enabled by ring_buffer_nest_end() */
3228	preempt_disable_notrace();
3229	cpu = raw_smp_processor_id();
3230	cpu_buffer = buffer->buffers[cpu];
3231	/* This is the shift value for the above recursive locking */
3232	cpu_buffer->nest += NESTED_BITS;
3233}
3234
3235/**
3236 * ring_buffer_nest_end - Allow to trace while nested
3237 * @buffer: The ring buffer to modify
3238 *
3239 * Must be called after ring_buffer_nest_start() and after the
3240 * ring_buffer_unlock_commit().
3241 */
3242void ring_buffer_nest_end(struct trace_buffer *buffer)
3243{
3244	struct ring_buffer_per_cpu *cpu_buffer;
3245	int cpu;
3246
3247	/* disabled by ring_buffer_nest_start() */
3248	cpu = raw_smp_processor_id();
3249	cpu_buffer = buffer->buffers[cpu];
3250	/* This is the shift value for the above recursive locking */
3251	cpu_buffer->nest -= NESTED_BITS;
3252	preempt_enable_notrace();
3253}
3254
3255/**
3256 * ring_buffer_unlock_commit - commit a reserved
3257 * @buffer: The buffer to commit to
3258 *
3259 * This commits the data to the ring buffer, and releases any locks held.
3260 *
3261 * Must be paired with ring_buffer_lock_reserve.
3262 */
3263int ring_buffer_unlock_commit(struct trace_buffer *buffer)
3264{
3265	struct ring_buffer_per_cpu *cpu_buffer;
3266	int cpu = raw_smp_processor_id();
3267
3268	cpu_buffer = buffer->buffers[cpu];
3269
3270	rb_commit(cpu_buffer);
3271
3272	rb_wakeups(buffer, cpu_buffer);
3273
3274	trace_recursive_unlock(cpu_buffer);
3275
3276	preempt_enable_notrace();
3277
3278	return 0;
3279}
3280EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3281
3282/* Special value to validate all deltas on a page. */
3283#define CHECK_FULL_PAGE		1L
3284
3285#ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
3286
3287static const char *show_irq_str(int bits)
3288{
3289	const char *type[] = {
3290		".",	// 0
3291		"s",	// 1
3292		"h",	// 2
3293		"Hs",	// 3
3294		"n",	// 4
3295		"Ns",	// 5
3296		"Nh",	// 6
3297		"NHs",	// 7
3298	};
3299
3300	return type[bits];
3301}
3302
3303/* Assume this is an trace event */
3304static const char *show_flags(struct ring_buffer_event *event)
3305{
3306	struct trace_entry *entry;
3307	int bits = 0;
3308
3309	if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3310		return "X";
3311
3312	entry = ring_buffer_event_data(event);
3313
3314	if (entry->flags & TRACE_FLAG_SOFTIRQ)
3315		bits |= 1;
3316
3317	if (entry->flags & TRACE_FLAG_HARDIRQ)
3318		bits |= 2;
3319
3320	if (entry->flags & TRACE_FLAG_NMI)
3321		bits |= 4;
3322
3323	return show_irq_str(bits);
3324}
3325
3326static const char *show_irq(struct ring_buffer_event *event)
3327{
3328	struct trace_entry *entry;
3329
3330	if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3331		return "";
3332
3333	entry = ring_buffer_event_data(event);
3334	if (entry->flags & TRACE_FLAG_IRQS_OFF)
3335		return "d";
3336	return "";
3337}
3338
3339static const char *show_interrupt_level(void)
3340{
3341	unsigned long pc = preempt_count();
3342	unsigned char level = 0;
3343
3344	if (pc & SOFTIRQ_OFFSET)
3345		level |= 1;
3346
3347	if (pc & HARDIRQ_MASK)
3348		level |= 2;
3349
3350	if (pc & NMI_MASK)
3351		level |= 4;
3352
3353	return show_irq_str(level);
3354}
3355
3356static void dump_buffer_page(struct buffer_data_page *bpage,
3357			     struct rb_event_info *info,
3358			     unsigned long tail)
3359{
3360	struct ring_buffer_event *event;
3361	u64 ts, delta;
3362	int e;
3363
3364	ts = bpage->time_stamp;
3365	pr_warn("  [%lld] PAGE TIME STAMP\n", ts);
3366
3367	for (e = 0; e < tail; e += rb_event_length(event)) {
3368
3369		event = (struct ring_buffer_event *)(bpage->data + e);
3370
3371		switch (event->type_len) {
3372
3373		case RINGBUF_TYPE_TIME_EXTEND:
3374			delta = rb_event_time_stamp(event);
3375			ts += delta;
3376			pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n",
3377				e, ts, delta);
3378			break;
3379
3380		case RINGBUF_TYPE_TIME_STAMP:
3381			delta = rb_event_time_stamp(event);
3382			ts = rb_fix_abs_ts(delta, ts);
3383			pr_warn(" 0x%x:  [%lld] absolute:%lld TIME STAMP\n",
3384				e, ts, delta);
3385			break;
3386
3387		case RINGBUF_TYPE_PADDING:
3388			ts += event->time_delta;
3389			pr_warn(" 0x%x:  [%lld] delta:%d PADDING\n",
3390				e, ts, event->time_delta);
3391			break;
3392
3393		case RINGBUF_TYPE_DATA:
3394			ts += event->time_delta;
3395			pr_warn(" 0x%x:  [%lld] delta:%d %s%s\n",
3396				e, ts, event->time_delta,
3397				show_flags(event), show_irq(event));
3398			break;
3399
3400		default:
3401			break;
3402		}
3403	}
3404	pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e);
3405}
3406
3407static DEFINE_PER_CPU(atomic_t, checking);
3408static atomic_t ts_dump;
3409
3410#define buffer_warn_return(fmt, ...)					\
3411	do {								\
3412		/* If another report is happening, ignore this one */	\
3413		if (atomic_inc_return(&ts_dump) != 1) {			\
3414			atomic_dec(&ts_dump);				\
3415			goto out;					\
3416		}							\
3417		atomic_inc(&cpu_buffer->record_disabled);		\
3418		pr_warn(fmt, ##__VA_ARGS__);				\
3419		dump_buffer_page(bpage, info, tail);			\
3420		atomic_dec(&ts_dump);					\
3421		/* There's some cases in boot up that this can happen */ \
3422		if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING))	\
3423			/* Do not re-enable checking */			\
3424			return;						\
3425	} while (0)
3426
3427/*
3428 * Check if the current event time stamp matches the deltas on
3429 * the buffer page.
3430 */
3431static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3432			 struct rb_event_info *info,
3433			 unsigned long tail)
3434{
3435	struct ring_buffer_event *event;
3436	struct buffer_data_page *bpage;
3437	u64 ts, delta;
3438	bool full = false;
3439	int e;
3440
3441	bpage = info->tail_page->page;
3442
3443	if (tail == CHECK_FULL_PAGE) {
3444		full = true;
3445		tail = local_read(&bpage->commit);
3446	} else if (info->add_timestamp &
3447		   (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
3448		/* Ignore events with absolute time stamps */
3449		return;
3450	}
3451
3452	/*
3453	 * Do not check the first event (skip possible extends too).
3454	 * Also do not check if previous events have not been committed.
3455	 */
3456	if (tail <= 8 || tail > local_read(&bpage->commit))
3457		return;
3458
3459	/*
3460	 * If this interrupted another event,
3461	 */
3462	if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
3463		goto out;
3464
3465	ts = bpage->time_stamp;
3466
3467	for (e = 0; e < tail; e += rb_event_length(event)) {
3468
3469		event = (struct ring_buffer_event *)(bpage->data + e);
3470
3471		switch (event->type_len) {
3472
3473		case RINGBUF_TYPE_TIME_EXTEND:
3474			delta = rb_event_time_stamp(event);
3475			ts += delta;
3476			break;
3477
3478		case RINGBUF_TYPE_TIME_STAMP:
3479			delta = rb_event_time_stamp(event);
3480			delta = rb_fix_abs_ts(delta, ts);
3481			if (delta < ts) {
3482				buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n",
3483						   cpu_buffer->cpu, ts, delta);
3484			}
3485			ts = delta;
3486			break;
3487
3488		case RINGBUF_TYPE_PADDING:
3489			if (event->time_delta == 1)
3490				break;
3491			fallthrough;
3492		case RINGBUF_TYPE_DATA:
3493			ts += event->time_delta;
3494			break;
3495
3496		default:
3497			RB_WARN_ON(cpu_buffer, 1);
3498		}
3499	}
3500	if ((full && ts > info->ts) ||
3501	    (!full && ts + info->delta != info->ts)) {
3502		buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n",
3503				   cpu_buffer->cpu,
3504				   ts + info->delta, info->ts, info->delta,
3505				   info->before, info->after,
3506				   full ? " (full)" : "", show_interrupt_level());
3507	}
3508out:
3509	atomic_dec(this_cpu_ptr(&checking));
3510}
3511#else
3512static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3513			 struct rb_event_info *info,
3514			 unsigned long tail)
3515{
3516}
3517#endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
3518
3519static struct ring_buffer_event *
3520__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
3521		  struct rb_event_info *info)
3522{
3523	struct ring_buffer_event *event;
3524	struct buffer_page *tail_page;
3525	unsigned long tail, write, w;
3526
3527	/* Don't let the compiler play games with cpu_buffer->tail_page */
3528	tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
3529
3530 /*A*/	w = local_read(&tail_page->write) & RB_WRITE_MASK;
3531	barrier();
3532	rb_time_read(&cpu_buffer->before_stamp, &info->before);
3533	rb_time_read(&cpu_buffer->write_stamp, &info->after);
3534	barrier();
3535	info->ts = rb_time_stamp(cpu_buffer->buffer);
3536
3537	if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
3538		info->delta = info->ts;
3539	} else {
3540		/*
3541		 * If interrupting an event time update, we may need an
3542		 * absolute timestamp.
3543		 * Don't bother if this is the start of a new page (w == 0).
3544		 */
3545		if (!w) {
3546			/* Use the sub-buffer timestamp */
3547			info->delta = 0;
3548		} else if (unlikely(info->before != info->after)) {
3549			info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3550			info->length += RB_LEN_TIME_EXTEND;
3551		} else {
3552			info->delta = info->ts - info->after;
3553			if (unlikely(test_time_stamp(info->delta))) {
3554				info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3555				info->length += RB_LEN_TIME_EXTEND;
3556			}
3557		}
3558	}
3559
3560 /*B*/	rb_time_set(&cpu_buffer->before_stamp, info->ts);
3561
3562 /*C*/	write = local_add_return(info->length, &tail_page->write);
3563
3564	/* set write to only the index of the write */
3565	write &= RB_WRITE_MASK;
3566
3567	tail = write - info->length;
3568
3569	/* See if we shot pass the end of this buffer page */
3570	if (unlikely(write > cpu_buffer->buffer->subbuf_size)) {
3571		check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
3572		return rb_move_tail(cpu_buffer, tail, info);
3573	}
3574
3575	if (likely(tail == w)) {
3576		/* Nothing interrupted us between A and C */
3577 /*D*/		rb_time_set(&cpu_buffer->write_stamp, info->ts);
3578		/*
3579		 * If something came in between C and D, the write stamp
3580		 * may now not be in sync. But that's fine as the before_stamp
3581		 * will be different and then next event will just be forced
3582		 * to use an absolute timestamp.
3583		 */
3584		if (likely(!(info->add_timestamp &
3585			     (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3586			/* This did not interrupt any time update */
3587			info->delta = info->ts - info->after;
3588		else
3589			/* Just use full timestamp for interrupting event */
3590			info->delta = info->ts;
3591		check_buffer(cpu_buffer, info, tail);
3592	} else {
3593		u64 ts;
3594		/* SLOW PATH - Interrupted between A and C */
3595
3596		/* Save the old before_stamp */
3597		rb_time_read(&cpu_buffer->before_stamp, &info->before);
3598
3599		/*
3600		 * Read a new timestamp and update the before_stamp to make
3601		 * the next event after this one force using an absolute
3602		 * timestamp. This is in case an interrupt were to come in
3603		 * between E and F.
3604		 */
3605		ts = rb_time_stamp(cpu_buffer->buffer);
3606		rb_time_set(&cpu_buffer->before_stamp, ts);
3607
3608		barrier();
3609 /*E*/		rb_time_read(&cpu_buffer->write_stamp, &info->after);
3610		barrier();
3611 /*F*/		if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3612		    info->after == info->before && info->after < ts) {
3613			/*
3614			 * Nothing came after this event between C and F, it is
3615			 * safe to use info->after for the delta as it
3616			 * matched info->before and is still valid.
3617			 */
3618			info->delta = ts - info->after;
3619		} else {
3620			/*
3621			 * Interrupted between C and F:
3622			 * Lost the previous events time stamp. Just set the
3623			 * delta to zero, and this will be the same time as
3624			 * the event this event interrupted. And the events that
3625			 * came after this will still be correct (as they would
3626			 * have built their delta on the previous event.
3627			 */
3628			info->delta = 0;
3629		}
3630		info->ts = ts;
3631		info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
3632	}
3633
3634	/*
3635	 * If this is the first commit on the page, then it has the same
3636	 * timestamp as the page itself.
3637	 */
3638	if (unlikely(!tail && !(info->add_timestamp &
3639				(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3640		info->delta = 0;
3641
3642	/* We reserved something on the buffer */
3643
3644	event = __rb_page_index(tail_page, tail);
3645	rb_update_event(cpu_buffer, event, info);
3646
3647	local_inc(&tail_page->entries);
3648
3649	/*
3650	 * If this is the first commit on the page, then update
3651	 * its timestamp.
3652	 */
3653	if (unlikely(!tail))
3654		tail_page->page->time_stamp = info->ts;
3655
3656	/* account for these added bytes */
3657	local_add(info->length, &cpu_buffer->entries_bytes);
3658
3659	return event;
3660}
3661
3662static __always_inline struct ring_buffer_event *
3663rb_reserve_next_event(struct trace_buffer *buffer,
3664		      struct ring_buffer_per_cpu *cpu_buffer,
3665		      unsigned long length)
3666{
3667	struct ring_buffer_event *event;
3668	struct rb_event_info info;
3669	int nr_loops = 0;
3670	int add_ts_default;
3671
3672	/* ring buffer does cmpxchg, make sure it is safe in NMI context */
3673	if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3674	    (unlikely(in_nmi()))) {
3675		return NULL;
3676	}
3677
3678	rb_start_commit(cpu_buffer);
3679	/* The commit page can not change after this */
3680
3681#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3682	/*
3683	 * Due to the ability to swap a cpu buffer from a buffer
3684	 * it is possible it was swapped before we committed.
3685	 * (committing stops a swap). We check for it here and
3686	 * if it happened, we have to fail the write.
3687	 */
3688	barrier();
3689	if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3690		local_dec(&cpu_buffer->committing);
3691		local_dec(&cpu_buffer->commits);
3692		return NULL;
3693	}
3694#endif
3695
3696	info.length = rb_calculate_event_length(length);
3697
3698	if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3699		add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3700		info.length += RB_LEN_TIME_EXTEND;
3701		if (info.length > cpu_buffer->buffer->max_data_size)
3702			goto out_fail;
3703	} else {
3704		add_ts_default = RB_ADD_STAMP_NONE;
3705	}
3706
3707 again:
3708	info.add_timestamp = add_ts_default;
3709	info.delta = 0;
3710
3711	/*
3712	 * We allow for interrupts to reenter here and do a trace.
3713	 * If one does, it will cause this original code to loop
3714	 * back here. Even with heavy interrupts happening, this
3715	 * should only happen a few times in a row. If this happens
3716	 * 1000 times in a row, there must be either an interrupt
3717	 * storm or we have something buggy.
3718	 * Bail!
3719	 */
3720	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3721		goto out_fail;
3722
3723	event = __rb_reserve_next(cpu_buffer, &info);
3724
3725	if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3726		if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
3727			info.length -= RB_LEN_TIME_EXTEND;
3728		goto again;
3729	}
3730
3731	if (likely(event))
3732		return event;
3733 out_fail:
3734	rb_end_commit(cpu_buffer);
3735	return NULL;
3736}
3737
3738/**
3739 * ring_buffer_lock_reserve - reserve a part of the buffer
3740 * @buffer: the ring buffer to reserve from
3741 * @length: the length of the data to reserve (excluding event header)
3742 *
3743 * Returns a reserved event on the ring buffer to copy directly to.
3744 * The user of this interface will need to get the body to write into
3745 * and can use the ring_buffer_event_data() interface.
3746 *
3747 * The length is the length of the data needed, not the event length
3748 * which also includes the event header.
3749 *
3750 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3751 * If NULL is returned, then nothing has been allocated or locked.
3752 */
3753struct ring_buffer_event *
3754ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
3755{
3756	struct ring_buffer_per_cpu *cpu_buffer;
3757	struct ring_buffer_event *event;
3758	int cpu;
3759
3760	/* If we are tracing schedule, we don't want to recurse */
3761	preempt_disable_notrace();
3762
3763	if (unlikely(atomic_read(&buffer->record_disabled)))
3764		goto out;
3765
3766	cpu = raw_smp_processor_id();
3767
3768	if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3769		goto out;
3770
3771	cpu_buffer = buffer->buffers[cpu];
3772
3773	if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3774		goto out;
3775
3776	if (unlikely(length > buffer->max_data_size))
3777		goto out;
3778
3779	if (unlikely(trace_recursive_lock(cpu_buffer)))
3780		goto out;
3781
3782	event = rb_reserve_next_event(buffer, cpu_buffer, length);
3783	if (!event)
3784		goto out_unlock;
3785
3786	return event;
3787
3788 out_unlock:
3789	trace_recursive_unlock(cpu_buffer);
3790 out:
3791	preempt_enable_notrace();
3792	return NULL;
3793}
3794EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3795
3796/*
3797 * Decrement the entries to the page that an event is on.
3798 * The event does not even need to exist, only the pointer
3799 * to the page it is on. This may only be called before the commit
3800 * takes place.
3801 */
3802static inline void
3803rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3804		   struct ring_buffer_event *event)
3805{
3806	unsigned long addr = (unsigned long)event;
3807	struct buffer_page *bpage = cpu_buffer->commit_page;
3808	struct buffer_page *start;
3809
3810	addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
3811
3812	/* Do the likely case first */
3813	if (likely(bpage->page == (void *)addr)) {
3814		local_dec(&bpage->entries);
3815		return;
3816	}
3817
3818	/*
3819	 * Because the commit page may be on the reader page we
3820	 * start with the next page and check the end loop there.
3821	 */
3822	rb_inc_page(&bpage);
3823	start = bpage;
3824	do {
3825		if (bpage->page == (void *)addr) {
3826			local_dec(&bpage->entries);
3827			return;
3828		}
3829		rb_inc_page(&bpage);
3830	} while (bpage != start);
3831
3832	/* commit not part of this buffer?? */
3833	RB_WARN_ON(cpu_buffer, 1);
3834}
3835
3836/**
3837 * ring_buffer_discard_commit - discard an event that has not been committed
3838 * @buffer: the ring buffer
3839 * @event: non committed event to discard
3840 *
3841 * Sometimes an event that is in the ring buffer needs to be ignored.
3842 * This function lets the user discard an event in the ring buffer
3843 * and then that event will not be read later.
3844 *
3845 * This function only works if it is called before the item has been
3846 * committed. It will try to free the event from the ring buffer
3847 * if another event has not been added behind it.
3848 *
3849 * If another event has been added behind it, it will set the event
3850 * up as discarded, and perform the commit.
3851 *
3852 * If this function is called, do not call ring_buffer_unlock_commit on
3853 * the event.
3854 */
3855void ring_buffer_discard_commit(struct trace_buffer *buffer,
3856				struct ring_buffer_event *event)
3857{
3858	struct ring_buffer_per_cpu *cpu_buffer;
3859	int cpu;
3860
3861	/* The event is discarded regardless */
3862	rb_event_discard(event);
3863
3864	cpu = smp_processor_id();
3865	cpu_buffer = buffer->buffers[cpu];
3866
3867	/*
3868	 * This must only be called if the event has not been
3869	 * committed yet. Thus we can assume that preemption
3870	 * is still disabled.
3871	 */
3872	RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3873
3874	rb_decrement_entry(cpu_buffer, event);
3875	if (rb_try_to_discard(cpu_buffer, event))
3876		goto out;
3877
3878 out:
3879	rb_end_commit(cpu_buffer);
3880
3881	trace_recursive_unlock(cpu_buffer);
3882
3883	preempt_enable_notrace();
3884
3885}
3886EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3887
3888/**
3889 * ring_buffer_write - write data to the buffer without reserving
3890 * @buffer: The ring buffer to write to.
3891 * @length: The length of the data being written (excluding the event header)
3892 * @data: The data to write to the buffer.
3893 *
3894 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3895 * one function. If you already have the data to write to the buffer, it
3896 * may be easier to simply call this function.
3897 *
3898 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3899 * and not the length of the event which would hold the header.
3900 */
3901int ring_buffer_write(struct trace_buffer *buffer,
3902		      unsigned long length,
3903		      void *data)
3904{
3905	struct ring_buffer_per_cpu *cpu_buffer;
3906	struct ring_buffer_event *event;
3907	void *body;
3908	int ret = -EBUSY;
3909	int cpu;
3910
3911	preempt_disable_notrace();
3912
3913	if (atomic_read(&buffer->record_disabled))
3914		goto out;
3915
3916	cpu = raw_smp_processor_id();
3917
3918	if (!cpumask_test_cpu(cpu, buffer->cpumask))
3919		goto out;
3920
3921	cpu_buffer = buffer->buffers[cpu];
3922
3923	if (atomic_read(&cpu_buffer->record_disabled))
3924		goto out;
3925
3926	if (length > buffer->max_data_size)
3927		goto out;
3928
3929	if (unlikely(trace_recursive_lock(cpu_buffer)))
3930		goto out;
3931
3932	event = rb_reserve_next_event(buffer, cpu_buffer, length);
3933	if (!event)
3934		goto out_unlock;
3935
3936	body = rb_event_data(event);
3937
3938	memcpy(body, data, length);
3939
3940	rb_commit(cpu_buffer);
3941
3942	rb_wakeups(buffer, cpu_buffer);
3943
3944	ret = 0;
3945
3946 out_unlock:
3947	trace_recursive_unlock(cpu_buffer);
3948
3949 out:
3950	preempt_enable_notrace();
3951
3952	return ret;
3953}
3954EXPORT_SYMBOL_GPL(ring_buffer_write);
3955
3956static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3957{
3958	struct buffer_page *reader = cpu_buffer->reader_page;
3959	struct buffer_page *head = rb_set_head_page(cpu_buffer);
3960	struct buffer_page *commit = cpu_buffer->commit_page;
3961
3962	/* In case of error, head will be NULL */
3963	if (unlikely(!head))
3964		return true;
3965
3966	/* Reader should exhaust content in reader page */
3967	if (reader->read != rb_page_size(reader))
3968		return false;
3969
3970	/*
3971	 * If writers are committing on the reader page, knowing all
3972	 * committed content has been read, the ring buffer is empty.
3973	 */
3974	if (commit == reader)
3975		return true;
3976
3977	/*
3978	 * If writers are committing on a page other than reader page
3979	 * and head page, there should always be content to read.
3980	 */
3981	if (commit != head)
3982		return false;
3983
3984	/*
3985	 * Writers are committing on the head page, we just need
3986	 * to care about there're committed data, and the reader will
3987	 * swap reader page with head page when it is to read data.
3988	 */
3989	return rb_page_commit(commit) == 0;
3990}
3991
3992/**
3993 * ring_buffer_record_disable - stop all writes into the buffer
3994 * @buffer: The ring buffer to stop writes to.
3995 *
3996 * This prevents all writes to the buffer. Any attempt to write
3997 * to the buffer after this will fail and return NULL.
3998 *
3999 * The caller should call synchronize_rcu() after this.
4000 */
4001void ring_buffer_record_disable(struct trace_buffer *buffer)
4002{
4003	atomic_inc(&buffer->record_disabled);
4004}
4005EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
4006
4007/**
4008 * ring_buffer_record_enable - enable writes to the buffer
4009 * @buffer: The ring buffer to enable writes
4010 *
4011 * Note, multiple disables will need the same number of enables
4012 * to truly enable the writing (much like preempt_disable).
4013 */
4014void ring_buffer_record_enable(struct trace_buffer *buffer)
4015{
4016	atomic_dec(&buffer->record_disabled);
4017}
4018EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
4019
4020/**
4021 * ring_buffer_record_off - stop all writes into the buffer
4022 * @buffer: The ring buffer to stop writes to.
4023 *
4024 * This prevents all writes to the buffer. Any attempt to write
4025 * to the buffer after this will fail and return NULL.
4026 *
4027 * This is different than ring_buffer_record_disable() as
4028 * it works like an on/off switch, where as the disable() version
4029 * must be paired with a enable().
4030 */
4031void ring_buffer_record_off(struct trace_buffer *buffer)
4032{
4033	unsigned int rd;
4034	unsigned int new_rd;
4035
4036	rd = atomic_read(&buffer->record_disabled);
4037	do {
4038		new_rd = rd | RB_BUFFER_OFF;
4039	} while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
4040}
4041EXPORT_SYMBOL_GPL(ring_buffer_record_off);
4042
4043/**
4044 * ring_buffer_record_on - restart writes into the buffer
4045 * @buffer: The ring buffer to start writes to.
4046 *
4047 * This enables all writes to the buffer that was disabled by
4048 * ring_buffer_record_off().
4049 *
4050 * This is different than ring_buffer_record_enable() as
4051 * it works like an on/off switch, where as the enable() version
4052 * must be paired with a disable().
4053 */
4054void ring_buffer_record_on(struct trace_buffer *buffer)
4055{
4056	unsigned int rd;
4057	unsigned int new_rd;
4058
4059	rd = atomic_read(&buffer->record_disabled);
4060	do {
4061		new_rd = rd & ~RB_BUFFER_OFF;
4062	} while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
4063}
4064EXPORT_SYMBOL_GPL(ring_buffer_record_on);
4065
4066/**
4067 * ring_buffer_record_is_on - return true if the ring buffer can write
4068 * @buffer: The ring buffer to see if write is enabled
4069 *
4070 * Returns true if the ring buffer is in a state that it accepts writes.
4071 */
4072bool ring_buffer_record_is_on(struct trace_buffer *buffer)
4073{
4074	return !atomic_read(&buffer->record_disabled);
4075}
4076
4077/**
4078 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
4079 * @buffer: The ring buffer to see if write is set enabled
4080 *
4081 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
4082 * Note that this does NOT mean it is in a writable state.
4083 *
4084 * It may return true when the ring buffer has been disabled by
4085 * ring_buffer_record_disable(), as that is a temporary disabling of
4086 * the ring buffer.
4087 */
4088bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
4089{
4090	return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
4091}
4092
4093/**
4094 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
4095 * @buffer: The ring buffer to stop writes to.
4096 * @cpu: The CPU buffer to stop
4097 *
4098 * This prevents all writes to the buffer. Any attempt to write
4099 * to the buffer after this will fail and return NULL.
4100 *
4101 * The caller should call synchronize_rcu() after this.
4102 */
4103void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
4104{
4105	struct ring_buffer_per_cpu *cpu_buffer;
4106
4107	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4108		return;
4109
4110	cpu_buffer = buffer->buffers[cpu];
4111	atomic_inc(&cpu_buffer->record_disabled);
4112}
4113EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
4114
4115/**
4116 * ring_buffer_record_enable_cpu - enable writes to the buffer
4117 * @buffer: The ring buffer to enable writes
4118 * @cpu: The CPU to enable.
4119 *
4120 * Note, multiple disables will need the same number of enables
4121 * to truly enable the writing (much like preempt_disable).
4122 */
4123void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
4124{
4125	struct ring_buffer_per_cpu *cpu_buffer;
4126
4127	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4128		return;
4129
4130	cpu_buffer = buffer->buffers[cpu];
4131	atomic_dec(&cpu_buffer->record_disabled);
4132}
4133EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
4134
4135/*
4136 * The total entries in the ring buffer is the running counter
4137 * of entries entered into the ring buffer, minus the sum of
4138 * the entries read from the ring buffer and the number of
4139 * entries that were overwritten.
4140 */
4141static inline unsigned long
4142rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
4143{
4144	return local_read(&cpu_buffer->entries) -
4145		(local_read(&cpu_buffer->overrun) + cpu_buffer->read);
4146}
4147
4148/**
4149 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
4150 * @buffer: The ring buffer
4151 * @cpu: The per CPU buffer to read from.
4152 */
4153u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
4154{
4155	unsigned long flags;
4156	struct ring_buffer_per_cpu *cpu_buffer;
4157	struct buffer_page *bpage;
4158	u64 ret = 0;
4159
4160	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4161		return 0;
4162
4163	cpu_buffer = buffer->buffers[cpu];
4164	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4165	/*
4166	 * if the tail is on reader_page, oldest time stamp is on the reader
4167	 * page
4168	 */
4169	if (cpu_buffer->tail_page == cpu_buffer->reader_page)
4170		bpage = cpu_buffer->reader_page;
4171	else
4172		bpage = rb_set_head_page(cpu_buffer);
4173	if (bpage)
4174		ret = bpage->page->time_stamp;
4175	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4176
4177	return ret;
4178}
4179EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
4180
4181/**
4182 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
4183 * @buffer: The ring buffer
4184 * @cpu: The per CPU buffer to read from.
4185 */
4186unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
4187{
4188	struct ring_buffer_per_cpu *cpu_buffer;
4189	unsigned long ret;
4190
4191	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4192		return 0;
4193
4194	cpu_buffer = buffer->buffers[cpu];
4195	ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
4196
4197	return ret;
4198}
4199EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
4200
4201/**
4202 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
4203 * @buffer: The ring buffer
4204 * @cpu: The per CPU buffer to get the entries from.
4205 */
4206unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
4207{
4208	struct ring_buffer_per_cpu *cpu_buffer;
4209
4210	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4211		return 0;
4212
4213	cpu_buffer = buffer->buffers[cpu];
4214
4215	return rb_num_of_entries(cpu_buffer);
4216}
4217EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
4218
4219/**
4220 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
4221 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
4222 * @buffer: The ring buffer
4223 * @cpu: The per CPU buffer to get the number of overruns from
4224 */
4225unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
4226{
4227	struct ring_buffer_per_cpu *cpu_buffer;
4228	unsigned long ret;
4229
4230	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4231		return 0;
4232
4233	cpu_buffer = buffer->buffers[cpu];
4234	ret = local_read(&cpu_buffer->overrun);
4235
4236	return ret;
4237}
4238EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
4239
4240/**
4241 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4242 * commits failing due to the buffer wrapping around while there are uncommitted
4243 * events, such as during an interrupt storm.
4244 * @buffer: The ring buffer
4245 * @cpu: The per CPU buffer to get the number of overruns from
4246 */
4247unsigned long
4248ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
4249{
4250	struct ring_buffer_per_cpu *cpu_buffer;
4251	unsigned long ret;
4252
4253	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4254		return 0;
4255
4256	cpu_buffer = buffer->buffers[cpu];
4257	ret = local_read(&cpu_buffer->commit_overrun);
4258
4259	return ret;
4260}
4261EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4262
4263/**
4264 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4265 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4266 * @buffer: The ring buffer
4267 * @cpu: The per CPU buffer to get the number of overruns from
4268 */
4269unsigned long
4270ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
4271{
4272	struct ring_buffer_per_cpu *cpu_buffer;
4273	unsigned long ret;
4274
4275	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4276		return 0;
4277
4278	cpu_buffer = buffer->buffers[cpu];
4279	ret = local_read(&cpu_buffer->dropped_events);
4280
4281	return ret;
4282}
4283EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4284
4285/**
4286 * ring_buffer_read_events_cpu - get the number of events successfully read
4287 * @buffer: The ring buffer
4288 * @cpu: The per CPU buffer to get the number of events read
4289 */
4290unsigned long
4291ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
4292{
4293	struct ring_buffer_per_cpu *cpu_buffer;
4294
4295	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4296		return 0;
4297
4298	cpu_buffer = buffer->buffers[cpu];
4299	return cpu_buffer->read;
4300}
4301EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4302
4303/**
4304 * ring_buffer_entries - get the number of entries in a buffer
4305 * @buffer: The ring buffer
4306 *
4307 * Returns the total number of entries in the ring buffer
4308 * (all CPU entries)
4309 */
4310unsigned long ring_buffer_entries(struct trace_buffer *buffer)
4311{
4312	struct ring_buffer_per_cpu *cpu_buffer;
4313	unsigned long entries = 0;
4314	int cpu;
4315
4316	/* if you care about this being correct, lock the buffer */
4317	for_each_buffer_cpu(buffer, cpu) {
4318		cpu_buffer = buffer->buffers[cpu];
4319		entries += rb_num_of_entries(cpu_buffer);
4320	}
4321
4322	return entries;
4323}
4324EXPORT_SYMBOL_GPL(ring_buffer_entries);
4325
4326/**
4327 * ring_buffer_overruns - get the number of overruns in buffer
4328 * @buffer: The ring buffer
4329 *
4330 * Returns the total number of overruns in the ring buffer
4331 * (all CPU entries)
4332 */
4333unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
4334{
4335	struct ring_buffer_per_cpu *cpu_buffer;
4336	unsigned long overruns = 0;
4337	int cpu;
4338
4339	/* if you care about this being correct, lock the buffer */
4340	for_each_buffer_cpu(buffer, cpu) {
4341		cpu_buffer = buffer->buffers[cpu];
4342		overruns += local_read(&cpu_buffer->overrun);
4343	}
4344
4345	return overruns;
4346}
4347EXPORT_SYMBOL_GPL(ring_buffer_overruns);
4348
4349static void rb_iter_reset(struct ring_buffer_iter *iter)
4350{
4351	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4352
4353	/* Iterator usage is expected to have record disabled */
4354	iter->head_page = cpu_buffer->reader_page;
4355	iter->head = cpu_buffer->reader_page->read;
4356	iter->next_event = iter->head;
4357
4358	iter->cache_reader_page = iter->head_page;
4359	iter->cache_read = cpu_buffer->read;
4360	iter->cache_pages_removed = cpu_buffer->pages_removed;
4361
4362	if (iter->head) {
4363		iter->read_stamp = cpu_buffer->read_stamp;
4364		iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4365	} else {
4366		iter->read_stamp = iter->head_page->page->time_stamp;
4367		iter->page_stamp = iter->read_stamp;
4368	}
4369}
4370
4371/**
4372 * ring_buffer_iter_reset - reset an iterator
4373 * @iter: The iterator to reset
4374 *
4375 * Resets the iterator, so that it will start from the beginning
4376 * again.
4377 */
4378void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4379{
4380	struct ring_buffer_per_cpu *cpu_buffer;
4381	unsigned long flags;
4382
4383	if (!iter)
4384		return;
4385
4386	cpu_buffer = iter->cpu_buffer;
4387
4388	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4389	rb_iter_reset(iter);
4390	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4391}
4392EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
4393
4394/**
4395 * ring_buffer_iter_empty - check if an iterator has no more to read
4396 * @iter: The iterator to check
4397 */
4398int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4399{
4400	struct ring_buffer_per_cpu *cpu_buffer;
4401	struct buffer_page *reader;
4402	struct buffer_page *head_page;
4403	struct buffer_page *commit_page;
4404	struct buffer_page *curr_commit_page;
4405	unsigned commit;
4406	u64 curr_commit_ts;
4407	u64 commit_ts;
4408
4409	cpu_buffer = iter->cpu_buffer;
4410	reader = cpu_buffer->reader_page;
4411	head_page = cpu_buffer->head_page;
4412	commit_page = READ_ONCE(cpu_buffer->commit_page);
4413	commit_ts = commit_page->page->time_stamp;
4414
4415	/*
4416	 * When the writer goes across pages, it issues a cmpxchg which
4417	 * is a mb(), which will synchronize with the rmb here.
4418	 * (see rb_tail_page_update())
4419	 */
4420	smp_rmb();
4421	commit = rb_page_commit(commit_page);
4422	/* We want to make sure that the commit page doesn't change */
4423	smp_rmb();
4424
4425	/* Make sure commit page didn't change */
4426	curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4427	curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4428
4429	/* If the commit page changed, then there's more data */
4430	if (curr_commit_page != commit_page ||
4431	    curr_commit_ts != commit_ts)
4432		return 0;
4433
4434	/* Still racy, as it may return a false positive, but that's OK */
4435	return ((iter->head_page == commit_page && iter->head >= commit) ||
4436		(iter->head_page == reader && commit_page == head_page &&
4437		 head_page->read == commit &&
4438		 iter->head == rb_page_size(cpu_buffer->reader_page)));
4439}
4440EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
4441
4442static void
4443rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4444		     struct ring_buffer_event *event)
4445{
4446	u64 delta;
4447
4448	switch (event->type_len) {
4449	case RINGBUF_TYPE_PADDING:
4450		return;
4451
4452	case RINGBUF_TYPE_TIME_EXTEND:
4453		delta = rb_event_time_stamp(event);
4454		cpu_buffer->read_stamp += delta;
4455		return;
4456
4457	case RINGBUF_TYPE_TIME_STAMP:
4458		delta = rb_event_time_stamp(event);
4459		delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp);
4460		cpu_buffer->read_stamp = delta;
4461		return;
4462
4463	case RINGBUF_TYPE_DATA:
4464		cpu_buffer->read_stamp += event->time_delta;
4465		return;
4466
4467	default:
4468		RB_WARN_ON(cpu_buffer, 1);
4469	}
4470}
4471
4472static void
4473rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4474			  struct ring_buffer_event *event)
4475{
4476	u64 delta;
4477
4478	switch (event->type_len) {
4479	case RINGBUF_TYPE_PADDING:
4480		return;
4481
4482	case RINGBUF_TYPE_TIME_EXTEND:
4483		delta = rb_event_time_stamp(event);
4484		iter->read_stamp += delta;
4485		return;
4486
4487	case RINGBUF_TYPE_TIME_STAMP:
4488		delta = rb_event_time_stamp(event);
4489		delta = rb_fix_abs_ts(delta, iter->read_stamp);
4490		iter->read_stamp = delta;
4491		return;
4492
4493	case RINGBUF_TYPE_DATA:
4494		iter->read_stamp += event->time_delta;
4495		return;
4496
4497	default:
4498		RB_WARN_ON(iter->cpu_buffer, 1);
4499	}
4500}
4501
4502static struct buffer_page *
4503rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4504{
4505	struct buffer_page *reader = NULL;
4506	unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
4507	unsigned long overwrite;
4508	unsigned long flags;
4509	int nr_loops = 0;
4510	bool ret;
4511
4512	local_irq_save(flags);
4513	arch_spin_lock(&cpu_buffer->lock);
4514
4515 again:
4516	/*
4517	 * This should normally only loop twice. But because the
4518	 * start of the reader inserts an empty page, it causes
4519	 * a case where we will loop three times. There should be no
4520	 * reason to loop four times (that I know of).
4521	 */
4522	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
4523		reader = NULL;
4524		goto out;
4525	}
4526
4527	reader = cpu_buffer->reader_page;
4528
4529	/* If there's more to read, return this page */
4530	if (cpu_buffer->reader_page->read < rb_page_size(reader))
4531		goto out;
4532
4533	/* Never should we have an index greater than the size */
4534	if (RB_WARN_ON(cpu_buffer,
4535		       cpu_buffer->reader_page->read > rb_page_size(reader)))
4536		goto out;
4537
4538	/* check if we caught up to the tail */
4539	reader = NULL;
4540	if (cpu_buffer->commit_page == cpu_buffer->reader_page)
4541		goto out;
4542
4543	/* Don't bother swapping if the ring buffer is empty */
4544	if (rb_num_of_entries(cpu_buffer) == 0)
4545		goto out;
4546
4547	/*
4548	 * Reset the reader page to size zero.
4549	 */
4550	local_set(&cpu_buffer->reader_page->write, 0);
4551	local_set(&cpu_buffer->reader_page->entries, 0);
4552	local_set(&cpu_buffer->reader_page->page->commit, 0);
4553	cpu_buffer->reader_page->real_end = 0;
4554
4555 spin:
4556	/*
4557	 * Splice the empty reader page into the list around the head.
4558	 */
4559	reader = rb_set_head_page(cpu_buffer);
4560	if (!reader)
4561		goto out;
4562	cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
4563	cpu_buffer->reader_page->list.prev = reader->list.prev;
4564
4565	/*
4566	 * cpu_buffer->pages just needs to point to the buffer, it
4567	 *  has no specific buffer page to point to. Lets move it out
4568	 *  of our way so we don't accidentally swap it.
4569	 */
4570	cpu_buffer->pages = reader->list.prev;
4571
4572	/* The reader page will be pointing to the new head */
4573	rb_set_list_to_head(&cpu_buffer->reader_page->list);
4574
4575	/*
4576	 * We want to make sure we read the overruns after we set up our
4577	 * pointers to the next object. The writer side does a
4578	 * cmpxchg to cross pages which acts as the mb on the writer
4579	 * side. Note, the reader will constantly fail the swap
4580	 * while the writer is updating the pointers, so this
4581	 * guarantees that the overwrite recorded here is the one we
4582	 * want to compare with the last_overrun.
4583	 */
4584	smp_mb();
4585	overwrite = local_read(&(cpu_buffer->overrun));
4586
4587	/*
4588	 * Here's the tricky part.
4589	 *
4590	 * We need to move the pointer past the header page.
4591	 * But we can only do that if a writer is not currently
4592	 * moving it. The page before the header page has the
4593	 * flag bit '1' set if it is pointing to the page we want.
4594	 * but if the writer is in the process of moving it
4595	 * than it will be '2' or already moved '0'.
4596	 */
4597
4598	ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
4599
4600	/*
4601	 * If we did not convert it, then we must try again.
4602	 */
4603	if (!ret)
4604		goto spin;
4605
4606	/*
4607	 * Yay! We succeeded in replacing the page.
4608	 *
4609	 * Now make the new head point back to the reader page.
4610	 */
4611	rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
4612	rb_inc_page(&cpu_buffer->head_page);
4613
4614	local_inc(&cpu_buffer->pages_read);
4615
4616	/* Finally update the reader page to the new head */
4617	cpu_buffer->reader_page = reader;
4618	cpu_buffer->reader_page->read = 0;
4619
4620	if (overwrite != cpu_buffer->last_overrun) {
4621		cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4622		cpu_buffer->last_overrun = overwrite;
4623	}
4624
4625	goto again;
4626
4627 out:
4628	/* Update the read_stamp on the first event */
4629	if (reader && reader->read == 0)
4630		cpu_buffer->read_stamp = reader->page->time_stamp;
4631
4632	arch_spin_unlock(&cpu_buffer->lock);
4633	local_irq_restore(flags);
4634
4635	/*
4636	 * The writer has preempt disable, wait for it. But not forever
4637	 * Although, 1 second is pretty much "forever"
4638	 */
4639#define USECS_WAIT	1000000
4640        for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4641		/* If the write is past the end of page, a writer is still updating it */
4642		if (likely(!reader || rb_page_write(reader) <= bsize))
4643			break;
4644
4645		udelay(1);
4646
4647		/* Get the latest version of the reader write value */
4648		smp_rmb();
4649	}
4650
4651	/* The writer is not moving forward? Something is wrong */
4652	if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4653		reader = NULL;
4654
4655	/*
4656	 * Make sure we see any padding after the write update
4657	 * (see rb_reset_tail()).
4658	 *
4659	 * In addition, a writer may be writing on the reader page
4660	 * if the page has not been fully filled, so the read barrier
4661	 * is also needed to make sure we see the content of what is
4662	 * committed by the writer (see rb_set_commit_to_write()).
4663	 */
4664	smp_rmb();
4665
4666
4667	return reader;
4668}
4669
4670static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4671{
4672	struct ring_buffer_event *event;
4673	struct buffer_page *reader;
4674	unsigned length;
4675
4676	reader = rb_get_reader_page(cpu_buffer);
4677
4678	/* This function should not be called when buffer is empty */
4679	if (RB_WARN_ON(cpu_buffer, !reader))
4680		return;
4681
4682	event = rb_reader_event(cpu_buffer);
4683
4684	if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4685		cpu_buffer->read++;
4686
4687	rb_update_read_stamp(cpu_buffer, event);
4688
4689	length = rb_event_length(event);
4690	cpu_buffer->reader_page->read += length;
4691	cpu_buffer->read_bytes += length;
4692}
4693
4694static void rb_advance_iter(struct ring_buffer_iter *iter)
4695{
4696	struct ring_buffer_per_cpu *cpu_buffer;
4697
4698	cpu_buffer = iter->cpu_buffer;
4699
4700	/* If head == next_event then we need to jump to the next event */
4701	if (iter->head == iter->next_event) {
4702		/* If the event gets overwritten again, there's nothing to do */
4703		if (rb_iter_head_event(iter) == NULL)
4704			return;
4705	}
4706
4707	iter->head = iter->next_event;
4708
4709	/*
4710	 * Check if we are at the end of the buffer.
4711	 */
4712	if (iter->next_event >= rb_page_size(iter->head_page)) {
4713		/* discarded commits can make the page empty */
4714		if (iter->head_page == cpu_buffer->commit_page)
4715			return;
4716		rb_inc_iter(iter);
4717		return;
4718	}
4719
4720	rb_update_iter_read_stamp(iter, iter->event);
4721}
4722
4723static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4724{
4725	return cpu_buffer->lost_events;
4726}
4727
4728static struct ring_buffer_event *
4729rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4730	       unsigned long *lost_events)
4731{
4732	struct ring_buffer_event *event;
4733	struct buffer_page *reader;
4734	int nr_loops = 0;
4735
4736	if (ts)
4737		*ts = 0;
4738 again:
4739	/*
4740	 * We repeat when a time extend is encountered.
4741	 * Since the time extend is always attached to a data event,
4742	 * we should never loop more than once.
4743	 * (We never hit the following condition more than twice).
4744	 */
4745	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4746		return NULL;
4747
4748	reader = rb_get_reader_page(cpu_buffer);
4749	if (!reader)
4750		return NULL;
4751
4752	event = rb_reader_event(cpu_buffer);
4753
4754	switch (event->type_len) {
4755	case RINGBUF_TYPE_PADDING:
4756		if (rb_null_event(event))
4757			RB_WARN_ON(cpu_buffer, 1);
4758		/*
4759		 * Because the writer could be discarding every
4760		 * event it creates (which would probably be bad)
4761		 * if we were to go back to "again" then we may never
4762		 * catch up, and will trigger the warn on, or lock
4763		 * the box. Return the padding, and we will release
4764		 * the current locks, and try again.
4765		 */
4766		return event;
4767
4768	case RINGBUF_TYPE_TIME_EXTEND:
4769		/* Internal data, OK to advance */
4770		rb_advance_reader(cpu_buffer);
4771		goto again;
4772
4773	case RINGBUF_TYPE_TIME_STAMP:
4774		if (ts) {
4775			*ts = rb_event_time_stamp(event);
4776			*ts = rb_fix_abs_ts(*ts, reader->page->time_stamp);
4777			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4778							 cpu_buffer->cpu, ts);
4779		}
4780		/* Internal data, OK to advance */
4781		rb_advance_reader(cpu_buffer);
4782		goto again;
4783
4784	case RINGBUF_TYPE_DATA:
4785		if (ts && !(*ts)) {
4786			*ts = cpu_buffer->read_stamp + event->time_delta;
4787			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4788							 cpu_buffer->cpu, ts);
4789		}
4790		if (lost_events)
4791			*lost_events = rb_lost_events(cpu_buffer);
4792		return event;
4793
4794	default:
4795		RB_WARN_ON(cpu_buffer, 1);
4796	}
4797
4798	return NULL;
4799}
4800EXPORT_SYMBOL_GPL(ring_buffer_peek);
4801
4802static struct ring_buffer_event *
4803rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4804{
4805	struct trace_buffer *buffer;
4806	struct ring_buffer_per_cpu *cpu_buffer;
4807	struct ring_buffer_event *event;
4808	int nr_loops = 0;
4809
4810	if (ts)
4811		*ts = 0;
4812
4813	cpu_buffer = iter->cpu_buffer;
4814	buffer = cpu_buffer->buffer;
4815
4816	/*
4817	 * Check if someone performed a consuming read to the buffer
4818	 * or removed some pages from the buffer. In these cases,
4819	 * iterator was invalidated and we need to reset it.
4820	 */
4821	if (unlikely(iter->cache_read != cpu_buffer->read ||
4822		     iter->cache_reader_page != cpu_buffer->reader_page ||
4823		     iter->cache_pages_removed != cpu_buffer->pages_removed))
4824		rb_iter_reset(iter);
4825
4826 again:
4827	if (ring_buffer_iter_empty(iter))
4828		return NULL;
4829
4830	/*
4831	 * As the writer can mess with what the iterator is trying
4832	 * to read, just give up if we fail to get an event after
4833	 * three tries. The iterator is not as reliable when reading
4834	 * the ring buffer with an active write as the consumer is.
4835	 * Do not warn if the three failures is reached.
4836	 */
4837	if (++nr_loops > 3)
4838		return NULL;
4839
4840	if (rb_per_cpu_empty(cpu_buffer))
4841		return NULL;
4842
4843	if (iter->head >= rb_page_size(iter->head_page)) {
4844		rb_inc_iter(iter);
4845		goto again;
4846	}
4847
4848	event = rb_iter_head_event(iter);
4849	if (!event)
4850		goto again;
4851
4852	switch (event->type_len) {
4853	case RINGBUF_TYPE_PADDING:
4854		if (rb_null_event(event)) {
4855			rb_inc_iter(iter);
4856			goto again;
4857		}
4858		rb_advance_iter(iter);
4859		return event;
4860
4861	case RINGBUF_TYPE_TIME_EXTEND:
4862		/* Internal data, OK to advance */
4863		rb_advance_iter(iter);
4864		goto again;
4865
4866	case RINGBUF_TYPE_TIME_STAMP:
4867		if (ts) {
4868			*ts = rb_event_time_stamp(event);
4869			*ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp);
4870			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4871							 cpu_buffer->cpu, ts);
4872		}
4873		/* Internal data, OK to advance */
4874		rb_advance_iter(iter);
4875		goto again;
4876
4877	case RINGBUF_TYPE_DATA:
4878		if (ts && !(*ts)) {
4879			*ts = iter->read_stamp + event->time_delta;
4880			ring_buffer_normalize_time_stamp(buffer,
4881							 cpu_buffer->cpu, ts);
4882		}
4883		return event;
4884
4885	default:
4886		RB_WARN_ON(cpu_buffer, 1);
4887	}
4888
4889	return NULL;
4890}
4891EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4892
4893static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4894{
4895	if (likely(!in_nmi())) {
4896		raw_spin_lock(&cpu_buffer->reader_lock);
4897		return true;
4898	}
4899
4900	/*
4901	 * If an NMI die dumps out the content of the ring buffer
4902	 * trylock must be used to prevent a deadlock if the NMI
4903	 * preempted a task that holds the ring buffer locks. If
4904	 * we get the lock then all is fine, if not, then continue
4905	 * to do the read, but this can corrupt the ring buffer,
4906	 * so it must be permanently disabled from future writes.
4907	 * Reading from NMI is a oneshot deal.
4908	 */
4909	if (raw_spin_trylock(&cpu_buffer->reader_lock))
4910		return true;
4911
4912	/* Continue without locking, but disable the ring buffer */
4913	atomic_inc(&cpu_buffer->record_disabled);
4914	return false;
4915}
4916
4917static inline void
4918rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4919{
4920	if (likely(locked))
4921		raw_spin_unlock(&cpu_buffer->reader_lock);
4922}
4923
4924/**
4925 * ring_buffer_peek - peek at the next event to be read
4926 * @buffer: The ring buffer to read
4927 * @cpu: The cpu to peak at
4928 * @ts: The timestamp counter of this event.
4929 * @lost_events: a variable to store if events were lost (may be NULL)
4930 *
4931 * This will return the event that will be read next, but does
4932 * not consume the data.
4933 */
4934struct ring_buffer_event *
4935ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
4936		 unsigned long *lost_events)
4937{
4938	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4939	struct ring_buffer_event *event;
4940	unsigned long flags;
4941	bool dolock;
4942
4943	if (!cpumask_test_cpu(cpu, buffer->cpumask))
4944		return NULL;
4945
4946 again:
4947	local_irq_save(flags);
4948	dolock = rb_reader_lock(cpu_buffer);
4949	event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4950	if (event && event->type_len == RINGBUF_TYPE_PADDING)
4951		rb_advance_reader(cpu_buffer);
4952	rb_reader_unlock(cpu_buffer, dolock);
4953	local_irq_restore(flags);
4954
4955	if (event && event->type_len == RINGBUF_TYPE_PADDING)
4956		goto again;
4957
4958	return event;
4959}
4960
4961/** ring_buffer_iter_dropped - report if there are dropped events
4962 * @iter: The ring buffer iterator
4963 *
4964 * Returns true if there was dropped events since the last peek.
4965 */
4966bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
4967{
4968	bool ret = iter->missed_events != 0;
4969
4970	iter->missed_events = 0;
4971	return ret;
4972}
4973EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
4974
4975/**
4976 * ring_buffer_iter_peek - peek at the next event to be read
4977 * @iter: The ring buffer iterator
4978 * @ts: The timestamp counter of this event.
4979 *
4980 * This will return the event that will be read next, but does
4981 * not increment the iterator.
4982 */
4983struct ring_buffer_event *
4984ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4985{
4986	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4987	struct ring_buffer_event *event;
4988	unsigned long flags;
4989
4990 again:
4991	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4992	event = rb_iter_peek(iter, ts);
4993	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4994
4995	if (event && event->type_len == RINGBUF_TYPE_PADDING)
4996		goto again;
4997
4998	return event;
4999}
5000
5001/**
5002 * ring_buffer_consume - return an event and consume it
5003 * @buffer: The ring buffer to get the next event from
5004 * @cpu: the cpu to read the buffer from
5005 * @ts: a variable to store the timestamp (may be NULL)
5006 * @lost_events: a variable to store if events were lost (may be NULL)
5007 *
5008 * Returns the next event in the ring buffer, and that event is consumed.
5009 * Meaning, that sequential reads will keep returning a different event,
5010 * and eventually empty the ring buffer if the producer is slower.
5011 */
5012struct ring_buffer_event *
5013ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
5014		    unsigned long *lost_events)
5015{
5016	struct ring_buffer_per_cpu *cpu_buffer;
5017	struct ring_buffer_event *event = NULL;
5018	unsigned long flags;
5019	bool dolock;
5020
5021 again:
5022	/* might be called in atomic */
5023	preempt_disable();
5024
5025	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5026		goto out;
5027
5028	cpu_buffer = buffer->buffers[cpu];
5029	local_irq_save(flags);
5030	dolock = rb_reader_lock(cpu_buffer);
5031
5032	event = rb_buffer_peek(cpu_buffer, ts, lost_events);
5033	if (event) {
5034		cpu_buffer->lost_events = 0;
5035		rb_advance_reader(cpu_buffer);
5036	}
5037
5038	rb_reader_unlock(cpu_buffer, dolock);
5039	local_irq_restore(flags);
5040
5041 out:
5042	preempt_enable();
5043
5044	if (event && event->type_len == RINGBUF_TYPE_PADDING)
5045		goto again;
5046
5047	return event;
5048}
5049EXPORT_SYMBOL_GPL(ring_buffer_consume);
5050
5051/**
5052 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
5053 * @buffer: The ring buffer to read from
5054 * @cpu: The cpu buffer to iterate over
5055 * @flags: gfp flags to use for memory allocation
5056 *
5057 * This performs the initial preparations necessary to iterate
5058 * through the buffer.  Memory is allocated, buffer resizing
5059 * is disabled, and the iterator pointer is returned to the caller.
5060 *
5061 * After a sequence of ring_buffer_read_prepare calls, the user is
5062 * expected to make at least one call to ring_buffer_read_prepare_sync.
5063 * Afterwards, ring_buffer_read_start is invoked to get things going
5064 * for real.
5065 *
5066 * This overall must be paired with ring_buffer_read_finish.
5067 */
5068struct ring_buffer_iter *
5069ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
5070{
5071	struct ring_buffer_per_cpu *cpu_buffer;
5072	struct ring_buffer_iter *iter;
5073
5074	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5075		return NULL;
5076
5077	iter = kzalloc(sizeof(*iter), flags);
5078	if (!iter)
5079		return NULL;
5080
5081	/* Holds the entire event: data and meta data */
5082	iter->event_size = buffer->subbuf_size;
5083	iter->event = kmalloc(iter->event_size, flags);
5084	if (!iter->event) {
5085		kfree(iter);
5086		return NULL;
5087	}
5088
5089	cpu_buffer = buffer->buffers[cpu];
5090
5091	iter->cpu_buffer = cpu_buffer;
5092
5093	atomic_inc(&cpu_buffer->resize_disabled);
5094
5095	return iter;
5096}
5097EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
5098
5099/**
5100 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
5101 *
5102 * All previously invoked ring_buffer_read_prepare calls to prepare
5103 * iterators will be synchronized.  Afterwards, read_buffer_read_start
5104 * calls on those iterators are allowed.
5105 */
5106void
5107ring_buffer_read_prepare_sync(void)
5108{
5109	synchronize_rcu();
5110}
5111EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
5112
5113/**
5114 * ring_buffer_read_start - start a non consuming read of the buffer
5115 * @iter: The iterator returned by ring_buffer_read_prepare
5116 *
5117 * This finalizes the startup of an iteration through the buffer.
5118 * The iterator comes from a call to ring_buffer_read_prepare and
5119 * an intervening ring_buffer_read_prepare_sync must have been
5120 * performed.
5121 *
5122 * Must be paired with ring_buffer_read_finish.
5123 */
5124void
5125ring_buffer_read_start(struct ring_buffer_iter *iter)
5126{
5127	struct ring_buffer_per_cpu *cpu_buffer;
5128	unsigned long flags;
5129
5130	if (!iter)
5131		return;
5132
5133	cpu_buffer = iter->cpu_buffer;
5134
5135	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5136	arch_spin_lock(&cpu_buffer->lock);
5137	rb_iter_reset(iter);
5138	arch_spin_unlock(&cpu_buffer->lock);
5139	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5140}
5141EXPORT_SYMBOL_GPL(ring_buffer_read_start);
5142
5143/**
5144 * ring_buffer_read_finish - finish reading the iterator of the buffer
5145 * @iter: The iterator retrieved by ring_buffer_start
5146 *
5147 * This re-enables resizing of the buffer, and frees the iterator.
5148 */
5149void
5150ring_buffer_read_finish(struct ring_buffer_iter *iter)
5151{
5152	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5153	unsigned long flags;
5154
5155	/* Use this opportunity to check the integrity of the ring buffer. */
5156	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5157	rb_check_pages(cpu_buffer);
5158	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5159
5160	atomic_dec(&cpu_buffer->resize_disabled);
5161	kfree(iter->event);
5162	kfree(iter);
5163}
5164EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
5165
5166/**
5167 * ring_buffer_iter_advance - advance the iterator to the next location
5168 * @iter: The ring buffer iterator
5169 *
5170 * Move the location of the iterator such that the next read will
5171 * be the next location of the iterator.
5172 */
5173void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
5174{
5175	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5176	unsigned long flags;
5177
5178	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5179
5180	rb_advance_iter(iter);
5181
5182	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5183}
5184EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
5185
5186/**
5187 * ring_buffer_size - return the size of the ring buffer (in bytes)
5188 * @buffer: The ring buffer.
5189 * @cpu: The CPU to get ring buffer size from.
5190 */
5191unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
5192{
5193	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5194		return 0;
5195
5196	return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages;
5197}
5198EXPORT_SYMBOL_GPL(ring_buffer_size);
5199
5200/**
5201 * ring_buffer_max_event_size - return the max data size of an event
5202 * @buffer: The ring buffer.
5203 *
5204 * Returns the maximum size an event can be.
5205 */
5206unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer)
5207{
5208	/* If abs timestamp is requested, events have a timestamp too */
5209	if (ring_buffer_time_stamp_abs(buffer))
5210		return buffer->max_data_size - RB_LEN_TIME_EXTEND;
5211	return buffer->max_data_size;
5212}
5213EXPORT_SYMBOL_GPL(ring_buffer_max_event_size);
5214
5215static void rb_clear_buffer_page(struct buffer_page *page)
5216{
5217	local_set(&page->write, 0);
5218	local_set(&page->entries, 0);
5219	rb_init_page(page->page);
5220	page->read = 0;
5221}
5222
5223static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
5224{
5225	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
5226
5227	meta->reader.read = cpu_buffer->reader_page->read;
5228	meta->reader.id = cpu_buffer->reader_page->id;
5229	meta->reader.lost_events = cpu_buffer->lost_events;
5230
5231	meta->entries = local_read(&cpu_buffer->entries);
5232	meta->overrun = local_read(&cpu_buffer->overrun);
5233	meta->read = cpu_buffer->read;
5234
5235	/* Some archs do not have data cache coherency between kernel and user-space */
5236	flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page));
5237}
5238
5239static void
5240rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
5241{
5242	struct buffer_page *page;
5243
5244	rb_head_page_deactivate(cpu_buffer);
5245
5246	cpu_buffer->head_page
5247		= list_entry(cpu_buffer->pages, struct buffer_page, list);
5248	rb_clear_buffer_page(cpu_buffer->head_page);
5249	list_for_each_entry(page, cpu_buffer->pages, list) {
5250		rb_clear_buffer_page(page);
5251	}
5252
5253	cpu_buffer->tail_page = cpu_buffer->head_page;
5254	cpu_buffer->commit_page = cpu_buffer->head_page;
5255
5256	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5257	INIT_LIST_HEAD(&cpu_buffer->new_pages);
5258	rb_clear_buffer_page(cpu_buffer->reader_page);
5259
5260	local_set(&cpu_buffer->entries_bytes, 0);
5261	local_set(&cpu_buffer->overrun, 0);
5262	local_set(&cpu_buffer->commit_overrun, 0);
5263	local_set(&cpu_buffer->dropped_events, 0);
5264	local_set(&cpu_buffer->entries, 0);
5265	local_set(&cpu_buffer->committing, 0);
5266	local_set(&cpu_buffer->commits, 0);
5267	local_set(&cpu_buffer->pages_touched, 0);
5268	local_set(&cpu_buffer->pages_lost, 0);
5269	local_set(&cpu_buffer->pages_read, 0);
5270	cpu_buffer->last_pages_touch = 0;
5271	cpu_buffer->shortest_full = 0;
5272	cpu_buffer->read = 0;
5273	cpu_buffer->read_bytes = 0;
5274
5275	rb_time_set(&cpu_buffer->write_stamp, 0);
5276	rb_time_set(&cpu_buffer->before_stamp, 0);
5277
5278	memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
5279
5280	cpu_buffer->lost_events = 0;
5281	cpu_buffer->last_overrun = 0;
5282
5283	if (cpu_buffer->mapped)
5284		rb_update_meta_page(cpu_buffer);
5285
5286	rb_head_page_activate(cpu_buffer);
5287	cpu_buffer->pages_removed = 0;
5288}
5289
5290/* Must have disabled the cpu buffer then done a synchronize_rcu */
5291static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5292{
5293	unsigned long flags;
5294
5295	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5296
5297	if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5298		goto out;
5299
5300	arch_spin_lock(&cpu_buffer->lock);
5301
5302	rb_reset_cpu(cpu_buffer);
5303
5304	arch_spin_unlock(&cpu_buffer->lock);
5305
5306 out:
5307	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5308}
5309
5310/**
5311 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5312 * @buffer: The ring buffer to reset a per cpu buffer of
5313 * @cpu: The CPU buffer to be reset
5314 */
5315void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
5316{
5317	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5318
5319	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5320		return;
5321
5322	/* prevent another thread from changing buffer sizes */
5323	mutex_lock(&buffer->mutex);
5324
5325	atomic_inc(&cpu_buffer->resize_disabled);
5326	atomic_inc(&cpu_buffer->record_disabled);
5327
5328	/* Make sure all commits have finished */
5329	synchronize_rcu();
5330
5331	reset_disabled_cpu_buffer(cpu_buffer);
5332
5333	atomic_dec(&cpu_buffer->record_disabled);
5334	atomic_dec(&cpu_buffer->resize_disabled);
5335
5336	mutex_unlock(&buffer->mutex);
5337}
5338EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
5339
5340/* Flag to ensure proper resetting of atomic variables */
5341#define RESET_BIT	(1 << 30)
5342
5343/**
5344 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer
5345 * @buffer: The ring buffer to reset a per cpu buffer of
5346 */
5347void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5348{
5349	struct ring_buffer_per_cpu *cpu_buffer;
5350	int cpu;
5351
5352	/* prevent another thread from changing buffer sizes */
5353	mutex_lock(&buffer->mutex);
5354
5355	for_each_online_buffer_cpu(buffer, cpu) {
5356		cpu_buffer = buffer->buffers[cpu];
5357
5358		atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
5359		atomic_inc(&cpu_buffer->record_disabled);
5360	}
5361
5362	/* Make sure all commits have finished */
5363	synchronize_rcu();
5364
5365	for_each_buffer_cpu(buffer, cpu) {
5366		cpu_buffer = buffer->buffers[cpu];
5367
5368		/*
5369		 * If a CPU came online during the synchronize_rcu(), then
5370		 * ignore it.
5371		 */
5372		if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5373			continue;
5374
5375		reset_disabled_cpu_buffer(cpu_buffer);
5376
5377		atomic_dec(&cpu_buffer->record_disabled);
5378		atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
5379	}
5380
5381	mutex_unlock(&buffer->mutex);
5382}
5383
5384/**
5385 * ring_buffer_reset - reset a ring buffer
5386 * @buffer: The ring buffer to reset all cpu buffers
5387 */
5388void ring_buffer_reset(struct trace_buffer *buffer)
5389{
5390	struct ring_buffer_per_cpu *cpu_buffer;
5391	int cpu;
5392
5393	/* prevent another thread from changing buffer sizes */
5394	mutex_lock(&buffer->mutex);
5395
5396	for_each_buffer_cpu(buffer, cpu) {
5397		cpu_buffer = buffer->buffers[cpu];
5398
5399		atomic_inc(&cpu_buffer->resize_disabled);
5400		atomic_inc(&cpu_buffer->record_disabled);
5401	}
5402
5403	/* Make sure all commits have finished */
5404	synchronize_rcu();
5405
5406	for_each_buffer_cpu(buffer, cpu) {
5407		cpu_buffer = buffer->buffers[cpu];
5408
5409		reset_disabled_cpu_buffer(cpu_buffer);
5410
5411		atomic_dec(&cpu_buffer->record_disabled);
5412		atomic_dec(&cpu_buffer->resize_disabled);
5413	}
5414
5415	mutex_unlock(&buffer->mutex);
5416}
5417EXPORT_SYMBOL_GPL(ring_buffer_reset);
5418
5419/**
5420 * ring_buffer_empty - is the ring buffer empty?
5421 * @buffer: The ring buffer to test
5422 */
5423bool ring_buffer_empty(struct trace_buffer *buffer)
5424{
5425	struct ring_buffer_per_cpu *cpu_buffer;
5426	unsigned long flags;
5427	bool dolock;
5428	bool ret;
5429	int cpu;
5430
5431	/* yes this is racy, but if you don't like the race, lock the buffer */
5432	for_each_buffer_cpu(buffer, cpu) {
5433		cpu_buffer = buffer->buffers[cpu];
5434		local_irq_save(flags);
5435		dolock = rb_reader_lock(cpu_buffer);
5436		ret = rb_per_cpu_empty(cpu_buffer);
5437		rb_reader_unlock(cpu_buffer, dolock);
5438		local_irq_restore(flags);
5439
5440		if (!ret)
5441			return false;
5442	}
5443
5444	return true;
5445}
5446EXPORT_SYMBOL_GPL(ring_buffer_empty);
5447
5448/**
5449 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5450 * @buffer: The ring buffer
5451 * @cpu: The CPU buffer to test
5452 */
5453bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
5454{
5455	struct ring_buffer_per_cpu *cpu_buffer;
5456	unsigned long flags;
5457	bool dolock;
5458	bool ret;
5459
5460	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5461		return true;
5462
5463	cpu_buffer = buffer->buffers[cpu];
5464	local_irq_save(flags);
5465	dolock = rb_reader_lock(cpu_buffer);
5466	ret = rb_per_cpu_empty(cpu_buffer);
5467	rb_reader_unlock(cpu_buffer, dolock);
5468	local_irq_restore(flags);
5469
5470	return ret;
5471}
5472EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
5473
5474#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
5475/**
5476 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5477 * @buffer_a: One buffer to swap with
5478 * @buffer_b: The other buffer to swap with
5479 * @cpu: the CPU of the buffers to swap
5480 *
5481 * This function is useful for tracers that want to take a "snapshot"
5482 * of a CPU buffer and has another back up buffer lying around.
5483 * it is expected that the tracer handles the cpu buffer not being
5484 * used at the moment.
5485 */
5486int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5487			 struct trace_buffer *buffer_b, int cpu)
5488{
5489	struct ring_buffer_per_cpu *cpu_buffer_a;
5490	struct ring_buffer_per_cpu *cpu_buffer_b;
5491	int ret = -EINVAL;
5492
5493	if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5494	    !cpumask_test_cpu(cpu, buffer_b->cpumask))
5495		goto out;
5496
5497	cpu_buffer_a = buffer_a->buffers[cpu];
5498	cpu_buffer_b = buffer_b->buffers[cpu];
5499
5500	/* It's up to the callers to not try to swap mapped buffers */
5501	if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) {
5502		ret = -EBUSY;
5503		goto out;
5504	}
5505
5506	/* At least make sure the two buffers are somewhat the same */
5507	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
5508		goto out;
5509
5510	if (buffer_a->subbuf_order != buffer_b->subbuf_order)
5511		goto out;
5512
5513	ret = -EAGAIN;
5514
5515	if (atomic_read(&buffer_a->record_disabled))
5516		goto out;
5517
5518	if (atomic_read(&buffer_b->record_disabled))
5519		goto out;
5520
5521	if (atomic_read(&cpu_buffer_a->record_disabled))
5522		goto out;
5523
5524	if (atomic_read(&cpu_buffer_b->record_disabled))
5525		goto out;
5526
5527	/*
5528	 * We can't do a synchronize_rcu here because this
5529	 * function can be called in atomic context.
5530	 * Normally this will be called from the same CPU as cpu.
5531	 * If not it's up to the caller to protect this.
5532	 */
5533	atomic_inc(&cpu_buffer_a->record_disabled);
5534	atomic_inc(&cpu_buffer_b->record_disabled);
5535
5536	ret = -EBUSY;
5537	if (local_read(&cpu_buffer_a->committing))
5538		goto out_dec;
5539	if (local_read(&cpu_buffer_b->committing))
5540		goto out_dec;
5541
5542	/*
5543	 * When resize is in progress, we cannot swap it because
5544	 * it will mess the state of the cpu buffer.
5545	 */
5546	if (atomic_read(&buffer_a->resizing))
5547		goto out_dec;
5548	if (atomic_read(&buffer_b->resizing))
5549		goto out_dec;
5550
5551	buffer_a->buffers[cpu] = cpu_buffer_b;
5552	buffer_b->buffers[cpu] = cpu_buffer_a;
5553
5554	cpu_buffer_b->buffer = buffer_a;
5555	cpu_buffer_a->buffer = buffer_b;
5556
5557	ret = 0;
5558
5559out_dec:
5560	atomic_dec(&cpu_buffer_a->record_disabled);
5561	atomic_dec(&cpu_buffer_b->record_disabled);
5562out:
5563	return ret;
5564}
5565EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
5566#endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
5567
5568/**
5569 * ring_buffer_alloc_read_page - allocate a page to read from buffer
5570 * @buffer: the buffer to allocate for.
5571 * @cpu: the cpu buffer to allocate.
5572 *
5573 * This function is used in conjunction with ring_buffer_read_page.
5574 * When reading a full page from the ring buffer, these functions
5575 * can be used to speed up the process. The calling function should
5576 * allocate a few pages first with this function. Then when it
5577 * needs to get pages from the ring buffer, it passes the result
5578 * of this function into ring_buffer_read_page, which will swap
5579 * the page that was allocated, with the read page of the buffer.
5580 *
5581 * Returns:
5582 *  The page allocated, or ERR_PTR
5583 */
5584struct buffer_data_read_page *
5585ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
5586{
5587	struct ring_buffer_per_cpu *cpu_buffer;
5588	struct buffer_data_read_page *bpage = NULL;
5589	unsigned long flags;
5590	struct page *page;
5591
5592	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5593		return ERR_PTR(-ENODEV);
5594
5595	bpage = kzalloc(sizeof(*bpage), GFP_KERNEL);
5596	if (!bpage)
5597		return ERR_PTR(-ENOMEM);
5598
5599	bpage->order = buffer->subbuf_order;
5600	cpu_buffer = buffer->buffers[cpu];
5601	local_irq_save(flags);
5602	arch_spin_lock(&cpu_buffer->lock);
5603
5604	if (cpu_buffer->free_page) {
5605		bpage->data = cpu_buffer->free_page;
5606		cpu_buffer->free_page = NULL;
5607	}
5608
5609	arch_spin_unlock(&cpu_buffer->lock);
5610	local_irq_restore(flags);
5611
5612	if (bpage->data)
5613		goto out;
5614
5615	page = alloc_pages_node(cpu_to_node(cpu),
5616				GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO,
5617				cpu_buffer->buffer->subbuf_order);
5618	if (!page) {
5619		kfree(bpage);
5620		return ERR_PTR(-ENOMEM);
5621	}
5622
5623	bpage->data = page_address(page);
5624
5625 out:
5626	rb_init_page(bpage->data);
5627
5628	return bpage;
5629}
5630EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
5631
5632/**
5633 * ring_buffer_free_read_page - free an allocated read page
5634 * @buffer: the buffer the page was allocate for
5635 * @cpu: the cpu buffer the page came from
5636 * @data_page: the page to free
5637 *
5638 * Free a page allocated from ring_buffer_alloc_read_page.
5639 */
5640void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu,
5641				struct buffer_data_read_page *data_page)
5642{
5643	struct ring_buffer_per_cpu *cpu_buffer;
5644	struct buffer_data_page *bpage = data_page->data;
5645	struct page *page = virt_to_page(bpage);
5646	unsigned long flags;
5647
5648	if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5649		return;
5650
5651	cpu_buffer = buffer->buffers[cpu];
5652
5653	/*
5654	 * If the page is still in use someplace else, or order of the page
5655	 * is different from the subbuffer order of the buffer -
5656	 * we can't reuse it
5657	 */
5658	if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order)
5659		goto out;
5660
5661	local_irq_save(flags);
5662	arch_spin_lock(&cpu_buffer->lock);
5663
5664	if (!cpu_buffer->free_page) {
5665		cpu_buffer->free_page = bpage;
5666		bpage = NULL;
5667	}
5668
5669	arch_spin_unlock(&cpu_buffer->lock);
5670	local_irq_restore(flags);
5671
5672 out:
5673	free_pages((unsigned long)bpage, data_page->order);
5674	kfree(data_page);
5675}
5676EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
5677
5678/**
5679 * ring_buffer_read_page - extract a page from the ring buffer
5680 * @buffer: buffer to extract from
5681 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
5682 * @len: amount to extract
5683 * @cpu: the cpu of the buffer to extract
5684 * @full: should the extraction only happen when the page is full.
5685 *
5686 * This function will pull out a page from the ring buffer and consume it.
5687 * @data_page must be the address of the variable that was returned
5688 * from ring_buffer_alloc_read_page. This is because the page might be used
5689 * to swap with a page in the ring buffer.
5690 *
5691 * for example:
5692 *	rpage = ring_buffer_alloc_read_page(buffer, cpu);
5693 *	if (IS_ERR(rpage))
5694 *		return PTR_ERR(rpage);
5695 *	ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0);
5696 *	if (ret >= 0)
5697 *		process_page(ring_buffer_read_page_data(rpage), ret);
5698 *	ring_buffer_free_read_page(buffer, cpu, rpage);
5699 *
5700 * When @full is set, the function will not return true unless
5701 * the writer is off the reader page.
5702 *
5703 * Note: it is up to the calling functions to handle sleeps and wakeups.
5704 *  The ring buffer can be used anywhere in the kernel and can not
5705 *  blindly call wake_up. The layer that uses the ring buffer must be
5706 *  responsible for that.
5707 *
5708 * Returns:
5709 *  >=0 if data has been transferred, returns the offset of consumed data.
5710 *  <0 if no data has been transferred.
5711 */
5712int ring_buffer_read_page(struct trace_buffer *buffer,
5713			  struct buffer_data_read_page *data_page,
5714			  size_t len, int cpu, int full)
5715{
5716	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5717	struct ring_buffer_event *event;
5718	struct buffer_data_page *bpage;
5719	struct buffer_page *reader;
5720	unsigned long missed_events;
5721	unsigned long flags;
5722	unsigned int commit;
5723	unsigned int read;
5724	u64 save_timestamp;
5725	int ret = -1;
5726
5727	if (!cpumask_test_cpu(cpu, buffer->cpumask))
5728		goto out;
5729
5730	/*
5731	 * If len is not big enough to hold the page header, then
5732	 * we can not copy anything.
5733	 */
5734	if (len <= BUF_PAGE_HDR_SIZE)
5735		goto out;
5736
5737	len -= BUF_PAGE_HDR_SIZE;
5738
5739	if (!data_page || !data_page->data)
5740		goto out;
5741	if (data_page->order != buffer->subbuf_order)
5742		goto out;
5743
5744	bpage = data_page->data;
5745	if (!bpage)
5746		goto out;
5747
5748	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5749
5750	reader = rb_get_reader_page(cpu_buffer);
5751	if (!reader)
5752		goto out_unlock;
5753
5754	event = rb_reader_event(cpu_buffer);
5755
5756	read = reader->read;
5757	commit = rb_page_size(reader);
5758
5759	/* Check if any events were dropped */
5760	missed_events = cpu_buffer->lost_events;
5761
5762	/*
5763	 * If this page has been partially read or
5764	 * if len is not big enough to read the rest of the page or
5765	 * a writer is still on the page, then
5766	 * we must copy the data from the page to the buffer.
5767	 * Otherwise, we can simply swap the page with the one passed in.
5768	 */
5769	if (read || (len < (commit - read)) ||
5770	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
5771	    cpu_buffer->mapped) {
5772		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
5773		unsigned int rpos = read;
5774		unsigned int pos = 0;
5775		unsigned int size;
5776
5777		/*
5778		 * If a full page is expected, this can still be returned
5779		 * if there's been a previous partial read and the
5780		 * rest of the page can be read and the commit page is off
5781		 * the reader page.
5782		 */
5783		if (full &&
5784		    (!read || (len < (commit - read)) ||
5785		     cpu_buffer->reader_page == cpu_buffer->commit_page))
5786			goto out_unlock;
5787
5788		if (len > (commit - read))
5789			len = (commit - read);
5790
5791		/* Always keep the time extend and data together */
5792		size = rb_event_ts_length(event);
5793
5794		if (len < size)
5795			goto out_unlock;
5796
5797		/* save the current timestamp, since the user will need it */
5798		save_timestamp = cpu_buffer->read_stamp;
5799
5800		/* Need to copy one event at a time */
5801		do {
5802			/* We need the size of one event, because
5803			 * rb_advance_reader only advances by one event,
5804			 * whereas rb_event_ts_length may include the size of
5805			 * one or two events.
5806			 * We have already ensured there's enough space if this
5807			 * is a time extend. */
5808			size = rb_event_length(event);
5809			memcpy(bpage->data + pos, rpage->data + rpos, size);
5810
5811			len -= size;
5812
5813			rb_advance_reader(cpu_buffer);
5814			rpos = reader->read;
5815			pos += size;
5816
5817			if (rpos >= commit)
5818				break;
5819
5820			event = rb_reader_event(cpu_buffer);
5821			/* Always keep the time extend and data together */
5822			size = rb_event_ts_length(event);
5823		} while (len >= size);
5824
5825		/* update bpage */
5826		local_set(&bpage->commit, pos);
5827		bpage->time_stamp = save_timestamp;
5828
5829		/* we copied everything to the beginning */
5830		read = 0;
5831	} else {
5832		/* update the entry counter */
5833		cpu_buffer->read += rb_page_entries(reader);
5834		cpu_buffer->read_bytes += rb_page_size(reader);
5835
5836		/* swap the pages */
5837		rb_init_page(bpage);
5838		bpage = reader->page;
5839		reader->page = data_page->data;
5840		local_set(&reader->write, 0);
5841		local_set(&reader->entries, 0);
5842		reader->read = 0;
5843		data_page->data = bpage;
5844
5845		/*
5846		 * Use the real_end for the data size,
5847		 * This gives us a chance to store the lost events
5848		 * on the page.
5849		 */
5850		if (reader->real_end)
5851			local_set(&bpage->commit, reader->real_end);
5852	}
5853	ret = read;
5854
5855	cpu_buffer->lost_events = 0;
5856
5857	commit = local_read(&bpage->commit);
5858	/*
5859	 * Set a flag in the commit field if we lost events
5860	 */
5861	if (missed_events) {
5862		/* If there is room at the end of the page to save the
5863		 * missed events, then record it there.
5864		 */
5865		if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
5866			memcpy(&bpage->data[commit], &missed_events,
5867			       sizeof(missed_events));
5868			local_add(RB_MISSED_STORED, &bpage->commit);
5869			commit += sizeof(missed_events);
5870		}
5871		local_add(RB_MISSED_EVENTS, &bpage->commit);
5872	}
5873
5874	/*
5875	 * This page may be off to user land. Zero it out here.
5876	 */
5877	if (commit < buffer->subbuf_size)
5878		memset(&bpage->data[commit], 0, buffer->subbuf_size - commit);
5879
5880 out_unlock:
5881	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5882
5883 out:
5884	return ret;
5885}
5886EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5887
5888/**
5889 * ring_buffer_read_page_data - get pointer to the data in the page.
5890 * @page:  the page to get the data from
5891 *
5892 * Returns pointer to the actual data in this page.
5893 */
5894void *ring_buffer_read_page_data(struct buffer_data_read_page *page)
5895{
5896	return page->data;
5897}
5898EXPORT_SYMBOL_GPL(ring_buffer_read_page_data);
5899
5900/**
5901 * ring_buffer_subbuf_size_get - get size of the sub buffer.
5902 * @buffer: the buffer to get the sub buffer size from
5903 *
5904 * Returns size of the sub buffer, in bytes.
5905 */
5906int ring_buffer_subbuf_size_get(struct trace_buffer *buffer)
5907{
5908	return buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
5909}
5910EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get);
5911
5912/**
5913 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page.
5914 * @buffer: The ring_buffer to get the system sub page order from
5915 *
5916 * By default, one ring buffer sub page equals to one system page. This parameter
5917 * is configurable, per ring buffer. The size of the ring buffer sub page can be
5918 * extended, but must be an order of system page size.
5919 *
5920 * Returns the order of buffer sub page size, in system pages:
5921 * 0 means the sub buffer size is 1 system page and so forth.
5922 * In case of an error < 0 is returned.
5923 */
5924int ring_buffer_subbuf_order_get(struct trace_buffer *buffer)
5925{
5926	if (!buffer)
5927		return -EINVAL;
5928
5929	return buffer->subbuf_order;
5930}
5931EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get);
5932
5933/**
5934 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page.
5935 * @buffer: The ring_buffer to set the new page size.
5936 * @order: Order of the system pages in one sub buffer page
5937 *
5938 * By default, one ring buffer pages equals to one system page. This API can be
5939 * used to set new size of the ring buffer page. The size must be order of
5940 * system page size, that's why the input parameter @order is the order of
5941 * system pages that are allocated for one ring buffer page:
5942 *  0 - 1 system page
5943 *  1 - 2 system pages
5944 *  3 - 4 system pages
5945 *  ...
5946 *
5947 * Returns 0 on success or < 0 in case of an error.
5948 */
5949int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
5950{
5951	struct ring_buffer_per_cpu *cpu_buffer;
5952	struct buffer_page *bpage, *tmp;
5953	int old_order, old_size;
5954	int nr_pages;
5955	int psize;
5956	int err;
5957	int cpu;
5958
5959	if (!buffer || order < 0)
5960		return -EINVAL;
5961
5962	if (buffer->subbuf_order == order)
5963		return 0;
5964
5965	psize = (1 << order) * PAGE_SIZE;
5966	if (psize <= BUF_PAGE_HDR_SIZE)
5967		return -EINVAL;
5968
5969	/* Size of a subbuf cannot be greater than the write counter */
5970	if (psize > RB_WRITE_MASK + 1)
5971		return -EINVAL;
5972
5973	old_order = buffer->subbuf_order;
5974	old_size = buffer->subbuf_size;
5975
5976	/* prevent another thread from changing buffer sizes */
5977	mutex_lock(&buffer->mutex);
5978	atomic_inc(&buffer->record_disabled);
5979
5980	/* Make sure all commits have finished */
5981	synchronize_rcu();
5982
5983	buffer->subbuf_order = order;
5984	buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE;
5985
5986	/* Make sure all new buffers are allocated, before deleting the old ones */
5987	for_each_buffer_cpu(buffer, cpu) {
5988
5989		if (!cpumask_test_cpu(cpu, buffer->cpumask))
5990			continue;
5991
5992		cpu_buffer = buffer->buffers[cpu];
5993
5994		if (cpu_buffer->mapped) {
5995			err = -EBUSY;
5996			goto error;
5997		}
5998
5999		/* Update the number of pages to match the new size */
6000		nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
6001		nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
6002
6003		/* we need a minimum of two pages */
6004		if (nr_pages < 2)
6005			nr_pages = 2;
6006
6007		cpu_buffer->nr_pages_to_update = nr_pages;
6008
6009		/* Include the reader page */
6010		nr_pages++;
6011
6012		/* Allocate the new size buffer */
6013		INIT_LIST_HEAD(&cpu_buffer->new_pages);
6014		if (__rb_allocate_pages(cpu_buffer, nr_pages,
6015					&cpu_buffer->new_pages)) {
6016			/* not enough memory for new pages */
6017			err = -ENOMEM;
6018			goto error;
6019		}
6020	}
6021
6022	for_each_buffer_cpu(buffer, cpu) {
6023
6024		if (!cpumask_test_cpu(cpu, buffer->cpumask))
6025			continue;
6026
6027		cpu_buffer = buffer->buffers[cpu];
6028
6029		/* Clear the head bit to make the link list normal to read */
6030		rb_head_page_deactivate(cpu_buffer);
6031
6032		/* Now walk the list and free all the old sub buffers */
6033		list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) {
6034			list_del_init(&bpage->list);
6035			free_buffer_page(bpage);
6036		}
6037		/* The above loop stopped an the last page needing to be freed */
6038		bpage = list_entry(cpu_buffer->pages, struct buffer_page, list);
6039		free_buffer_page(bpage);
6040
6041		/* Free the current reader page */
6042		free_buffer_page(cpu_buffer->reader_page);
6043
6044		/* One page was allocated for the reader page */
6045		cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next,
6046						     struct buffer_page, list);
6047		list_del_init(&cpu_buffer->reader_page->list);
6048
6049		/* The cpu_buffer pages are a link list with no head */
6050		cpu_buffer->pages = cpu_buffer->new_pages.next;
6051		cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev;
6052		cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next;
6053
6054		/* Clear the new_pages list */
6055		INIT_LIST_HEAD(&cpu_buffer->new_pages);
6056
6057		cpu_buffer->head_page
6058			= list_entry(cpu_buffer->pages, struct buffer_page, list);
6059		cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
6060
6061		cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update;
6062		cpu_buffer->nr_pages_to_update = 0;
6063
6064		free_pages((unsigned long)cpu_buffer->free_page, old_order);
6065		cpu_buffer->free_page = NULL;
6066
6067		rb_head_page_activate(cpu_buffer);
6068
6069		rb_check_pages(cpu_buffer);
6070	}
6071
6072	atomic_dec(&buffer->record_disabled);
6073	mutex_unlock(&buffer->mutex);
6074
6075	return 0;
6076
6077error:
6078	buffer->subbuf_order = old_order;
6079	buffer->subbuf_size = old_size;
6080
6081	atomic_dec(&buffer->record_disabled);
6082	mutex_unlock(&buffer->mutex);
6083
6084	for_each_buffer_cpu(buffer, cpu) {
6085		cpu_buffer = buffer->buffers[cpu];
6086
6087		if (!cpu_buffer->nr_pages_to_update)
6088			continue;
6089
6090		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) {
6091			list_del_init(&bpage->list);
6092			free_buffer_page(bpage);
6093		}
6094	}
6095
6096	return err;
6097}
6098EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
6099
6100static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
6101{
6102	struct page *page;
6103
6104	if (cpu_buffer->meta_page)
6105		return 0;
6106
6107	page = alloc_page(GFP_USER | __GFP_ZERO);
6108	if (!page)
6109		return -ENOMEM;
6110
6111	cpu_buffer->meta_page = page_to_virt(page);
6112
6113	return 0;
6114}
6115
6116static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
6117{
6118	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
6119
6120	free_page(addr);
6121	cpu_buffer->meta_page = NULL;
6122}
6123
6124static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
6125				   unsigned long *subbuf_ids)
6126{
6127	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
6128	unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
6129	struct buffer_page *first_subbuf, *subbuf;
6130	int id = 0;
6131
6132	subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
6133	cpu_buffer->reader_page->id = id++;
6134
6135	first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
6136	do {
6137		if (WARN_ON(id >= nr_subbufs))
6138			break;
6139
6140		subbuf_ids[id] = (unsigned long)subbuf->page;
6141		subbuf->id = id;
6142
6143		rb_inc_page(&subbuf);
6144		id++;
6145	} while (subbuf != first_subbuf);
6146
6147	/* install subbuf ID to kern VA translation */
6148	cpu_buffer->subbuf_ids = subbuf_ids;
6149
6150	meta->meta_page_size = PAGE_SIZE;
6151	meta->meta_struct_len = sizeof(*meta);
6152	meta->nr_subbufs = nr_subbufs;
6153	meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
6154
6155	rb_update_meta_page(cpu_buffer);
6156}
6157
6158static struct ring_buffer_per_cpu *
6159rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
6160{
6161	struct ring_buffer_per_cpu *cpu_buffer;
6162
6163	if (!cpumask_test_cpu(cpu, buffer->cpumask))
6164		return ERR_PTR(-EINVAL);
6165
6166	cpu_buffer = buffer->buffers[cpu];
6167
6168	mutex_lock(&cpu_buffer->mapping_lock);
6169
6170	if (!cpu_buffer->mapped) {
6171		mutex_unlock(&cpu_buffer->mapping_lock);
6172		return ERR_PTR(-ENODEV);
6173	}
6174
6175	return cpu_buffer;
6176}
6177
6178static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
6179{
6180	mutex_unlock(&cpu_buffer->mapping_lock);
6181}
6182
6183/*
6184 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need
6185 * to be set-up or torn-down.
6186 */
6187static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer,
6188			       bool inc)
6189{
6190	unsigned long flags;
6191
6192	lockdep_assert_held(&cpu_buffer->mapping_lock);
6193
6194	if (inc && cpu_buffer->mapped == UINT_MAX)
6195		return -EBUSY;
6196
6197	if (WARN_ON(!inc && cpu_buffer->mapped == 0))
6198		return -EINVAL;
6199
6200	mutex_lock(&cpu_buffer->buffer->mutex);
6201	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6202
6203	if (inc)
6204		cpu_buffer->mapped++;
6205	else
6206		cpu_buffer->mapped--;
6207
6208	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6209	mutex_unlock(&cpu_buffer->buffer->mutex);
6210
6211	return 0;
6212}
6213
6214/*
6215 *   +--------------+  pgoff == 0
6216 *   |   meta page  |
6217 *   +--------------+  pgoff == 1
6218 *   | subbuffer 0  |
6219 *   |              |
6220 *   +--------------+  pgoff == (1 + (1 << subbuf_order))
6221 *   | subbuffer 1  |
6222 *   |              |
6223 *         ...
6224 */
6225#ifdef CONFIG_MMU
6226static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
6227			struct vm_area_struct *vma)
6228{
6229	unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff;
6230	unsigned int subbuf_pages, subbuf_order;
6231	struct page **pages;
6232	int p = 0, s = 0;
6233	int err;
6234
6235	/* Refuse MP_PRIVATE or writable mappings */
6236	if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC ||
6237	    !(vma->vm_flags & VM_MAYSHARE))
6238		return -EPERM;
6239
6240	/*
6241	 * Make sure the mapping cannot become writable later. Also tell the VM
6242	 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND).
6243	 */
6244	vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP,
6245		     VM_MAYWRITE);
6246
6247	lockdep_assert_held(&cpu_buffer->mapping_lock);
6248
6249	subbuf_order = cpu_buffer->buffer->subbuf_order;
6250	subbuf_pages = 1 << subbuf_order;
6251
6252	nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */
6253	nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */
6254
6255	vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT;
6256	if (!vma_pages || vma_pages > nr_pages)
6257		return -EINVAL;
6258
6259	nr_pages = vma_pages;
6260
6261	pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL);
6262	if (!pages)
6263		return -ENOMEM;
6264
6265	if (!pgoff) {
6266		pages[p++] = virt_to_page(cpu_buffer->meta_page);
6267
6268		/*
6269		 * TODO: Align sub-buffers on their size, once
6270		 * vm_insert_pages() supports the zero-page.
6271		 */
6272	} else {
6273		/* Skip the meta-page */
6274		pgoff--;
6275
6276		if (pgoff % subbuf_pages) {
6277			err = -EINVAL;
6278			goto out;
6279		}
6280
6281		s += pgoff / subbuf_pages;
6282	}
6283
6284	while (p < nr_pages) {
6285		struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]);
6286		int off = 0;
6287
6288		if (WARN_ON_ONCE(s >= nr_subbufs)) {
6289			err = -EINVAL;
6290			goto out;
6291		}
6292
6293		for (; off < (1 << (subbuf_order)); off++, page++) {
6294			if (p >= nr_pages)
6295				break;
6296
6297			pages[p++] = page;
6298		}
6299		s++;
6300	}
6301
6302	err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages);
6303
6304out:
6305	kfree(pages);
6306
6307	return err;
6308}
6309#else
6310static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer,
6311			struct vm_area_struct *vma)
6312{
6313	return -EOPNOTSUPP;
6314}
6315#endif
6316
6317int ring_buffer_map(struct trace_buffer *buffer, int cpu,
6318		    struct vm_area_struct *vma)
6319{
6320	struct ring_buffer_per_cpu *cpu_buffer;
6321	unsigned long flags, *subbuf_ids;
6322	int err = 0;
6323
6324	if (!cpumask_test_cpu(cpu, buffer->cpumask))
6325		return -EINVAL;
6326
6327	cpu_buffer = buffer->buffers[cpu];
6328
6329	mutex_lock(&cpu_buffer->mapping_lock);
6330
6331	if (cpu_buffer->mapped) {
6332		err = __rb_map_vma(cpu_buffer, vma);
6333		if (!err)
6334			err = __rb_inc_dec_mapped(cpu_buffer, true);
6335		mutex_unlock(&cpu_buffer->mapping_lock);
6336		return err;
6337	}
6338
6339	/* prevent another thread from changing buffer/sub-buffer sizes */
6340	mutex_lock(&buffer->mutex);
6341
6342	err = rb_alloc_meta_page(cpu_buffer);
6343	if (err)
6344		goto unlock;
6345
6346	/* subbuf_ids include the reader while nr_pages does not */
6347	subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL);
6348	if (!subbuf_ids) {
6349		rb_free_meta_page(cpu_buffer);
6350		err = -ENOMEM;
6351		goto unlock;
6352	}
6353
6354	atomic_inc(&cpu_buffer->resize_disabled);
6355
6356	/*
6357	 * Lock all readers to block any subbuf swap until the subbuf IDs are
6358	 * assigned.
6359	 */
6360	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6361	rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
6362	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6363
6364	err = __rb_map_vma(cpu_buffer, vma);
6365	if (!err) {
6366		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6367		cpu_buffer->mapped = 1;
6368		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6369	} else {
6370		kfree(cpu_buffer->subbuf_ids);
6371		cpu_buffer->subbuf_ids = NULL;
6372		rb_free_meta_page(cpu_buffer);
6373	}
6374
6375unlock:
6376	mutex_unlock(&buffer->mutex);
6377	mutex_unlock(&cpu_buffer->mapping_lock);
6378
6379	return err;
6380}
6381
6382int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
6383{
6384	struct ring_buffer_per_cpu *cpu_buffer;
6385	unsigned long flags;
6386	int err = 0;
6387
6388	if (!cpumask_test_cpu(cpu, buffer->cpumask))
6389		return -EINVAL;
6390
6391	cpu_buffer = buffer->buffers[cpu];
6392
6393	mutex_lock(&cpu_buffer->mapping_lock);
6394
6395	if (!cpu_buffer->mapped) {
6396		err = -ENODEV;
6397		goto out;
6398	} else if (cpu_buffer->mapped > 1) {
6399		__rb_inc_dec_mapped(cpu_buffer, false);
6400		goto out;
6401	}
6402
6403	mutex_lock(&buffer->mutex);
6404	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6405
6406	cpu_buffer->mapped = 0;
6407
6408	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6409
6410	kfree(cpu_buffer->subbuf_ids);
6411	cpu_buffer->subbuf_ids = NULL;
6412	rb_free_meta_page(cpu_buffer);
6413	atomic_dec(&cpu_buffer->resize_disabled);
6414
6415	mutex_unlock(&buffer->mutex);
6416
6417out:
6418	mutex_unlock(&cpu_buffer->mapping_lock);
6419
6420	return err;
6421}
6422
6423int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
6424{
6425	struct ring_buffer_per_cpu *cpu_buffer;
6426	struct buffer_page *reader;
6427	unsigned long missed_events;
6428	unsigned long reader_size;
6429	unsigned long flags;
6430
6431	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
6432	if (IS_ERR(cpu_buffer))
6433		return (int)PTR_ERR(cpu_buffer);
6434
6435	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
6436
6437consume:
6438	if (rb_per_cpu_empty(cpu_buffer))
6439		goto out;
6440
6441	reader_size = rb_page_size(cpu_buffer->reader_page);
6442
6443	/*
6444	 * There are data to be read on the current reader page, we can
6445	 * return to the caller. But before that, we assume the latter will read
6446	 * everything. Let's update the kernel reader accordingly.
6447	 */
6448	if (cpu_buffer->reader_page->read < reader_size) {
6449		while (cpu_buffer->reader_page->read < reader_size)
6450			rb_advance_reader(cpu_buffer);
6451		goto out;
6452	}
6453
6454	reader = rb_get_reader_page(cpu_buffer);
6455	if (WARN_ON(!reader))
6456		goto out;
6457
6458	/* Check if any events were dropped */
6459	missed_events = cpu_buffer->lost_events;
6460
6461	if (cpu_buffer->reader_page != cpu_buffer->commit_page) {
6462		if (missed_events) {
6463			struct buffer_data_page *bpage = reader->page;
6464			unsigned int commit;
6465			/*
6466			 * Use the real_end for the data size,
6467			 * This gives us a chance to store the lost events
6468			 * on the page.
6469			 */
6470			if (reader->real_end)
6471				local_set(&bpage->commit, reader->real_end);
6472			/*
6473			 * If there is room at the end of the page to save the
6474			 * missed events, then record it there.
6475			 */
6476			commit = rb_page_size(reader);
6477			if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
6478				memcpy(&bpage->data[commit], &missed_events,
6479				       sizeof(missed_events));
6480				local_add(RB_MISSED_STORED, &bpage->commit);
6481			}
6482			local_add(RB_MISSED_EVENTS, &bpage->commit);
6483		}
6484	} else {
6485		/*
6486		 * There really shouldn't be any missed events if the commit
6487		 * is on the reader page.
6488		 */
6489		WARN_ON_ONCE(missed_events);
6490	}
6491
6492	cpu_buffer->lost_events = 0;
6493
6494	goto consume;
6495
6496out:
6497	/* Some archs do not have data cache coherency between kernel and user-space */
6498	flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page));
6499
6500	rb_update_meta_page(cpu_buffer);
6501
6502	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
6503	rb_put_mapped_buffer(cpu_buffer);
6504
6505	return 0;
6506}
6507
6508/*
6509 * We only allocate new buffers, never free them if the CPU goes down.
6510 * If we were to free the buffer, then the user would lose any trace that was in
6511 * the buffer.
6512 */
6513int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
6514{
6515	struct trace_buffer *buffer;
6516	long nr_pages_same;
6517	int cpu_i;
6518	unsigned long nr_pages;
6519
6520	buffer = container_of(node, struct trace_buffer, node);
6521	if (cpumask_test_cpu(cpu, buffer->cpumask))
6522		return 0;
6523
6524	nr_pages = 0;
6525	nr_pages_same = 1;
6526	/* check if all cpu sizes are same */
6527	for_each_buffer_cpu(buffer, cpu_i) {
6528		/* fill in the size from first enabled cpu */
6529		if (nr_pages == 0)
6530			nr_pages = buffer->buffers[cpu_i]->nr_pages;
6531		if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
6532			nr_pages_same = 0;
6533			break;
6534		}
6535	}
6536	/* allocate minimum pages, user can later expand it */
6537	if (!nr_pages_same)
6538		nr_pages = 2;
6539	buffer->buffers[cpu] =
6540		rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
6541	if (!buffer->buffers[cpu]) {
6542		WARN(1, "failed to allocate ring buffer on CPU %u\n",
6543		     cpu);
6544		return -ENOMEM;
6545	}
6546	smp_wmb();
6547	cpumask_set_cpu(cpu, buffer->cpumask);
6548	return 0;
6549}
6550
6551#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
6552/*
6553 * This is a basic integrity check of the ring buffer.
6554 * Late in the boot cycle this test will run when configured in.
6555 * It will kick off a thread per CPU that will go into a loop
6556 * writing to the per cpu ring buffer various sizes of data.
6557 * Some of the data will be large items, some small.
6558 *
6559 * Another thread is created that goes into a spin, sending out
6560 * IPIs to the other CPUs to also write into the ring buffer.
6561 * this is to test the nesting ability of the buffer.
6562 *
6563 * Basic stats are recorded and reported. If something in the
6564 * ring buffer should happen that's not expected, a big warning
6565 * is displayed and all ring buffers are disabled.
6566 */
6567static struct task_struct *rb_threads[NR_CPUS] __initdata;
6568
6569struct rb_test_data {
6570	struct trace_buffer *buffer;
6571	unsigned long		events;
6572	unsigned long		bytes_written;
6573	unsigned long		bytes_alloc;
6574	unsigned long		bytes_dropped;
6575	unsigned long		events_nested;
6576	unsigned long		bytes_written_nested;
6577	unsigned long		bytes_alloc_nested;
6578	unsigned long		bytes_dropped_nested;
6579	int			min_size_nested;
6580	int			max_size_nested;
6581	int			max_size;
6582	int			min_size;
6583	int			cpu;
6584	int			cnt;
6585};
6586
6587static struct rb_test_data rb_data[NR_CPUS] __initdata;
6588
6589/* 1 meg per cpu */
6590#define RB_TEST_BUFFER_SIZE	1048576
6591
6592static char rb_string[] __initdata =
6593	"abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
6594	"?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
6595	"!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
6596
6597static bool rb_test_started __initdata;
6598
6599struct rb_item {
6600	int size;
6601	char str[];
6602};
6603
6604static __init int rb_write_something(struct rb_test_data *data, bool nested)
6605{
6606	struct ring_buffer_event *event;
6607	struct rb_item *item;
6608	bool started;
6609	int event_len;
6610	int size;
6611	int len;
6612	int cnt;
6613
6614	/* Have nested writes different that what is written */
6615	cnt = data->cnt + (nested ? 27 : 0);
6616
6617	/* Multiply cnt by ~e, to make some unique increment */
6618	size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
6619
6620	len = size + sizeof(struct rb_item);
6621
6622	started = rb_test_started;
6623	/* read rb_test_started before checking buffer enabled */
6624	smp_rmb();
6625
6626	event = ring_buffer_lock_reserve(data->buffer, len);
6627	if (!event) {
6628		/* Ignore dropped events before test starts. */
6629		if (started) {
6630			if (nested)
6631				data->bytes_dropped += len;
6632			else
6633				data->bytes_dropped_nested += len;
6634		}
6635		return len;
6636	}
6637
6638	event_len = ring_buffer_event_length(event);
6639
6640	if (RB_WARN_ON(data->buffer, event_len < len))
6641		goto out;
6642
6643	item = ring_buffer_event_data(event);
6644	item->size = size;
6645	memcpy(item->str, rb_string, size);
6646
6647	if (nested) {
6648		data->bytes_alloc_nested += event_len;
6649		data->bytes_written_nested += len;
6650		data->events_nested++;
6651		if (!data->min_size_nested || len < data->min_size_nested)
6652			data->min_size_nested = len;
6653		if (len > data->max_size_nested)
6654			data->max_size_nested = len;
6655	} else {
6656		data->bytes_alloc += event_len;
6657		data->bytes_written += len;
6658		data->events++;
6659		if (!data->min_size || len < data->min_size)
6660			data->max_size = len;
6661		if (len > data->max_size)
6662			data->max_size = len;
6663	}
6664
6665 out:
6666	ring_buffer_unlock_commit(data->buffer);
6667
6668	return 0;
6669}
6670
6671static __init int rb_test(void *arg)
6672{
6673	struct rb_test_data *data = arg;
6674
6675	while (!kthread_should_stop()) {
6676		rb_write_something(data, false);
6677		data->cnt++;
6678
6679		set_current_state(TASK_INTERRUPTIBLE);
6680		/* Now sleep between a min of 100-300us and a max of 1ms */
6681		usleep_range(((data->cnt % 3) + 1) * 100, 1000);
6682	}
6683
6684	return 0;
6685}
6686
6687static __init void rb_ipi(void *ignore)
6688{
6689	struct rb_test_data *data;
6690	int cpu = smp_processor_id();
6691
6692	data = &rb_data[cpu];
6693	rb_write_something(data, true);
6694}
6695
6696static __init int rb_hammer_test(void *arg)
6697{
6698	while (!kthread_should_stop()) {
6699
6700		/* Send an IPI to all cpus to write data! */
6701		smp_call_function(rb_ipi, NULL, 1);
6702		/* No sleep, but for non preempt, let others run */
6703		schedule();
6704	}
6705
6706	return 0;
6707}
6708
6709static __init int test_ringbuffer(void)
6710{
6711	struct task_struct *rb_hammer;
6712	struct trace_buffer *buffer;
6713	int cpu;
6714	int ret = 0;
6715
6716	if (security_locked_down(LOCKDOWN_TRACEFS)) {
6717		pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
6718		return 0;
6719	}
6720
6721	pr_info("Running ring buffer tests...\n");
6722
6723	buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
6724	if (WARN_ON(!buffer))
6725		return 0;
6726
6727	/* Disable buffer so that threads can't write to it yet */
6728	ring_buffer_record_off(buffer);
6729
6730	for_each_online_cpu(cpu) {
6731		rb_data[cpu].buffer = buffer;
6732		rb_data[cpu].cpu = cpu;
6733		rb_data[cpu].cnt = cpu;
6734		rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu],
6735						     cpu, "rbtester/%u");
6736		if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
6737			pr_cont("FAILED\n");
6738			ret = PTR_ERR(rb_threads[cpu]);
6739			goto out_free;
6740		}
6741	}
6742
6743	/* Now create the rb hammer! */
6744	rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
6745	if (WARN_ON(IS_ERR(rb_hammer))) {
6746		pr_cont("FAILED\n");
6747		ret = PTR_ERR(rb_hammer);
6748		goto out_free;
6749	}
6750
6751	ring_buffer_record_on(buffer);
6752	/*
6753	 * Show buffer is enabled before setting rb_test_started.
6754	 * Yes there's a small race window where events could be
6755	 * dropped and the thread wont catch it. But when a ring
6756	 * buffer gets enabled, there will always be some kind of
6757	 * delay before other CPUs see it. Thus, we don't care about
6758	 * those dropped events. We care about events dropped after
6759	 * the threads see that the buffer is active.
6760	 */
6761	smp_wmb();
6762	rb_test_started = true;
6763
6764	set_current_state(TASK_INTERRUPTIBLE);
6765	/* Just run for 10 seconds */;
6766	schedule_timeout(10 * HZ);
6767
6768	kthread_stop(rb_hammer);
6769
6770 out_free:
6771	for_each_online_cpu(cpu) {
6772		if (!rb_threads[cpu])
6773			break;
6774		kthread_stop(rb_threads[cpu]);
6775	}
6776	if (ret) {
6777		ring_buffer_free(buffer);
6778		return ret;
6779	}
6780
6781	/* Report! */
6782	pr_info("finished\n");
6783	for_each_online_cpu(cpu) {
6784		struct ring_buffer_event *event;
6785		struct rb_test_data *data = &rb_data[cpu];
6786		struct rb_item *item;
6787		unsigned long total_events;
6788		unsigned long total_dropped;
6789		unsigned long total_written;
6790		unsigned long total_alloc;
6791		unsigned long total_read = 0;
6792		unsigned long total_size = 0;
6793		unsigned long total_len = 0;
6794		unsigned long total_lost = 0;
6795		unsigned long lost;
6796		int big_event_size;
6797		int small_event_size;
6798
6799		ret = -1;
6800
6801		total_events = data->events + data->events_nested;
6802		total_written = data->bytes_written + data->bytes_written_nested;
6803		total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
6804		total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
6805
6806		big_event_size = data->max_size + data->max_size_nested;
6807		small_event_size = data->min_size + data->min_size_nested;
6808
6809		pr_info("CPU %d:\n", cpu);
6810		pr_info("              events:    %ld\n", total_events);
6811		pr_info("       dropped bytes:    %ld\n", total_dropped);
6812		pr_info("       alloced bytes:    %ld\n", total_alloc);
6813		pr_info("       written bytes:    %ld\n", total_written);
6814		pr_info("       biggest event:    %d\n", big_event_size);
6815		pr_info("      smallest event:    %d\n", small_event_size);
6816
6817		if (RB_WARN_ON(buffer, total_dropped))
6818			break;
6819
6820		ret = 0;
6821
6822		while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
6823			total_lost += lost;
6824			item = ring_buffer_event_data(event);
6825			total_len += ring_buffer_event_length(event);
6826			total_size += item->size + sizeof(struct rb_item);
6827			if (memcmp(&item->str[0], rb_string, item->size) != 0) {
6828				pr_info("FAILED!\n");
6829				pr_info("buffer had: %.*s\n", item->size, item->str);
6830				pr_info("expected:   %.*s\n", item->size, rb_string);
6831				RB_WARN_ON(buffer, 1);
6832				ret = -1;
6833				break;
6834			}
6835			total_read++;
6836		}
6837		if (ret)
6838			break;
6839
6840		ret = -1;
6841
6842		pr_info("         read events:   %ld\n", total_read);
6843		pr_info("         lost events:   %ld\n", total_lost);
6844		pr_info("        total events:   %ld\n", total_lost + total_read);
6845		pr_info("  recorded len bytes:   %ld\n", total_len);
6846		pr_info(" recorded size bytes:   %ld\n", total_size);
6847		if (total_lost) {
6848			pr_info(" With dropped events, record len and size may not match\n"
6849				" alloced and written from above\n");
6850		} else {
6851			if (RB_WARN_ON(buffer, total_len != total_alloc ||
6852				       total_size != total_written))
6853				break;
6854		}
6855		if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
6856			break;
6857
6858		ret = 0;
6859	}
6860	if (!ret)
6861		pr_info("Ring buffer PASSED!\n");
6862
6863	ring_buffer_free(buffer);
6864	return 0;
6865}
6866
6867late_initcall(test_ringbuffer);
6868#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
6869