Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Jan 7, 2025
1 parent 4d5b693 commit 3e5407b
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 103 deletions.
1 change: 1 addition & 0 deletions core/file_server/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ void ConfigManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec) {
Event* pStoppedEvent = new Event(iter->mRealBaseDir, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
pStoppedEvent->SetConfigName(cmd->mConfigName);
pStoppedEvent->SetContainerID(containerInfo.mID);
iter->mStopped = true;
LOG_DEBUG(
sLogger,
("GetContainerStoppedEvent Type", pStoppedEvent->GetType())("Source", pStoppedEvent->GetSource())(
Expand Down
1 change: 1 addition & 0 deletions core/file_server/ContainerInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct ContainerInfo {
std::vector<sls_logs::LogTag> mTags; // ContainerNameTag
std::vector<sls_logs::LogTag> mMetadatas; // ExternalEnvTag and ExternalK8sLabelTag
Json::Value mJson; // this obj's json, for saving to local file
bool mStopped = false; // whether this container is stopped

static bool ParseByJSONObj(const Json::Value&, ContainerInfo&, std::string&);
static bool ParseAllByJSONObj(const Json::Value&, std::unordered_map<std::string, ContainerInfo>&, std::string&);
Expand Down
1 change: 1 addition & 0 deletions core/file_server/EventDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class EventDispatcher {
friend class FuseFileUnittest;
friend class MultiServerConfigUpdatorUnitest;
friend class EventDispatcherDirUnittest;
friend class ModifyHandlerUnittest;

void CleanEnviroments();
int32_t GetInotifyWatcherCount();
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ ContainerInfo* FileDiscoveryOptions::GetContainerPathByLogPath(const string& log
return NULL;
}
// reverse order to find the latest container
for (size_t i = mContainerInfos->size() - 1; i >= 0; --i) {
for (int i = mContainerInfos->size() - 1; i >= 0; --i) {
if (_IsSubPath((*mContainerInfos)[i].mRealBaseDir, logPath)) {
return &(*mContainerInfos)[i];
}
Expand Down
17 changes: 7 additions & 10 deletions core/file_server/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,6 @@ void CreateModifyHandler::Handle(const Event& event) {
mCreateHandlerPtr->Handle(event);
} else if (event.IsContainerStopped() && isDir) {
for (auto& pair : mModifyHandlerPtrMap) {
LOG_INFO(sLogger,
("pair config name", pair.second->GetConfigName())("event config name", event.GetConfigName()));
if (pair.second->GetConfigName() == event.GetConfigName()) {
LOG_DEBUG(sLogger,
("Handle container stopped event, config", pair.first)("Source", event.GetSource())(
Expand Down Expand Up @@ -793,14 +791,13 @@ void ModifyHandler::Handle(const Event& event) {
reader->CloseFilePtr();
} else if (reader->IsContainerStopped()) {
// update container info one more time, ensure file is hold by same cotnainer
if (reader->UpdateContainerInfo()) {
LOG_INFO(
sLogger,
("close the file", "but file is reused by a new container")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("container id", reader->GetContainerID()));
if (reader->UpdateContainerInfo() && !reader->IsContainerStopped()) {
LOG_INFO(sLogger,
("file is reused by a new container", reader->GetContainerID())(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
} else {
// release fd as quick as possible
LOG_INFO(sLogger,
Expand Down
1 change: 1 addition & 0 deletions core/file_server/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class CreateModifyHandler : public EventHandler {
friend class ConfigUpdatorUnittest;
friend class EventDispatcherTest;
friend class SenderUnittest;
friend class EventDispatcherContainerUnittest;
#endif
};

Expand Down
2 changes: 0 additions & 2 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
string path = source;
if (object.size() > 0)
path += PATH_SEPARATOR + object;
LOG_WARNING(sLogger,
("container stopped", "unregister all dir")("dir", path)("config", ev->GetConfigName()));
dispatcher->StopAllDir(path, ev->GetConfigName(), ev->GetContainerID());
} else {
EventHandler* handler = dispatcher->GetHandler(source.c_str());
Expand Down
20 changes: 14 additions & 6 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2532,16 +2532,24 @@ const std::string& LogFileReader::GetConvertedPath() const {

bool LogFileReader::UpdateContainerInfo() {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
ContainerInfo* containerPath = discoveryConfig.first->GetContainerPathByLogPath(mHostLogPathDir);
if (containerPath && containerPath->mID != mContainerID) {
if (discoveryConfig.first == nullptr) {
return false;
}
ContainerInfo* containerInfo = discoveryConfig.first->GetContainerPathByLogPath(mHostLogPathDir);
if (containerInfo && containerInfo->mID != mContainerID) {
LOG_INFO(sLogger,
("container info of file reader changed", "may be because container restart")(
"old container id", mContainerID)("new container id", containerInfo->mID)(
"container status", containerInfo->mStopped ? "stopped" : "running"));
// if config have wildcard path, use mWildcardPaths[0] as base path
SetDockerPath(!discoveryConfig.first->GetWildcardPaths().empty() ? discoveryConfig.first->GetWildcardPaths()[0]
: discoveryConfig.first->GetBasePath(),
containerPath->mRealBaseDir.size());
SetContainerID(containerPath->mID);
containerInfo->mRealBaseDir.size());
SetContainerID(containerInfo->mID);
mContainerStopped = containerInfo->mStopped;
mExtraTags.clear();
AddExtraTags(containerPath->mMetadatas);
AddExtraTags(containerPath->mTags);
AddExtraTags(containerInfo->mMetadatas);
AddExtraTags(containerInfo->mTags);
return true;
}
return false;
Expand Down
5 changes: 3 additions & 2 deletions core/unittest/event_handler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
cmake_minimum_required(VERSION 3.22)
project(event_handler_unittest)

# add_executable(create_modify_handler_unittest CreateModifyHandlerUnittest.cpp)
# target_link_libraries(create_modify_handler_unittest ${UT_BASE_TARGET})
add_executable(create_modify_handler_unittest CreateModifyHandlerUnittest.cpp)
target_link_libraries(create_modify_handler_unittest ${UT_BASE_TARGET})

add_executable(modify_handler_unittest ModifyHandlerUnittest.cpp)
target_link_libraries(modify_handler_unittest ${UT_BASE_TARGET})
Expand All @@ -25,5 +25,6 @@ add_executable(log_input_unittest LogInputUnittest.cpp)
target_link_libraries(log_input_unittest ${UT_BASE_TARGET})

include(GoogleTest)
gtest_discover_tests(create_modify_handler_unittest)
gtest_discover_tests(modify_handler_unittest)
gtest_discover_tests(log_input_unittest)
226 changes: 145 additions & 81 deletions core/unittest/event_handler/CreateModifyHandlerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@
#include <memory>
#include <string>

#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "config/PipelineConfig.h"
#include "file_server/ConfigManager.h"
#include "file_server/event/Event.h"
#include "file_server/event_handler/EventHandler.h"
#include "pipeline/Pipeline.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "unittest/Unittest.h"

using namespace std;

DECLARE_FLAG_STRING(ilogtail_config);

namespace logtail {
class MockModifyHandler : public ModifyHandler {
public:
MockModifyHandler() : ModifyHandler("", nullptr) {}
MockModifyHandler(const std::string& configName, const FileDiscoveryConfig& pConfig)
: ModifyHandler(configName, pConfig) {}
virtual void Handle(const Event& event) { ++handle_count; }
virtual void HandleTimeOut() { ++handle_timeout_count; }
virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) { return true; }
Expand All @@ -44,95 +51,152 @@ class MockModifyHandler : public ModifyHandler {
};

class CreateModifyHandlerUnittest : public ::testing::Test {
public:
void TestHandleContainerStoppedEvent();

protected:
static void SetUpTestCase() {
srand(time(NULL));
gRootDir = GetProcessExecutionDir();
gLogName = "test.log";
if (PATH_SEPARATOR[0] == gRootDir.at(gRootDir.size() - 1))
gRootDir.resize(gRootDir.size() - 1);
gRootDir += PATH_SEPARATOR + "ModifyHandlerUnittest";
bfs::remove_all(gRootDir);
}

static void TearDownTestCase() {}

void SetUp() override {
// mock config, log_path is related to the unittest
std::string configStr = R"""({
"log_path" : "/root/log",
"file_pattern" : "test-0.log",
"advanced" :
{
"force_multiconfig" : false,
"k8s" : {},
"tail_size_kb" : 1024
},
"aliuid" : "123456789",
"category" : "logstore-0",
"create_time" : 1647230190,
"defaultEndpoint" : "cn-huhehaote-intranet.log.aliyuncs.com",
"delay_alarm_bytes" : 0,
"delay_skip_bytes" : 0,
"discard_none_utf8" : false,
"discard_unmatch" : false,
"docker_exclude_env" : {},
"docker_exclude_label" : {},
"docker_file" : true,
"docker_include_env" : {},
"docker_include_label" : {},
"enable" : true,
"enable_tag" : false,
"file_encoding" : "utf8",
"filter_keys" : [],
"filter_regs" : [],
"group_topic" : "",
"keys" :
[
"content"
],
"local_storage" : true,
"log_begin_reg" : ".*",
"log_type" : "common_reg_log",
"log_tz" : "",
"max_depth" : 10,
"max_send_rate" : -1,
"merge_type" : "topic",
"preserve" : true,
"preserve_depth" : 1,
"priority" : 0,
"project_name" : "project-0",
"raw_log" : false,
"regex" :
[
"(.*)"
],
"region" : "cn-huhehaote",
"send_rate_expire" : 0,
"sensitive_keys" : [],
"shard_hash_key" : [],
"tail_existed" : false,
"timeformat" : "",
"topic_format" : "none",
"tz_adjust" : false,
"version" : 1
})""";
Json::Value userConfig;
Json::Reader reader;
APSARA_TEST_TRUE_FATAL(reader.parse(configStr, userConfig));
ConfigManager::GetInstance()->LoadSingleUserConfig(mConfigName, userConfig);
bfs::create_directories(gRootDir);
// create a file for reader
std::string logPath = gRootDir + PATH_SEPARATOR + gLogName;
writeLog(logPath, "a sample log\n");

// init pipeline and config
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<PipelineConfig> config;
unique_ptr<Pipeline> pipeline;

// new pipeline
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
")"
+ logPath + R"("
]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
Json::Value inputConfigJson = (*configJson)["inputs"][0];

config.reset(new PipelineConfig(mConfigName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new Pipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
ctx.SetPipeline(*pipeline.get());
ctx.SetConfigName(mConfigName);
ctx.SetProcessQueueKey(0);
discoveryOpts = FileDiscoveryOptions();
discoveryOpts.Init(inputConfigJson, ctx, "test");
discoveryOpts.SetDeduceAndSetContainerBaseDirFunc(
[](ContainerInfo& containerInfo, const PipelineContext* ctx, const FileDiscoveryOptions* opts) {
containerInfo.mRealBaseDir = containerInfo.mUpperDir;
return true;
});
mConfig = std::make_pair(&discoveryOpts, &ctx);
readerOpts.mInputType = FileReaderOptions::InputType::InputFile;

FileServer::GetInstance()->AddFileDiscoveryConfig(mConfigName, &discoveryOpts, &ctx);
FileServer::GetInstance()->AddFileReaderConfig(mConfigName, &readerOpts, &ctx);
FileServer::GetInstance()->AddMultilineConfig(mConfigName, &multilineOpts, &ctx);
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx);

// build a reader
mReaderPtr = std::make_shared<LogFileReader>(
gRootDir, gLogName, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
mReaderPtr->UpdateReaderManual();
mReaderPtr->SetContainerID("1");
APSARA_TEST_TRUE_FATAL(mReaderPtr->CheckFileSignatureAndOffset(true));

// build a modify handler
LogFileReaderPtrArray readerPtrArray{mReaderPtr};
mHandlerPtr.reset(new ModifyHandler(mConfigName, mConfig));
mHandlerPtr->mNameReaderMap[gLogName] = readerPtrArray;
mReaderPtr->SetReaderArray(&mHandlerPtr->mNameReaderMap[gLogName]);
mHandlerPtr->mDevInodeReaderMap[mReaderPtr->mDevInode] = mReaderPtr;

auto containerInfo = std::make_shared<std::vector<ContainerInfo>>();
discoveryOpts.SetContainerInfo(containerInfo);
}

void TearDown() override {}
std::string mConfigName = "##1.0##project-0$config-0";
void TearDown() override { bfs::remove_all(gRootDir); }

static std::string gRootDir;
static std::string gLogName;

private:
const std::string mConfigName = "##1.0##project-0$config-0";
FileDiscoveryOptions discoveryOpts;
FileReaderOptions readerOpts;
MultilineOptions multilineOpts;
PipelineContext ctx;
FileDiscoveryConfig mConfig;

std::shared_ptr<LogFileReader> mReaderPtr;
std::shared_ptr<ModifyHandler> mHandlerPtr;
CreateHandler mCreateHandler;

public:
void TestHandleContainerStoppedEvent() {
LOG_INFO(sLogger, ("TestFindAllSubDirAndHandler() begin", time(NULL)));
CreateModifyHandler createModifyHandler(&mCreateHandler);
MockModifyHandler* pHanlder = new MockModifyHandler(); // released by ~CreateModifyHandler
createModifyHandler.mModifyHandlerPtrMap.insert(std::make_pair(mConfigName, pHanlder));

Event event1("/not_exist", "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, 0);
createModifyHandler.Handle(event1);
APSARA_TEST_EQUAL_FATAL(pHanlder->handle_count, 0);

Event event2("/root/log", "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, 0);
createModifyHandler.Handle(event2);
APSARA_TEST_EQUAL_FATAL(pHanlder->handle_count, 1);
void writeLog(const std::string& logPath, const std::string& logContent) {
std::ofstream writer(logPath.c_str(), fstream::out | fstream::app);
writer << logContent;
writer.close();
}
};

APSARA_UNIT_TEST_CASE(CreateModifyHandlerUnittest, TestHandleContainerStoppedEvent, 0);
void CreateModifyHandlerUnittest::TestHandleContainerStoppedEvent() {
LOG_INFO(sLogger, ("TestFindAllSubDirAndHandler() begin", time(NULL)));
CreateModifyHandler createModifyHandler(&mCreateHandler);

MockModifyHandler* pHanlder = new MockModifyHandler(mConfigName, mConfig); // released by ~CreateModifyHandler
createModifyHandler.mModifyHandlerPtrMap.insert(std::make_pair(mConfigName, pHanlder));

Event event1("/not_exist", "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, 0);
event1.SetConfigName(mConfigName);
createModifyHandler.Handle(event1);
APSARA_TEST_EQUAL_FATAL(pHanlder->handle_count, 1);

Event event2(gRootDir, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, 0);
event2.SetConfigName(mConfigName);
createModifyHandler.Handle(event2);
APSARA_TEST_EQUAL_FATAL(pHanlder->handle_count, 2);

Event event3(gRootDir, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, 0);
event3.SetConfigName(mConfigName + "_test");
createModifyHandler.Handle(event3);
APSARA_TEST_EQUAL_FATAL(pHanlder->handle_count, 2);
}

std::string CreateModifyHandlerUnittest::gRootDir;
std::string CreateModifyHandlerUnittest::gLogName;

UNIT_TEST_CASE(CreateModifyHandlerUnittest, TestHandleContainerStoppedEvent);
} // end of namespace logtail

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit 3e5407b

Please sign in to comment.