pgdlock

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2025 License: MIT Imports: 10 Imported by: 0

README

go-pg-distributed-lock

A robust PostgreSQL-based distributed locking library for Go applications.

⭐️ Star This Project ⭐️

If you find this project helpful, please give it a star on GitHub! Your support is greatly appreciated.

Overview

go-pg-distributed-lock provides a reliable distributed locking mechanism using PostgreSQL as the coordination backend. It's designed for distributed systems that need synchronization across multiple nodes or processes.

Key features:

  • Lease-based locks with configurable timeouts
  • Heartbeat mechanism to maintain locks during long operations
  • Automatic cleanup of inactive/stale locks
  • Automatic lock cleanup for crashed nodes
  • Configurable retry strategies
  • Per-lock customization of timing parameters

Installation

go get github.com/tzahifadida/go-pg-distributed-lock

Quick Start

package main

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/jackc/pgx/v5/stdlib"
	pgdlock "github.com/tzahifadida/go-pg-distributed-lock"
)

func main() {
	// Connect to PostgreSQL
	db, err := sql.Open("pgx", "postgres://username:password@localhost:5432/dbname")
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}
	defer db.Close()

	// Create context
	ctx := context.Background()

	// Create lock manager with default config
	lockManager, err := pgdlock.NewLockManager(ctx, db, nil)
	if err != nil {
		log.Fatalf("Failed to create lock manager: %v", err)
	}
	// Properly clean up resources when done
	defer lockManager.Close()

	// Create a lock for a specific resource
	lock := lockManager.NewDistributedLock("my-resource")

	// Try to acquire the lock
	if err := lock.Lock(ctx); err != nil {
		log.Printf("Could not acquire lock: %v", err)
		return
	}

	log.Println("Lock acquired, performing work...")

	// Simulate work
	time.Sleep(5 * time.Second)

	// Release the lock
	if err := lock.Unlock(ctx); err != nil {
		log.Printf("Error releasing lock: %v", err)
		return
	}

	log.Println("Lock released")
}

Configuration Options

You can customize the lock behavior with the Config struct:

config := pgdlock.Config{
    MaxAttempts:       3,                  // Maximum attempts to acquire a lock (default: 1)
    RetryDelay:        100 * time.Millisecond, // Delay between retry attempts
    HeartbeatInterval: 5 * time.Second,    // How often to send heartbeats
    LeaseTime:         30 * time.Second,   // How long a lock is valid
    SchemaName:        "public",           // Database schema
    TablePrefix:       "myapp_",           // Prefix for the locks table (results in "myapp_distributed_locks")
    CleanupInterval:       24 * time.Hour,     // How often to cleanup inactive locks (default: 24h)
    InactivityThreshold:   12 * time.Hour,     // Time after which inactive locks are cleaned up (default: 12h)
    EnableAutomaticCleanup: true,              // Whether to automatically run cleanup (default: true)
}

lockManager, err := pgdlock.NewLockManager(ctx, db, &config)

The TablePrefix is particularly useful when:

  • You need to separate locks used by different applications in the same database
  • You want to avoid table name collisions in your database
  • You need to namespace different lock sets for different components

For example, you might use different prefixes for different services:

// For the auth service
authConfig := pgdlock.Config{
    TablePrefix: "auth_",  // Table will be "auth_distributed_locks"
    // ... other config options
}

// For the payment service
paymentConfig := pgdlock.Config{
    TablePrefix: "payment_",  // Table will be "payment_distributed_locks"
    // ... other config options
}

Use Cases

  • Coordinating access to shared resources in distributed systems
  • Implementing leader election
  • Ensuring exclusive access to critical sections
  • Preventing race conditions in distributed job systems
  • Implementing distributed cron jobs that should run on only one node

Error Handling

The library provides specific error types to handle common locking scenarios:

if err := lock.Lock(ctx); err != nil {
    if errors.Is(err, pgdlock.ErrLockAlreadyHeld) {
        // Resource is already locked by another process
    } else if errors.Is(err, pgdlock.ErrLockExpired) {
        // Lock has expired
    } else {
        // Other errors
    }
}

Advanced Usage

Extending a Lock Lease
// Extend the current lock by an additional 30 seconds
if err := lock.ExtendLease(ctx, 30 * time.Second); err != nil {
    log.Printf("Failed to extend lease: %v", err)
}
Checking Lock Status
isLocked, err := lock.IsLocked(ctx)
if err != nil {
    log.Printf("Error checking lock status: %v", err)
} else if isLocked {
    log.Println("Lock is currently held by this node")
} else {
    log.Println("Lock is not currently held")
}
Managing Lock Cleanup

The library includes functionality to clean up locks that haven't been used for a specified period. This helps prevent database bloat from abandoned locks.

Automatic cleanup is enabled by default with a 24-hour cleanup interval and a 12-hour inactivity threshold. You can customize these settings:

config := pgdlock.Config{
    // ... other options
    CleanupInterval:       12 * time.Hour,    // Run cleanup every 12 hours
    InactivityThreshold:   6 * time.Hour,     // Clean locks inactive for 6+ hours
    EnableAutomaticCleanup: true,             // Enable automatic cleanup
}

You can also manually stop, start, or trigger the cleanup process:

// Manually run cleanup once
if err := lockManager.Cleanup(ctx); err != nil {
    log.Printf("Failed to run cleanup: %v", err)
}

// Stop automatic cleanup
lockManager.StopCleanup()

// Restart automatic cleanup
if err := lockManager.StartCleanup(ctx); err != nil {
    log.Printf("Failed to start cleanup: %v", err)
}
Customizing Lock Acquisition Attempts

By default, the library does not retry lock acquisition if it fails (MaxAttempts=1). If you want automatic retries, you can either:

  1. Configure it globally for all locks:
config := pgdlock.Config{
    MaxAttempts: 5,                  // Try 5 times before giving up
    RetryDelay: 50 * time.Millisecond, // Wait 50ms between attempts
    // other config options...
}
lockManager, err := pgdlock.NewLockManager(ctx, db, &config)
  1. Configure it for individual locks:
// Create a lock with custom retry options
lock := lockManager.NewDistributedLock(
    "my-critical-resource",
    pgdlock.WithMaxAttempts(5),              // Try up to 5 times
    pgdlock.WithRetryDelay(50 * time.Millisecond), // Use a shorter retry delay
)

// Acquire the lock with the custom retry behavior
if err := lock.Lock(ctx); err != nil {
    log.Printf("Failed to acquire lock despite 5 attempts: %v", err)
}
Customizing Timing Parameters For Individual Locks

You can customize the heartbeat interval and lease time for individual locks:

// Create a lock with custom timing parameters
lock := lockManager.NewDistributedLock(
    "my-long-running-task",
    pgdlock.WithHeartbeatInterval(3 * time.Second),  // Send heartbeats more frequently
    pgdlock.WithLeaseTime(2 * time.Minute),         // Use a longer lease time
)

// You can combine multiple options
lock := lockManager.NewDistributedLock(
    "my-critical-resource",
    pgdlock.WithMaxAttempts(5),                // Try up to 5 times
    pgdlock.WithRetryDelay(50 * time.Millisecond),   // Use a shorter retry delay
    pgdlock.WithHeartbeatInterval(1 * time.Second),  // Send frequent heartbeats
    pgdlock.WithLeaseTime(10 * time.Second)          // Use a shorter lease time
)

This gives you fine-grained control over lock behavior based on specific resource requirements:

  • Use shorter lease times for high-contention resources
  • Use longer lease times for long-running operations
  • Adjust heartbeat frequency based on network reliability and operation duration

Performance Considerations

  • The library uses a heartbeat mechanism that requires periodic database operations.
  • Consider tuning the HeartbeatInterval and LeaseTime based on your workload.
  • Adjust CleanupInterval and InactivityThreshold based on your application's lock usage patterns. Using shorter cleanup intervals in high-churn systems can help maintain database performance.
  • For high-frequency lock operations, you may need to adjust your PostgreSQL connection pool size.
  • Per-lock customization allows you to optimize timing parameters for different usage patterns.
  • Always call lockManager.Close() when shutting down your application to properly release resources.

License

This project is licensed under the MIT License. See the LICENSE file for details.


Thank you for using the library! If you have any questions, suggestions, or encounter any issues, please don't hesitate to open an issue on the GitHub repository. Your feedback and contributions are greatly appreciated and help make this library better for everyone.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLockAlreadyHeld is returned when attempting to acquire a lock that is already held by another node.
	ErrLockAlreadyHeld = errors.New("lock is already held by another node")

	// ErrLockNotHeld is returned when attempting to perform an operation on a lock that is not held by the current node.
	ErrLockNotHeld = errors.New("lock is not held by this node")

	// ErrLockExpired is returned when attempting to perform an operation on a lock that has expired.
	ErrLockExpired = errors.New("lock has expired")

	// ErrDatabaseError is returned when a database operation fails.
	ErrDatabaseError = errors.New("database operation failed")
)
View Source
var DefaultConfig = Config{
	MaxAttempts:            1,
	RetryDelay:             1 * time.Millisecond,
	HeartbeatInterval:      10 * time.Second,
	LeaseTime:              60 * time.Second,
	TablePrefix:            "",
	SchemaName:             "",
	CleanupInterval:        24 * time.Hour,
	InactivityThreshold:    12 * time.Hour,
	EnableAutomaticCleanup: true,
}

DefaultConfig provides default configuration values.

Functions

This section is empty.

Types

type Config

type Config struct {
	// MaxAttempts is the maximum number of attempts to acquire the lock.
	MaxAttempts int

	// RetryDelay is the duration to wait between lock acquisition attempts.
	RetryDelay time.Duration

	// HeartbeatInterval is the duration between heartbeats to maintain the lock.
	HeartbeatInterval time.Duration

	// LeaseTime is the duration for which the lock is considered valid.
	LeaseTime time.Duration

	// TablePrefix is the prefix to use for database tables.
	// For example, with TablePrefix "myapp_", the table will be "myapp_distributed_locks".
	TablePrefix string

	// SchemaName is the database schema to use.
	SchemaName string

	// CleanupInterval is the interval at which to run the cleanup process.
	// Default is 24 hours.
	CleanupInterval time.Duration

	// InactivityThreshold is the duration of inactivity after which a lock is considered stale and can be cleaned up.
	// Default is 12 hours.
	InactivityThreshold time.Duration

	// EnableAutomaticCleanup determines whether to automatically run the cleanup process.
	// Default is true.
	EnableAutomaticCleanup bool
}

Config holds the configuration options for the distributed lock system.

type DistributedLock

type DistributedLock struct {
	// contains filtered or unexported fields
}

DistributedLock represents a distributed lock implementation.

func (*DistributedLock) ExtendLease

func (dl *DistributedLock) ExtendLease(ctx context.Context, extension time.Duration) error

ExtendLease extends the lease time of the lock.

Parameters:

  • ctx: The context for the operation.
  • extension: The duration by which to extend the lease.

Returns:

  • An error if the lease cannot be extended, nil otherwise.

func (*DistributedLock) IsLocked

func (dl *DistributedLock) IsLocked(ctx context.Context) (bool, error)

IsLocked checks if the lock is currently held by this node.

Parameters:

  • ctx: The context for the operation.

Returns:

  • A boolean indicating whether the lock is held, and an error if the check fails.

func (*DistributedLock) Lock

func (dl *DistributedLock) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

Parameters:

  • ctx: The context for the operation.

Returns:

  • An error if the lock cannot be acquired, nil otherwise.
  • By default, no retries are performed unless configured differently.

func (*DistributedLock) Unlock

func (dl *DistributedLock) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

Parameters:

  • ctx: The context for the operation.

Returns:

  • An error if the lock cannot be released, nil otherwise.

type LockInfo

type LockInfo struct {
	NodeID        uuid.UUID `json:"nodeID" db:"node_id"`
	Resource      string    `json:"resource" db:"resource"`
	ExpiresAt     time.Time `json:"expiresAt" db:"expires_at"`
	LastHeartbeat time.Time `json:"lastHeartbeat" db:"last_heartbeat"`
}

LockInfo holds information about a lock.

type LockManager

type LockManager struct {
	Config *Config
	// contains filtered or unexported fields
}

LockManager is responsible for managing distributed locks in PostgreSQL.

func NewLockManager

func NewLockManager(ctx context.Context, db *sql.DB, config *Config) (*LockManager, error)

NewLockManager creates a new LockManager with the given configuration and database connection.

func (*LockManager) Cleanup added in v0.0.6

func (lm *LockManager) Cleanup(ctx context.Context) error

Cleanup removes locks that have been inactive for longer than the configured threshold. This method can be called manually if automatic cleanup is disabled.

func (*LockManager) Close added in v0.0.6

func (lm *LockManager) Close() error

Close releases resources held by the LockManager, including stopping the cleanup process.

func (*LockManager) NewDistributedLock

func (lm *LockManager) NewDistributedLock(resource string, options ...LockOption) *DistributedLock

NewDistributedLock creates a new DistributedLock instance.

Parameters:

  • resource: The name of the resource being locked.
  • options: Optional parameters for customizing the lock behavior.

Returns:

  • A pointer to the newly created DistributedLock.

func (*LockManager) StartCleanup added in v0.0.6

func (lm *LockManager) StartCleanup(ctx context.Context) error

StartCleanup starts the background process for cleaning up inactive locks. This method is automatically called by NewLockManager if EnableAutomaticCleanup is true.

func (*LockManager) StopCleanup added in v0.0.6

func (lm *LockManager) StopCleanup()

StopCleanup stops the background cleanup process.

func (*LockManager) WithClock

func (lm *LockManager) WithClock(clock clockwork.Clock) *LockManager

WithClock sets a custom clock for the LockManager (mainly for testing purposes).

func (*LockManager) WithLogger

func (lm *LockManager) WithLogger(logger *slog.Logger) *LockManager

WithLogger sets a custom logger for the LockManager.

type LockOption

type LockOption func(*DistributedLock)

LockOption defines a function type for customizing a DistributedLock

func WithHeartbeatInterval added in v0.0.5

func WithHeartbeatInterval(heartbeatInterval time.Duration) LockOption

WithHeartbeatInterval sets a custom heartbeat interval for the lock

func WithLeaseTime added in v0.0.5

func WithLeaseTime(leaseTime time.Duration) LockOption

WithLeaseTime sets a custom lease time for the lock

func WithMaxAttempts added in v0.0.5

func WithMaxAttempts(maxAttempts int) LockOption

WithMaxAttempts sets a custom maximum number of attempts for lock acquisition

func WithRetryDelay

func WithRetryDelay(retryDelay time.Duration) LockOption

WithRetryDelay sets a custom delay between retry attempts for lock acquisition

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL