Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,17 @@ import (
const etcAmazonEfs = "/etc/amazon/efs"

func main() {
var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI Endpoint")
version = flag.Bool("version", false, "Print the version and exit")
efsUtilsCfgDirPath = flag.String("efs-utils-config-dir-path", "/var/amazon/efs", "The preferred path for the efs-utils config directory. efs-utils-config-legacy-dir-path will be used if it is not empty, otherwise efs-utils-config-dir-path will be used.")
efsUtilsCfgLegacyDirPath = flag.String("efs-utils-config-legacy-dir-path", "/etc/amazon/efs-legacy", "The path to the legacy efs-utils config directory mounted from the host path /etc/amazon/efs")
efsUtilsStaticFilesPath = flag.String("efs-utils-static-files-path", "/etc/amazon/efs-static-files/", "The path to efs-utils static files directory")
volMetricsOptIn = flag.Bool("vol-metrics-opt-in", false, "Opt in to emit volume metrics")
volMetricsRefreshPeriod = flag.Float64("vol-metrics-refresh-period", 240, "Refresh period for volume metrics in minutes")
volMetricsFsRateLimit = flag.Int("vol-metrics-fs-rate-limit", 5, "Volume metrics routines rate limiter per file system")
deleteAccessPointRootDir = flag.Bool("delete-access-point-root-dir", false,
"Opt in to delete access point root directory by DeleteVolume. By default, DeleteVolume will delete the access point behind Persistent Volume and deleting access point will not delete the access point root directory or its contents.")
adaptiveRetryMode = flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests.")
tags = flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'")
maxInflightMountCallsOptIn = flag.Bool("max-inflight-mount-calls-opt-in", false, "Opt in to use max inflight mount calls limit.")
maxInflightMountCalls = flag.Int64("max-inflight-mount-calls", driver.UnsetMaxInflightMountCounts, "New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value.")
volumeAttachLimitOptIn = flag.Bool("volume-attach-limit-opt-in", false, "Opt in to use volume attach limit.")
volumeAttachLimit = flag.Int64("volume-attach-limit", driver.UnsetVolumeAttachLimit, "Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value.")
forceUnmountAfterTimeout = flag.Bool("force-unmount-after-timeout", false, "Enable force unmount if normal unmount times out during NodeUnpublishVolume.")
unmountTimeout = flag.Duration("unmount-timeout", driver.DefaultUnmountTimeout, "Timeout for unmounting a volume during NodePublishVolume when forceUnmountAfterTimeout is true. If the timeout is reached, the volume will be forcibly unmounted. The default value is 30 seconds.")
)
klog.InitFlags(nil)

options := driver.NewOptions()
flag.Parse()

if *version {
if err := options.Validate(); err != nil {
klog.ErrorS(err, "Invalid options")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if *options.Version {
info, err := driver.GetVersionJSON()
if err != nil {
klog.Fatalln(err)
Expand All @@ -63,11 +50,11 @@ func main() {
}

// chose which configuration directory we will use and create a symlink to it
err := driver.InitConfigDir(*efsUtilsCfgLegacyDirPath, *efsUtilsCfgDirPath, etcAmazonEfs)
err := driver.InitConfigDir(*options.EfsUtilsCfgLegacyDirPath, *options.EfsUtilsCfgDirPath, etcAmazonEfs)
if err != nil {
klog.Fatalln(err)
}
drv := driver.NewDriver(*endpoint, etcAmazonEfs, *efsUtilsStaticFilesPath, *tags, *volMetricsOptIn, *volMetricsRefreshPeriod, *volMetricsFsRateLimit, *deleteAccessPointRootDir, *adaptiveRetryMode, *maxInflightMountCallsOptIn, *maxInflightMountCalls, *volumeAttachLimitOptIn, *volumeAttachLimit, *forceUnmountAfterTimeout, *unmountTimeout)
drv := driver.NewDriver(options, etcAmazonEfs)
if err := drv.Run(); err != nil {
klog.Fatalln(err)
}
Expand Down
35 changes: 16 additions & 19 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ const (
driverName = "efs.csi.aws.com"

// AgentNotReadyTaintKey contains the key of taints to be removed on driver startup
AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready"
UnsetMaxInflightMountCounts = -1
UnsetVolumeAttachLimit = -1
DefaultUnmountTimeout = 30 * time.Second
AgentNotReadyNodeTaintKey = "efs.csi.aws.com/agent-not-ready"
)

type Driver struct {
Expand All @@ -62,34 +59,34 @@ type Driver struct {
unmountTimeout time.Duration
}

func NewDriver(endpoint, efsUtilsCfgPath, efsUtilsStaticFilesPath, tags string, volMetricsOptIn bool, volMetricsRefreshPeriod float64, volMetricsFsRateLimit int, deleteAccessPointRootDir bool, adaptiveRetryMode bool, maxInflightMountCallsOptIn bool, maxInflightMountCalls int64, volumeAttachLimitOptIn bool, volumeAttachLimit int64, forceUnmountAfterTimeout bool, unmountTimeout time.Duration) *Driver {
cloud, err := cloud.NewCloud(adaptiveRetryMode)
func NewDriver(options *Options, efsUtilsCfgPath string) *Driver {
cloud, err := cloud.NewCloud(*options.AdaptiveRetryMode)
if err != nil {
klog.Fatalln(err)
}

nodeCaps := SetNodeCapOptInFeatures(volMetricsOptIn)
watchdog := newExecWatchdog(efsUtilsCfgPath, efsUtilsStaticFilesPath, "amazon-efs-mount-watchdog")
nodeCaps := SetNodeCapOptInFeatures(*options.VolMetricsOptIn)
watchdog := newExecWatchdog(efsUtilsCfgPath, *options.EfsUtilsStaticFilesPath, "amazon-efs-mount-watchdog")
return &Driver{
endpoint: endpoint,
endpoint: *options.Endpoint,
nodeID: cloud.GetMetadata().GetInstanceID(),
mounter: newNodeMounter(),
efsWatchdog: watchdog,
cloud: cloud,
nodeCaps: nodeCaps,
volStatter: NewVolStatter(),
volMetricsOptIn: volMetricsOptIn,
volMetricsRefreshPeriod: volMetricsRefreshPeriod,
volMetricsFsRateLimit: volMetricsFsRateLimit,
volMetricsOptIn: *options.VolMetricsOptIn,
volMetricsRefreshPeriod: *options.VolMetricsRefreshPeriod,
volMetricsFsRateLimit: *options.VolMetricsFsRateLimit,
gidAllocator: NewGidAllocator(),
deleteAccessPointRootDir: deleteAccessPointRootDir,
adaptiveRetryMode: adaptiveRetryMode,
tags: parseTagsFromStr(strings.TrimSpace(tags)),
deleteAccessPointRootDir: *options.DeleteAccessPointRootDir,
adaptiveRetryMode: *options.AdaptiveRetryMode,
tags: parseTagsFromStr(strings.TrimSpace(*options.Tags)),
lockManager: NewLockManagerMap(),
inFlightMountTracker: NewInFlightMountTracker(getMaxInflightMountCalls(maxInflightMountCallsOptIn, maxInflightMountCalls)),
volumeAttachLimit: getVolumeAttachLimit(volumeAttachLimitOptIn, volumeAttachLimit),
forceUnmountAfterTimeout: forceUnmountAfterTimeout,
unmountTimeout: unmountTimeout,
inFlightMountTracker: NewInFlightMountTracker(getMaxInflightMountCalls(*options.MaxInflightMountCallsOptIn, *options.MaxInflightMountCalls)),
volumeAttachLimit: getVolumeAttachLimit(*options.VolumeAttachLimitOptIn, *options.VolumeAttachLimit),
forceUnmountAfterTimeout: *options.ForceUnmountAfterTimeout,
unmountTimeout: *options.UnmountTimeout,
}
}

Expand Down
10 changes: 0 additions & 10 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,6 @@ func tryRemoveNotReadyTaintUntilSucceed(interval time.Duration, removeFn func()
}

func getMaxInflightMountCalls(maxInflightMountCallsOptIn bool, maxInflightMountCalls int64) int64 {
if maxInflightMountCallsOptIn && maxInflightMountCalls <= 0 {
klog.Errorf("Fatal error: maxInflightMountCalls must be greater than 0 when maxInflightMountCallsOptIn is true!")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if !maxInflightMountCallsOptIn {
klog.V(4).Infof("MaxInflightMountCallsOptIn is false, setting maxInflightMountCalls to %d and inflight check is disabled", UnsetMaxInflightMountCounts)
return UnsetMaxInflightMountCounts
Expand All @@ -583,11 +578,6 @@ func getMaxInflightMountCalls(maxInflightMountCallsOptIn bool, maxInflightMountC
}

func getVolumeAttachLimit(volumeAttachLimitOptIn bool, volumeAttachLimit int64) int64 {
if volumeAttachLimitOptIn && volumeAttachLimit <= 0 {
klog.Errorf("Fatal error: volumeAttachLimit must be greater than 0 when volumeAttachLimitOptIn is true!")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if !volumeAttachLimitOptIn {
klog.V(4).Infof("VolumeAttachLimitOptIn is false, setting maxVolumesPerNode to zero so that container orchestrator will decide the value")
return 0
Expand Down
61 changes: 6 additions & 55 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,6 @@ func TestGetMaxInflightMountCalls(t *testing.T) {
maxInflightMountCallsOptIn bool
maxInflightMountCalls int64
expected int64
expectFatal bool
}{
{
name: "opt-in false returns unset",
Expand All @@ -1154,37 +1153,13 @@ func TestGetMaxInflightMountCalls(t *testing.T) {
maxInflightMountCalls: 5,
expected: 5,
},
{
name: "opt-in true with zero value should fatal",
maxInflightMountCallsOptIn: true,
maxInflightMountCalls: 0,
expectFatal: true,
},
{
name: "opt-in true with negative value should fatal",
maxInflightMountCallsOptIn: true,
maxInflightMountCalls: UnsetMaxInflightMountCounts,
expectFatal: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expectFatal {
if os.Getenv("FORK") == "1" {
// If it is in forked process, run the fatal code directly and let klog.Fatal exit
getMaxInflightMountCalls(tc.maxInflightMountCallsOptIn, tc.maxInflightMountCalls)
return
}
err := runForkFatalTest("TestGetMaxInflightMountCalls/" + tc.name)
if err == nil {
t.Fatal("expected process to exit with error")
}
} else {
result := getMaxInflightMountCalls(tc.maxInflightMountCallsOptIn, tc.maxInflightMountCalls)
if result != tc.expected {
t.Errorf("Expected %d, got %d", tc.expected, result)
}
result := getMaxInflightMountCalls(tc.maxInflightMountCallsOptIn, tc.maxInflightMountCalls)
if result != tc.expected {
t.Errorf("Expected %d, got %d", tc.expected, result)
}
})
}
Expand All @@ -1210,37 +1185,13 @@ func TestGetVolumeAttachLimit(t *testing.T) {
volumeAttachLimit: 50,
expected: 50,
},
{
name: "opt-in true with zero value should fatal",
volumeAttachLimitOptIn: true,
volumeAttachLimit: 0,
expectFatal: true,
},
{
name: "opt-in true with negative value should fatal",
volumeAttachLimitOptIn: true,
volumeAttachLimit: -1,
expectFatal: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expectFatal {
// If it is in forked process, run the fatal code directly and let klog.Fatal exit
if os.Getenv("FORK") == "1" {
getVolumeAttachLimit(tc.volumeAttachLimitOptIn, tc.volumeAttachLimit)
return
}
err := runForkFatalTest("TestGetVolumeAttachLimit/" + tc.name)
if err == nil {
t.Fatal("expected process to exit with error")
}
} else {
result := getVolumeAttachLimit(tc.volumeAttachLimitOptIn, tc.volumeAttachLimit)
if result != tc.expected {
t.Errorf("Expected %d, got %d", tc.expected, result)
}
result := getVolumeAttachLimit(tc.volumeAttachLimitOptIn, tc.volumeAttachLimit)
if result != tc.expected {
t.Errorf("Expected %d, got %d", tc.expected, result)
}
})
}
Expand Down
68 changes: 68 additions & 0 deletions pkg/driver/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package driver

import (
"errors"
"flag"
"time"
)

const (
UnsetMaxInflightMountCounts = -1
UnsetVolumeAttachLimit = -1
DefaultUnmountTimeout = 30 * time.Second
)

type Options struct {
Endpoint *string
Version *bool
EfsUtilsCfgDirPath *string
EfsUtilsCfgLegacyDirPath *string
EfsUtilsStaticFilesPath *string
VolMetricsOptIn *bool
VolMetricsRefreshPeriod *float64
VolMetricsFsRateLimit *int
DeleteAccessPointRootDir *bool
AdaptiveRetryMode *bool
Tags *string
MaxInflightMountCallsOptIn *bool
MaxInflightMountCalls *int64
VolumeAttachLimitOptIn *bool
VolumeAttachLimit *int64
ForceUnmountAfterTimeout *bool
UnmountTimeout *time.Duration
}

func NewOptions() *Options {
return &Options{
Endpoint: flag.String("endpoint", "unix://tmp/csi.sock", "CSI Endpoint"),
Version: flag.Bool("version", false, "Print the version and exit"),
EfsUtilsCfgDirPath: flag.String("efs-utils-config-dir-path", "/var/amazon/efs", "The preferred path for the efs-utils config directory. efs-utils-config-legacy-dir-path will be used if it is not empty, otherwise efs-utils-config-dir-path will be used."),
EfsUtilsCfgLegacyDirPath: flag.String("efs-utils-config-legacy-dir-path", "/etc/amazon/efs-legacy", "The path to the legacy efs-utils config directory mounted from the host path /etc/amazon/efs"),
EfsUtilsStaticFilesPath: flag.String("efs-utils-static-files-path", "/etc/amazon/efs-static-files/", "The path to efs-utils static files directory"),
VolMetricsOptIn: flag.Bool("vol-metrics-opt-in", false, "Opt in to emit volume metrics"),
VolMetricsRefreshPeriod: flag.Float64("vol-metrics-refresh-period", 240, "Refresh period for volume metrics in minutes"),
VolMetricsFsRateLimit: flag.Int("vol-metrics-fs-rate-limit", 5, "Volume metrics routines rate limiter per file system"),
DeleteAccessPointRootDir: flag.Bool("delete-access-point-root-dir", false,
"Opt in to delete access point root directory by DeleteVolume. By default, DeleteVolume will delete the access point behind Persistent Volume and deleting access point will not delete the access point root directory or its contents."),
AdaptiveRetryMode: flag.Bool("adaptive-retry-mode", true, "Opt out to use standard sdk retry configuration. By default, adaptive retry mode will be used to more heavily client side rate limit EFS API requests."),
Tags: flag.String("tags", "", "Space separated key:value pairs which will be added as tags for EFS resources. For example, 'environment:prod region:us-east-1'"),
MaxInflightMountCallsOptIn: flag.Bool("max-inflight-mount-calls-opt-in", false, "Opt in to use max inflight mount calls limit."),
MaxInflightMountCalls: flag.Int64("max-inflight-mount-calls", UnsetMaxInflightMountCounts, "New NodePublishVolume operation will be blocked if maximum number of inflight calls is reached. If maxInflightMountCallsOptIn is true, it has to be set to a positive value."),
VolumeAttachLimitOptIn: flag.Bool("volume-attach-limit-opt-in", false, "Opt in to use volume attach limit."),
VolumeAttachLimit: flag.Int64("volume-attach-limit", UnsetVolumeAttachLimit, "Maximum number of volumes that can be attached to a node. If volumeAttachLimitOptIn is true, it has to be set to a positive value."),
ForceUnmountAfterTimeout: flag.Bool("force-unmount-after-timeout", false, "Enable force unmount if normal unmount times out during NodeUnpublishVolume."),
UnmountTimeout: flag.Duration("unmount-timeout", DefaultUnmountTimeout, "Timeout for unmounting a volume during NodePublishVolume when forceUnmountAfterTimeout is true. If the timeout is reached, the volume will be forcibly unmounted. The default value is 30 seconds."),
}
}

func (o *Options) Validate() error {
if *o.MaxInflightMountCallsOptIn && *o.MaxInflightMountCalls <= 0 {
return errors.New("maxInflightMountCallsOptIn is true, but maxInflightMountCalls is not set to a positive value")
}

if *o.VolumeAttachLimitOptIn && *o.VolumeAttachLimit <= 0 {
return errors.New("volumeAttachLimitOptIn is true, but volumeAttachLimit is not set to a positive value")
}

return nil
}
Loading
Loading