o
    ࡡi.                     @   sX   d Z ddlZddlZddlZddlZddlmZ G dd deZedkr*e	  dS dS )a  
Tests for greenlet behavior during interpreter shutdown (Py_FinalizeEx).

Prior to the safe finalization fix, active greenlets being deallocated
during interpreter shutdown could trigger SIGSEGV or SIGABRT on Python
< 3.11, because green_dealloc attempted to throw GreenletExit via
g_switch() into a partially-torn-down interpreter.

The fix adds _Py_IsFinalizing() guards (on Python < 3.11 only) that
call murder_in_place() instead of g_switch() when the interpreter is
shutting down, avoiding the crash at the cost of not running cleanup
code inside the greenlet.

These tests verify:
  1. No crashes on ANY Python version (the core safety guarantee).
  2. GreenletExit cleanup code runs correctly during normal thread exit
     (the standard production path, e.g. uWSGI worker threads).
    N)TestCasec                   @   s\   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd ZdS )TestInterpreterShutdownc                 C   s6   t |}tjtjd|gddddd}|j|j|jfS )z
        Run a Python script in a subprocess that exercises greenlet
        during interpreter shutdown. Returns (returncode, stdout, stderr).
        z-cT   F)capture_outputtexttimeoutcheck)	textwrapdedent
subprocessrunsys
executable
returncodestdoutstderr)selfscript_bodyfull_scriptresult r   y/var/www/html/asbeauty/laura_geller_scraping/env/lib/python3.10/site-packages/greenlet/tests/test_interpreter_shutdown.py_run_shutdown_script   s   

z,TestInterpreterShutdown._run_shutdown_scriptc              	   C   >   |  d\}}}| |dd| d| |  | d| dS )a8  
        An active (suspended) greenlet that is deallocated during
        interpreter shutdown should not crash the process.

        Before the fix, this would SIGSEGV on Python < 3.11 because
        _green_dealloc_kill_started_non_main_greenlet tried to call
        g_switch() during Py_FinalizeEx.
        aT              import greenlet

            def worker():
                greenlet.getcurrent().parent.switch("from worker")
                return "done"

            g = greenlet.greenlet(worker)
            result = g.switch()
            assert result == "from worker", result
            print("OK: exiting with active greenlet")
        r   Process crashed (rc=):
z OK: exiting with active greenletNr   assertEqualassertInr   rcr   r   r   r   r   )test_active_greenlet_at_shutdown_no_crash1   s   	zATestInterpreterShutdown.test_active_greenlet_at_shutdown_no_crashc              	   C   r   )zm
        Multiple suspended greenlets at shutdown should all be cleaned
        up without crashing.
        a              import greenlet

            def worker(name):
                greenlet.getcurrent().parent.switch(f"hello from {name}")
                return "done"

            greenlets = []
            for i in range(10):
                g = greenlet.greenlet(worker)
                result = g.switch(f"g{i}")
                greenlets.append(g)

            print(f"OK: {len(greenlets)} active greenlets at shutdown")
        r   r   r   z#OK: 10 active greenlets at shutdownNr   r   r   r   r   *test_multiple_active_greenlets_at_shutdownI   s   zBTestInterpreterShutdown.test_multiple_active_greenlets_at_shutdownc              	   C   r   )zQ
        Nested (chained parent) greenlets at shutdown should not crash.
        a              import greenlet

            def inner():
                greenlet.getcurrent().parent.switch("inner done")

            def outer():
                g_inner = greenlet.greenlet(inner)
                g_inner.switch()
                greenlet.getcurrent().parent.switch("outer done")

            g = greenlet.greenlet(outer)
            result = g.switch()
            assert result == "outer done", result
            print("OK: nested greenlets at shutdown")
        r   r   r   z OK: nested greenlets at shutdownNr   r   r   r   r   !test_nested_greenlets_at_shutdown`   s   z9TestInterpreterShutdown.test_nested_greenlets_at_shutdownc              	   C   r   )zm
        Greenlets in worker threads that are still referenced at
        shutdown should not crash.
        a              import greenlet
            import threading

            results = []

            def thread_worker():
                def greenlet_func():
                    greenlet.getcurrent().parent.switch("from thread greenlet")
                    return "done"

                g = greenlet.greenlet(greenlet_func)
                val = g.switch()
                results.append((g, val))

            threads = []
            for _ in range(3):
                t = threading.Thread(target=thread_worker)
                t.start()
                threads.append(t)

            for t in threads:
                t.join()

            print(f"OK: {len(results)} threaded greenlets at shutdown")
        r   r   r   z$OK: 3 threaded greenlets at shutdownNr   r   r   r   r   #test_threaded_greenlets_at_shutdownw   s   z;TestInterpreterShutdown.test_threaded_greenlets_at_shutdownc              	   C   J   |  d\}}}| |dd| d| |  | d| | d| dS )z
        When a thread exits normally while holding active greenlets,
        GreenletExit IS thrown and cleanup code runs.  This is the
        standard cleanup path used in production (e.g. uWSGI worker
        threads finishing a request).
        a4              import os
            import threading
            import greenlet

            _write = os.write

            def thread_func():
                def worker(_w=_write,
                           _GreenletExit=greenlet.GreenletExit):
                    try:
                        greenlet.getcurrent().parent.switch("suspended")
                    except _GreenletExit:
                        _w(1, b"CLEANUP: GreenletExit caught\n")
                        raise

                g = greenlet.greenlet(worker)
                g.switch()
                # Thread exits with active greenlet -> thread-state
                # cleanup triggers GreenletExit

            t = threading.Thread(target=thread_func)
            t.start()
            t.join()
            print("OK: thread cleanup done")
        r   r   r   OK: thread cleanup donezCLEANUP: GreenletExit caughtNr   r   r   r   r   (test_greenlet_cleanup_during_thread_exit   s   z@TestInterpreterShutdown.test_greenlet_cleanup_during_thread_exitc              	   C   r%   )zl
        try/finally blocks in active greenlets run correctly when the
        owning thread exits.
        aR              import os
            import threading
            import greenlet

            _write = os.write

            def thread_func():
                def worker(_w=_write):
                    try:
                        greenlet.getcurrent().parent.switch("suspended")
                    finally:
                        _w(1, b"FINALLY: cleanup executed\n")

                g = greenlet.greenlet(worker)
                g.switch()

            t = threading.Thread(target=thread_func)
            t.start()
            t.join()
            print("OK: thread cleanup done")
        r   r   r   r&   zFINALLY: cleanup executedNr   r   r   r   r   %test_finally_block_during_thread_exit   s   z=TestInterpreterShutdown.test_finally_block_during_thread_exitc              	   C   r   )z
        Stress test: many active greenlets with cleanup code at shutdown.
        Ensures no crashes regardless of deallocation order.
        a              import sys
            import greenlet

            cleanup_count = 0

            def worker(idx):
                global cleanup_count
                try:
                    greenlet.getcurrent().parent.switch(f"ready-{idx}")
                except greenlet.GreenletExit:
                    cleanup_count += 1
                    raise

            greenlets = []
            for i in range(50):
                g = greenlet.greenlet(worker)
                result = g.switch(i)
                greenlets.append(g)

            print(f"OK: {len(greenlets)} greenlets about to shut down")
            # Note: we can't easily print cleanup_count during shutdown
            # since it happens after the main module's code runs.
        r   r   r   z#OK: 50 greenlets about to shut downNr   r   r   r   r   ,test_many_greenlets_with_cleanup_at_shutdown   s   zDTestInterpreterShutdown.test_many_greenlets_with_cleanup_at_shutdownc              	   C   r   )z
        Deeply nested greenlet parent chains at shutdown.
        Tests that the deallocation order doesn't cause issues.
        a              import greenlet

            def level(depth, max_depth):
                if depth < max_depth:
                    g = greenlet.greenlet(level)
                    g.switch(depth + 1, max_depth)
                greenlet.getcurrent().parent.switch(f"depth-{depth}")

            g = greenlet.greenlet(level)
            result = g.switch(0, 10)
            print(f"OK: nested to depth 10, got {result}")
        r   r   r   zOK: nested to depth 10Nr   r   r   r   r   (test_deeply_nested_greenlets_at_shutdown  s   z@TestInterpreterShutdown.test_deeply_nested_greenlets_at_shutdownc              	   C   r   )z
        A greenlet that has an active exception context when it's
        suspended should not crash during shutdown cleanup.
        a              import greenlet

            def worker():
                try:
                    raise ValueError("test error")
                except ValueError:
                    # Suspend while an exception is active on the stack
                    greenlet.getcurrent().parent.switch("suspended with exc")
                return "done"

            g = greenlet.greenlet(worker)
            result = g.switch()
            assert result == "suspended with exc"
            print("OK: greenlet with active exception at shutdown")
        r   r   r   z.OK: greenlet with active exception at shutdownNr   r   r   r   r   (test_greenlet_with_traceback_at_shutdown&  s   z@TestInterpreterShutdown.test_greenlet_with_traceback_at_shutdownN)__name__
__module____qualname__r   r!   r"   r#   r$   r'   r(   r)   r*   r+   r   r   r   r   r      s    6% r   __main__)
__doc__r   r   unittestr	   greenlet.testsr   r   r,   mainr   r   r   r   <module>   s     %