Skip to content

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
weizhoublue committed Nov 28, 2024
1 parent 02de076 commit 442f958
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 27 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ all: build-bin install-bin

.PHONY: all build install

阿斯蒂芬


CONTROLLER_BIN_SUBDIRS := cmd/spiderpool-controller cmd/spiderpoolctl cmd/spiderpool-init
AGENT_BIN_SUBDIRS := cmd/spiderpool-agent cmd/spiderpool cmd/coordinator
COORDINATOR_BIN_SUBDIRS := cmd/coordinator
Expand Down
75 changes: 65 additions & 10 deletions pkg/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,46 +128,101 @@ func getIPDiffSet(ipSourceList, ipExcludeList []net.IP, sorted bool, expectCount
}

// FindAvailableIPs find available ip list in range
func FindAvailableIPs(ipRanges []string, ipList []net.IP, count int) []net.IP {
func FindAvailableIPs(ipRanges []string, ipList []net.IP, count int) ([]net.IP, error) {
if count < 0 {
return nil, fmt.Errorf("count must be non-negative")
}

if len(ipRanges) == 0 {
return nil, fmt.Errorf("ipRanges cannot be empty")
}

// Use efficient map with [16]byte key for faster lookups
ipMap := make(map[[16]byte]struct{}, len(ipList))
ipVersion := -1 // -1: unset, 4: IPv4, 6: IPv6

// Validate and store existing IPs
for _, ip := range ipList {
if ip != nil {
ipMap[[16]byte(ip.To16())] = struct{}{}
if ip == nil {
continue
}

// Determine and validate IP version consistency
if ip.To4() != nil {
if ipVersion == 6 {
return nil, fmt.Errorf("mixed IPv4 and IPv6 addresses are not supported")
}
ipVersion = 4
} else {
if ipVersion == 4 {
return nil, fmt.Errorf("mixed IPv4 and IPv6 addresses are not supported")
}
ipVersion = 6
}
ipMap[[16]byte(ip.To16())] = struct{}{}
}

var availableIPs []net.IP


// Process each IP range
for _, ipRange := range ipRanges {
if count == 0 {
break
}

ips := strings.Split(ipRange, "-")
if len(ips) > 2 {
return nil, fmt.Errorf("invalid IP range format: %s", ipRange)
}

startIP := net.ParseIP(ips[0])
if startIP == nil {
return nil, fmt.Errorf("invalid start IP: %s", ips[0])
}

var endIP net.IP
if len(ips) == 2 {
endIP = net.ParseIP(ips[1])
if endIP == nil {
return nil, fmt.Errorf("invalid end IP: %s", ips[1])
}
} else {
endIP = startIP
}
if startIP == nil || endIP == nil {
continue

// Validate IP version consistency within range
if (startIP.To4() != nil) != (endIP.To4() != nil) {
return nil, fmt.Errorf("IP range %s contains mixed IPv4 and IPv6 addresses", ipRange)
}
if bytes.Compare(startIP, endIP) == 1 {
continue

// Validate IP version consistency with existing IPs
if ipVersion != -1 {
if (startIP.To4() != nil) != (ipVersion == 4) {
return nil, fmt.Errorf("IP range %s version mismatch with existing IPs", ipRange)
}
} else {
ipVersion = map[bool]int{true: 4, false: 6}[startIP.To4() != nil]
}

// Validate range order
if bytes.Compare(startIP, endIP) > 0 {
return nil, fmt.Errorf("invalid IP range: start IP %s is greater than end IP %s", startIP, endIP)
}

// Find available IPs in range using efficient lookup
stop := nextIP(endIP)
for ip := startIP; !ip.Equal(stop) && count > 0; ip = nextIP(ip) {
if _, exists := ipMap[[16]byte(ip.To16())]; !exists {
availableIPs = append(availableIPs, ip)
newIP := make(net.IP, len(ip))
copy(newIP, ip)
availableIPs = append(availableIPs, newIP)
ipMap[[16]byte(ip.To16())] = struct{}{} // Prevent duplicates across ranges
count--
}
}
}

return availableIPs
return availableIPs, nil
}

func nextIP(ip net.IP) net.IP {
Expand Down
119 changes: 102 additions & 17 deletions pkg/ipam/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -482,8 +483,21 @@ func (i *ipam) allocateIPsFromAllCandidates(ctx context.Context, tt ToBeAllocate
}

func (i *ipam) allocateIPFromCandidate(ctx context.Context, c *PoolCandidate, nic string, cleanGateway bool, pod *corev1.Pod, podController types.PodTopController) (*types.AllocationResult, error) {
if c == nil {
return nil, fmt.Errorf("pool candidate cannot be nil")
}
if nic == "" {
return nil, fmt.Errorf("NIC name cannot be empty")
}
if pod == nil {
return nil, fmt.Errorf("pod cannot be nil")
}

logger := logutils.FromContext(ctx)
const maxRetries = 3
var lastErr error

// Check failure history first
for _, oldRes := range i.failure.getFailureIPs(string(pod.UID)) {
for _, ipPool := range c.PToIPPool {
if oldRes.IP.IPPool == ipPool.Name && *oldRes.IP.Nic == nic {
Expand All @@ -495,31 +509,65 @@ func (i *ipam) allocateIPFromCandidate(ctx context.Context, c *PoolCandidate, ni
}
}

var errs []error
var result *types.AllocationResult
for _, pool := range c.Pools {
ip, err := i.ipPoolManager.AllocateIP(ctx, pool, nic, pod, podController)
if err != nil {
logger.Sugar().Warnf("Failed to allocate IPv%d IP address to NIC %s from IPPool %s: %v", c.IPVersion, nic, pool, err)
errs = append(errs, err)
continue
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context cancelled while retrying IP allocation: %v", ctx.Err())
case <-time.After(time.Duration(attempt*100) * time.Millisecond):
}
logger.Sugar().Infof("Retrying IP allocation attempt %d/%d", attempt+1, maxRetries)
}

logger.Sugar().Infof("Allocate IPv%d IP %s to NIC %s from IPPool %s", c.IPVersion, *ip.Address, nic, pool)
result = &types.AllocationResult{
IP: ip,
Routes: convert.ConvertSpecRoutesToOAIRoutes(nic, c.PToIPPool[pool].Spec.Routes),
CleanGateway: cleanGateway,
var errs []error
var result *types.AllocationResult

for _, pool := range c.Pools {
if err := i.checkPoolHealth(ctx, pool); err != nil {
logger.Sugar().Warnf("Pool %s health check failed: %v", pool, err)
errs = append(errs, err)
continue
}

ip, err := i.ipPoolManager.AllocateIP(ctx, pool, nic, pod, podController)
if err != nil {
logger.Sugar().Warnf("Failed to allocate IPv%d IP address to NIC %s from IPPool %s: %v", c.IPVersion, nic, pool, err)
errs = append(errs, err)
lastErr = err
continue
}

logger.Sugar().Infof("Allocated IPv%d IP %s to NIC %s from IPPool %s", c.IPVersion, *ip.Address, nic, pool)
result = &types.AllocationResult{
IP: ip,
Routes: convert.ConvertSpecRoutesToOAIRoutes(nic, c.PToIPPool[pool].Spec.Routes),
CleanGateway: cleanGateway,
}

if err := i.validateAllocatedIP(ctx, result, pod); err != nil {
logger.Sugar().Warnf("IP validation failed: %v", err)
if releaseErr := i.ipPoolManager.ReleaseIP(ctx, pool, []types.IPAndUID{{IP: *ip.Address, UID: string(pod.UID)}}); releaseErr != nil {
logger.Sugar().Errorf("Failed to release invalid IP: %v", releaseErr)
}
errs = append(errs, err)
continue
}

return result, nil
}

break
if len(errs) == len(c.Pools) {
lastErr = utilerrors.NewAggregate(errs)
continue
}
}

if len(errs) == len(c.Pools) {
return nil, fmt.Errorf("failed to allocate any IPv%d IP address to NIC %s from IPPools %v: %w", c.IPVersion, nic, c.Pools, utilerrors.NewAggregate(errs))
if lastErr != nil {
return nil, fmt.Errorf("failed to allocate any IPv%d IP address to NIC %s after %d attempts: %w",
c.IPVersion, nic, maxRetries, lastErr)
}

return result, nil
return nil, fmt.Errorf("failed to allocate IP address after %d attempts", maxRetries)
}

func (i *ipam) precheckPoolCandidates(ctx context.Context, t *ToBeAllocated) error {
Expand Down Expand Up @@ -781,3 +829,40 @@ func sortPoolCandidates(preliminary ToBeAllocateds) {
}
}
}

func (i *ipam) checkPoolHealth(ctx context.Context, poolName string) error {
pool, err := i.ipPoolManager.GetIPPoolByName(ctx, poolName, constant.UseCache)
if err != nil {
return fmt.Errorf("failed to get pool %s: %v", poolName, err)
}

if pool.DeletionTimestamp != nil {
return fmt.Errorf("pool %s is being deleted", poolName)
}

if pool.Spec.Disable != nil && *pool.Spec.Disable {
return fmt.Errorf("pool %s is disabled", poolName)
}

return nil
}

func (i *ipam) validateAllocatedIP(ctx context.Context, result *types.AllocationResult, pod *corev1.Pod) error {
if result == nil || result.IP == nil || result.IP.Address == nil {
return fmt.Errorf("invalid allocation result")
}

// Verify IP is not already in use by another pod
endpoints, err := i.endpointManager.GetEndpointsByIPs(ctx, []string{*result.IP.Address}, constant.UseCache)

Check failure on line 856 in pkg/ipam/allocate.go

View workflow job for this annotation

GitHub Actions / lint-golang

i.endpointManager.GetEndpointsByIPs undefined (type workloadendpointmanager.WorkloadEndpointManager has no field or method GetEndpointsByIPs) (typecheck)
if err != nil {
return fmt.Errorf("failed to check IP usage: %v", err)
}

for _, endpoint := range endpoints {
if endpoint.Status.Current.UID != string(pod.UID) {
return fmt.Errorf("IP %s is already in use by pod %s/%s", *result.IP.Address, endpoint.Namespace, endpoint.Name)
}
}

return nil
}

0 comments on commit 442f958

Please sign in to comment.