From 56a26652b1f4ae2e0f183118fa75aded7fd7f213 Mon Sep 17 00:00:00 2001 From: Illyoung Choi Date: Mon, 22 Mar 2021 17:05:57 -0700 Subject: [PATCH] Add search by meta api to fs interface, and bugfix --- fs/fs.go | 73 ++++++++ fs/fs_test.go | 20 +++ irods/fs/collection.go | 8 +- irods/fs/data_object.go | 372 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 464 insertions(+), 9 deletions(-) diff --git a/fs/fs.go b/fs/fs.go index c6e671f..49d1873 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -365,6 +365,11 @@ func (fs *FileSystem) List(path string) ([]*FSEntry, error) { return fs.listEntries(collection.Internal.(*types.IRODSCollection)) } +// SearchByMeta searches all file system entries with given metadata +func (fs *FileSystem) SearchByMeta(metaname string, metavalue string) ([]*FSEntry, error) { + return fs.searchEntriesByMeta(metaname, metavalue) +} + // RemoveDir deletes a directory func (fs *FileSystem) RemoveDir(path string, recurse bool, force bool) error { irodsPath := util.GetCorrectIRODSPath(path) @@ -908,6 +913,74 @@ func (fs *FileSystem) listEntries(collection *types.IRODSCollection) ([]*FSEntry return fsEntries, nil } +func (fs *FileSystem) searchEntriesByMeta(metaName string, metaValue string) ([]*FSEntry, error) { + conn, err := fs.Session.AcquireConnection() + if err != nil { + return nil, err + } + defer fs.Session.ReturnConnection(conn) + + collections, err := irods_fs.SearchCollectionsByMeta(conn, metaName, metaValue) + if err != nil { + return nil, err + } + + fsEntries := []*FSEntry{} + + for _, coll := range collections { + fsEntry := &FSEntry{ + ID: coll.ID, + Type: FSDirectoryEntry, + Name: coll.Name, + Path: coll.Path, + Owner: coll.Owner, + Size: 0, + CreateTime: coll.CreateTime, + ModifyTime: coll.ModifyTime, + CheckSum: "", + Internal: coll, + } + + fsEntries = append(fsEntries, fsEntry) + + // cache it + fs.Cache.AddEntryCache(fsEntry) + } + + dataobjects, err := irods_fs.SearchDataObjectsMasterReplicaByMeta(conn, metaName, metaValue) + if err != nil { + return nil, err + } + + for _, dataobject := range dataobjects { + if len(dataobject.Replicas) == 0 { + continue + } + + replica := dataobject.Replicas[0] + + fsEntry := &FSEntry{ + ID: dataobject.ID, + Type: FSFileEntry, + Name: dataobject.Name, + Path: dataobject.Path, + Owner: replica.Owner, + Size: dataobject.Size, + CreateTime: replica.CreateTime, + ModifyTime: replica.ModifyTime, + CheckSum: replica.CheckSum, + Internal: dataobject, + } + + fsEntries = append(fsEntries, fsEntry) + + // cache it + fs.Cache.AddEntryCache(fsEntry) + } + + return fsEntries, nil +} + func (fs *FileSystem) getDataObject(path string) (*FSEntry, error) { // check cache first cachedEntry := fs.Cache.GetEntryCache(path) diff --git a/fs/fs_test.go b/fs/fs_test.go index ba2123c..4d1c3f4 100644 --- a/fs/fs_test.go +++ b/fs/fs_test.go @@ -63,6 +63,26 @@ func TestListEntries(t *testing.T) { shutdown() } +func TestListEntriesByMeta(t *testing.T) { + setup() + + entries, err := fs.SearchByMeta("ipc_UUID", "3241af9a-c199-11e5-bd90-3c4a92e4a804") + if err != nil { + t.Errorf("err - %v", err) + panic(err) + } + + if len(entries) == 0 { + util.LogDebug("There is no entries") + } else { + for _, entry := range entries { + util.LogDebugf("Entry : %v", entry) + } + } + + shutdown() +} + func TestListACLs(t *testing.T) { setup() diff --git a/irods/fs/collection.go b/irods/fs/collection.go index e1286af..ee058e2 100644 --- a/irods/fs/collection.go +++ b/irods/fs/collection.go @@ -669,9 +669,9 @@ func SearchCollectionsByMeta(conn *connection.IRODSConnection, metaName string, query.AddSelect(common.ICAT_COLUMN_COLL_MODIFY_TIME, 1) metaNameCondVal := fmt.Sprintf("= '%s'", metaName) - query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal) + query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_NAME, metaNameCondVal) metaValueCondVal := fmt.Sprintf("= '%s'", metaValue) - query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) + query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_VALUE, metaValueCondVal) queryMessage, err := query.GetMessage() if err != nil { @@ -787,9 +787,9 @@ func SearchCollectionsByMetaWildcard(conn *connection.IRODSConnection, metaName query.AddSelect(common.ICAT_COLUMN_COLL_MODIFY_TIME, 1) metaNameCondVal := fmt.Sprintf("= '%s'", metaName) - query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal) + query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_NAME, metaNameCondVal) metaValueCondVal := fmt.Sprintf("like '%s'", metaValue) - query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) + query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_VALUE, metaValueCondVal) queryMessage, err := query.GetMessage() if err != nil { diff --git a/irods/fs/data_object.go b/irods/fs/data_object.go index 6dcdc78..46f13d6 100644 --- a/irods/fs/data_object.go +++ b/irods/fs/data_object.go @@ -661,11 +661,12 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection * } pagenatedDataObjects[row] = &types.IRODSDataObject{ - ID: -1, - Path: "", - Name: "", - Size: 0, - Replicas: []*types.IRODSReplica{replica}, + ID: -1, + CollectionID: collection.ID, + Path: "", + Name: "", + Size: 0, + Replicas: []*types.IRODSReplica{replica}, } } @@ -1706,6 +1707,186 @@ func SearchDataObjectsByMeta(conn *connection.IRODSConnection, metaName string, return mergedDataObjects, nil } +// SearchDataObjectsMasterReplicaByMeta searches data objects by metadata, returns only master replica +func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, metaName string, metaValue string) ([]*types.IRODSDataObject, error) { + if conn == nil || !conn.IsConnected() { + return nil, fmt.Errorf("connection is nil or disconnected") + } + + dataObjects := []*types.IRODSDataObject{} + + continueQuery := true + continueIndex := 0 + for continueQuery { + // data object + query := message.NewIRODSMessageQuery(common.MaxQueryRows, continueIndex, 0, 0) + query.AddSelect(common.ICAT_COLUMN_COLL_ID, 1) + query.AddSelect(common.ICAT_COLUMN_COLL_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_ID, 1) + query.AddSelect(common.ICAT_COLUMN_DATA_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_DATA_SIZE, 1) + + // replica + query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1) + query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1) + query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1) + query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1) + query.AddSelect(common.ICAT_COLUMN_D_CREATE_TIME, 1) + query.AddSelect(common.ICAT_COLUMN_D_MODIFY_TIME, 1) + + metaNameCondVal := fmt.Sprintf("= '%s'", metaName) + query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal) + metaValueCondVal := fmt.Sprintf("= '%s'", metaValue) + query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) + query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'") + + queryMessage, err := query.GetMessage() + if err != nil { + return nil, fmt.Errorf("Could not make a data object query message - %v", err) + } + + err = conn.SendMessage(queryMessage) + if err != nil { + return nil, fmt.Errorf("Could not send a data object query message - %v", err) + } + + // Server responds with results + queryResultMessage, err := conn.ReadMessage() + if err != nil { + return nil, fmt.Errorf("Could not receive a data object query result message - %v", err) + } + + queryResult := message.IRODSMessageQueryResult{} + err = queryResult.FromMessage(queryResultMessage) + if err != nil { + return nil, fmt.Errorf("Could not receive a data object query result message - %v", err) + } + + if queryResult.RowCount == 0 { + break + } + + if queryResult.AttributeCount > len(queryResult.SQLResult) { + return nil, fmt.Errorf("Could not receive data object attributes - requires %d, but received %d attributes", queryResult.AttributeCount, len(queryResult.SQLResult)) + } + + pagenatedDataObjects := make([]*types.IRODSDataObject, queryResult.RowCount, queryResult.RowCount) + + for attr := 0; attr < queryResult.AttributeCount; attr++ { + sqlResult := queryResult.SQLResult[attr] + if len(sqlResult.Values) != queryResult.RowCount { + return nil, fmt.Errorf("Could not receive data object rows - requires %d, but received %d attributes", queryResult.RowCount, len(sqlResult.Values)) + } + + for row := 0; row < queryResult.RowCount; row++ { + value := sqlResult.Values[row] + + if pagenatedDataObjects[row] == nil { + // create a new + replica := &types.IRODSReplica{ + Number: -1, + Owner: "", + CheckSum: "", + Status: "", + ResourceName: "", + Path: "", + ResourceHierarchy: "", + CreateTime: time.Time{}, + ModifyTime: time.Time{}, + } + + pagenatedDataObjects[row] = &types.IRODSDataObject{ + ID: -1, + CollectionID: -1, + Path: "", + Name: "", + Size: 0, + Replicas: []*types.IRODSReplica{replica}, + } + } + + switch sqlResult.AttributeIndex { + case int(common.ICAT_COLUMN_COLL_ID): + collID, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse collection id - %s", value) + } + pagenatedDataObjects[row].CollectionID = collID + case int(common.ICAT_COLUMN_COLL_NAME): + if len(pagenatedDataObjects[row].Path) > 0 { + pagenatedDataObjects[row].Path = util.MakeIRODSPath(value, pagenatedDataObjects[row].Path) + } else { + pagenatedDataObjects[row].Path = value + } + case int(common.ICAT_COLUMN_D_DATA_ID): + objID, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object id - %s", value) + } + pagenatedDataObjects[row].ID = objID + case int(common.ICAT_COLUMN_DATA_NAME): + if len(pagenatedDataObjects[row].Path) > 0 { + pagenatedDataObjects[row].Path = util.MakeIRODSPath(pagenatedDataObjects[row].Path, value) + } else { + pagenatedDataObjects[row].Path = value + } + pagenatedDataObjects[row].Name = value + case int(common.ICAT_COLUMN_DATA_SIZE): + objSize, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object size - %s", value) + } + pagenatedDataObjects[row].Size = objSize + case int(common.ICAT_COLUMN_DATA_REPL_NUM): + repNum, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object replica number - %s", value) + } + pagenatedDataObjects[row].Replicas[0].Number = repNum + case int(common.ICAT_COLUMN_D_OWNER_NAME): + pagenatedDataObjects[row].Replicas[0].Owner = value + case int(common.ICAT_COLUMN_D_DATA_CHECKSUM): + pagenatedDataObjects[row].Replicas[0].CheckSum = value + case int(common.ICAT_COLUMN_D_DATA_STATUS): + pagenatedDataObjects[row].Replicas[0].Status = value + case int(common.ICAT_COLUMN_D_RESC_NAME): + pagenatedDataObjects[row].Replicas[0].ResourceName = value + case int(common.ICAT_COLUMN_D_DATA_PATH): + pagenatedDataObjects[row].Replicas[0].Path = value + case int(common.ICAT_COLUMN_D_RESC_HIER): + pagenatedDataObjects[row].Replicas[0].ResourceHierarchy = value + case int(common.ICAT_COLUMN_D_CREATE_TIME): + cT, err := util.GetIRODSDateTime(value) + if err != nil { + return nil, fmt.Errorf("Could not parse create time - %s", value) + } + pagenatedDataObjects[row].Replicas[0].CreateTime = cT + case int(common.ICAT_COLUMN_D_MODIFY_TIME): + mT, err := util.GetIRODSDateTime(value) + if err != nil { + return nil, fmt.Errorf("Could not parse modify time - %s", value) + } + pagenatedDataObjects[row].Replicas[0].ModifyTime = mT + default: + // ignore + } + } + } + + dataObjects = append(dataObjects, pagenatedDataObjects...) + + continueIndex = queryResult.ContinueIndex + if continueIndex == 0 { + continueQuery = false + } + } + + return dataObjects, nil +} + // SearchDataObjectsByMetaWildcard searches data objects by metadata // Caution: This is a very slow operation func SearchDataObjectsByMetaWildcard(conn *connection.IRODSConnection, metaName string, metaValue string) ([]*types.IRODSDataObject, error) { @@ -1904,3 +2085,184 @@ func SearchDataObjectsByMetaWildcard(conn *connection.IRODSConnection, metaName return mergedDataObjects, nil } + +// SearchDataObjectsMasterReplicaByMetaWildcard searches data objects by metadata, returns only master replica +// Caution: This is a very slow operation +func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnection, metaName string, metaValue string) ([]*types.IRODSDataObject, error) { + if conn == nil || !conn.IsConnected() { + return nil, fmt.Errorf("connection is nil or disconnected") + } + + dataObjects := []*types.IRODSDataObject{} + + continueQuery := true + continueIndex := 0 + for continueQuery { + // data object + query := message.NewIRODSMessageQuery(common.MaxQueryRows, continueIndex, 0, 0) + query.AddSelect(common.ICAT_COLUMN_COLL_ID, 1) + query.AddSelect(common.ICAT_COLUMN_COLL_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_ID, 1) + query.AddSelect(common.ICAT_COLUMN_DATA_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_DATA_SIZE, 1) + + // replica + query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1) + query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1) + query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1) + query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1) + query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1) + query.AddSelect(common.ICAT_COLUMN_D_CREATE_TIME, 1) + query.AddSelect(common.ICAT_COLUMN_D_MODIFY_TIME, 1) + + metaNameCondVal := fmt.Sprintf("= '%s'", metaName) + query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal) + metaValueCondVal := fmt.Sprintf("like '%s'", metaValue) + query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) + query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'") + + queryMessage, err := query.GetMessage() + if err != nil { + return nil, fmt.Errorf("Could not make a data object query message - %v", err) + } + + err = conn.SendMessage(queryMessage) + if err != nil { + return nil, fmt.Errorf("Could not send a data object query message - %v", err) + } + + // Server responds with results + queryResultMessage, err := conn.ReadMessage() + if err != nil { + return nil, fmt.Errorf("Could not receive a data object query result message - %v", err) + } + + queryResult := message.IRODSMessageQueryResult{} + err = queryResult.FromMessage(queryResultMessage) + if err != nil { + return nil, fmt.Errorf("Could not receive a data object query result message - %v", err) + } + + if queryResult.RowCount == 0 { + break + } + + if queryResult.AttributeCount > len(queryResult.SQLResult) { + return nil, fmt.Errorf("Could not receive data object attributes - requires %d, but received %d attributes", queryResult.AttributeCount, len(queryResult.SQLResult)) + } + + pagenatedDataObjects := make([]*types.IRODSDataObject, queryResult.RowCount, queryResult.RowCount) + + for attr := 0; attr < queryResult.AttributeCount; attr++ { + sqlResult := queryResult.SQLResult[attr] + if len(sqlResult.Values) != queryResult.RowCount { + return nil, fmt.Errorf("Could not receive data object rows - requires %d, but received %d attributes", queryResult.RowCount, len(sqlResult.Values)) + } + + for row := 0; row < queryResult.RowCount; row++ { + value := sqlResult.Values[row] + + if pagenatedDataObjects[row] == nil { + // create a new + replica := &types.IRODSReplica{ + Number: -1, + Owner: "", + CheckSum: "", + Status: "", + ResourceName: "", + Path: "", + ResourceHierarchy: "", + CreateTime: time.Time{}, + ModifyTime: time.Time{}, + } + + pagenatedDataObjects[row] = &types.IRODSDataObject{ + ID: -1, + CollectionID: -1, + Path: "", + Name: "", + Size: 0, + Replicas: []*types.IRODSReplica{replica}, + } + } + + switch sqlResult.AttributeIndex { + case int(common.ICAT_COLUMN_COLL_ID): + collID, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse collection id - %s", value) + } + pagenatedDataObjects[row].CollectionID = collID + case int(common.ICAT_COLUMN_COLL_NAME): + if len(pagenatedDataObjects[row].Path) > 0 { + pagenatedDataObjects[row].Path = util.MakeIRODSPath(value, pagenatedDataObjects[row].Path) + } else { + pagenatedDataObjects[row].Path = value + } + case int(common.ICAT_COLUMN_D_DATA_ID): + objID, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object id - %s", value) + } + pagenatedDataObjects[row].ID = objID + case int(common.ICAT_COLUMN_DATA_NAME): + if len(pagenatedDataObjects[row].Path) > 0 { + pagenatedDataObjects[row].Path = util.MakeIRODSPath(pagenatedDataObjects[row].Path, value) + } else { + pagenatedDataObjects[row].Path = value + } + pagenatedDataObjects[row].Name = value + case int(common.ICAT_COLUMN_DATA_SIZE): + objSize, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object size - %s", value) + } + pagenatedDataObjects[row].Size = objSize + case int(common.ICAT_COLUMN_DATA_REPL_NUM): + repNum, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not parse data object replica number - %s", value) + } + pagenatedDataObjects[row].Replicas[0].Number = repNum + case int(common.ICAT_COLUMN_D_OWNER_NAME): + pagenatedDataObjects[row].Replicas[0].Owner = value + case int(common.ICAT_COLUMN_D_DATA_CHECKSUM): + pagenatedDataObjects[row].Replicas[0].CheckSum = value + case int(common.ICAT_COLUMN_D_DATA_STATUS): + pagenatedDataObjects[row].Replicas[0].Status = value + case int(common.ICAT_COLUMN_D_RESC_NAME): + pagenatedDataObjects[row].Replicas[0].ResourceName = value + case int(common.ICAT_COLUMN_D_DATA_PATH): + pagenatedDataObjects[row].Replicas[0].Path = value + case int(common.ICAT_COLUMN_D_RESC_HIER): + pagenatedDataObjects[row].Replicas[0].ResourceHierarchy = value + case int(common.ICAT_COLUMN_D_CREATE_TIME): + cT, err := util.GetIRODSDateTime(value) + if err != nil { + return nil, fmt.Errorf("Could not parse create time - %s", value) + } + pagenatedDataObjects[row].Replicas[0].CreateTime = cT + case int(common.ICAT_COLUMN_D_MODIFY_TIME): + mT, err := util.GetIRODSDateTime(value) + if err != nil { + return nil, fmt.Errorf("Could not parse modify time - %s", value) + } + pagenatedDataObjects[row].Replicas[0].ModifyTime = mT + default: + // ignore + } + } + } + + dataObjects = append(dataObjects, pagenatedDataObjects...) + + continueIndex = queryResult.ContinueIndex + if continueIndex == 0 { + continueQuery = false + } + } + + return dataObjects, nil +}