一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”

目录

  • 业务背景

  • 方案确定

  • 迁移阶段

    • 迁移优化

  • 复盘

只要还有一根头发,说明你还能努力一把:dog:

业务背景

之前公司投票系统的统计用的是 HBase 进行存储,历史数据大概是四亿条,总监说现在需要将 HBase 数据迁移到mongodb,只保存最近两年的数据,其他的数据磁盘备份就行,要求有三:

  1. 不丢数据

  2. 平滑迁移

  3. 不停机

于是作为一个刚刚毕业充满激情和热血的有志美少女,我开完早会第一时刻就去百度搜索方案:

一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”
数据迁移方案

点开看了前面几条之后,摸着键盘的手有点发抖,就差留下两行清澈的泪水,然后默默的我打开了:

一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”
人生就是这么戏剧

方案确定

于是我老大就带着我快乐的分析需求,主要是迁移投票统计数据,有每日投票记录,每日活动投票数记录,每个用户投票记录等等。最终确定三亿条数据分布在两张表里面,那两张数据量大的表由他迁移,另外七张表总共数据量差不多快一个亿,由我来迁移。

因为不能影响到线上的客户,所有我这边迁移方案最终是 新老double write + offline data sync and check
,其实就是 线上双写+线下复核

我需要迁移的表,有五张表是数据量比较小的就十万条数据左右,于是我先拿着这些表出气,这些表由于数据量不大我是进行全表迁移的,就是从 HBase 查询到所有数据直接往 mongodb 里面倒。

这是我出的数据迁移方案:

数据平滑迁移方案

采用上线期间对新增的数据进行双写策略:

  1. 先上线一版对"增删改"数据写两份数据库的措施,在代码原来只操作hbase的情况下,加上mongodb库的操作逻辑;

  2. 编写数据迁移的 python 脚本和接口,将数据从 hbase 迁移到 mongodb;

  3. 将从 hbase 查询的逻辑旁边全部加上从 mongodb 库查询的逻辑;

  4. 迁移完毕,上线的时候,将hbase逻辑全部下掉,查询全部走 mongo;

  5. 考虑到数据迁移幂等性问题,迁移代码全部写成覆盖而不是增加票数;

迁移阶段

方案确定了,一切好像都在按照排期有条不紊的进行着:

  1. 建立 mongodb 数据表,建立好索引

  2. 上线双写代码,投票写操作同时记录在 mongodb 库,那样迁移期间投票数据记录就不会丢失;

  3. 拿着迁移数据量小的表出气,全部查出来一股脑往 mongodb 倾倒;

// 查询 hbase

public List<VoteRecordable> listVoteRecord(VoteRecordable begin, VoteRecordable end) {

byte[] startRow = begin.getRowKey();

byte[] endRow = end.getRowKey();

Scan scan = new Scan(startRow, endRow);

return this.hbaseTemplate.find(begin.getTableName(), scan, this.getRowMapper(begin.getClass()));

}

// mongodb 入库

public void batchInsertTotalActivityVoteRecordDoc(List<TotalActivityVoteRecordDoc> totalActivityVoteRecordDocs) {

BulkOperations operations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, TotalActivityVoteRecordDoc.class);

List<Pair<Query, Update>> upsertList = new ArrayList<>(totalActivityVoteRecordDocs.size());

totalActivityVoteRecordDocs.forEach(data -> {

Query query = new Query(Criteria.where("activityId").is(data.getActivityId()));

Update update = new Update();

update.set("voteCount", data.getVoteCount());

Pair<Query, Update> upsertPair = Pair.of(query, update);

upsertList.add(upsertPair);

});


operations.upsert(upsertList);

operations.execute();

}

一次 scan 查询会返回大量数据,因此客户端发起一次scan请求,实际并不会一次就将所有数据加载到本地,而是分成多次 RPC 请求进行加载,数据量小的话可以不计较得失愉快的scan,但是数据量太大会有两个问题躲不掉:

  1. 严重消耗网络带宽,从而影响其他业务;

  2. 本地客户端发生OOM;

  3. 请求太大太集中会把 HBase 打爆,因为我的请求是 scan 方式而不是 rowKey 等值查询(等值查询的话需要拼接详细的活动ID或者投票ID);

五张表我愉快的迁移了,但是迁移大表的时候报应来了,一个是查询慢,第二个是查询完插入过程中需要new 很多对象,愉快的 OOM 了,于是这种方式我也就是想着小表先给整了,大表还要另外寻出路。

迁移优化

优化思路:

  1. scan 危险的话,那就等值查询喽,拼接要的东西我又不是没有;

  2. 查询慢要提高速度,那就多线程走起;

  3. 本地迁移太耽误事情了,放到服务器里面迁移,内存大各种访问走内网还快;

  4. 迁移期间调用接口请求失败了需要重试;

于是我在Java代码里面写好查询数据和插入数据的逻辑:

@ResponseBody

public void insertHourlyVoteRecordDoc2(@RequestParam(value = "voteItemId") String voteItemId,

@RequestParam(value = "beginDate") String beginDate,

@RequestParam(value = "endDate") String endDate){

VoteRecordable begin = new HourlyVoteRecord();

begin.setTraceId(voteItemId);

begin.setDate(beginDate);


VoteRecordable end = new HourlyVoteRecord();

end.setTraceId(voteItemId);

end.setDate(endDate);


List<VoteRecordable> voteRecordables = voteItemStatService.listVoteRecords(begin, end);

System.out.println(voteRecordables.size() + "voteRecordables大小是多少 insertHourlyVoteRecordDoc");

if (CollectionUtils.isEmpty(voteRecordables)) {

return;

}

List<HourlyVoteRecordDoc> hourlyVoteRecordDocs = new ArrayList<>(voteRecordables.size());

for (VoteRecordable voteRecordable1 : voteRecordables) {

HourlyVoteRecord hourlyVoteRecord = (HourlyVoteRecord) voteRecordable1;

HourlyVoteRecordDoc hourlyVoteRecordDoc = new HourlyVoteRecordDoc();

hourlyVoteRecordDoc.setVoteItemId(new ObjectId(hourlyVoteRecord.getVoteItemId()));

hourlyVoteRecordDoc.setVoteCount(hourlyVoteRecord.getVoteCount());

hourlyVoteRecordDoc.setHour(hourlyVoteRecord.getHour());


hourlyVoteRecordDocs.add(hourlyVoteRecordDoc);

}

voteRecordService.batchInsertHourlyVoteRecordDoc(hourlyVoteRecordDocs);


}

然后在python写脚本进行多线程并发请求迁移:

"""

迁移

DailyVoteRecordDoc

HourlyVoteRecord

HourlyActivtiyVoteRecord

"""


import threading

import time

from datetime import datetime, timedelta


from core_service import vote_service

from scripts.biz.vote.migrate_user_vote_records import MigrateLimitException



class MyThread(threading.Thread):

def __init__(self, thread_id, name, archive_activities):

threading.Thread.__init__(self)

self.threadID = thread_id

self.name = name

self.archive_activities = archive_activities


def run(self):

print("Starting " + self.name)

main(self.archive_activities)

print("exiting " + self.name)



def main(archive_activities):

"""

迁移投票记录

:param archive_activities:

:return:

"""


final_date_end_timestamp = 1595433600


for activity in archive_activities:

date_start = time.strftime('%Y-%m-%d', time.localtime(int(activity['dateStart'] / 1000)))

hour_start = time.strftime('%Y-%m-%d-%H', time.localtime(int(activity['dateStart'] / 1000)))


date_end_timestamp = int(activity['dateEnd'] / 1000 + 24 * 60 * 60)


if final_date_end_timestamp < date_end_timestamp:

date_end_timestamp = final_date_end_timestamp


date_end = time.strftime('%Y-%m-%d', time.localtime(date_end_timestamp))

hour_end = time.strftime('%Y-%m-%d-%H', time.localtime(date_end_timestamp))

## 迁移 hourly_activity_vote_records

vote_service.batch_insert_hourly_activity_vote_records(activity.get('_id'), hour_start, hour_end)

print(activity)


vote_items = vote_service.list_vote_items(str(activity['_id']))

try:

for vote_item in vote_items:

# print("user_vote_record: activityId voteItemId score", activity.get('_id'), vote_item.get('_id'))

# print(vote_item)

if vote_item['score'] != 0:

## 迁移 daily_vote_records

vote_service.batch_insert_daily_vote_records(vote_item.get('_id'), date_start, date_end)

## 迁移 hourly_vote_records

print("开始结束时间时间", hour_start, hour_end)

print("开始结束日期", date_start, date_end)

vote_service.batch_insert_hourly_vote_records(vote_item.get('_id'), hour_start, hour_end)


except MigrateLimitException:

print("migrate limit error")




if __name__ == "__main__":

// 调用Java代码接口,查询到所有的活动

archive_activities_local = vote_service.archive_activities_v3()

# 开启多线程

i = 1

available_activities = []

threads = []


for index, local_activity in enumerate(archive_activities_local):

# 每循环700次开一个线程

if index == i * 700:

thread = MyThread(i, "Thread-" + str(i), available_activities)

thread.start()

threads.append(thread)

i += 1

available_activities = []

available_activities.append(local_activity)


if available_activities:

thread = MyThread(i, "Thread-" + str(i), available_activities)

thread.start()

threads.append(thread)


for t in threads:

t.join()


请求如果失败了重试三次

@retry(3)

def batch_insert_hourly_activity_vote_records(activityId, beginDate, endDate):

"""

批量插入活动小时投票记录

:param activityId:

:param beginDate:

:param endDate:

:return:

"""

url = "http://xxx/vote/api/internal/dbMigration/insertHourlyActivityVoteRecordDoc?" /

"activityId={activityId}&beginDate={beginDate}&endDate={endDate}".format(activityId=activityId, beginDate=beginDate, endDate=endDate)


r = requests.session().get(url)


最终放到服务器里面跑了两个小时就跑完了。我老大的三亿条数据跑了一天跑完,我一开始以为我老大说的一天12小时(实际是24小时算的),那我想着我的数据不到他的三分之一,三小时就差不多,于是本地跑,结果跑了一整天。

这便是所有的实现了。

但是这个过程并没有我说的这么顺利和轻松,架构师给了我五天我延期了三天,我浪费时间的部分有以下几个方面:

  1. 私人原因,中间有一天看了下小破牙请了半天假,痛的灵魂出窍无心工作,但是我周末自己在家赶了很多双写代码的进度;

  2. 前期迁移还算顺利,但是由于缺少数据迁移的经验,后期大数据表迁移出现了很多意外;

  3. 大数据表迁移,而且没有处理好中间中断情况的log记录,导致我开始了不敢轻易中断,然后如果代码逻辑有bug,要从来就很要命;

  4. 预估不到数据迁移时间,其中有一天我写完逻辑在本地跑数据迁移,跑了六个小时,严重影响到了我的操作;

  5. 线下数据复核的时候,有七张表数据,对的我有点头昏眼花,然后有三张大表数据一直对不上;业务里面有礼物投票刷数据的情况一开始没有想到这个龟孙儿,导致我一直以为数据迁移有问题,拼了命的找原因又找不到逻辑漏洞

全程写的很痛苦,感觉根本不在我的掌控之中。说了这么多,菜是原罪,努力学习吧,只要还有一根头发就不放弃学习!!!

欢迎批评指正,公众号《阿甘的码路》欢迎和菜鸟号主一起成长,有收获的朋友点个在看或者分享鼓励一下吧,十分感谢~

一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”
关注我,一起成长

原文 

http://mp.weixin.qq.com/s?__biz=MzU0ODYzMzc0MA==&mid=2247484014&idx=1&sn=ac4dbb437201921321970903bbeb501a

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 一个HBase数据迁移到Mongodb需求,架构师说“你怎么有那么多意外”

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址