<?php

/**
 * Created by Satyam on 25-07-2024 for Fast Comet to Digital Ocean Records Migration script
 */

date_default_timezone_set("Asia/Kolkata");   //India time (GMT+5:30)

$eny_key = "jxDcis9sstSogJbC541sYmtrC2pKFRTug5rSgkxwo";
echo "<pre>";
if (!isset($_GET['enc_key']) || $_GET['enc_key'] != "jxDcis9sstSogJbC541sYmtrC2pKFRTug5rSgkxwo") {
    echo "Un authorized request";
    exit;
}

// Configuration for FastComet
$fastcometHost = '127.0.0.1';
$fastcometUsername = 'yatradorg_usrsrkad';
$fastcometPassword = 'NAEmDZJl0x(y';
$fastcometDatabase = 'yatradorg_archlogroav';

// Configuration for DigitalOcean
$doHost = 'db-mysql-blr1-96285-yd-live-do-user-16365556-0.c.db.ondigitalocean.com';
$doUsername = 'ydusrinv';
$doPassword = 'AVNS_c_ZgeJXjAD8FOZy3AYE';
$doDatabase = 'yatradorg_archlogroav';

$chunkSize = 500; // Increased for bulk operations
$maxExecutionTime = 1800; // 30 minutes
$startTime = time();
$todayDate = date('Y-m-d 00:00:00');
$todayDateYMD = date('Y-m-d');

ini_set('memory_limit', '1G');
set_time_limit($maxExecutionTime);

function logMessage($message, $logFile = 'fc_to_do_rc_migrates.log')
{
    $timestamp = date('Y-m-d H:i:s');
    $formattedMessage = "[$timestamp] $message" . PHP_EOL;
    file_put_contents($logFile, $formattedMessage, FILE_APPEND | LOCK_EX);
}

try {
    $sourcePDO = new PDO("mysql:host=$fastcometHost;dbname=$fastcometDatabase", $fastcometUsername, $fastcometPassword, [
        PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
        PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => false
    ]);
    $destPDO = new PDO("mysql:host=$doHost;port=25060;dbname=$doDatabase", $doUsername, $doPassword, [
        PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
        PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => false
    ]);
    logMessage("");
    logMessage("-----------------------------------------Starting optimized bulk data transfer process-----------------------------------------");

    $stmt = $destPDO->query("
        SELECT * 
        FROM data_transfer_management_log         
        WHERE (last_run < '$todayDate' OR last_run is NULL)
        AND status != 'in_progress' OR (status != 'completed' AND last_run like '$todayDateYMD%')
        ORDER BY last_run DESC 
        LIMIT 1
    ");

    $config = $stmt->fetch(PDO::FETCH_ASSOC);
    $stmt->closeCursor();

    if (!$config) {
        logMessage("No tables found for processing or all tables are in progress.");
        exit;
    }

    $table = $config['table_name'];
    $uniqueColumn = $config['unique_column'];
    $dateColumn = $config['date_column'];
    $lastProcessedId = $config['last_processed_id'] ?? -1;
    $lastProcessedDate = $config['last_processed_date'];

    // Get the maximum ID in the source table
    $maxIdStmt = $sourcePDO->query("SELECT MAX(`$uniqueColumn`) AS max_id FROM `$table`");
    $maxIdRow = $maxIdStmt->fetch(PDO::FETCH_ASSOC);
    $maxId = $maxIdRow['max_id'];
    $maxIdStmt->closeCursor();

    // Get the minimum ID in the source table
    $minIdStmt = $sourcePDO->query("SELECT MIN(`$uniqueColumn`) AS min_id FROM `$table`");
    $minIdRow = $minIdStmt->fetch(PDO::FETCH_ASSOC);
    $minId = $minIdRow['min_id'];
    $minIdStmt->closeCursor();

    // Adjust lastProcessedId if it's outside the valid range
    if ($lastProcessedId < $minId || $lastProcessedId > $maxId) {
        logMessage("Last processed ID ($lastProcessedId) is outside the valid range ($minId - $maxId). Resetting to minimum ID.");
        $lastProcessedId = $minId - 1; // Start from the beginning
    }

    logMessage("Processing table: $table");

    $destPDO->prepare("UPDATE data_transfer_management_log SET status = 'in_progress', last_run = NOW() WHERE table_name = ?")
        ->execute([$table]);

    $totalTransferred = 0;
    $totalDeleted = 0;

    while (true) {
        if (time() - $startTime > $maxExecutionTime - 60) { // Leave 60 seconds for cleanup
            logMessage("Approaching max execution time. Stopping gracefully.");
            break;
        }

        $sourcePDO->beginTransaction();
        $destPDO->beginTransaction();

        try {
            // Fetch chunk of data
            $query = $dateColumn && $lastProcessedDate
                ? "SELECT * FROM `$table` WHERE (`$uniqueColumn` > :lastId OR (`$uniqueColumn` = :lastId AND `$dateColumn` > :lastDate)) ORDER BY `$uniqueColumn`, `$dateColumn` LIMIT :chunkSize"
                : "SELECT * FROM `$table` WHERE `$uniqueColumn` > :lastId ORDER BY `$uniqueColumn` LIMIT :chunkSize";

            $stmt = $sourcePDO->prepare($query);
            $stmt->bindParam(':lastId', $lastProcessedId, PDO::PARAM_INT);
            $stmt->bindParam(':chunkSize', $chunkSize, PDO::PARAM_INT);
            if ($dateColumn && $lastProcessedDate) {
                $stmt->bindParam(':lastDate', $lastProcessedDate, PDO::PARAM_STR);
            }
            $stmt->execute();

            $rows = $stmt->fetchAll(PDO::FETCH_ASSOC);
            $rowCount = count($rows);

            if ($rowCount == 0) {
                // Check for any missed rows
                $missedQuery = "SELECT * FROM `$table` WHERE `$uniqueColumn` <= :lastId ORDER BY `$uniqueColumn` LIMIT :chunkSize";
                $missedStmt = $sourcePDO->prepare($missedQuery);
                $missedStmt->bindParam(':lastId', $lastProcessedId, PDO::PARAM_INT);
                $missedStmt->bindParam(':chunkSize', $chunkSize, PDO::PARAM_INT);
                $missedStmt->execute();
                $missedRows = $missedStmt->fetchAll(PDO::FETCH_ASSOC);
                
                if (count($missedRows) == 0) {
                    logMessage("No more data to transfer for table: $table");
                    break;
                } else {
                    logMessage("Found " . count($missedRows) . " missed rows. Processing them.");
                    $rows = $missedRows;
                    $rowCount = count($rows);
                }
            }

            // Prepare bulk insert
            //$columns = array_keys($rows[0]);
            $columns = array_map(function ($col) {
                return "`" . str_replace("`", "``", $col) . "`";
            }, array_keys($rows[0]));
            $insertPlaceholders = '(' . implode(',', array_fill(0, count($columns), '?')) . ')';
            $insertValues = [];
            $successfulIds = [];

            foreach ($rows as $row) {
                $insertValues = array_merge($insertValues, array_values($row));
                $successfulIds[] = $row[$uniqueColumn];
                $lastProcessedId = $row[$uniqueColumn];
                if ($dateColumn) {
                    $lastProcessedDate = $row[$dateColumn];
                }
            }

            // Perform bulk insert
            $insertSql = "INSERT IGNORE INTO `$table` (" . implode(',', $columns) . ") VALUES " . 
                         implode(',', array_fill(0, count($rows), $insertPlaceholders));
            $insertStmt = $destPDO->prepare($insertSql);
            $insertStmt->execute($insertValues);
            //logMessage("Executing query: $insertSql with values: " . implode(', ', $insertValues));

            $insertedCount = $insertStmt->rowCount();
            $totalTransferred += $insertedCount;

            // Perform bulk delete
            if (!empty($successfulIds)) {
                $deletePlaceholders = implode(',', array_fill(0, count($successfulIds), '?'));
                $deleteSQL = "DELETE FROM `$table` WHERE `$uniqueColumn` IN ($deletePlaceholders)";
                $deleteStmt = $sourcePDO->prepare($deleteSQL);
                $deleteStmt->execute($successfulIds);
                $deletedCount = $deleteStmt->rowCount();
                $totalDeleted += $deletedCount;
            }

            // Update management log
            $destPDO->prepare("
                UPDATE data_transfer_management_log
                SET last_processed_id = ?, last_processed_date = ?, rows_transferred = rows_transferred + ?
                WHERE table_name = ?
            ")->execute([$lastProcessedId, $lastProcessedDate, $insertedCount, $table]);

            $sourcePDO->commit();
            $destPDO->commit();

            //logMessage("Table $table: Transferred $insertedCount rows, Deleted $deletedCount rows, Last ID: $lastProcessedId, Last Date: $lastProcessedDate");
        } catch (Exception $e) {
            $sourcePDO->rollBack();
            $destPDO->rollBack();
            logMessage("Error: " . $e->getMessage());
            throw $e;
        }
    }

    $status = (time() - $startTime > $maxExecutionTime - 60) ? 'timeout' : 'completed';
    $destPDO->prepare("
        UPDATE data_transfer_management_log 
        SET status = ?, last_run = NOW()
        WHERE table_name = ?
    ")->execute([$status, $table]);
    $destPDO->commit();

    logMessage("Table $table: Processing $status. Total rows transferred: $totalTransferred, Total rows deleted: $totalDeleted");
    logMessage("Data transfer and delete process $status");
    logMessage("-----------------------------------------END-----------------------------------------");
    logMessage("");

} catch (PDOException $e) {
    logMessage("Error: " . $e->getMessage());
    if (isset($destPDO) && $destPDO->inTransaction()) {
        $destPDO->rollBack();
    }
    if (isset($sourcePDO) && $sourcePDO->inTransaction()) {
        $sourcePDO->rollBack();
    }
    if (isset($destPDO) && isset($table)) {
        $destPDO->prepare("
            UPDATE data_transfer_management_log 
            SET status = 'error', last_run = NOW()
            WHERE table_name = ?
        ")->execute([$table]);
    }
}