Flag Counter
© 2017 Tao Peng. All rights reserved.

Redis 持久化策略


2016年07月24日

###1. Redis持久化介绍

每次讲到redis和memcache的区别, 大家第一个能想到的都是持久化. 所谓持久化就是, 为防止数据丢失,需要将 Redis 中的数据从内存中 dump 到磁盘。 Redis 提供两种持久化方式: RDB 和 AOF。Redis 允许两者结合,也允许两者同时关闭。

RDB: 可以定时备份内存中的数据集。服务器启动的时候,可以从 RDB 文件中恢复数据集。

AOF(append only file): 可以记录服务器的所有写操作。在服务器重新启动的时候,会把所有的写操作重新执行一遍,从而实现数据备份。

**区别**

: RDB记录的是数据本身, 一般记录文件名叫做dump.rdb, 而AOF记录的是对于数据的写操作, 即, 如果数据被改变, 那么 会被AOF记录.

###2. RDB方法

save RDB文件有两种方法, 第一: redis后台会定时进行备份, 就在我们前面说过的serverCron定时函数中, 这个函数会周期执行, 里面 执行了很多的定时任务, 包括后台备份RDB. 第二: client可以发生save命令来使得server备份RDB文件.

***1). 先看一下定时任务执行的逻辑:***
//
// 非常重要的函数: 用于调用所有的定时操作
//
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    // ...  其他任务

    /* If there is not a background saving/rewrite in progress check if
     * we have to save/rewrite now */
     for (j = 0; j < server.saveparamslen; j++) {
        struct saveparam *sp = server.saveparams+j;

        /* Save if we reached the given amount of changes,
         * the given amount of seconds, and if the latest bgsave was
         * successful or if, in case of an error, at least
         * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
         if (server.dirty >= sp->changes &&
            server.unixtime-server.lastsave > sp->seconds &&
             (server.unixtime-server.lastbgsave_try >
             CONFIG_BGSAVE_RETRY_DELAY ||
             server.lastbgsave_status == C_OK))
         {
            serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
              sp->changes, (int)sp->seconds);
            // 在后台fork一个进程进行save RDB工作
            //
            rdbSaveBackground(server.rdb_filename);
            break;
         }
     }

     // ...  其他任务
}

下面具体看一下rdbSaveBackground函数:

//
// serverCron定时 JOB会周期性在在后台执行saveRDB
// 这个函数会fork一个子进程来执行save任务
// 执行实际的save操作的函数是: rdbSave(...)
//
int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    if (server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);

    start = ustime();
    // fork = 0说明是子进程
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");
        // 在子进程中执行saveRDB操作
        // 具体的执行sav的操作函数 ! ! !
        //
        retval = rdbSave(filename);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        // fork > 0说明当前是父进程, 返回的是子进程的pid
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}


***2). 看一下client发送save指令来进行save:***
struct redisCommand redisCommandTable[] = {
    // 其他命令 ...
    {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
    {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0},
    // 其他命令 ...
};

如果发送save命令, 那么执行的函数是saveCommand:

void saveCommand(client *c) {
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
        return;
    }
    // 执行实际的save操作还是rdbSave函数
    //
    if (rdbSave(server.rdb_filename) == C_OK) {
        addReply(c,shared.ok);
    } else {
        addReply(c,shared.err);
    }
}

里面实际的执行save操作的函数是rdbSave(…)函数.

同样, 如果client发送bgsave命令, 那么显然是要求redis执行在后台进行save操作! 这个执行的函数是bgsaveCommand:

void bgsaveCommand(client *c) {
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {
        addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");

      // 实际的操作函数是rdbSaveBackground, 和redis执行的定时任务是一样的
      //
    } else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
        addReplyStatus(c,"Background saving started");
    } else {
        addReply(c,shared.err);
    }
}


综上所述, 不管是后台还是client请求save, 都会调用到的函数是: rdbSave, 下面具体看一下这个函数:

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename) {
    char tmpfile[256];
    FILE *fp;
    rio rdb;
    int error = 0;
    // 根据进程号生成一个临时的rdb文件
    //
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return C_ERR;
    }

    // 初始化rdb结构体。rdb 结构体内指定了读写文件的函数,已写/读字符统计等数据
    rioInitWithFile(&rdb,fp);
    // 存储rdb, 具体写入rdb操作函数
    //
    if (rdbSaveRio(&rdb,&error) == C_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    // 下面检查错误
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

下面看一下具体生成rdb文件的函数rdbSaveRio:

/* Produces a dump of the database in RDB format sending it to the specified
 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
 * is returned and part of the output, or all the output, can be
 * missing because of I/O errors.
 *
 * When the function returns C_ERR and if 'error' is not NULL, the
 * integer pointed by 'error' is set to the value of errno just after the I/O
 * error. */
//
// 生成dump.rdb文件
//
int rdbSaveRio(rio *rdb, int *error) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    long long now = mstime();
    uint64_t cksum;
    // 首先进行校验和
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    // 写入版本号
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;

    // 遍历所有的数据集DB
    for (j = 0; j < server.dbnum; j++) {
        // 当前数据集
        redisDb *db = server.db+j;
        // 获取k-v数据
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        // 获取迭代器
        di = dictGetSafeIterator(d);
        if (!di) return C_ERR;

        /* Write the SELECT DB opcode */
        // 写入RDB操作码: 类型 + 大小
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
         * is currently the largest type we are able to represent in RDB sizes.
         * However this does not limit the actual size of the DB to load since
         * these sizes are just hints to resize the hash tables. */
        uint32_t db_size, expires_size;
        //
        // 下面写入hash表的大小, 包括dict和expires
        //
        db_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                dictSize(db->dict) :
                                UINT32_MAX;
        expires_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                dictSize(db->expires) :
                                UINT32_MAX;
        // 写入: 类型 + dict 大小 + expire 大小
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        /* Iterate this DB writing every entry */
        // 迭代当前DB所有的entry, 然后将数据写进rdb文件
        // 这是最重要的一步! ! !
        //
        while((de = dictNext(di)) != NULL) {
            // 获取key 和 value
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            // 获取超时时间
            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            //
            // 将key, value, 超时时间写入文件
            //
            if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
        }
        // 释放迭代器
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don't release it again on error. */

    /* EOF opcode */
    // 写入结束符
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

关于rdbSaveKeyValuePair, rdbSaveLen和rdbSaveType函数比较简单, 就是根据不同的类型进行写文件操作, 没什么好说的.


OK, 现在比较重要的一点是, 我们能够知道RDB数据的组织方式, 大致的情况是:

版本号
DB 0
操作码
数据
操作码
数据
...
结束码
DB 1
...

具体的可以看NoSQLFun这篇文章: Redis RDB文件格式全解析

###3. AOF方法

AOF 持久化和 RDB 持久化的最主要区别在于,前者记录了数据的变更,而后者是保存了数据本身。

redis AOF 有后台执行和边服务边备份两种方式。

***1). 后台执行***

后台执行的方式和 RDB 有类似的地方,fork 一个子进程,主进程仍进行服务,子进程执行AOF 持久 化,数据被dump 到磁盘上。 与 RDB 不同的是,后台子进程持久化过程中,主进程会记录期间的所有数据变更(主进程还在服务), 并存储在 server.aof_rewrite_buf_blocks 中;后台子进程结束后,Redis 更新缓存追加到AOF文件中,是RDB持久化所不具备的。

这一块怎么理解? ? ?

redis在产生数据更新的时候, 会将更新记录写入到server.aof_buf中, 等到一定的时机, 会将这个缓冲插入到 server.aof_rewrite_buf_blocks链表中, 然后进行写AOF操作.
注意, 在AOF子进程进程写AOF过程中, 还会出现更新数据情况, 主进程会将更新缓存在server.aof_rewrite_buf_blocks, 等到子进程结束, Redis 更新缓存追 加到 AOF 文件中.

那么首先看下定时任务执行AOF情况:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // 其他任务...

    /* Trigger an AOF rewrite if needed */
    if (server.rdb_child_pid == -1 &&
        server.aof_child_pid == -1 &&
        server.aof_rewrite_perc &&
        server.aof_current_size > server.aof_rewrite_min_size)
    {
        long long base = server.aof_rewrite_base_size ?
                        server.aof_rewrite_base_size : 1;
        long long growth = (server.aof_current_size*100/base) - 100;
        if (growth >= server.aof_rewrite_perc) {
            serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
            //
            // 执行写AOF操作
            //
            rewriteAppendOnlyFileBackground();
        }
    }

    // 其他任务...

AOF操作函数入口是:rewriteAppendOnlyFileBackground, 这个函数同样是是fork一个子进程来进行AOF

// 在后台进行AOF操作
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    start = ustime();
    // 子进程
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        // 临时AOF文件名
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        // AOF操作
        // 写AOF文件具体函数
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        // 父进程
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

下面看一下rewriteAppendOnlyFile函数:

//
// AOF写文件
// 操作和RDB文件写入是很类似的
//
int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    char byte;
    size_t processed = 0;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    // 生成临时文件
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);

    // 对于所有的数据集都进行AOF
    for (j = 0; j < server.dbnum; j++) {
        // 第一步写入select db-idx ^_^
        // 这个格式在后面我们会说
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        // 获取迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return C_ERR;
        }

        /* SELECT the new DB */
        // 写入操作命令 + db-idx j
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */
        // 下面迭代写入所有的操作
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
            // 获取t-l-v, 超时时间
            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

            /* Save the key and associated value */
            // 根据相关的类型写入
            // 类型 + 数据
            if (o->type == OBJ_STRING) {
                /* Emit a SET command */
                // 一个set指令写入
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                // 写入k-v
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == OBJ_LIST) {
                // list类型
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_SET) {
                // set类型
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_ZSET) {
                // zset类型
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == OBJ_HASH) {
                // hash类型
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                serverPanic("Unknown object type");
            }
            /* Save the expire time */
            //
            // 下面写入超时时间
            //
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
            /* Read some diff from the parent process from time to time. */
            if (aof.processed_bytes > processed+1024*10) {
                processed = aof.processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }

    // 下面会进行一些检查操作
    //
    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can't read forever (the server may receive data from clients
     * faster than it is able to send data to the child), so we try to read
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }

    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

上面的代码中, 我们可以看到, 对于不同类型的数据, 进行相应的写入操作.

***2). 边服务边备份的方式***

这种情况下, Redis 服务器会把所有的数据变更存储在server.aof_buf中, 并在特定时机将更新 缓存写入预设定的文件(server.aof_filename).
这个时机有下面几种:

进入事件循环之前
Redis 服务器定时程序 serverCron() 中
停止 AOF 策略的 stopAppendOnly() 中

注意执行具体的AOF的函数是: flushAppendOnlyFile,

这个在源码中的位置分别是:

// 1: 进入事件循环之前
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
// 进入mainloop之前需要执行
void beforeSleep(struct aeEventLoop *eventLoop) {
    // 其他任务...

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);

    // 其他任务...
}


// 2: serverCron()
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // 其他任务...

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }

    // 其他任务...


// 3: stopAppendOnly函数
/* Called when the user switches from "appendonly yes" to "appendonly no"
 * at runtime using the CONFIG command. */
//
// 这个函数是从"appendonly yes"转成"appendonly no"时候会执行
// 在停止AOF之前, 需要进行最后一次备份!
//
void stopAppendOnly(void) {
    serverAssert(server.aof_state != AOF_OFF);
    // 将数据flush进文件
    flushAppendOnlyFile(1);

    aof_fsync(server.aof_fd);
    close(server.aof_fd);

    server.aof_fd = -1;
    server.aof_selected_db = -1;
    server.aof_state = AOF_OFF;
    /* rewrite operation in progress? kill it, wait child exit */
    if (server.aof_child_pid != -1) {
        int statloc;

        serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
            (long) server.aof_child_pid);
        if (kill(server.aof_child_pid,SIGUSR1) != -1) {
            while(wait3(&statloc,0,NULL) != server.aof_child_pid);
        }
        /* reset the buffer accumulating changes while the child saves */
        aofRewriteBufferReset();
        aofRemoveTempFile(server.aof_child_pid);
        server.aof_child_pid = -1;
        server.aof_rewrite_time_start = -1;
        /* close pipes used for IPC between the two processes. */
        aofClosePipes();
    }
}

OK, 下面具体看一下这个函数flushAppendOnlyFile, 这个函数相对来讲比较复杂, 下面具体看一下:

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;
    // 如果server.aof_buf缓冲区没有数据, 那么没有要写入AOF的
    if (sdslen(server.aof_buf) == 0) return;
    // 创建线程任务, 调用fsync()
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
    // 如果没有设置强制同步的选项,可能不会立即进行同步
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        // 如果不用及时更新, 即可以异步更新, 那么放到后台进行执行
        // 如果进程还在执行中, 那么需要等待2s时间
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                // 设置延迟flush时间选项
                server.aof_flush_postponed_start = server.unixtime;
                return;
                // 没有超过2s,直接结束
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            // 下面需要写入磁盘
            server.aof_delayed_fsync++;
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */

    latencyStartMonitor(latency);
    //
    // 核心语句: 将server.aof_buf中的所有缓存数据写入文件
    //
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    // 写入错误! ! !
    if (nwritten != (signed)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Log the AOF write error and record the error code. */
        if (nwritten == -1) {
            if (can_log) {
                serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                serverLog(LL_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    serverLog(LL_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftruncate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synced on disk. */
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = C_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
      // 文件写入成功! ! !
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        if (server.aof_last_write_status == C_ERR) {
            serverLog(LL_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = C_OK;
        }
    }
    // 更新AOF文件大小
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    // 当server.aof_buf 足够小, 重新利用空间,防止频繁的内存分配。
    // 相反,当server.aof_buf 占据大量的空间,采取的策略是释放空间
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

到此为止, 边服务边备份的过程也已经结束, 我们知道备份的是server.aof_buf缓冲区中的数据, 但是到目前为止, 我们还不知道这个数据是从哪个地方获取的. 在前面已经提到过, 每次数据更新 都会将变更记录都会写入 server.aof_buf 中, 同时如果后台子进程在持久化,变更记录还会被写入 server.server.aof_rewrite_buf_blocks 中。

***在redis中, 只要有数据变, 都会进行广播通知, 会执行函数propagrate, 那么在这个函数中, 会进行AOF的server.aof\_buf缓冲区更新!!!
***

下面看一下propagrate函数:

/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * flags are an xor between:
 * + PROPAGATE_NONE (no propagation of command at all)
 * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + PROPAGATE_REPL (propagate into the replication link)
 *
 * This should not be used inside commands implementation. Use instead
 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
 */
//
// 记录数据更新~
// 在数据更新的时候, 需要做两件事:
// 1). 是否执行AOF
// 2). 是否写入repl_backlog文件, 用于同步主从机器
//
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 如果aof被打开 && 允许PROPAGATE_AOF: 添加AOF
    // 下面函数会进行server.aof\_buf缓冲区更新
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    // 需要写入repl_backlog, 用于主从机器数据同步
    // 下面函数包括两个部分:
    // 1). 向在线的从机发送数据
    // 2). 将数据写入repl_backlog, 防止从机掉线, 方便下次上线进行更新
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

下面看一下函数feedAppendOnlyFile:

// 追加AOF
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appended. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    //
    // 下面将数据更新到server.aof\_buf, 如果存在子进程AOF, 那么也更新到server.aof_rewrite_buf_blocks中
    //
    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    // 将数据的变更缓存到aof-buf中
    //
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    // 如果存在子进程正在AOF, 同时将数据更新到server.aof_rewrite_buf_blocks链表中
    // 这个链中的内容会在子进程结束后追加到文件
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

根据上面代码的两行注释, 就清楚了之前讲的那些, 那么现在其实是将生成的AOF追加到server.aof_buf 中。 在server在下一次进入事件循环之前, aof_buf 中的内容将会写到磁盘上. 这个之前已经说了, 边服务边备份.

那么现在还有一个问题, 这个链表中的内容是怎么被消费的呢?? 那么这就需要看上面的函数aofRewriteBufferAppend:

/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
// 将data添加到AOF缓冲区链表中: server.aof_rewrite_buf_blocks
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    // 将数据加入链表
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        // 添加到链表
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }
        // 分配 && 添加
        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }

    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    //
    // 核心: 注意, 在此处注册了一个事件用于将aof_rewrite_buf_blocks链表中的数据进行追加到AOF中,
    // 那么下面具体看一下执行事件的函数aofChildWriteDiffData
    //
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

看下函数aofChildWriteDiffData:

// 这个函数比较简单, 基本思路是取出链表数据, 然后写入指定的fd中,
// 这个fd是一个pipe管道, 用于父子进程通信, 这里是将链表中的内容写入
// 子进程中
// 具体的管道的创建是在rewriteAppendOnlyFileBackground函数中有一个aofCreatePipes用于创建父子管道!!!
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            //
            // 将数据写进子进程管道! ! !完成追加
            //
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

至此, AOF过程就结束了.


AOF数据组织方式: 首先我们需要知道, 记录的仅仅是写操作相关的动作, 例如执行命令SET name PT, 那么 生成的AOF内容是:

*3  # 代表有两个参数, 此处是SET, name和PT
$3  # 第一个参数长度是3
SET
$4  # 第二个参数长度是4
name
$2  # 第三个参数长度是2
PT

AOF可以根据这些操作, 将数据进行恢复!

###4. RDB和AOF逻辑图

RDB:

1

AOF:

2