Skip to content

Commit

Permalink
fix bug: sBuffer empty donot send response.
Browse files Browse the repository at this point in the history
  • Loading branch information
marklightning committed Jun 14, 2020
1 parent 580fb3e commit ee62df2
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 50 deletions.
14 changes: 12 additions & 2 deletions src/FlowControl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// **********************************************************************
// This file was generated by a TARS parser!
// TARS version 2.0.0.
// TARS version 2.4.2.
// **********************************************************************

#ifndef __FLOWCONTROL_H_
Expand Down Expand Up @@ -228,11 +228,20 @@ namespace Base
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.put("tars_ret", _ret);

vector<char> sTupResponseBuffer;
tarsAttr.encode(sTupResponseBuffer);
current->sendResponse(tars::TARSSERVERSUCCESS, sTupResponseBuffer);
}
else if (current->getRequestVersion() == JSONVERSION)
{
tars::JsonValueObjPtr _p = new tars::JsonValueObj();
_p->value["tars_ret"] = tars::JsonOutput::writeJson(_ret);
vector<char> sJsonResponseBuffer;
tars::TC_Json::writeValue(_p, sJsonResponseBuffer);
current->sendResponse(tars::TARSSERVERSUCCESS, sJsonResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriterVector> _os;
Expand Down Expand Up @@ -282,11 +291,12 @@ namespace Base
tars::Int32 _ret = report(flow,ip, _current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION )
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriterVector, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.put("tars_ret", _ret);
tarsAttr.encode(_sResponseBuffer);
}
else if (_current->getRequestVersion() == JSONVERSION)
Expand Down
76 changes: 39 additions & 37 deletions src/httpproxy/StationManager.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#include "StationManager.h"
#include "util/tc_mysql.h"
#include "util/tc_md5.h"
#include "HttpProxy.h"

#include "util/tc_md5.h"
#include "util/tc_mysql.h"

StationManager::StationManager()
{
_isLoading = false;
}

bool StationManager::load(TC_Config& conf)
bool StationManager::load(TC_Config &conf)
{
_conf = conf;
// 这里用个状态位进行简单处理,不是很严谨
Expand All @@ -28,13 +27,13 @@ bool StationManager::load(TC_Config& conf)

if (ret)
{
flushObj();
flushObj();
}

return ret;
}

bool StationManager::loadHttp(TC_Config& conf)
bool StationManager::loadHttp(TC_Config &conf)
{
_conf = conf;
// 这里用个状态位进行简单处理,不是很严谨
Expand All @@ -51,17 +50,19 @@ bool StationManager::loadHttp(TC_Config& conf)

if (ret)
{
flushObj();
flushObj();
}
return ret;
}

bool StationManager::loadComm()
{
bool ret = loadBlackList();
ret &= loadWhiteList();
return ret;
}

bool StationManager::init(TC_Config& conf)
bool StationManager::init(TC_Config &conf)
{
map<string, string> m = conf.getDomainMap("/main/db");
TC_DBConf tcDBConf;
Expand All @@ -71,7 +72,7 @@ bool StationManager::init(TC_Config& conf)
return load(conf);
}

bool StationManager::loadHttpRouterConf(vector<RouterParam>& paramList)
bool StationManager::loadHttpRouterConf(vector<RouterParam> &paramList)
{
try
{
Expand All @@ -95,7 +96,7 @@ bool StationManager::loadHttpRouterConf(vector<RouterParam>& paramList)

return true;
}
catch(const std::exception& e)
catch (const std::exception &e)
{
TLOGERROR(e.what() << endl);
}
Expand All @@ -115,16 +116,16 @@ bool StationManager::loadRouter()
{
RouterParam rp;
rp.id = TC_Common::strto<int>(data[i]["f_id"]);
rp.serverName = TC_Common::trim(data[i]["f_server_name"]);
rp.location = TC_Common::trim(data[i]["f_path_rule"]);
rp.serverName = TC_Common::trim(data[i]["f_server_name"]);
rp.location = TC_Common::trim(data[i]["f_path_rule"]);
rp.proxyPass = TC_Common::trim(data[i]["f_proxy_pass"]);
rp.stationId = TC_Common::trim(data[i]["f_station_id"]);
if (rp.location.empty() || rp.proxyPass.empty())
{
TLOGERROR("error db conf:" << rp.id << "|" << rp.serverName << "|" << rp.location << "|" << rp.proxyPass << "|" << rp.stationId << endl);
continue;
}

paramList.push_back(rp);
}
}
Expand All @@ -136,7 +137,8 @@ bool StationManager::loadRouter()
// 合并配置文件内容
loadHttpRouterConf(paramList);

return Router->reload(paramList);;
return Router->reload(paramList);
;
}

void StationManager::terminate()
Expand Down Expand Up @@ -167,7 +169,7 @@ bool StationManager::loadMonitor()
TLOGERROR("exception:" << e.what() << endl);
}

return false;
return false;
}

bool StationManager::loadBlackList()
Expand All @@ -177,16 +179,16 @@ bool StationManager::loadBlackList()
string sql = "select f_station_id, f_ip from t_blacklist where f_valid = 1 limit 10000";
TC_Mysql::MysqlData data = _mysql.queryRecord(sql);
TLOGDEBUG(sql << " ===> result size=" << data.size() << endl);

unordered_map<string, set<string>> blackList;
unordered_map<string, set<string>> blackListPat;
unordered_map<string, set<string>> blackListPat;
for (size_t i = 0; i < data.size(); i++)
{
string stationId = TC_Common::trim(data[i]["f_station_id"]);
string ip = TC_Common::trim(data[i]["f_ip"]);
if (ip.find("*") != string::npos)
{
blackListPat[stationId].insert(ip);
blackListPat[stationId].insert(ip);
}
else
{
Expand All @@ -206,7 +208,7 @@ bool StationManager::loadBlackList()
TLOGERROR("exception:" << e.what() << endl);
}

return false;
return false;
}
bool StationManager::loadWhiteList()
{
Expand All @@ -216,14 +218,14 @@ bool StationManager::loadWhiteList()
TC_Mysql::MysqlData data = _mysql.queryRecord(sql);
TLOGDEBUG(sql << " ===> result size=" << data.size() << endl);
unordered_map<string, set<string>> whiteList;
unordered_map<string, set<string>> whiteListPat;
unordered_map<string, set<string>> whiteListPat;
for (size_t i = 0; i < data.size(); i++)
{
string stationId = TC_Common::trim(data[i]["f_station_id"]);
string ip = TC_Common::trim(data[i]["f_ip"]);
if (ip.find("*") != string::npos)
{
whiteListPat[stationId].insert(ip);
whiteListPat[stationId].insert(ip);
}
else
{
Expand All @@ -243,32 +245,32 @@ bool StationManager::loadWhiteList()
TLOGERROR("exception:" << e.what() << endl);
}

return false;
return false;
}

void StationManager::run()
{
while(!_terminate)
while (!_terminate)
{
try
{
flushObj();
}
catch(exception &ex)
catch (exception &ex)
{
TLOGERROR("exception:" << ex.what() << endl);
}
catch(...)
catch (...)
{
TLOGERROR("exception unknown error." << endl);
}

TC_ThreadLock::Lock lock(*this);
timedWait(1000 * 30);
}
}
}

string StationManager::getMonitorUrl(const string& stationId)
string StationManager::getMonitorUrl(const string &stationId)
{
TC_ThreadRLock r(_rwLock);
auto it = _stationMonitorUrl.find(stationId);
Expand All @@ -292,7 +294,7 @@ string StationManager::getMonitorUrl(const string& stationId)
// return TC_MD5::md5str(ss.str());
// }

string StationManager::genAddrVer(const vector<UpstreamInfo>& addrList)
string StationManager::genAddrVer(const vector<UpstreamInfo> &addrList)
{
stringstream ss;
for (auto itv = addrList.begin(); itv != addrList.end(); ++itv)
Expand All @@ -303,7 +305,7 @@ string StationManager::genAddrVer(const vector<UpstreamInfo>& addrList)
return TC_MD5::md5str(ss.str());
}

void StationManager::addObjUpstream(const string& obj)
void StationManager::addObjUpstream(const string &obj)
{
TC_ThreadWLock w(_rwLock);
_stationObj.insert(obj);
Expand Down Expand Up @@ -357,9 +359,9 @@ void StationManager::flushObj()
_ObjProxy[*it].ver = ver;
}

HttpProxyFactory::getInstance()->getHttpProxy(*it)->setAddr(addrList, ver);
HttpProxyFactory::getInstance()->getHttpProxy(*it)->setAddr(addrList, ver);
}
else
else
{
TLOGERROR(*it << ", has no valid tars enpoint." << endl);
}
Expand Down Expand Up @@ -389,8 +391,8 @@ bool StationManager::isInBlackList(const string &stationId, const string &ip)
}
}
}
return false;

return false;
}

bool StationManager::checkWhiteList(const string &stationId, const string &ip)
Expand Down Expand Up @@ -421,8 +423,8 @@ bool StationManager::checkWhiteList(const string &stationId, const string &ip)
}
}
}
return ret;

return ret;
}

bool StationManager::loadUpstream()
Expand All @@ -439,15 +441,15 @@ bool StationManager::loadUpstream()
UpstreamInfo info;
info.addr = TC_Common::trim(data[i]["f_addr"]);
info.weight = TC_Common::strto<int>(data[i]["f_weight"]);
info.fusingOnOff = (TC_Common::strto<int>(data[i]["f_fusing_onoff"]) == 1 ? true:false);
info.fusingOnOff = (TC_Common::strto<int>(data[i]["f_fusing_onoff"]) == 1 ? true : false);
proxy[TC_Common::trim(data[i]["f_upstream"])].addrList.push_back(info);
//proxy[TC_Common::trim(data[i]["f_upstream"])].addrList.push_back(make_pair(TC_Common::trim(data[i]["f_addr"]), TC_Common::strto<int>(data[i]["f_weight"])));
}

for (auto it = proxy.begin(); it != proxy.end(); ++it)
{
it->second.ver = genAddrVer(it->second.addrList);
HttpProxyFactory::getInstance()->getHttpProxy(it->first)->setAddr(it->second.addrList, it->second.ver);
HttpProxyFactory::getInstance()->getHttpProxy(it->first)->setAddr(it->second.addrList, it->second.ver);
}

return true;
Expand Down
10 changes: 5 additions & 5 deletions src/tupproxy/TupCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ int TupCallback::onDispatch(ReqMessagePtr msg)
}
else if (getType() == "json")
{
if (!msg->response->sBuffer.empty())
//if (!msg->response->sBuffer.empty())
{
ReportHelper::reportProperty("response_iVersion_json");
doResponse_json(msg->response);
Expand Down Expand Up @@ -287,10 +287,10 @@ void TupCallback::doResponseException(int ret, const vector<char> &buffer)

void TupCallback::handleResponse()
{
if (_rspBuffer.size() == 0)
{
return;
}
// if (_rspBuffer.size() == 0)
// {
// return;
// }
TLOGDEBUG("rsp size:" << _rspBuffer.size() << endl);

bool bGzipOk = !_stParam.pairAcceptZip.first.empty();
Expand Down
Loading

0 comments on commit ee62df2

Please sign in to comment.