1、推送系统实践(一)最近由于项目需要做一个推送系统,借此机会对 nodejs 和 pomelo 有了一次实践,在此将整个实践过程记录并分享。什么是 pomeloPomelo 是基于 Node.js 的高性能、分布式游戏服务器框架。它包括基础的开发框架和相关的扩展组件(库和工具包),可以帮助你省去游戏开发枯燥中的重复劳动和底层逻辑的开发。Pomelo 不但适用于游戏服务器开发, 也可用于开发高实时 Web 应用,它的分布式架构可以使 Pomelo 比普通的实时 Web 框架扩展性更好。 更多详情参见 http:/ 为什么选 pomelo初次接触 pomelo 也是项目关系,当时我们是要做一个实时在
2、线对战游戏,当时选的就是 pomelo,效果不错。所以在做推送系统的时候,也就把 pomelo 放到技术选型方案中,评估后觉得也比较适合,于是继续使用 pomelo,下面进入正题。pomelo 安装Ubuntua、安装 nodejs 要求版本0.8b、到 http:/nodejs.org/获取源码包的下载地址,下载 nodejs 的 gz 包wget http:/nodejs.org/dist/v0.10.28/node-v0.10.28.tar.gzc、解压编译安装tar zxvf node-v0.10.28.tar.gzcd node-v0.10.28./configuremake sud
3、o make installd、检查是否安装成功#node -v2. 安装 g+ gcc 等 apt-get install build-essential3. 安装 npm 并设置成 http 连接apt-get updateapt-get install npmnpm config set strict-ssl falsenpm config set registry “http:/registry.npmjs.org/“npm install pomelo -g详细安装过程可以参见 http:/ 项目功能介绍主要功能是实现针对移动端的广告实时推送系统,详细需求如下:1. 用户状态维护2.
4、 在线用户实时推送3. 对用户进行分类并打 tag。4. 挖掘用户特征及 tag5. 根据 tag 区分用户并制定推送策略,推送相应的广告。举个例子解释一下,比如有如下场景:两个用户 A 和 B,其中 A 为男性,B 为女性。- AB 为用户, 性别为 tag广告库里面有两个广告 C 和 D,C 是汽车类的,D 是化妆品类广告。 - 广告C 是包段定向投放给男性,D 是全网投放( 无论男女都投放 ) - 推广策略Tag 从任意维度给用户打的一个标签。比如性别,年龄,地域等等策略 广告检索的方法。 说白了就是如何从一堆的广告到当中根据什么样的算法去选取广告并进行推送。那这里要实现的就是这样一个系
5、统: 能够通过离线的数据挖掘对用户进行分类,从而打上不同的 tag, 再将广告通过不同的推广策略推广给用户一个平台。其中第 4 点,挖掘用户特征不在本文范围内,本系统着重在于实现一个可扩展的实时在线推送平台。整体架构关于如何实现可扩展的分布式系统及系统架构方面,比如在线用户登录及状态维护,如何实现分布式系统方面属于 pomelo 范畴,本文也不在重述,有兴趣的童鞋可以查看 pomelo的官方文档,本文着重分享的是如何设计和实现业务系统架构。从以上的例子分析其实可以看出,这里面其实有四个方面:用户,tag,策略和广告。用户与 tag 是多对一的关系,一个用户可以有多个 tag。tag 与策略是多
6、对多的关系,一个策略可以应用在多个 tag;一个 tag 也可以存在多个策略tag 通过 pomelo 的 channel 来实现,一个 channel 代表一个 tag。策略通过 component 来实现,一个 component 代表一个策略。代码结构如下:appcomponentsdaomysqldomainmonitorserversareaconnectorgate其中, components 目录下面存放的是推广策略代码,一个推广策略是一个文件dao 目录存放数据存取,比如 mysqldomain 目录存放各种抽象类monitor 目录存放监控代码servers 目录存放的是三种
7、类型的 server:a. area 负责逻辑应用b. connector 负责用户连接维护c. gate 负责负载均衡下面直接上代码。负载均衡代码,gate/handler/gateHandler.jsvar Code = require(./././././shared/code);var dispatcher = require(./././util/dispatcher);var utils= require(./././util/utils);/* Gate handler that dispatch user to connectors.*/module.exports = fun
8、ction(app) return new Handler(app);var Handler = function(app) this.app = app;Handler.prototype.queryEntry = function(msg, session, next) var uid = msg.uid;if(!uid) next(null, code: Code.FAIL);return;var connectors = this.app.getServersByType(connector);if(!connectors | connectors.length = 0) next(n
9、ull, code: Code.GATE.FA_NO_SERVER_AVAILABLE);return;var res = dispatcher.dispatch(uid, connectors);next(null, code: Code.OK, host: res.pubHost, port: res.clientPort);其中 dispatcher 采用随机分流,var crc = require(crc);var utils = require(./utils)module.exports.dispatch = function(uid, connectors) /var index
10、 = Number(uid) % connectors.length;var index = Math.abs(crc.crc32(uid) % connectors.length;return connectorsindex;connector 验证用户登录connector/handler/entryHandler.jsvar Code = require(./././././shared/code);var logger = require(pomelo-logger).getLogger(_filename);var https = require(https);var async =
11、 require(async);module.exports = function(app) return new Handler(app);var Handler = function(app) this.app = app;var httpConfig = app.get(httpConf);this.option = hostname: httpConfig.avsHost,port: httpConfig.avsPort,path: httpConfig.playerValidate,method: “POST“,headers: httpConfig.avsHeaders;/cons
12、ole.error(JSON.stringify(this.option);/* New client entry.* param Object msg request message* param Object session current session object* param Function next next step callback* return Void*/Handler.prototype.playerValidate = function(uid, cb) var content = JSON.stringify(uid:uid);var req = https.r
13、equest(this.option, function(res)logger.debug(“playerValidate return code:“ + res.statusCode);cb(null, res.statusCode););req.write(content);req.end();return;Handler.prototype.entry = function(msg, session, next) var self = this;var uid = msg.uid;async.waterfall(function(cb) self.playerValidate(uid,
14、cb);,function(resCode, cb)if(resCode != Code.OK) next(null,code: Code.FAIL,msg: “invalid user“);return;var sessionService = self.app.get(sessionService);/duplicate log inif( ! sessionService.getByUid(uid) next(null, code: Code.FAIL,msg: duplicate login);return;session.bind(uid);/*session.set(gid, gi
15、d);session.push(gid, function(err) if(err) logger.error(set gid for session service failed! error is : %j, err.stack);next(null, code: Code.FAIL,msg: set gid failed);return;);*/session.on(closed, onUserLeave.bind(null, self.app);self.app.rpc.area.areaRemote.add(session, uid, self.app.get(serverId),f
16、unction(data)next(null, code: data.code, msg: data.msg);return;/cb(null, data.code); );return;, function(err) if (err) logger.error(err.stack); );/* User log out handler* param Object app current application* param Object session current session object*/var onUserLeave = function(app, session) var c
17、hannelName = session.get(channelName);if(!session | !session.uid | !channelName) logger.warn(“invalid user kicked“);return;app.rpc.area.areaRemote.kick(session, channelName, session.uid, function(data) if(data.code = Code.FAIL) logger.error(user leave error);area/remote/areaRemote.js/* Module depend
18、encies*/var logger = require(pomelo-logger).getLogger(_filename);var pomelo = require(pomelo);var Queue = require(pomelo-collection).queue;var Code = require(./././././shared/code);module.exports = function(app) return new areaRemote(app, app.get(adPushService);var areaRemote = function(app, adPushS
19、ervice) this.app = app;/this.self = this;this.adPushService = adPushService;areaRemote.prototype.add = function(uid, sid, cb)this.adPushService.initService();this.adPushService.add(uid, sid, cb);return;/*areaRemote.prototype.restoreChannel = function(channelName, cb) this.gameService.restoreChannel(
20、channelName, cb);return;*/areaRemote.prototype.kick = function(channelName, uid, cb) this.adPushService.kick(channelName, uid, cb);return;areaRemote.js 主要负责用户登录时将用户添加到相应通道里。area/remote/adPushService.js/* Module dependencies*/var logger = require(pomelo-logger).getLogger(_filename);var pomelo = requi
21、re(pomelo);var Code = require(./././././shared/code);var AdPushChannel = require(./././domain/adPushGlobalChannel);var Ssp = require(./././domain/ssp);var EventEmitter = require(events).EventEmitter;var util = require(util);var Const = require(./././././shared/const);var adPushService = function(app
22、) EventEmitter.call(this);this.app = app;this.initFlag = false;this.channelService = null;this.channelOnDict = ;util.inherits(adPushService, EventEmitter);module.exports = adPushService;adPushService.prototype.initService = function() if (! this.initFlag) this.channelService = this.app.get(“channelS
23、ervice“);this.backeSessionService = this.app.get(backendSessionService);for (var channelName in Const.PUSH_CHANNEL) var opts = channelName: Const.PUSH_CHANNELchannelName,app: this.app;var pushChannel = new AdPushChannel(opts);this.channelOnDictchannelName = pushChannel;this.initFlag = true;/*adPushS
24、ervice.prototype.pushAd = function(adPushChannel, ads)/logger.debug(“Schedule job:“+data.name+“execute“);/var ads = url:http:/ = function(uid, sid, cb)var selGame = null;var opts = uid: uid,sid: sidvar ssp = new Ssp(opts);/TODO get ssp strategy here/*每个用户都会有一个或者多个特征,从而属于一个或者多个 push 通道,这里需要扩展的是:从离线挖掘
25、系统中找出 ssp 属于哪些 push 策略从而把这些通道名称赋予该 ssp,然后会把 ssp 以此加到所属通道中,再由各个通道策略负责推送广告add set ssp strategy here */if (ssp.adPushStrategy in this.channelOnDict) this.channelOnDictadPushStrategy.addSsp(ssp, cb); else this.channelOnDictConst.PUSH_CHANNEL.DEFAULT.addSsp(ssp, cb);return;adPushService.prototype.kick =
26、function(channelName, uid, cb) var game = null;if (channelName in this.channelOnDict) game = this.channelOnDictchannelName;if (!game) cb(code: Code.FAIL, msg: channelName does not existed);logger.warn(“+channelName+“ does not existed“);return;game.kickSsp(uid, cb);return;domain/adPushGlobalChannel.j
27、s主要是负责将提供真实的能力将用户添加到 channel 里面和向用户推送具体广告。/* Module dependencies*/var EventEmitter = require(events).EventEmitter;var util = require(util);var Code = require(./././shared/code);var logger = require(pomelo-logger).getLogger(_filename);var self= null;var sType = connector;/var playerPerGame = 2;/var i
28、d = 1;/var playerAdded = “playerAdded“;/* Initialize a new Game with the given opts.* Game inherits EventEmitter* param Object opts* api public*/var adPushGlobalChannel = function(opts) EventEmitter.call(this);this.channelName = opts.channelName;this.app = opts.app;this.ssps = ;this.sspNames = ;this
29、.sspNum = 0;/this.channel = this.app.get(“globalChannelService“).getChannel(this.channelName, true);this.globalChannelService = this.app.get(globalChannelService);self = this;/this.initListener();util.inherits(adPushGlobalChannel, EventEmitter);/* Expose Game constructor*/module.exports = adPushGlobalChannel;