rio.c: added ability to fdatasync() from time to time while writing.
This commit is contained in:
parent
c4656119b6
commit
91f4213ddf
31
src/rio.c
31
src/rio.c
@ -48,9 +48,12 @@
|
|||||||
#include "fmacros.h"
|
#include "fmacros.h"
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include "rio.h"
|
#include "rio.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "crc64.h"
|
#include "crc64.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "redis.h"
|
||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure. */
|
||||||
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
|
||||||
@ -75,7 +78,18 @@ static off_t rioBufferTell(rio *r) {
|
|||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure. */
|
||||||
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
||||||
return fwrite(buf,len,1,r->io.file.fp);
|
size_t retval;
|
||||||
|
|
||||||
|
retval = fwrite(buf,len,1,r->io.file.fp);
|
||||||
|
r->io.file.buffered += len;
|
||||||
|
|
||||||
|
if (r->io.file.autosync &&
|
||||||
|
r->io.file.buffered >= r->io.file.autosync)
|
||||||
|
{
|
||||||
|
aof_fsync(fileno(r->io.file.fp));
|
||||||
|
r->io.file.buffered = 0;
|
||||||
|
}
|
||||||
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure. */
|
||||||
@ -109,6 +123,8 @@ static const rio rioFileIO = {
|
|||||||
void rioInitWithFile(rio *r, FILE *fp) {
|
void rioInitWithFile(rio *r, FILE *fp) {
|
||||||
*r = rioFileIO;
|
*r = rioFileIO;
|
||||||
r->io.file.fp = fp;
|
r->io.file.fp = fp;
|
||||||
|
r->io.file.buffered = 0;
|
||||||
|
r->io.file.autosync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rioInitWithBuffer(rio *r, sds s) {
|
void rioInitWithBuffer(rio *r, sds s) {
|
||||||
@ -123,6 +139,19 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
|
|||||||
r->cksum = crc64(r->cksum,buf,len);
|
r->cksum = crc64(r->cksum,buf,len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set the file-based rio object to auto-fsync every 'bytes' file written.
|
||||||
|
* By default this is set to zero that means no automatic file sync is
|
||||||
|
* performed.
|
||||||
|
*
|
||||||
|
* This feature is useful in a few contexts since when we rely on OS write
|
||||||
|
* buffers sometimes the OS buffers way too much, resulting in too many
|
||||||
|
* disk I/O concentrated in very little time. When we fsync in an explicit
|
||||||
|
* way instead the I/O pressure is more distributed across time. */
|
||||||
|
void rioSetAutoSync(rio *r, off_t bytes) {
|
||||||
|
redisAssert(r->read == rioFileIO.read);
|
||||||
|
r->io.file.autosync = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
/* ------------------------------ Higher level interface ---------------------------
|
/* ------------------------------ Higher level interface ---------------------------
|
||||||
* The following higher level functions use lower level rio.c functions to help
|
* The following higher level functions use lower level rio.c functions to help
|
||||||
* generating the Redis protocol for the Append Only File. */
|
* generating the Redis protocol for the Append Only File. */
|
||||||
|
@ -61,6 +61,8 @@ struct _rio {
|
|||||||
} buffer;
|
} buffer;
|
||||||
struct {
|
struct {
|
||||||
FILE *fp;
|
FILE *fp;
|
||||||
|
off_t buffered; /* Bytes written since last fsync. */
|
||||||
|
off_t autosync; /* fsync after 'autosync' bytes written. */
|
||||||
} file;
|
} file;
|
||||||
} io;
|
} io;
|
||||||
};
|
};
|
||||||
@ -97,5 +99,6 @@ size_t rioWriteBulkLongLong(rio *r, long long l);
|
|||||||
size_t rioWriteBulkDouble(rio *r, double d);
|
size_t rioWriteBulkDouble(rio *r, double d);
|
||||||
|
|
||||||
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
|
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
|
||||||
|
void rioSetAutoSync(rio *r, off_t bytes);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
Reference in New Issue
Block a user