Skip to content
6 changes: 3 additions & 3 deletions endpoints/openapi_v1/bfe_pool/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func UpdateAction(req *http.Request) (interface{}, error) {
return nil, xerror.WrapRecordNotExist("Instance Pool")
}

pi := &icluster_conf.InstancePool{
p := &icluster_conf.InstancePool{
Name: one.Name,
Instances: product_pool.Instancesc2i(param.Instances),
}
err = container.InstancePoolManager.UpdateInstances(req.Context(), one, pi)
err = container.InstancePoolManager.UpdateInstances(req.Context(), one, p)
if err != nil {
return nil, err
}

return product_pool.NewOneData(one, pi), err
return product_pool.NewOneData(one, p), err
}
6 changes: 3 additions & 3 deletions model/icluster_conf/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func ClusterList2MapByID(list []*Cluster) map[int64]*Cluster {

func NewClusterManager(txn itxn.TxnStorager, storager ClusterStorager,
subClusterStorager SubClusterStorager, bfeClusterStorager ibasic.BFEClusterStorager,
poolInstancesManager *InstancePoolManager,
instancePoolManager *InstancePoolManager,
versionControlManager *iversion_control.VersionControlManager,
deleteCheckers map[string]func(context.Context, *ibasic.Product, *Cluster) error) *ClusterManager {

Expand All @@ -245,7 +245,7 @@ func NewClusterManager(txn itxn.TxnStorager, storager ClusterStorager,
storager: storager,
subClusterStorager: subClusterStorager,
bfeClusterStorager: bfeClusterStorager,
poolInstancesManager: poolInstancesManager,
instancePoolManager: instancePoolManager,
versionControlManager: versionControlManager,

deleteCheckers: deleteCheckers,
Expand All @@ -268,7 +268,7 @@ type ClusterManager struct {
subClusterStorager SubClusterStorager
bfeClusterStorager ibasic.BFEClusterStorager

poolInstancesManager *InstancePoolManager
instancePoolManager *InstancePoolManager
versionControlManager *iversion_control.VersionControlManager

deleteCheckers map[string]func(context.Context, *ibasic.Product, *Cluster) error
Expand Down
2 changes: 1 addition & 1 deletion model/icluster_conf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (rm *ClusterManager) clusterTableConfGenerator(ctx context.Context) (*ivers
}

// maybe rpc in db transaction
piMap, err := rm.poolInstancesManager.BatchFetchInstances(ctx, PoolMap2List(pools))
piMap, err := rm.instancePoolManager.BatchFetchInstances(ctx, PoolMap2List(pools))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

piMap => poolMap

if err != nil {
return nil, err
}
Expand Down
216 changes: 2 additions & 214 deletions model/icluster_conf/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ package icluster_conf

import (
"context"
"strings"

"github.com/bfenetworks/api-server/lib/xerror"
"github.com/bfenetworks/api-server/model/ibasic"
"github.com/bfenetworks/api-server/model/itxn"
)

var (
Expand Down Expand Up @@ -58,226 +54,18 @@ func (p *Pool) SetDefaultInstances(is []Instance) {
p.instances = is
Comment thread
githublaohu marked this conversation as resolved.
}

func (p *Pool) GetDefaultInstances() *InstancePool {
func (p *Pool) GetDefaultPool() *InstancePool {
return &InstancePool{
Name: p.Name,
Instances: p.instances,
}
Comment thread
githublaohu marked this conversation as resolved.
}

type PoolStorager interface {
type PoolStorage interface {
FetchPool(ctx context.Context, name string) (*Pool, error)
FetchPools(ctx context.Context, param *PoolFilter) ([]*Pool, error)

CreatePool(ctx context.Context, product *ibasic.Product, data *PoolParam) (*Pool, error)
UpdatePool(ctx context.Context, oldData *Pool, diff *PoolParam) error
DeletePool(ctx context.Context, pool *Pool) error
}

type PoolManager struct {
storager PoolStorager
bfeClusterStorager ibasic.BFEClusterStorager
subClusterStorager SubClusterStorager
txn itxn.TxnStorager

poolInstancesManager *InstancePoolManager
}

func NewPoolManager(txn itxn.TxnStorager, storager PoolStorager,
bfeClusterStorager ibasic.BFEClusterStorager, subClusterStorager SubClusterStorager,
poolInstancesManager *InstancePoolManager) *PoolManager {

return &PoolManager{
txn: txn,
storager: storager,
bfeClusterStorager: bfeClusterStorager,
subClusterStorager: subClusterStorager,

poolInstancesManager: poolInstancesManager,
}
}

func (rppm *PoolManager) FetchPoolByName(ctx context.Context, name string) (one *Pool, err error) {
err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
one, err = rppm.storager.FetchPool(ctx, name)
return err
})

return
}

func (rppm *PoolManager) FetchBFEPool(ctx context.Context, name string) (one *Pool, err error) {
return rppm.FetchProductPool(ctx, ibasic.BuildinProduct, name)
}

func (rppm *PoolManager) FetchProductPool(ctx context.Context, product *ibasic.Product, name string) (one *Pool, err error) {
name, err = poolNameJudger(product.Name, name)
if err != nil {
return
}

err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
one, err = rppm.storager.FetchPool(ctx, name)
return err
})

return
}

func (rppm *PoolManager) FetchBFEPools(ctx context.Context) (list []*Pool, err error) {
return rppm.FetchProductPools(ctx, ibasic.BuildinProduct)
}

func (rppm *PoolManager) FetchProductPools(ctx context.Context, product *ibasic.Product) (list []*Pool, err error) {
err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
list, err = rppm.storager.FetchPools(ctx, &PoolFilter{
ProductID: &product.ID,
})
return err
})

return
}

func poolNameJudger(productName string, poolName string) (realName string, err error) {
ss := strings.SplitN(poolName, ".", 2)
if len(ss) == 2 {
if ss[0] != productName {
return "", xerror.WrapParamErrorWithMsg("Pool Name Must Use Product Name as Prefix")
}

return poolName, nil
}

return productName + "." + poolName, nil
}

// CanDelete check whether pool can be deleted, Check Logic:
// 1. Not BFE Cluster Refer To
// 2. Not SubCluster Refer To
func (rppm *PoolManager) CanDelete(ctx context.Context, pool *Pool) error {
bfeClusters, err := rppm.bfeClusterStorager.FetchBFEClusters(ctx, &ibasic.BFEClusterFilter{
Pool: &pool.Name,
})
if err != nil {
return err
}
if len(bfeClusters) != 0 {
return xerror.WrapModelErrorWithMsg("BFECluster %s Refer To This Pool", bfeClusters[0].Name)
}

subClusters, err := rppm.subClusterStorager.FetchSubClusterList(ctx, &SubClusterFilter{
InstancePool: pool,
})
if err != nil {
return err
}
if len(subClusters) != 0 {
return xerror.WrapModelErrorWithMsg("SubCluster %s Refer To This Pool", subClusters[0].Name)
}

return nil
}

func (rppm *PoolManager) DeleteBFEPool(ctx context.Context, name string) (one *Pool, err error) {
return rppm.DeleteProductPool(ctx, ibasic.BuildinProduct, name)
}

func (rppm *PoolManager) DeleteProductPool(ctx context.Context, product *ibasic.Product, name string) (one *Pool, err error) {
name, err = poolNameJudger(product.Name, name)
if err != nil {
return
}

err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
one, err = rppm.storager.FetchPool(ctx, name)
if err != nil {
return err
}

if one == nil {
return xerror.WrapRecordNotExist("Pool")
}

if err = rppm.CanDelete(ctx, one); err != nil {
return err
}

return rppm.storager.DeletePool(ctx, one)
})

return
}

func (rppm *PoolManager) CreateBFEPool(ctx context.Context, pool *PoolParam, pis *InstancePool) (one *Pool, err error) {
pool.Tag = &PoolTagBFE
return rppm.CreateProductPool(ctx, ibasic.BuildinProduct, pool, pis)
}

func (rppm *PoolManager) CreateProductPool(ctx context.Context, product *ibasic.Product, pool *PoolParam, pis *InstancePool) (one *Pool, err error) {
var pN string
pN, err = poolNameJudger(product.Name, *pool.Name)
if err != nil {
return
}
pool.Name = &pN
if pool.Tag == nil {
pool.Tag = &PoolTagProduct
}

err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
old, err := rppm.storager.FetchPool(ctx, *pool.Name)
if err != nil {
return err
}
if old != nil {
return xerror.WrapRecordExisted()
}

one, err = rppm.storager.CreatePool(ctx, product, pool)
if err != nil {
return err
}

if pis != nil {
err = rppm.poolInstancesManager.UpdateInstances(ctx, one, pis)
}
return err
})

return
}

func (rppm *PoolManager) UpdateBFEPool(ctx context.Context, pool *Pool, diff *PoolParam) (err error) {
return rppm.UpdateProductPool(ctx, ibasic.BuildinProduct, pool, diff)
}

func (rppm *PoolManager) UpdateProductPool(ctx context.Context, product *ibasic.Product, pool *Pool, diff *PoolParam) (err error) {
err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
return rppm.storager.UpdatePool(ctx, pool, diff)
})

return
}

func PoolList2Map(list []*Pool) map[int64]*Pool {
m := map[int64]*Pool{}
for _, one := range list {
m[one.ID] = one
}

return m
}

func (rppm *PoolManager) GetPoolByName(ctx context.Context, poolName *string) (pool *Pool, err error) {
err = rppm.txn.AtomExecute(ctx, func(ctx context.Context) error {
if poolName == nil || *poolName == "" {
return xerror.WrapParamErrorWithMsg("Pool Name Illegal")
}

pool, err = rppm.storager.FetchPool(ctx, *poolName)
return err
})

return
}
26 changes: 13 additions & 13 deletions model/icluster_conf/pool_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,38 @@ const (
InstancePoolTypeNacos int8 = 2
)

type InstancePoolStorager interface {
type InstancePoolStorage interface {
UpdateInstances(context.Context, *Pool, *InstancePool) error

BatchFetchInstances(context.Context, []*Pool) (map[string]*InstancePool, error)
}

type InstancePoolManager struct {
instancePoolStorages map[int8]InstancePoolStorager
instancePoolStorages map[int8]InstancePoolStorage
}

func NewInstancePoolManager(instancePoolStorages map[int8]InstancePoolStorager) *InstancePoolManager {
func NewInstancePoolManager(instancePoolStorages map[int8]InstancePoolStorage) *InstancePoolManager {
return &InstancePoolManager{
instancePoolStorages: instancePoolStorages,
}
}

func (pim *InstancePoolManager) BatchFetchInstances(ctx context.Context, pools []*Pool) (map[string]*InstancePool, error) {
type2InstancePoolList := map[int8][]*Pool{}
func (m *InstancePoolManager) BatchFetchInstances(ctx context.Context, pools []*Pool) (map[string]*InstancePool, error) {
type2PoolList := map[int8][]*Pool{}
for _, one := range pools {
type2InstancePoolList[one.Type] = append(type2InstancePoolList[one.Type], one)
type2PoolList[one.Type] = append(type2PoolList[one.Type], one)
}

for typ := range type2InstancePoolList {
_, ok := pim.instancePoolStorages[typ]
for type2 := range type2PoolList {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type2 => type?
如果type是保留字,可以用typeName。
type2不知道是什么意思

_, ok := m.instancePoolStorages[type2]
if !ok {
return nil, xerror.WrapModelErrorWithMsg("Type %d not register Storager", typ)
return nil, xerror.WrapModelErrorWithMsg("Type %d not register Storager", type2)
}
}

rst := map[string]*InstancePool{}
for typ, pisList := range type2InstancePoolList {
storager := pim.instancePoolStorages[typ]
for type2, pisList := range type2PoolList {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同line 74

storager := m.instancePoolStorages[type2]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storager => storage

r, err := storager.BatchFetchInstances(ctx, pisList)
if err != nil {
return nil, err
Expand All @@ -94,8 +94,8 @@ func (pim *InstancePoolManager) BatchFetchInstances(ctx context.Context, pools [
return rst, nil
}

func (pim *InstancePoolManager) UpdateInstances(ctx context.Context, pool *Pool, pis *InstancePool) error {
storager, ok := pim.instancePoolStorages[pool.Type]
func (m *InstancePoolManager) UpdateInstances(ctx context.Context, pool *Pool, pis *InstancePool) error {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pis => iPool

storager, ok := m.instancePoolStorages[pool.Type]
if !ok {
return xerror.WrapModelErrorWithMsg("Type %d not register Storager", pool.Type)
}
Expand Down
Loading