clang linux kfifo ringbuffer


原文链接: clang linux kfifo ringbuffer

git@github.com:dennis-musk/ringbuffer.git
Description: 环形缓冲区的实现

  • Others: 1.min的妙用,(验证剩余有效空间和要求要读出或者写入空间 取最小值)
  • 2.利用unsigned int 的回环,in 和 out一直在加,加到0xffffffff则归为0,任然满足计算偏移等。
  • 3.分为2部进行copy,一为当前偏移到size-1 二为剩余部分0到(len减去一中的个数)
  • 4.unsiged int下的(in - out)始终为in和out之间的距离,(in溢出后in:0x1 - out:0xffffffff = 2任然满足)(缓冲区中未脏的数据).
  • 5.计算偏移(in) & (size - 1) <==> in%size


环形缓冲区经常被使用到,尤其在生产者和消费者的模型中,假设生产者专门用于产生数据,而消费者专门用于处理数据,由于各种原因,可能生产者和消费者产生数据和处理数据的速度不一,比如如果处理速度有慢又快,在慢的时候,消费者产生的数据来不及处理的可能被丢弃,或者强制让生产者降速等待,在快的时候,又有可能太快,而生产者供给不了,那么消费者也必须等待.正是由于快慢不一,缓冲区的存在则恰可以进行中和,协调生产者和消费者速度不一的问题.

一.内核kfifo

首先学习一下linux内核是如何设计环形缓冲区的,毕竟内核代码精炼之至,令人叹为观止.
这里是linux2.6.27的代码


1.kfifo的结构类型

struct kfifo {
    unsigned char *buffer;  /* the buffer holding the data */
    unsigned int size;  /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;   /* data is extracted from off. (out % size) */
    spinlock_t *lock;   /* protects concurrent modifications */
};

这里发现我们用in out描述put get操作fifo的位置,用的是unsigned int类型,后面如果我们想获得in实际在fifo的位置,用in&(size-1),这就是size下面要采用用2的乘方的原因.
而牵涉到in,out一起计算的时候,不需要进行&运算获取实际位置,即使有溢出问题也是满足的,可以使用补码进行验算,最后都是看成无符号的数.
在in out增加和减少,会自己溢出回归.
2.kfifo_init

struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
             gfp_t gfp_mask, spinlock_t *lock)
{
    struct kfifo *fifo;

    /* size must be a power of 2 */
    BUG_ON(!is_power_of_2(size));

    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
        return ERR_PTR(-ENOMEM);

    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
}

bool is_power_of_2(unsigned long n)
{
    return (n != 0 && ((n & (n - 1)) == 0));
}

申请分配一个kfifo的结构体指针,初始化buffer使用的是函数外部的空间,in,out为0,size其中必须为2的乘方,意义为下面size-1方便进行与运算.

3.kfifo_alloc

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;

    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        BUG_ON(size > 0x80000000);
        size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);

    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
        kfree(buffer);

    return ret;
}

这个函数主要就是申请size的buffer空间,然后调用kfifo_init初始化.
4.kfifo_free

void kfifo_free(struct kfifo *fifo)
{
    kfree(fifo->buffer);
    kfree(fifo);
}

这个函数和kfifo_alloc配合使用,用于释放内存,先释放buffer,再释放结构体指针fifo.
5.kfifo_reset

static inline void __kfifo_reset(struct kfifo *fifo)
{
    fifo->in = fifo->out = 0;
}

static inline void kfifo_reset(struct kfifo *fifo)
{
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
}

重置in out位置为0
6.kfifo_len

static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
    return fifo->in - fifo->out;
}

static inline unsigned int kfifo_len(struct kfifo *fifo)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

得到fifo中数据的长度,用fifo->in - fifo->out是没有问题的,即便在unsigned int型溢出时也是对的,具体可以使用补码进行运算.
7.kfifo_put

unsigned int __kfifo_put(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //put进去的字节数不能大于fifo剩余的字节数
    len = min(len, fifo->size - fifo->in + fifo->out);
    smp_mb();

    /*fifo->in & (fifo->size - 1)通过这个与运算,相当于把
      fifo->in是size的倍数给去掉了,得到的是在size里的位
      置,就是在这个buffer的位置.
      而l所表示的是要put进去的字节数和从in开始到buffer
      结尾字节数的小值,就是从in到buffer结尾能不能放下
      目的字节数*/
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    /*如果放不下,copy分两部分,一部分从in在位置复制l字
      节数.一部分从buffer开始复制len-l字节数,如果放得
      下,那len-l为0,一样可以*/
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);
    smp_wmb();
    //更新in所在位置
    fifo->in += len;

    return len;
}

static inline unsigned int kfifo_put(struct kfifo *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

8.kfifo_get

unsigned int __kfifo_get(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //get的字节数和fifo buffer中字节数比较,len为最终要get的字节数
    len = min(len, fifo->in - fifo->out);
    smp_rmb();

    //要get的字节数,和out所在位置到fifo buffer结尾字节数,比较
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    //和put同理,两部分,一部分copy l字节数,一部分copy len-l字节数,注意方向
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    smp_mb();
    //更新out位置
    fifo->out += len;

    return len;
}

static inline unsigned int kfifo_get(struct kfifo *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;

    spin_lock_irqsave(fifo->lock, flags);

    ret = __kfifo_get(fifo, buffer, len);
    //如果没有数据,重置
    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;

    spin_unlock_irqrestore(fifo->lock, flags)
     //返回get的字节数
    return ret;
}

二.仿造kfifo,编写的环形缓冲区ring.c ring.h


/*ring.h*/
#ifndef RING_H
#define RING_H

#include <pthread.h>

struct ring{
    unsigned char *buffer;  /* the buffer holding the data */
    unsigned int size;  /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;   /* data is extracted from off. (out % size) */
    pthread_mutex_t *lock;  /* protects concurrent modifications */
};

extern struct ring *ring_init(unsigned char *buffer, unsigned int size,pthread_mutex_t *lock);
extern struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock);
extern void ring_free(struct ring *fifo);
extern unsigned int __ring_put(struct ring *fifo,
                unsigned char *buffer, unsigned int len);
extern unsigned int __ring_get(struct ring *fifo,
                unsigned char *buffer, unsigned int len);

static inline void __ring_reset(struct ring *fifo)
{
    fifo->in = fifo->out = 0;
}
static inline void ring_reset(struct ring *fifo)
{
    unsigned long flags;

    pthread_mutex_lock(fifo->lock);

    __ring_reset(fifo);

    pthread_mutex_unlock(fifo->lock);
}

static inline unsigned int __ring_len(struct ring *fifo)
{
    return fifo->in - fifo->out;
}

static inline unsigned int ring_len(struct ring *fifo)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_len(fifo);

    pthread_mutex_unlock(fifo->lock);

    return ret;
}

static inline unsigned int ring_put(struct ring *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_put(fifo, buffer, len);

    pthread_mutex_unlock(fifo->lock);

    return ret;
}

static inline unsigned int ring_get(struct ring *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_get(fifo, buffer, len);

    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;

    pthread_mutex_unlock(fifo->lock);

    return ret;
}

#endif

/*ring.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "ring.h"

#define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
#define min(x,y) ({ \
    typeof(x) _x = (x); \
    typeof(y) _y = (y); \
    (void) (&_x == &_y);    \
    _x < _y ? _x : _y; })

struct ring *ring_init(unsigned char *buffer, unsigned int size,
                        pthread_mutex_t *lock)
{
    struct ring *fifo = NULL;

    if(!is_power_of_2(size)){
        printf("size is not power of 2\n");
        return fifo;
    }

    fifo = (struct ring *)malloc(sizeof(struct ring));
    if (!fifo){
        printf("fifo malloc error\n");
        return fifo;
    }

    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
}

struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock)
{
    unsigned char *buffer = NULL;
    struct ring *ret = NULL;


    buffer = (unsigned char *)malloc(size);
    if (!buffer){
        printf("buffer malloc error\n");
        return ret;
    }

    ret = ring_init(buffer, size, lock);

    return ret;
}
void ring_free(struct ring *fifo)
{
    free(fifo->buffer);
    free(fifo);
}

unsigned int __ring_put(struct ring *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->size - fifo->in + fifo->out);

    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);

    fifo->in += len;

    return len;
}

unsigned int __ring_get(struct ring *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->in - fifo->out);

    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);

    fifo->out += len;
    return len;
}

三.测试

测试的main.c文件如下:

/*main.c*/
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "ring.h"

struct data{
    int a;
    time_t t;
};

pthread_t tid1;
pthread_t tid2;

void sig_handler(int sig)
{
    if(sig == SIGINT){

        if(pthread_cancel(tid1) != 0){
            perror("thread cancel fail");
            exit(0);
        }

        if(pthread_cancel(tid2) != 0){
            perror("thread cancel fail");
            exit(0);
        }
        printf("\n\n 两个线程取消\n");
   }
}

void * put_proc(void * arg)
{
    signal(SIGINT, sig_handler);
    if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
        perror("pthread set cancel state fail");
        pthread_exit(NULL);
        exit(0);
    }
    if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
        perror("pthread set cancel type fail");
        pthread_exit(NULL);
        exit(0);
    }

    int i = 0;
    struct data data_put;
    struct ring * ring_buf = (struct ring *)arg;
    int len = sizeof(struct data);
    int ret;
    while(1){
        data_put.a = i;
        time(&data_put.t);
        ret = ring_put(ring_buf,(unsigned char *)&data_put,len);
        printf("ret put:%d\nput data:%d\ntime:%s\n\n",ret,data_put.a,ctime(&data_put.t));
        i++;
        sleep(2);
    }
}

void * get_proc(void * arg)
{
    signal(SIGINT, sig_handler);
    if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
        perror("pthread set cancel state fail");
        pthread_exit(NULL);
        exit(0);
    }
    if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
        perror("pthread set cancel type fail");
        pthread_exit(NULL);
        exit(0);
    }

    struct ring * ring_buf = (struct ring *)arg;
    int len = sizeof(struct data);
    struct data data_get;
    int ret;
    while(1){
        ret = ring_get(ring_buf,(unsigned char *)&data_get,len);
        printf("ret get:%d\nget data:%d\ntime:%s\n\n",ret,data_get.a,ctime(&data_get.t));
        sleep(2);
    }

}

int main(int argc, char const *argv[])
{
    signal(SIGINT, sig_handler);

    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
    struct ring * ring_buf = NULL;
    ring_buf = ring_alloc(32,&lock);

    int err;

    err = pthread_create(&tid1, NULL, put_proc, ring_buf);
    if(err){
        printf("fail create thread 1\n");
        goto end;
    }
    err = pthread_create(&tid2, NULL, get_proc, ring_buf);
    if(err){
        printf("fail create thread 2\n");
        goto end;
    }
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);

    printf("program end\n");
end:
    ring_free(ring_buf);
    return 0;
}

其中,建立了两个线程,一个用于向ring_buf写数据,一个用于向ring_buf读数据,数据定义时加上了时间信息便于查看.互斥量的使用主要用于线程同步,比如两个线程如果都向缓冲区写数据时,必须保证临界区的安全,当然也可以使用读写锁,其实更好一些,因为读的时候,也可以写.
信号处理函数,用于ctrl+c强制结束时,异步取消线程.

结果:

ret get:0
get data:1032341248
time:Sun Apr 21 12:30:00 4461252

ret put:16
put data:0
time:Mon Jan  8 17:35:33 2018

ret get:16
get data:0
time:Mon Jan  8 17:35:33 2018

ret put:16
put data:1
time:Mon Jan  8 17:35:35 2018

ret get:16
get data:1
time:Mon Jan  8 17:35:35 2018

ret put:16
put data:2
time:Mon Jan  8 17:35:37 2018

ret get:16
get data:2
time:Mon Jan  8 17:35:37 2018

ret put:16
put data:3
time:Mon Jan  8 17:35:39 2018

ret get:16
get data:3
time:Mon Jan  8 17:35:39 2018

ret put:16
put data:4
time:Mon Jan  8 17:35:41 2018

^C

 两个线程取消
program end

刚开始,get线程没有读出数据
后面就是put一个,get一个,没问题.

`