2011-08-02 49 views
9

我正在测试内核异步io函数(而不是posix aio),并试图弄清楚它是如何工作的。下面的代码是一个完整的程序,我只需将一个数组重复写入使用O_DIRECT打开的文件。我在回调函数“write missed bytes expect 1024 got 0”中看到错误(请参阅work_done()中的fprintf语句)。linux内核aio功能

对于那些不熟悉内核的AIO,下面的代码执行以下操作:

  1. 初始化一些结构
  2. 准备AIO(io_prep_pwrite)
  3. 提交的IO请求(io_submit)
  4. 检查事件完成(io_getevents)
  5. 调用回调函数以查看一切是否正常。

我在步骤5得到一个错误。如果我不使用O_DIRECT打开文件,事情工作正常,但它击败了异步写入的目的。 有人能告诉我我做错了什么吗?这是内核aio的正确用法,例如,我使用回调是否正确?对O_DIRECT的使用有任何限制吗?

我编译使用 'GCC -Wall test.c的-laio' 提前

感谢。

/* 
* File: myaiocp.c 
* Author: kmehta 
* 
* Created on July 11, 2011, 12:50 PM 
* 
* 
* Testing kernel aio. 
* Program creates a 2D matrix and writes it multiple times to create a file of desired size. 
* Writes are performed using kernel aio functions (io_prep_pwrite, io_submit, etc.) 
*/ 
#define _GNU_SOURCE 
#define _XOPEN_SOURCE 600 

#include <stdio.h> 
#include <stdlib.h> 
#include <getopt.h> 
#include <pthread.h> 
#include <fcntl.h> 
#include <string.h> 
#include <sys/uio.h> 
#include <sys/time.h> 
#include <omp.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/stat.h> 
#include <errno.h> 
#include <libaio.h> 

char ** buf; 
long seg_size; 
int seg_rows; 
double total_size; 
char * filename; 
static int wait_count = 0; 

void io_task(); 
void cleanup(); 
void allocate_2D_matrix(int[]); 
int file_open(char *); 
void wr_done(io_context_t ctx, struct iocb* iocb, long res, long res2); 

int main(int argc, char **argv) { 
    total_size = 1048576;  //1MB 
    seg_size = 1024;   //1kB 
    seg_rows = 1024; 
    filename = "aio.out"; 

    int dims[] = {seg_rows, seg_size}; 
    allocate_2D_matrix(dims); //Creates 2D matrix 

    io_task(); 
    cleanup(); 

    return 0; 
} 

/* 
* Create a 2D matrix 
*/ 
void allocate_2D_matrix(int dims[2]) { 
    int i; 
    char *data; 

    //create the matrix 
    data = (char *) calloc(1, dims[0] * dims[1] * sizeof (char)); 
    if (data == NULL) { 
     printf("\nCould not allocate memory for matrix.\n"); 
     exit(1); 
    } 

    buf = (char **) malloc(dims[0] * sizeof (char *)); 
    if (buf == NULL) { 
     printf("\nCould not allocate memory for matrix.\n"); 
     exit(1); 
    } 

    for (i = 0; i < dims[0]; i++) { 
     buf[i] = &(data[i * dims[1]]); 
    } 
} 

static void io_error(const char *func, int rc) 
{ 
    if (rc == -ENOSYS) 
     fprintf(stderr, "AIO not in this kernel\n"); 
    else if (rc < 0) 
     fprintf(stderr, "%s: %s\n", func, strerror(-rc)); 
    else 
     fprintf(stderr, "%s: error %d\n", func, rc); 

    exit(1); 
} 

/* 
* Callback function 
*/ 
static void work_done(io_context_t ctx, struct iocb *iocb, long res, long res2) 
{ 

    if (res2 != 0) { 
     io_error("aio write", res2); 
     } 

     if (res != iocb->u.c.nbytes) { 
      fprintf(stderr, "write missed bytes expect %lu got %ld\n", 
        iocb->u.c.nbytes, res2); 
      exit(1); 
     } 
     wait_count --; 
     printf("%d ", wait_count); 
} 

/* 
* Wait routine. Get events and call the callback function work_done() 
*/ 
int io_wait_run(io_context_t ctx, long iter) 
{ 
     struct io_event events[iter]; 
     struct io_event *ep; 
     int ret, n; 

     /* 
     * get up to aio_maxio events at a time. 
     */ 
     ret = n = io_getevents(ctx, iter, iter, events, NULL); 
     printf("got %d events\n", n); 
     /* 
     * Call the callback functions for each event. 
     */ 
     for (ep = events ; n-- > 0 ; ep++) { 
      io_callback_t cb = (io_callback_t)ep->data ; struct iocb *iocb = ep->obj ; cb(ctx, iocb, ep->res, ep->res2); 
     } 
     return ret; 
} 

void io_task() { 
    long offset = 0; 
    int bufIndex = 0; 

    //Open file 
    int fd = file_open(filename); 

    //Initialize structures 
    long i; 
    long iter = total_size/seg_size; //No. of iterations to reach desired file size (total_size) 
    io_context_t myctx; 
    if(0 != io_queue_init(iter, &myctx)) 
    { 
     perror("Could not initialize io queue"); 
     exit(EXIT_FAILURE); 
    } 
    struct iocb * ioq[iter]; 

    //loop through iter times to reach desired file size 
    for (i = 0; i < iter; i++) { 
     struct iocb *io = (struct iocb*) malloc(sizeof (struct iocb)); 
     io_prep_pwrite(io, fd, buf[bufIndex], seg_size, offset); 
     io_set_callback(io, work_done); 
     ioq[i] = io; 

     offset += seg_size; 
     bufIndex ++; 
     if (bufIndex > seg_rows - 1) //If entire matrix written, start again from index 0 
      bufIndex = 0; 
    } 

    printf("done preparing. Now submitting..\n"); 
    if(iter != io_submit(myctx, iter, ioq)) 
    { 
     perror("Failure on submit"); 
     exit(EXIT_FAILURE); 
    } 

    printf("now awaiting completion..\n"); 
    wait_count = iter; 
    int res; 

    while (wait_count) { 
     res = io_wait_run(myctx, iter); 
     if (res < 0) 
      io_error("io_wait_run", res); 
    } 

    close(fd); 
} 

void cleanup() { 
    free(buf[0]); 
    free(buf); 
} 

int file_open(char *filename) { 
    int fd; 
    if (-1 == (fd = open(filename, O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC, 0666))) { 
     printf("\nError opening file. \n"); 
     exit(-1); 
    } 

    return fd; 
} 

回答

7

首先,抓好使用libaio而不是POSIX aio

对O_DIRECT的使用有任何限制吗?

我不是100%肯定这是真正的问题,但O_DIRECT有一定的要求(从TLPI报价居多):

  • 正在传输的数据缓冲区必须在内存对齐边界是块大小的倍数(使用posix_memalign
  • 数据传输开始的文件或设备偏移量必须是块大小的倍数
  • 要传输的数据的长度必须是块大小的倍数

一眼就可以看出您没有采取aby预防措施来对齐allocate_2D_matrix中的内存。

如果我不使用O_DIRECT打开文件,一切工作正常,但它 比拥有异步写入的目的。

这种情况并非如此。异步I/O运行良好,没有O_DIRECT(例如,想到系统调用的数量被削减)。

+0

+1对于TLPI参考。这是有史以来最棒的书。 – hari

+0

@cnicutar太棒了。这解决了它。有几个问题: 1。你为什么建议使用libaio而不是posix aio?我没有太多posix aio的经验,所以我不知道。 2.你为什么说异步I/O不适合O/DIR?它如何减少没有。系统调用?事实上,我认为O_DIRECT更有效,因为它绕过了内核缓冲。 – jitihsk

+1

'mmap'可能是比'posix_memalign'获得4k对齐内存更好的方法。后者肯定不得不在分配开始时浪费4k,因为它在页面粒度上进行分配(假设你足够的分配,'posix_memalign'通过'mmap'服务请求,而不是' brk')。 –