欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

OpenStack_Swift源代碼分析——ObjectReplicato

系統 2716 0

1、Replicator運行代碼具體分析

上篇問中介紹了啟動Replicator的詳細過程,以下解說Replicator的運行代碼的詳細實現,首先看replicate方法:

        def replicate(self, override_devices=None, override_partitions=None):
        """Run a replication pass"""
        self.start = time.time()
        self.suffix_count = 0
        self.suffix_sync = 0
        self.suffix_hash = 0
        self.replication_count = 0
        self.last_replication_count = -1
        self.partition_times = []

        if override_devices is None:
            override_devices = []
        if override_partitions is None:
            override_partitions = []
        #heartbeat 為心跳函數 依據配置,配置沒有 默覺得 300
        stats = eventlet.spawn(self.heartbeat)
        #detect_lockup  檢查死鎖
        lockup_detector = eventlet.spawn(self.detect_lockups)
        eventlet.sleep()  # Give spawns a cycle

        try:
            #replication 的 woker 數量
            self.run_pool = GreenPool(size=self.concurrency)
            # Returns a sorted list of jobs (dictionaries) that specify the
            # partitions, nodes, etc to be synced.
            # 返回專門為分區,節點同步工作的排序的列表
            #
            jobs = self.collect_jobs()
            for job in jobs:
                #重寫設備
                if override_devices and job['device'] not in override_devices:
                    continue
                #重寫分區
                if override_partitions and \
                        job['partition'] not in override_partitions:
                    continue
                #假設重寫設備及其重寫分區在job 中
                dev_path = join(self.devices_dir, job['device'])
                if self.mount_check and not ismount(dev_path):
                    self.logger.warn(_('%s is not mounted'), job['device'])
                    continue
                #ring沒有改變
                if not self.check_ring():
                    self.logger.info(_("Ring change detected. Aborting "
                                       "current replication pass."))
                    return
                #假設
                if job['delete']:
                    self.run_pool.spawn(self.update_deleted, job)
                else:
                    #運行的是更新
                    self.run_pool.spawn(self.update, job)
            with Timeout(self.lockup_timeout):
                self.run_pool.waitall()
        except (Exception, Timeout):
            self.logger.exception(_("Exception in top-level replication loop"))
            self.kill_coros()
        finally:
            stats.kill()
            lockup_detector.kill()
            self.stats_line()
      

在replicate方法中,首先是為replicate方法運行的準備工作,當中最重要的是要收集要運行的job的collection_jobs方法,以下為其代碼的詳細實現:

      def collect_jobs(self):
        """
        Returns a sorted list of jobs (dictionaries) that specify the
        partitions, nodes, etc to be synced.
        """
        jobs = []
        ips = whataremyips()
        #replication_ip 和replication_port 在  RingBuilder中 load加入
        #self.object_ring = Ring(self.swift_dir, ring_name='object')
        for local_dev in [dev for dev in self.object_ring.devs
                          if dev and dev['replication_ip'] in ips and
                          dev['replication_port'] == self.port]:
            dev_path = join(self.devices_dir, local_dev['device'])
            obj_path = join(dev_path, 'objects')
            tmp_path = join(dev_path, 'tmp')
            if self.mount_check and not ismount(dev_path):
                self.logger.warn(_('%s is not mounted'), local_dev['device'])
                continue
        #Remove any file in a given path that that was last modified before mtime.
        #/srv/1/node/sdb1/tmp下的文件
            unlink_older_than(tmp_path, time.time() - self.reclaim_age)
            if not os.path.exists(obj_path):
                try:
                    mkdirs(obj_path)
                except Exception:
                    self.logger.exception('ERROR creating %s' % obj_path)
                continue
            #root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
            #13069  133971  4799  58208  94238
            for partition in os.listdir(obj_path):
                try:
                    job_path = join(obj_path, partition)
                    #推斷當前路徑是否為文件,假設是文件則刪除
                    if isfile(job_path):
                        #
                        # Clean up any (probably zero-byte) files where a
                        # partition should be.
                        self.logger.warning('Removing partition directory '
                                            'which was a file: %s', job_path)
                        os.remove(job_path)
                        continue
                    #獲得每一個partion相應的設備
                    part_nodes = \
                        self.object_ring.get_part_nodes(int(partition))
                    #nodes為不是本機器nodes的其它replica-1個nodes
                    nodes = [node for node in part_nodes
                             if node['id'] != local_dev['id']]
                    #對objects下全部partion遍歷,故有jobs的長度最大為_replica2part2dev分區備份中出現此設備有此設備id的分區和
                    jobs.append(
                        dict(path=job_path,
                             device=local_dev['device'],
                             nodes=nodes,
                             #len(nodes)>len(part_nodes)-1的情況是當前節點已經不再是 當前partition所相應的設備了,有可能刪除了該設備
                             delete=len(nodes) > len(part_nodes) - 1,
                             partition=partition))
                except (ValueError, OSError):
                    continue
        #打亂順序
        random.shuffle(jobs)
        if self.handoffs_first:
            # Move the handoff parts to the front of the list
            #將handoff 節點移到jobs隊列的前邊
            jobs.sort(key=lambda job: not job['delete'])
        self.job_count = len(jobs)
        return jobs
    

對于第二層for循環,os.listdir(obj_path)列出objects目錄下的全部partion,創建object是在objects目錄下創建objects所映射的分區號的文件件,再在partion目錄下創建以object的hash值后三位為名稱的目錄,然后再在后綴目錄下創建以object的hash值為目錄名的目錄,object會存儲為以object上傳時間戳為名.data為文件后綴的文件。通過理解一致性hash算法可知,增加虛擬節點后每個設備會多個虛擬節點和其相應,假設一個設備相應的分區為n則,obj_path下子目錄數目會<=n,由于存入的全部文件并不一定都能映射到當前設備所相應的分區。for循環首先判讀obj_path下是否為文件,若是文件則刪除,若不是則獲得該分區號,依據分區號獲得該分區號所映射的三個備份設備,并將設備id和本地設備id不想等的增加到nodes中,將nodes、path等信息增加到jobs中,最后打亂jobs的順序,再將handoff 節點移到隊列前邊。返回jobs。再到replicate方法,首先我們看job[delete]為False的情況。當job[delete]為False會運行update方法,下邊看update方法的詳細實現:

      def update(self, job):
        """
        High-level method that replicates a single partition.

        :param job: a dict containing info about the partition to be replicated
        """
        self.replication_count += 1
        self.logger.increment('partition.update.count.%s' % (job['device'],))
        begin = time.time()
        try:
            #get_hashes 從hashes.pkl獲取hashes值并更新 獲取本地的hashes job[path] 為 job_path = join(obj_path, partition) local_hash為hashes.pkl中的反序列化回來的內容 hashed為改變的
            hashed, local_hash = tpool_reraise(
                get_hashes, job['path'],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)
            self.suffix_hash += hashed
            self.logger.update_stats('suffix.hashes', hashed)
            #
            attempts_left = len(job['nodes'])
            #此時的nodes為除去本節點外的全部節點 由于 job['nodes]不包括本地節點get_more_nodes(int(job['partition']))能獲得除去本partion所相應節點 外的其它全部節點
            nodes = itertools.chain(
                job['nodes'],
                self.object_ring.get_more_nodes(int(job['partition'])))
           #此時attempts_left 為2 若果replica為3
            while attempts_left > 0:
                # If this throws StopIterator it will be caught way below
                node = next(nodes)
                attempts_left -= 1
                try:
                    with Timeout(self.http_timeout):
                        #REPLICARE方法 相應 sever里面的RELICATE方法
                        resp = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '', headers=self.headers).getresponse()
                        if resp.status == HTTP_INSUFFICIENT_STORAGE:
                            self.logger.error(_('%(ip)s/%(device)s responded'
                                                ' as unmounted'), node)
                            attempts_left += 1
                            continue
                        if resp.status != HTTP_OK:
                            self.logger.error(_("Invalid response %(resp)s "
                                                "from %(ip)s"),
                                              {'resp': resp.status,
                                               'ip': node['replication_ip']})
                            continue
                        #remote_hash 為 請求 'REPLICATE 返回的
                        remote_hash = pickle.loads(resp.read())
                        del resp
                    #找出本地后綴和遠程后綴不同的
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #假設沒有說明沒有變動,則繼續請求下一個節點
                    if not suffixes:
                        continue

                    #效果就是運行get_hashes方法 
                    hashed, recalc_hash = tpool_reraise(
                        get_hashes,
                        job['path'], recalculate=suffixes,
                        reclaim_age=self.reclaim_age)
                    self.logger.update_stats('suffix.hashes', hashed)
                    local_hash = recalc_hash
                    #假如 local_hash 為 123 321 122 remote_hash 123 321 124 則 122為變化的
                    #文件路徑hash值后三位會不會反復
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #找到了不同的并知道其節點則將其同步到相應的節點,是基于推送模式的,故傳的數據是自己本地的數據
                    self.sync(node, job, suffixes)  #同步變化的
                    with Timeout(self.http_timeout):
                        conn = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '/' + '-'.join(suffixes),
                            headers=self.headers)
                        conn.getresponse().read()
                    self.suffix_sync += len(suffixes)
                    self.logger.update_stats('suffix.syncs', len(suffixes))
                except (Exception, Timeout):
                    self.logger.exception(_("Error syncing with node: %s") %
                                          node)
            #后綴數量 寫日志時會用到
            self.suffix_count += len(local_hash)
        except (Exception, Timeout):
            self.logger.exception(_("Error syncing partition"))
        finally:
            self.partition_times.append(time.time() - begin)
            self.logger.timing_since('partition.update.timing', begin)
    

update方法,中首先是獲得本地文件里當前設備所相應hashes.pkl文件里每一個后綴所相應的hahes值,形如{'a83': '0db7b416c9808517a1bb2157af20b09b'},當中key為文件內容hash值的后三字節,value為后綴目錄下全部子目錄下(即以文件內容的md5值為名字的目錄)全部.data文件的文件名稱字的md5值,能夠理解為全部文件名稱的md5值和。

                  hashed, local_hash = tpool_reraise(
                get_hashes, job['path'],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)
    

如上代碼片段會運行get_hashes方法,并將后邊參數傳遞給get_hashes

      def get_hashes(partition_dir, recalculate=None, do_listdir=False,  
               reclaim_age=ONE_WEEK):  
    """ 
    Get a list of hashes for the suffix dir.  do_listdir causes it to mistrust 
    the hash cache for suffix existence at the (unexpectedly high) cost of a 
    listdir.  reclaim_age is just passed on to hash_suffix. 
 
    :param partition_dir: absolute path of partition to get hashes for 
    :param recalculate: 形如 recalculate=['a83'] 
      list of suffixes(后綴,即 hash值的后綴  310即為后綴  root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects/94238# ls 
   310  hashes.pkl   ) which should be recalculated(又一次計算) when got 
    :param do_listdir: force existence check for all hashes in the partition(對partion中的hashe強行運行檢查) 
    :param reclaim_age: age at which to remove tombstones 
 
    :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) 
    """
    

因沒有傳遞recalulate這個參數故僅僅有do_listdir為True時會強制運行又一次計算后綴文件下全部文件名稱字的hash值。文件名稱字是時間戳,時間戳變了說明文件有更新,故須要和遠程同步,檢查是否為同一個版本號,不是同一個版本號的須要把本地版本號傳遞給遠程server。

      attempts_left = len(job['nodes'])
            #此時的nodes為除去本節點外的全部節點 由于 job['nodes]不包括本地節點get_more_nodes(int(job['partition']))能獲得除去本partion所相應節點 外的其它全部節點
            nodes = itertools.chain(
                job['nodes'],
                self.object_ring.get_more_nodes(int(job['partition'])))
    

如上代碼片段,attempts_left為當前job相應的分區去掉本地節點的其它的備份節點的個數。得到attempts_left后,下邊接著更新了nodes,當中get_more_nodes方法會得到出去本分區所相應節點之外的其它全部節點的迭代器,全部nodes是除去本節點外全部節點的一個迭代器。

下邊就是while循環,循環attempts_left次,

      resp = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '', headers=self.headers).getresponse()
    

依據迭代得到的node請求,因副本節點首先被迭代到,故首先請求副本節點。若果成功請求讀取resp返回的內容,得到遠程設備同一個partion下的remote_hash

      suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #假設沒有說明沒有變動,則繼續請求下一個節點
                    if not suffixes:
                        continue
    

對照兩個設備同樣partion下的hashes.pkl文件同樣key而value不同的key。suffixes則說明和遠程備份文件都是同一個版本號,繼續請求下一個備份。假設不為空,則須要處理,同一時候再一次得到自己hashes.pkl目錄中的內容,由于上一次請求時間中可能有其它的備份已經有新的更新推送到本server了。得到本地最新的hashes.pkl內容后再一次對照,得到不同的同樣分區下的不同后綴
運行同步:

      self.sync(node, job, suffixes)  #同步變化的
    

在同步變化時作者如今使用rsync方法,沒有使用ssync,只是已經留出了ssync的實現,當ssync方法穩定時就會把rsync替換掉。(敬請期待)
       def sync(self, node, job, suffixes):  # Just exists for doc anchor point
        """
        Synchronize local suffix directories from a partition with a remote
        node.

        :param node: the "dev" entry for the remote node to sync with
        :param job: information about the partition being synced
        :param suffixes: a list of suffixes which need to be pushed

        :returns: boolean indicating success or failure
        """
        # self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
        #配置沒有 sync_method方法 則運行類自己的rsync方法
        return self.sync_method(node, job, suffixes)
    
? sync_method方法從例如以下獲得,沒有配置則運行rsync方法

? self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')

      def rsync(self, node, job, suffixes):
        """
        Uses rsync to implement the sync method. This was the first
        sync method in Swift.
        """
        if not os.path.exists(job['path']):
            return False
        args = [
            'rsync',
            '--recursive',
            '--whole-file',
            '--human-readable',
            '--xattrs',
            '--itemize-changes',
            '--ignore-existing',
            '--timeout=%s' % self.rsync_io_timeout,
            '--contimeout=%s' % self.rsync_io_timeout,
            '--bwlimit=%s' % self.rsync_bwlimit,
        ]
        node_ip = rsync_ip(node['replication_ip'])
        #包括了ip信息
        if self.vm_test_mode:
            rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
        else:
            rsync_module = '%s::object' % node_ip
        had_any = False
        for suffix in suffixes:
            spath = join(job['path'], suffix)
            if os.path.exists(spath):
                args.append(spath)
                had_any = True
        if not had_any:
            return False
        args.append(join(rsync_module, node['device'],
                    'objects', job['partition']))
        #args里面包括了通的全部信息 包括設備名稱,設備分區
        return self._rsync(args) == 0
    
rsync方法將接受的參數都放到args中,然后運行_rsync方法。

          def _rsync(self, args):
        """
        Execute the rsync binary to replicate a partition.

        :returns: return code of rsync process. 0 is successful
        """
        start_time = time.time()
        ret_val = None
        try:
            with Timeout(self.rsync_timeout):
                #此處即為同步操作了,推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)
                results = proc.stdout.read()
                ret_val = proc.wait()
        except Timeout:
            self.logger.error(_("Killing long-running rsync: %s"), str(args))
            proc.kill()
            return 1  # failure response code
        total_time = time.time() - start_time
        for result in results.split('\n'):
            if result == '':
                continue
            if result.startswith('cd+'):
                continue
            if not ret_val:
                self.logger.info(result)
            else:
                self.logger.error(result)
        if ret_val:
            error_line = _('Bad rsync return code: %(ret)d <- %(args)s') % \
                {'args': str(args), 'ret': ret_val}
            if self.rsync_error_log_line_length:
                error_line = error_line[:self.rsync_error_log_line_length]
            self.logger.error(error_line)
        elif results:
            self.logger.info(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {'src': args[-2], 'dst': args[-1], 'time': total_time})
        else:
            self.logger.debug(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {'src': args[-2], 'dst': args[-1], 'time': total_time})
        return ret_val
    
當中例如以下代碼片段就是運行詳細的推送:

        #此處即為同步操作了,推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)
    

若job[delete]為True出現這樣的情況的可能就是,因增刪了設備,Ring 又一次調整,當前partion中的備份不再有此server的ID如partion號為45678的在rebalance前的對于的備份設備的id為[1,2,3],假設當前設備id為1,則又一次rebalance后當前partion相應的備份為[4,2,3],則就會出現job[delete]為True的情況,我們看其代碼詳細實現:

          def update_deleted(self, job):
        """
        High-level method that replicates a single partition that doesn't
        belong on (不應放在 )this node.

        :param job: a dict containing info about the partition to be replicated
        """
        #得到parition下相應的后綴
        def tpool_get_suffixes(path):
            return [suff for suff in os.listdir(path)
                    if len(suff) == 3 and isdir(join(path, suff))]
        self.replication_count += 1
        self.logger.increment('partition.delete.count.%s' % (job['device'],))
        begin = time.time()
        try:
            responses = []
            suffixes = tpool.execute(tpool_get_suffixes, job['path'])
            if suffixes:
                for node in job['nodes']:
                    success = self.sync(node, job, suffixes)      #運行同步
                    if success:
                        with Timeout(self.http_timeout):
                            conn = http_connect(
                                node['replication_ip'],
                                node['replication_port'],
                                node['device'], job['partition'], 'REPLICATE',
                                '/' + '-'.join(suffixes), headers=self.headers)
                            conn.getresponse().read()
                    responses.append(success)
            if self.handoff_delete:
                # delete handoff if we have had handoff_delete successes
                delete_handoff = len([resp for resp in responses if resp]) >= \
                    self.handoff_delete
            else:
                # delete handoff if all syncs were successful
                delete_handoff = len(responses) == len(job['nodes']) and \
                    all(responses)
            #suffixes為空或 請求的三個已經都響應成功后刪除本地partion下的文件
            if not suffixes or delete_handoff:
                self.logger.info(_("Removing partition: %s"), job['path'])
                tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
        except (Exception, Timeout):
            self.logger.exception(_("Error syncing handoff partition"))
        finally:
            self.partition_times.append(time.time() - begin)
            self.logger.timing_since('partition.delete.timing', begin)

    


至此 replicate操作就解說完成, 文中若有理解不合理之處,請指正,謝謝!

OpenStack_Swift源代碼分析——ObjectReplicator源代碼分析(2)


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 亚洲第一a亚洲 | 国产成人免费无庶挡视频 | 久草在线视频资源 | 欧美爱爱视频网站 | 欧美日韩色综合网站 | 亚色中文| 日本三级韩国三级香港三级 | 俄罗斯论理片 | 色综合中文字幕 | 黑人黄色大片 | 久久久久网站 | 特黄级| 亚洲精品在线播放视频 | 午夜精品亚洲 | 韩国在线精品福利视频在线观看 | 加勒比久草 | 丁香婷婷网 | 爽妇网国产精品 | 国产亚洲第一伦理第一区 | 欧美污污网站 | 国产不卡a | 精品久久久久久久中文字幕 | 99色精品 | 久久久噜噜噜久久中文字幕色伊伊 | 婷婷丁香综合 | 欧美国产激情二区三区 | 亚洲一二三区视频 | 国产一区不卡 | 国产福利福利视频 | 国产大陆精品另类xxxx | 久久国产精品免费 | 视频一区欧美 | 91视频会员| 青草娱乐 | 国产欧美日本 | 色偷偷偷 | 国产欧美一区二区三区久久人妖 | 色吧综合 | 欧美无玛| 福利片在线观看 | 激情se|