Flag Counter
© 2017 Tao Peng. All rights reserved.

Redis data淘汰机制


2016年07月24日

###1. redis淘汰策略

在使用redis的时候, 我们可以根据自己机器的性能做出一些限制, 让其打到最好的效率, 在redisServer这个结构体中, 可以有这些限制,

/* Limits */
// 同时最多的client是连接数量
unsigned int maxclients;        /* Max number of simultaneous clients */
// 最大内存
unsigned long long maxmemory;   /* Max number of memory bytes to use */
// 使用哪一种替换策略
int maxmemory_policy;           /* Policy for key eviction */
// 这个值表示需要随机挑选多少个k-v, 然后进行选择淘汰
// 这个值有什么意义下面会说! ! !
//
int maxmemory_samples;          /* Pricision of random sampling */

当redis的Redis 内存数据集大小上升到一定大小的时候,就会施行数据淘汰策略。具体的淘汰机制有以下几种:

volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用 的数据淘汰
volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数 据淘汰
volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据 淘汰
allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰
allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰
no-enviction(驱逐):禁止驱逐数据

**注意**

: 对于TTL方式,并不是全局的距离淘汰时间最近的obj被淘汰, 而是从选出来的maxmemory_samples个随机obj中选择.同时在计算 LRU的过程中也需要使用到maxmemory_samples字段, 在函数evictionPoolPopulate中.

###2. LRU淘汰机制

所有的redis对象都会被封装成redisObj结构体, 如下:

// 所有的redis对象都会封装成这个结构
typedef struct redisObject {
    // 对象类型4bit
    unsigned type:4;
    // 编码方式4bit
    unsigned encoding:4;
    // lru相关: 因为当内存满的时候需要淘汰一些数据集
    // 此处的lru记录本条数据最近啥时候被访问
    unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
    // 多少次引用
    int refcount;
    // 实际数据指针
    void *ptr;
} robj;

里面有一个字段是unsigned lru:LRU_BITS;用于记录当前的object最后一次访问时间.

**LRU 数据淘汰机制**

: LRU淘汰使用的方法是, 通过evictionPoolPopulate计算出等待淘汰的数据的列表.在 准备淘汰的时候, 会选择一个最佳的进行淘汰.(其基本标准是根据obj的空闲时间)

###3. TTL淘汰机制

Redis 数据集数据结构中保存了键值对过期时间的表,即 redisDb.expires,在使用 SET 命令的时候, 就有一个键值对超时时间的选项。

**TTL数据淘汰机制**: 从过期时间 redisDB.expires 表中随机挑选几个键值对,取出其中 ttl 最大的键值对淘汰。 同理,Redis 并不是保证取得所有过期 时间的表中最快过期的键值对,而只是随机挑选的几个键值对中的。 ###4. 数据淘汰逻辑 Redis在执行客户端一个命令的时候,会检测使用的内存是否超额。如果超额,即进行数据淘汰。 ``` // // 本函数是执行命令函数! ! ! // int processCommand(client *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ // 执行quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 检查命令 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 是否可知命令 && 命令参数是否错误 if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } /* Check if the user is authenticated */ // 检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; } /* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot; if (server.cluster->state != CLUSTER_OK) { flagTransaction(c); clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); return C_OK; } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { flagTransaction(c); clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ // 如果超过最大内存, 那么如果可以释放内存, 那么释放 // 如果不能释放, 那么返回err if (server.maxmemory) { // 首先在freeMemoryIfNeeded()函数中尝试淘汰数据, 使用方法: LRU 或者 TTL 或者 RANDOM int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 不允许写命令 if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == C_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return C_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ // 不允许写命令 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return C_OK; } /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 不允许写命令 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { addReply(c, shared.roslaveerr); return C_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ // // 如果客户端订阅了频道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,PING就不能执 行其他命令。 // if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; } /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; } /* Lua script too slow? Only allow a limited number of commands. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return C_OK; } /* Exec the command */ // 如果是multi命令, 那么执行queueMultiCommand, 将命令放入队列 // if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 将命令放入client的队列中 queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return C_OK; } ``` 这个函数是redis执行命令的入口函数, 先不要看别的, 直接看```if (server.maxmemory) ...```那段, 即判断如果设置了 最大内存限制, 那么就需要判断是否需要淘汰一些数据, 那么执行函数```freeMemoryIfNeeded(...)``` ``` // freeMemoryIfNeeded()函数中尝试淘汰数据, 使用方法: LRU 或者 TTL 或者 RANDOM // 具体细节看下面的注释 // int freeMemoryIfNeeded(void) { size_t mem_reported, mem_used, mem_tofree, mem_freed; int slaves = listLength(server.slaves); mstime_t latency, eviction_latency; long long delta; /* Check if we are over the memory usage limit. If we are not, no need * to subtract the slaves output buffers. We can just return ASAP. */ // 首先计算内存消耗, 如果没有超过最大内存, 那么返回 mem_reported = zmalloc_used_memory(); if (mem_reported <= server.maxmemory) return C_OK; /* Remove the size of slaves output buffers and AOF buffer from the * count of used memory. */ mem_used = mem_reported; // 将slave的output buffer内存以及AOF buffer不计入内存计算 if (slaves) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = listNodeValue(ln); unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave); if (obuf_bytes > mem_used) mem_used = 0; else mem_used -= obuf_bytes; } } // AOF内存大小也不计入(AOF是持久化方法) if (server.aof_state != AOF_OFF) { mem_used -= sdslen(server.aof_buf); mem_used -= aofRewriteBufferSize(); } /* Check if we are still over the memory limit. */ // 去掉不需要计算的内存之后, 判断是不是超过最大内存, 如果没有, 那么返回 if (mem_used <= server.maxmemory) return C_OK; /* Compute how much memory we need to free. */ // 计算需要free的内存的大小 mem_tofree = mem_used - server.maxmemory; mem_freed = 0; // 如果淘汰机制是MAXMEMORY_NO_EVICTION: 不可以淘汰数据 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ latencyStartMonitor(latency); // 如果还没有达到需要释放的内存大小, 需要继续释放 while (mem_freed < mem_tofree) { int j, k, keys_freed = 0; // 对于每个DB进行循环处理 // 默认的DB的index是从0~15 for (j = 0; j < server.dbnum; j++) { long bestval = 0; /* just to prevent warning */ sds bestkey = NULL; dictEntry *de; // 选择当前的DB: server.db[j] redisDb *db = server.db+j; dict *dict; // 根据淘汰机制选择对应的数据hash表进行处理 // 如果是TTL: 那么需要从设置了超时时间的obj中剔除, 那么选择server.db[j].expires // 如果是LRU和RANDOM, 那么从整体的数据中剔除数据, 那么选择server.db[j].dict就可以 // if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { dict = server.db[j].dict; } else { dict = server.db[j].expires; } if (dictSize(dict) == 0) continue; /* volatile-random and allkeys-random policy */ // 随机淘汰方法 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { // 在dict中随机选择一个obj进行淘汰 de = dictGetRandomKey(dict); bestkey = dictGetKey(de); } /* volatile-lru and allkeys-lru policy */ // 根据LRU机制淘汰 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU) { struct evictionPoolEntry *pool = db->eviction_pool; while(bestkey == NULL) { // 根据obj的空闲时间, 选择出准备淘汰的obj列表 evictionPoolPopulate(dict, db->dict, db->eviction_pool); /* Go backward from best to worst element to evict. */ // 靠后边(数组右边)的是最应该被淘汰的数据 for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; de = dictFind(dict,pool[k].key); /* Remove the entry from the pool. */ sdsfree(pool[k].key); /* Shift all elements on its right to left. */ memmove(pool+k,pool+k+1, sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); /* Clear the element on the right which is empty * since we shifted one position to the left. */ pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL; pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ // 如果这个key存在, 那么就淘汰, 然后break if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... */ continue; } } } } /* volatile-ttl */ // 根据TTL机制淘汰, 淘汰即将过期的数据 else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { // 根据maxmemory_samples随机选择一些数据, 然后根据最近的expire时间进行淘汰 for (k = 0; k < server.maxmemory_samples; k++) { sds thiskey; long thisval; de = dictGetRandomKey(dict); thiskey = dictGetKey(de); thisval = (long) dictGetVal(de); /* Expire sooner (minor expire unix timestamp) is better * candidate for deletion */ // 如果距离淘汰时间更短, 那么说明是更佳被淘汰的数据 if (bestkey == NULL || thisval < bestval) { bestkey = thiskey; bestval = thisval; } } } /* Finally remove the selected key. */ // 删除选择出来的数据集 // 如果找到了被准备淘汰的key, 那么开始淘汰 if (bestkey) { robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); // 发布数据更新消息,主要是AOF 持久化和从机 propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one * we are freeing removing the key, but we can't account for * that otherwise we would never exit the loop. * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); // 如果设置了"懒淘汰", 即延迟淘汰, 那么使用异步淘汰方法就OK if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else // 否则今天同步淘汰 dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); latencyRemoveNestedEvent(latency,eviction_latency); // 下面重新计算内存 delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); decrRefCount(keyobj); keys_freed++; /* When the memory to free starts to be big enough, we may * start spending so much time here that is impossible to * deliver data to the slaves fast enough, so we force the * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers(); } } if (!keys_freed) { latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); goto cant_free; /* nothing to free... */ } } latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); return C_OK; cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree) break; usleep(1000); } return C_ERR; } ``` 上面有讲到, 淘汰数据的时候, 会有两种方法, 分别是"异步删除"和"同步删除", 下面分别看一下:
>> 异步删除数据
``` // 首先是异步删除数据 // /* Delete a key, value, and associated expiration entry if any, from the DB. * If there are enough allocations to free the value object may be put into * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ // 如果这个key在db->expires中需要删除, 那么将db->expires中引用删除就OK // 因为db->expires和db->dict是共享的数据, 所以实际的删除在下面 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictFind(db->dict,key->ptr); // 如果在dict中找到了这个key if (de) { robj *val = dictGetVal(de); // 计算删除代价 size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, let's put it into the * lazy free list. */ // 如果删除代价超过设置的阈值, 那么将key加入到lazy free list中, 这个list // 会在bio这个线程中进行处理! ! ! if (free_effort > LAZYFREE_THRESHOLD) { atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex); // 创建后台的bio线程, 准备删除数据 bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); dictSetVal(db->dict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ // // 如果上门没有进行bio线程淘汰, 那么此处会进行删除数据, // 如果进行了bio, 由于执行了dictSetVal(db->dict,de,NULL); // 那么此处可以直接跳过 // if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (server.cluster_enabled) slotToKeyDel(key); return 1; } else { return 0; } } ``` 下面看一下计算代价的函数: ``` /* Return the amount of work needed in order to free an object. * The return value is not always the actual number of allocations the * object is compoesd of, but a number proportional to it. * * For strings the function always returns 1. * * For aggregated objects represented by hash tables or other data structures * the function just returns the number of elements the object is composed of. * * Objects composed of single allocations are always reported as having a * single item even if they are actaully logical composed of multiple * elements. * * For lists the funciton returns the number of elements in the quicklist * representing the list. */ // // 这个函数很简单, 就是根据不同的类型计算相应的删除元素复杂度, // 复杂度的定义是: // 如果是string类型, 那么函数返回1 // 如果是SET类型 && 编码是hash table, 那么返回hash table的大小 // 如果是ZSET类型 && 编码是跳表, 那么返回跳表长度 // 如果是hash类型 && 编码是hash table, 那么返回hash table大小 // size_t lazyfreeGetFreeEffort(robj *obj) { if (obj->type == OBJ_LIST) { quicklist *ql = obj->ptr; return ql->len; } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) { dict *ht = obj->ptr; return dictSize(ht); } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){ zset *zs = obj->ptr; return zs->zsl->length; } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) { dict *ht = obj->ptr; return dictSize(ht); } else { return 1; /* Everything else is a single allocation. */ } } 而 #define LAZYFREE_THRESHOLD 64, 也就是说, 长度, 字段什么的超过默认64个, 那么 就会被加到lazy list中进行统一处理! ! ! ``` 那么如果被进入到lazy list中了, 那么bio线程到底怎么处理呢? ? ? 看下面bio JOBS的执行逻辑:
![1](/public/img/grocery/redis/5.png "redis-bio-job")
``` /* Initialize the background system, spawning the thread. */ // 初始化这个线程, 这个没什么好看的, 直接看到最后一个for循环! ! ! void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ // // 这个for循环比较重要, 目的是创建一些线程, pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) // for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } } // // 当上面的init完成后, bio线程就开始run了 // void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); // 常见的线程死循环 while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ // 下面根据不同的类型, 进行一些操作 if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { // 此处就是删除数据的处理! ! ! // 具体的删除见具体的函数, 此处不再多说! /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. * arg2 & arg3 -> free two dictionaries (a Redis DB). * only arg3 -> free the skiplist. */ if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread(job->arg3); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Unblock threads blocked on bioWaitStepOfType() if any. */ pthread_cond_broadcast(&bio_step_cond[type]); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; } } // 本函数被创建bio job的时候被调用, 基本逻辑是组装一个job, 然后将 // 其放入bio的list中, 这个list会在函数bioProcessBackgroundJobs中被 // 循环调用! ! ! // void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]); } ``` >> 同步删除数据
同步删除数据比较简单, 直接从dict中删除就OK, ``` /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbSyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ // 如果db->expires中存在, 那么需要删除 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); // 删除全局的dict中的数据 if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (server.cluster_enabled) slotToKeyDel(key); return 1; } else { return 0; } } ``` 至此所有的删除过程都讲解清楚了~~~ ###5. 整体淘汰逻辑图:
![2](/public/img/grocery/redis/6.jpg "redis")