一、简介

在2022.01.31周一这天发布了redis7.0RC1版本(Release Candidate),其中进行了很多优化工作,以及增加了一些新特性。多AOF就是其中一个明显的向下不兼容优化。
在2022.02.28周一发布了redis7.0RC2版本,这里主要以redis7.0RC1进行分析。

二、单个AOF文件劣势

以前写的持久化https://blog.csdn.net/happytree001/article/details/118229644

单个AOF文件的劣势在于重写的过程中

  1. 主进程需要将在重写过程中新产生的写命令保存到缓冲区中
  2. 重写子进程需要通过管道从主进程中读取在序列化内存数据期间产生的新的写命令
  3. 重写子进程需要将从主进程读取的数据存入缓冲区中
  4. 主进程和重写子进程需要3个管道进行通信(数据、命令、确认)
  5. 重写子进程结束后,主进程需要将缓冲区未处理完的数据同步到新的AOF文件中,切换AOF文件,整个重写过程结束

整个过程很消耗如下三方面的资源

  • 内存(缓冲区)
  • cpu(通信)
  • 磁盘IO (主进程写文件)

三、多AOF简述

3.1 AOF组织形式

(1) AOF文件类型

AOF分了三种类型

  • base
  • incr
  • history
    并且引入了一个清单文件(manifest),来标识每个文件的类型
    manifest文件格式
    完整数据 = base + incr
    history类型的文件是重写完成后不需要的数据,后续将会被删除。

(2) AOF文件储存位置

多个AOF文件统一存放在appenddirname指定的目录中
在这里插入图片描述

3.2 AOF重写过程

在这里插入图片描述
aof-base文件类似于RDB文件的作用,
aof-incr文件类似于aof文件的作用,
而aof-history文件则是中间文件。

  1. 服务器启动时,从manifest文件中加载相关信息
  2. 加载aof-base文件,以恢复内存数据
  3. 加载aof-incr文件,以完全恢复内存数据

其中2,3步骤类是于RDB+AOF方式的持久化方式,只是将恢复时拆分成两个过程。

  1. 服务器将接受到的写命令写入最近的aof-incr.seq文件中
  2. 当满足某些条件触发AOF重写时,主进程将产生新的aof-incr.(seq+1)文件,并更新manifest文件,然后fork子进程进行处理。
  3. 子进程则只需要将内存中的数据序列化到临时文件中即可
  4. 主进程检测到子进程结束后,将把临时文件重命名为新的base.(seq+1)文件,然后更新manifest文件(aof-incr.seq之前的aof-incr文件和base.seq之前的base文件都将被标记为history),然后将history文件都加入到后台任务中
  5. 后台任务将history文件清除

四、多AOF实现

4.1 manifest内存组织

(1) 数据结构

/*-----------------------------------------------------------------------------
 * AOF manifest definition
 *----------------------------------------------------------------------------*/
typedef enum {
    AOF_FILE_TYPE_BASE  = 'b', /* BASE file */
    AOF_FILE_TYPE_HIST  = 'h', /* HISTORY file */
    AOF_FILE_TYPE_INCR  = 'i', /* INCR file */
} aof_file_type;

typedef struct {
    sds           file_name;  /* file name */
    long long     file_seq;   /* file sequence */
    aof_file_type file_type;  /* file type */
} aofInfo;

typedef struct {
    aofInfo     *base_aof_info;       /* BASE file information. NULL if there is no BASE file. */
    list        *incr_aof_list;       /* INCR AOFs list. We may have multiple INCR AOF when rewrite fails. */
    list        *history_aof_list;    /* HISTORY AOF list. When the AOFRW success, The aofInfo contained in
                                         `base_aof_info` and `incr_aof_list` will be moved to this list. We
                                         will delete these AOF files when AOFRW finish. */
    long long   curr_base_file_seq;   /* The sequence number used by the current BASE file. */
    long long   curr_incr_file_seq;   /* The sequence number used by the current INCR file. */
    int         dirty;                /* 1 Indicates that the aofManifest in the memory is inconsistent with
                                         disk, we need to persist it immediately. */
} aofManifest;
struct redisServer {
	...
    char *aof_filename;             /* Basename of the AOF file and manifest file */
    char *aof_dirname;              /* Name of the AOF directory */
	...
	aofManifest *aof_manifest;       /* Used to track AOFs. */
...
};

(2) 内存结构图

在这里插入图片描述可以看出,文件以及文件名并没有被修改,而只是类型修改了,然后放到不同的list中而已。

4.2 代码实现

4.2.1 启动阶段

(1) 启动时加载manifest

逐行读取,依次解析,填充到内存结构中。

int main(int argc, char **argv) {
	...
  aofLoadManifestFromDisk();
	...
}
/* Load the manifest information from the disk to `server.aof_manifest`
 * when the Redis server start.
 *
 * During loading, this function does strict error checking and will abort
 * the entire Redis server process on error (I/O error, invalid format, etc.)
 *
 * If the AOF directory or manifest file do not exist, this will be ignored
 * in order to support seamless upgrades from previous versions which did not
 * use them.
 */
#define MANIFEST_MAX_LINE 1024
void aofLoadManifestFromDisk(void) {
    const char *err = NULL;
    long long maxseq = 0;

    server.aof_manifest = aofManifestCreate();

    if (!dirExists(server.aof_dirname)) {
        serverLog(LL_NOTICE, "The AOF directory %s doesn't exist", server.aof_dirname);
        return;
    }

    sds am_name = getAofManifestFileName();
    sds am_filepath = makePath(server.aof_dirname, am_name);
    if (!fileExist(am_filepath)) {
        serverLog(LL_NOTICE, "The AOF manifest file %s doesn't exist", am_name);
        sdsfree(am_name);
        sdsfree(am_filepath);
        return;
    }

    FILE *fp = fopen(am_filepath, "r");
    if (fp == NULL) {
        serverLog(LL_WARNING, "Fatal error: can't open the AOF manifest "
            "file %s for reading: %s", am_name, strerror(errno));
        exit(1);
    }

    sdsfree(am_name);
    sdsfree(am_filepath);

    char buf[MANIFEST_MAX_LINE+1];
    sds *argv = NULL;
    int argc;
    aofInfo *ai = NULL;

    sds line = NULL;
    int linenum = 0;

    while (1) {
        if (fgets(buf, MANIFEST_MAX_LINE+1, fp) == NULL) {
            if (feof(fp)) {
                if (linenum == 0) {
                    err = "Found an empty AOF manifest";
                    goto loaderr;
                } else {
                    break;
                }
            } else {
                err = "Read AOF manifest failed";
                goto loaderr;
            }
        }

        linenum++;

        /* Skip comments lines */
        if (buf[0] == '#') continue;

        if (strchr(buf, '\n') == NULL) {
            err = "The AOF manifest file contains too long line";
            goto loaderr;
        }

        line = sdstrim(sdsnew(buf), " \t\r\n");
        if (!sdslen(line)) {
            err = "The AOF manifest file is invalid format";
            goto loaderr;
        }

        argv = sdssplitargs(line, &argc);
        /* 'argc < 6' was done for forward compatibility. */
        if (argv == NULL || argc < 6 || (argc % 2)) {
            err = "The AOF manifest file is invalid format";
            goto loaderr;
        }

        ai = aofInfoCreate();
        for (int i = 0; i < argc; i += 2) {
            if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_NAME)) {
                ai->file_name = sdsnew(argv[i+1]);
                if (!pathIsBaseName(ai->file_name)) {
                    err = "File can't be a path, just a filename";
                    goto loaderr;
                }
            } else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_SEQ)) {
                ai->file_seq = atoll(argv[i+1]);
            } else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_TYPE)) {
                ai->file_type = (argv[i+1])[0];
            }
            /* else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_OTHER)) {} */
        }

        /* We have to make sure we load all the information. */
        if (!ai->file_name || !ai->file_seq || !ai->file_type) {
            err = "The AOF manifest file is invalid format";
            goto loaderr;
        }

        sdsfreesplitres(argv, argc);
        argv = NULL;

        if (ai->file_type == AOF_FILE_TYPE_BASE) {
            if (server.aof_manifest->base_aof_info) {
                err = "Found duplicate base file information";
                goto loaderr;
            }
            server.aof_manifest->base_aof_info = ai;
            server.aof_manifest->curr_base_file_seq = ai->file_seq;
        } else if (ai->file_type == AOF_FILE_TYPE_HIST) {
            listAddNodeTail(server.aof_manifest->history_aof_list, ai);
        } else if (ai->file_type == AOF_FILE_TYPE_INCR) {
            if (ai->file_seq <= maxseq) {
                err = "Found a non-monotonic sequence number";
                goto loaderr;
            }
            listAddNodeTail(server.aof_manifest->incr_aof_list, ai);
            server.aof_manifest->curr_incr_file_seq = ai->file_seq;
            maxseq = ai->file_seq;
        } else {
            err = "Unknown AOF file type";
            goto loaderr;
        }

        sdsfree(line);
        line = NULL;
        ai = NULL;
    }

    fclose(fp);
    return;

loaderr:
    /* Sanitizer suppression: may report a false positive if we goto loaderr
     * and exit(1) without freeing these allocations. */
    if (argv) sdsfreesplitres(argv, argc);
    if (ai) aofInfoFree(ai);

    serverLog(LL_WARNING, "\n*** FATAL AOF MANIFEST FILE ERROR ***\n");
    if (line) {
        serverLog(LL_WARNING, "Reading the manifest file, at line %d\n", linenum);
        serverLog(LL_WARNING, ">>> '%s'\n", line);
    }
    serverLog(LL_WARNING, "%s\n", err);
    exit(1);
}

(2) 加载base以及incr文件

int main(int argc, char **argv) {
	...
  loadDataFromDisk();
	...
}
/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    ...
    if (server.aof_state == AOF_ON) {
        int ret = loadAppendOnlyFiles(server.aof_manifest);
        if (ret == AOF_FAILED || ret == AOF_OPEN_ERR)
            exit(1);
    } 
	  ...
}

先加载base文件,然后依次加载incr文件,这里incr顺序也很重要。

/* Load the AOF files according the aofManifest pointed by am. */
int loadAppendOnlyFiles(aofManifest *am) {
    serverAssert(am != NULL);
    int ret = C_OK;
    long long start;
    off_t total_size = 0;
    sds aof_name;
    int total_num, aof_num = 0, last_file;

    ...
    if (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) {
        return AOF_NOT_EXIST;
    }

    total_num = getBaseAndIncrAppendOnlyFilesNum(am);
    serverAssert(total_num > 0);

    /* Here we calculate the total size of all BASE and INCR files in
     * advance, it will be set to `server.loading_total_bytes`. */
    total_size = getBaseAndIncrAppendOnlyFilesSize(am);
    startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);

    /* Load BASE AOF if needed. */
    if (am->base_aof_info) {
        serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
        aof_name = (char*)am->base_aof_info->file_name;
        updateLoadingFileName(aof_name);
        last_file = ++aof_num == total_num;
        start = ustime();
        ret = loadSingleAppendOnlyFile(aof_name);
        if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) {
            serverLog(LL_NOTICE, "DB loaded from base file %s: %.3f seconds",
                aof_name, (float)(ustime()-start)/1000000);
        }

        /* If an AOF exists in the manifest but not on the disk, Or the truncated
         * file is not the last file, we consider this to be a fatal error. */
        if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
            ret = AOF_FAILED;
        }

        if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) {
            goto cleanup;
        }
    }

    /* Load INCR AOFs if needed. */
    if (listLength(am->incr_aof_list)) {
        listNode *ln;
        listIter li;

        listRewind(am->incr_aof_list, &li);
        while ((ln = listNext(&li)) != NULL) {
            aofInfo *ai = (aofInfo*)ln->value;
            serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
            aof_name = (char*)ai->file_name;
            updateLoadingFileName(aof_name);
            last_file = ++aof_num == total_num;
            start = ustime();
            ret = loadSingleAppendOnlyFile(aof_name);
            if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) {
                serverLog(LL_NOTICE, "DB loaded from incr file %s: %.3f seconds",
                    aof_name, (float)(ustime()-start)/1000000);
            }

            if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
                ret = AOF_FAILED;
            }

            if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) {
                goto cleanup;
            }
        }
    }

    server.aof_current_size = total_size;
    server.aof_rewrite_base_size = server.aof_current_size;
    server.aof_fsync_offset = server.aof_current_size;

cleanup:
    stopLoading(ret == AOF_OK);
    return ret;
}

(3) 打开aof

int main(int argc, char **argv) {
	...
	aofOpenIfNeededOnServerStart();
	...
}
/* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is
 * 'AOF_ON', It will do three things:
 * 1. Force create a BASE file when redis starts with an empty dataset
 * 2. Open the last opened INCR type AOF for writing, If not, create a new one
 * 3. Synchronously update the manifest file to the disk
 *
 * If any of the above steps fails, the redis process will exit.
 */
void aofOpenIfNeededOnServerStart(void) {
    if (server.aof_state != AOF_ON) {
        return;
    }

    serverAssert(server.aof_manifest != NULL);
    serverAssert(server.aof_fd == -1);

    if (dirCreateIfMissing(server.aof_dirname) == -1) {
        serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
            server.aof_dirname, strerror(errno));
        exit(1);
    }

    /* If we start with an empty dataset, we will force create a BASE file. */
    if (!server.aof_manifest->base_aof_info &&
        !listLength(server.aof_manifest->incr_aof_list))
    {
        sds base_name = getNewBaseFileNameAndMarkPreAsHistory(server.aof_manifest);
        sds base_filepath = makePath(server.aof_dirname, base_name);
        if (rewriteAppendOnlyFile(base_filepath) != C_OK) {
            exit(1);
        }
        sdsfree(base_filepath);
    }

    /* Because we will 'exit(1)' if open AOF or persistent manifest fails, so
     * we don't need atomic modification here. */
    sds aof_name = getLastIncrAofName(server.aof_manifest);

    /* Here we should use 'O_APPEND' flag. */
    sds aof_filepath = makePath(server.aof_dirname, aof_name);
    server.aof_fd = open(aof_filepath, O_WRONLY|O_APPEND|O_CREAT, 0644);
    sdsfree(aof_filepath);
    if (server.aof_fd == -1) {
        serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
            aof_name, strerror(errno));
        exit(1);
    }

    /* Persist our changes. */
    int ret = persistAofManifest(server.aof_manifest);
    if (ret != C_OK) {
        exit(1);
    }

    server.aof_last_incr_size = getAppendOnlyFileSize(aof_name);
}

(4) 删除history文件集

int main(int argc, char **argv) {
	...
	 aofDelHistoryFiles();
	...
}
/* When AOFRW success, the previous BASE and INCR AOFs will
 * become HISTORY type and be moved into 'history_aof_list'.
 *
 * The function will traverse the 'history_aof_list' and submit
 * the delete task to the bio thread.
 */
int aofDelHistoryFiles(void) {
    if (server.aof_manifest == NULL ||
        server.aof_disable_auto_gc == 1 ||
        !listLength(server.aof_manifest->history_aof_list))
    {
        return C_OK;
    }

    listNode *ln;
    listIter li;

    listRewind(server.aof_manifest->history_aof_list, &li);
    while ((ln = listNext(&li)) != NULL) {
        aofInfo *ai = (aofInfo*)ln->value;
        serverAssert(ai->file_type == AOF_FILE_TYPE_HIST);
        serverLog(LL_NOTICE, "Removing the history file %s in the background", ai->file_name);
        sds aof_filepath = makePath(server.aof_dirname, ai->file_name);
        bg_unlink(aof_filepath);
        sdsfree(aof_filepath);
        listDelNode(server.aof_manifest->history_aof_list, ln);
    }

    server.aof_manifest->dirty = 1;
    return persistAofManifest(server.aof_manifest);
}

4.2.2 AOF重写阶段

(1) 触发重写

  • 手动命令触发
  • 自动条件触发
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	...
	/* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    if (!hasActiveChildProcess() &&
        server.aof_rewrite_scheduled &&
        !aofRewriteLimited())
    {
        rewriteAppendOnlyFileBackground();
    }
	
	...
	
  /* Trigger an AOF rewrite if needed. */
  if (server.aof_state == AOF_ON &&
       !hasActiveChildProcess() &&
       server.aof_rewrite_perc &&
       server.aof_current_size > server.aof_rewrite_min_size &&
       !aofRewriteLimited())
   {
       long long base = server.aof_rewrite_base_size ?
           server.aof_rewrite_base_size : 1;
       long long growth = (server.aof_current_size*100/base) - 100;
       if (growth >= server.aof_rewrite_perc) {
           serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
           rewriteAppendOnlyFileBackground();
       }
   }
	...
}

(2) 子进程重写

主进程通过调用flushAppendOnlyFile将正在写的AOF文件同步刷到磁盘上,然后调用openNewIncrAofForAppend创建一个新的AOF文件进行后续的使用,然后创建子进程,主进程就去干自己的事情了;
子进程则去序列化内存数据

/* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent open a new INCR AOF file to continue writing.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, it will:
 *    4a) get a new BASE file name and mark the previous (if we have) as the HISTORY type
 *    4b) rename(2) the temp file in new BASE file name
 *    4c) mark the rewritten INCR AOFs as history type
 *    4d) persist AOF manifest file
 *    4e) Delete the history files use bio
 */
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    if (hasActiveChildProcess()) return C_ERR;

    if (dirCreateIfMissing(server.aof_dirname) == -1) {
        serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
            server.aof_dirname, strerror(errno));
        return C_ERR;
    }

    /* We set aof_selected_db to -1 in order to force the next call to the
     * feedAppendOnlyFile() to issue a SELECT command. */
    server.aof_selected_db = -1;
    flushAppendOnlyFile(1);
    if (openNewIncrAofForAppend() != C_OK) return C_ERR;

    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];

        /* Child */
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        if (childpid == -1) {
            server.aof_lastbgrewrite_status = C_ERR;
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %ld",(long) childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        return C_OK;
    }
    return C_OK; /* unreached */
}

(3) 子进程结束,父进程处理

定时任务serverCron都会判断是否有子进程结束

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	...
	 /* Check if a background saving or AOF rewrite in progress terminated. */
    if (hasActiveChildProcess() || ldbPendingChildren())
    {
        run_with_period(1000) receiveChildInfo();
        checkChildrenDone();
    } 
    ...
}

通过系统调用watpid,获取结束的子进程pid以及退出状态

void checkChildrenDone(void) {
    int statloc = 0;
    pid_t pid;

    if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
        int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        /* sigKillChildHandler catches the signal and calls exit(), but we
         * must make sure not to flag lastbgsave_status, etc incorrectly.
         * We could directly terminate the child process via SIGUSR1
         * without handling it */
        if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
            bysignal = SIGUSR1;
            exitcode = 1;
        }

        if (pid == -1) {
            serverLog(LL_WARNING,"waitpid() returned an error: %s. "
                "child_type: %s, child_pid = %d",
                strerror(errno),
                strChildType(server.child_type),
                (int) server.child_pid);
        } else if (pid == server.child_pid) {
          	...
            } else if (server.child_type == CHILD_TYPE_AOF) {
                backgroundRewriteDoneHandler(exitcode, bysignal);
            } 
            ...
    }
}

如果是重写子进程结束:

  • 复制一份manifest结构数据temp_am,后续都操作这个临时结构,aofManifestDup
  • 生成一个新的base文件名new_base_filename,并将老的base文件标记为history,getNewBaseFileNameAndMarkPreAsHistory
  • 将子进程产生的临时文件temp-rewriteaof-bg-pid.aof重命名为new_base_filename,rename
  • 标记排除最后一个incr文件的所有incr文件为history,markRewrittenIncrAofAsHistory
  • 持久化temp_am ,这里才真正生效,persistAofManifest
  • 更新内存的am,aofManifestFreeAndUpdate
  • 删除历史文件aofDelHistoryFiles
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
        char tmpfile[256];
        long long now = ustime();
        sds new_base_filename;
        aofManifest *temp_am;
        mstime_t latency;

        serverLog(LL_NOTICE,
            "Background AOF rewrite terminated with success");

        snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof",
            (int)server.child_pid);

        serverAssert(server.aof_manifest != NULL);

        /* Dup a temporary aof_manifest for subsequent modifications. */
        temp_am = aofManifestDup(server.aof_manifest);

        /* Get a new BASE file name and mark the previous (if we have)
         * as the HISTORY type. */
        new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
        serverAssert(new_base_filename != NULL);
        sds new_base_filepath = makePath(server.aof_dirname, new_base_filename);

        /* Rename the temporary aof file to 'new_base_filename'. */
        latencyStartMonitor(latency);
        if (rename(tmpfile, new_base_filepath) == -1) {
            serverLog(LL_WARNING,
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                new_base_filename,
                strerror(errno));
            aofManifestFree(temp_am);
            sdsfree(new_base_filepath);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename", latency);

        /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
         * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
        markRewrittenIncrAofAsHistory(temp_am);

        /* Persist our modifications. */
        if (persistAofManifest(temp_am) == C_ERR) {
            bg_unlink(new_base_filepath);
            aofManifestFree(temp_am);
            sdsfree(new_base_filepath);
            goto cleanup;
        }
        sdsfree(new_base_filepath);

        /* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */
        aofManifestFreeAndUpdate(temp_am);

        if (server.aof_fd != -1) {
            /* AOF enabled. */
            server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
            server.aof_current_size = getAppendOnlyFileSize(new_base_filename) + server.aof_last_incr_size;
            server.aof_rewrite_base_size = server.aof_current_size;
            server.aof_fsync_offset = server.aof_current_size;
            server.aof_last_fsync = server.unixtime;
        }

        /* We don't care about the return value of `aofDelHistoryFiles`, because the history
         * deletion failure will not cause any problems. */
        aofDelHistoryFiles();

        server.aof_lastbgrewrite_status = C_OK;

        serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
        /* Change state from WAIT_REWRITE to ON if needed */
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_state = AOF_ON;

        serverLog(LL_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
    } else if (!bysignal && exitcode != 0) {
        server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * triggering an error condition. */
        if (bysignal != SIGUSR1)
            server.aof_lastbgrewrite_status = C_ERR;

        serverLog(LL_WARNING,
            "Background AOF rewrite terminated by signal %d", bysignal);
    }

cleanup:
    aofRemoveTempFile(server.child_pid);
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;
}

4.3 异常处理

(1) 重写启动限制

每次重写开始时,都将产生一个新的AOF-incr文件,重写失败后,redis会自动重试,如果每次都重写失败,则AOF-incr文件将无限多,为了限制在重写失败后AOF-incr文件个数的可控,将限制重写启动

/* Whether to limit the execution of Background AOF rewrite.
 *
 * At present, if AOFRW fails, redis will automatically retry. If it continues
 * to fail, we may get a lot of very small INCR files. so we need an AOFRW
 * limiting measure.
 *
 * We can't directly use `server.aof_current_size` and `server.aof_last_incr_size`,
 * because there may be no new writes after AOFRW fails.
 *
 * So, we use time delay to achieve our goal. When AOFRW fails, we delay the execution
 * of the next AOFRW by 1 minute. If the next AOFRW also fails, it will be delayed by 2
 * minutes. The next is 4, 8, 16, the maximum delay is 60 minutes (1 hour).
 *
 * During the limit period, we can still use the 'bgrewriteaof' command to execute AOFRW
 * immediately.
 *
 * Return 1 means that AOFRW is limited and cannot be executed. 0 means that we can execute
 * AOFRW, which may be that we have reached the 'next_rewrite_time' or the number of INCR
 * AOFs has not reached the limit threshold.
 * */
#define AOF_REWRITE_LIMITE_THRESHOLD    3
#define AOF_REWRITE_LIMITE_NAX_MINUTES  60 /* 1 hour */
int aofRewriteLimited(void) {
    int limit = 0;
    static int limit_deley_minutes = 0;
    static time_t next_rewrite_time = 0;

    unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list);
    if (incr_aof_num >= AOF_REWRITE_LIMITE_THRESHOLD) {
        if (server.unixtime < next_rewrite_time) {
            limit = 1;
        } else {
            if (limit_deley_minutes == 0) {
                limit = 1;
                limit_deley_minutes = 1;
            } else {
                limit_deley_minutes *= 2;
            }

            if (limit_deley_minutes > AOF_REWRITE_LIMITE_NAX_MINUTES) {
                limit_deley_minutes = AOF_REWRITE_LIMITE_NAX_MINUTES;
            }

            next_rewrite_time = server.unixtime + limit_deley_minutes * 60;

            serverLog(LL_WARNING,
                "Background AOF rewrite has repeatedly failed %ld times and triggered the limit, will retry in %d minutes",
                incr_aof_num, limit_deley_minutes);
        }
    } else {
        limit_deley_minutes = 0;
        next_rewrite_time = 0;
    }

    return limit;
}
  • 当连续出现3次重写失败时,将开始进行延时启动AOF REWRITE
  • 延时按照指数级增长,1min-2min-4min-8min-…-最大60min
  • 对于用户的bgrewriteaof命令不影响(看代码逻辑还是有影响
    当执行bgrewriteaof命令时,有其他子进程存在,则将设置server.aof_rewrite_scheduled = 1;,
    而在serverCron中判断server.aof_rewrite_scheduled 时还是判断了aofRewriteLimited(),
    所以只有在没有子进程的情况下才能直接调用rewriteAppendOnlyFileBackground()
void bgrewriteaofCommand(client *c) {
    if (server.child_type == CHILD_TYPE_AOF) {
        addReplyError(c,"Background append only file rewriting already in progress");
    } else if (hasActiveChildProcess() || server.in_exec) {
        server.aof_rewrite_scheduled = 1;
        addReplyStatus(c,"Background append only file rewriting scheduled");
    } else if (rewriteAppendOnlyFileBackground() == C_OK) {
        addReplyStatus(c,"Background append only file rewriting started");
    } else {
        addReplyError(c,"Can't execute an AOF background rewriting. "
                        "Please check the server logs for more information.");
    }
}

(2) 原子操作

每次重写过程中都涉及到多个步骤,并且都不是事物处理,不是原子的,所以每一步失败都可能导致整个重写异常。
redis设计的简单而优雅,每次都操作的复制品,只有最后一步持久化,一步操作成功则成功。中间过程失败,都不影响原有结构,后续再次重试即可。

五、单AOF版本升级

如果是从7.0rc1以下的版本升级到redis7.0rc1, 因为aof组织结构不同,并且没有manifest文件。
在启动时,判断相应的条件,则进行升级操作

main()
	->loadDataFromDisk()
		->loadAppendOnlyFiles()
			->aofUpgradePrepare()
/* Load the AOF files according the aofManifest pointed by am. */
int loadAppendOnlyFiles(aofManifest *am) {
    serverAssert(am != NULL);
    int ret = C_OK;
    long long start;
    off_t total_size = 0;
    sds aof_name;
    int total_num, aof_num = 0, last_file;

    /* If the 'server.aof_filename' file exists in dir, we may be starting
     * from an old redis version. We will use enter upgrade mode in three situations.
     *
     * 1. If the 'server.aof_dirname' directory not exist
     * 2. If the 'server.aof_dirname' directory exists but the manifest file is missing
     * 3. If the 'server.aof_dirname' directory exists and the manifest file it contains
     *    has only one base AOF record, and the file name of this base AOF is 'server.aof_filename',
     *    and the 'server.aof_filename' file not exist in 'server.aof_dirname' directory
     * */
    if (fileExist(server.aof_filename)) {
        if (!dirExists(server.aof_dirname) ||
            (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) ||
            (am->base_aof_info != NULL && listLength(am->incr_aof_list) == 0 &&
             !strcmp(am->base_aof_info->file_name, server.aof_filename) && !aofFileExist(server.aof_filename)))
        {
            aofUpgradePrepare(am);
        }
    }
	...
}
/* Called in `loadAppendOnlyFiles` when we upgrade from a old version redis.
 *
 * 1) Create AOF directory use 'server.aof_dirname' as the name.
 * 2) Use 'server.aof_filename' to construct a BASE type aofInfo and add it to
 *    aofManifest, then persist the manifest file to AOF directory.
 * 3) Move the old AOF file (server.aof_filename) to AOF directory.
 *
 * If any of the above steps fails or crash occurs, this will not cause any
 * problems, and redis will retry the upgrade process when it restarts.
 */
void aofUpgradePrepare(aofManifest *am) {
    serverAssert(!aofFileExist(server.aof_filename));

    /* Create AOF directory use 'server.aof_dirname' as the name. */
    if (dirCreateIfMissing(server.aof_dirname) == -1) {
        serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
            server.aof_dirname, strerror(errno));
        exit(1);
    }

    /* Manually construct a BASE type aofInfo and add it to aofManifest. */
    if (am->base_aof_info) aofInfoFree(am->base_aof_info);
    aofInfo *ai = aofInfoCreate();
    ai->file_name = sdsnew(server.aof_filename);
    ai->file_seq = 1;
    ai->file_type = AOF_FILE_TYPE_BASE;
    am->base_aof_info = ai;
    am->curr_base_file_seq = 1;
    am->dirty = 1;

    /* Persist the manifest file to AOF directory. */
    if (persistAofManifest(am) != C_OK) {
        exit(1);
    }

    /* Move the old AOF file to AOF directory. */
    sds aof_filepath = makePath(server.aof_dirname, server.aof_filename);
    if (rename(server.aof_filename, aof_filepath) == -1) {
        serverLog(LL_WARNING,
            "Error trying to move the old AOF file %s into dir %s: %s",
            server.aof_filename,
            server.aof_dirname,
            strerror(errno));
        sdsfree(aof_filepath);
        exit(1);;
    }
    sdsfree(aof_filepath);

    serverLog(LL_NOTICE, "Successfully migrated an old-style AOF file (%s) into the AOF directory (%s).",
        server.aof_filename, server.aof_dirname);
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐