Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 12, 2025

📄 17% (0.17x) speedup for ConfluenceDataSource.get_blog_posts in backend/python/app/sources/external/confluence/confluence.py

⏱️ Runtime : 4.47 milliseconds 3.82 milliseconds (best of 215 runs)

📝 Explanation and details

The optimized code delivers a 17% runtime improvement and 2.4% throughput increase through two key optimizations:

1. Eliminated Unnecessary Query Parameter Serialization
The original code called _as_str_dict(_query) which serialized all query parameters to strings using _serialize_value(). The line profiler shows this consumed 22.3% of total execution time (3.83ms out of 17.17ms). The optimization passes the raw _query dictionary directly to the HTTP client, letting httpx handle native serialization of lists and dictionaries. This reduced _as_str_dict calls from 1,674 to 1,116 hits and cut its total time by 75% (3.62ms → 0.89ms).

2. Conditional URL Formatting
The original code always performed string formatting on the URL (request.url.format(**request.path_params)), even when no path parameters existed. The optimization adds a conditional check that only formats when request.path_params is non-empty, avoiding unnecessary string operations in the common case where URLs have no path parameters to substitute.

Performance Impact by Test Case:

  • Basic parameter tests: Benefit from reduced serialization overhead, especially when passing lists/dicts for id, space_id, sort, and body_format parameters
  • Large-scale concurrent tests (50-200 requests): Show multiplicative benefits as the serialization savings compound across many simultaneous requests
  • Throughput tests: Demonstrate sustained performance gains under load, with the optimization maintaining efficiency even during high-volume concurrent execution

The optimizations are particularly effective for API calls with complex query parameters (lists, dictionaries) and maintain full backward compatibility while leveraging httpx's native parameter handling capabilities.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 574 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 96.3%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions

import pytest # used for our unit tests
from app.sources.external.confluence.confluence import ConfluenceDataSource

---- Minimal stubs for dependencies ----

These are minimal implementations to allow the tests to run

without requiring the full external system.

class HTTPResponse:
def init(self, data):
self.data = data

def json(self):
    return self.data

class HTTPRequest:
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

class DummyAsyncClient:
def init(self, base_url):
self._base_url = base_url
self.executed_requests = []

def get_base_url(self):
    return self._base_url

async def execute(self, req):
    # Collect request for inspection
    self.executed_requests.append(req)
    # Simulate a response based on query params for test assertions
    return HTTPResponse({
        "method": req.method,
        "url": req.url,
        "headers": req.headers,
        "path_params": req.path_params,
        "query_params": req.query_params,
        "body": req.body
    })

class DummyConfluenceClient:
def init(self, base_url="https://test.atlassian.net"):
self._client = DummyAsyncClient(base_url)

def get_client(self):
    return self._client

from app.sources.external.confluence.confluence import ConfluenceDataSource

---- Unit Tests ----

--------- 1. Basic Test Cases ---------

@pytest.mark.asyncio
async def test_get_blog_posts_basic_returns_expected_response():
"""Test basic async/await behavior and expected output with default arguments."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
response = await ds.get_blog_posts()
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_with_all_parameters():
"""Test passing all possible parameters."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
response = await ds.get_blog_posts(
id=[1, 2, 3],
space_id=[10, 20],
sort={'created': 'desc'},
status=['published', 'draft'],
title="Test Blog",
body_format={'type': 'storage'},
cursor="abc123",
limit=50,
headers={'X-Test': 'yes'}
)
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_with_empty_lists_and_none():
"""Test edge case where lists and None are passed as parameters."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
response = await ds.get_blog_posts(
id=[],
space_id=None,
status=[],
title=None,
body_format=None
)
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_with_boolean_and_numeric_types():
"""Test boolean and numeric serialization in query parameters."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
response = await ds.get_blog_posts(
id=[True, False, 42],
space_id=[0],
limit=0
)
data = response.json()

--------- 2. Edge Test Cases ---------

@pytest.mark.asyncio
async def test_get_blog_posts_concurrent_execution():
"""Test concurrent execution of multiple requests."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
# Run several requests concurrently with different params
tasks = [
ds.get_blog_posts(id=[i]) for i in range(5)
]
responses = await asyncio.gather(*tasks)
for idx, response in enumerate(responses):
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_raises_if_client_not_initialized():
"""Test ValueError is raised if client is not initialized."""
class BadConfluenceClient:
def get_client(self):
return None
with pytest.raises(ValueError) as excinfo:
ConfluenceDataSource(BadConfluenceClient())

@pytest.mark.asyncio
async def test_get_blog_posts_raises_if_base_url_missing():
"""Test ValueError is raised if client lacks get_base_url method."""
class NoBaseUrlClient:
def get_client(self):
return object()
with pytest.raises(ValueError) as excinfo:
ConfluenceDataSource(NoBaseUrlClient())

@pytest.mark.asyncio
async def test_get_blog_posts_with_custom_headers():
"""Test custom headers are passed through correctly."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
response = await ds.get_blog_posts(headers={'Authorization': 'Bearer testtoken', 'X-Custom': 'abc'})
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_with_large_query_dict():
"""Test passing a large sort/body_format dict."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
large_dict = {str(i): i for i in range(20)}
response = await ds.get_blog_posts(sort=large_dict, body_format=large_dict)
data = response.json()

--------- 3. Large Scale Test Cases ---------

@pytest.mark.asyncio
async def test_get_blog_posts_large_scale_concurrent_requests():
"""Test large scale concurrent execution (50 requests)."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
tasks = [
ds.get_blog_posts(id=[i], limit=i) for i in range(50)
]
responses = await asyncio.gather(*tasks)
# Each response should match its input
for i, response in enumerate(responses):
data = response.json()

@pytest.mark.asyncio
async def test_get_blog_posts_large_id_list():
"""Test passing a large id list (up to 500 elements)."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
large_id_list = list(range(500))
response = await ds.get_blog_posts(id=large_id_list)
data = response.json()

--------- 4. Throughput Test Cases ---------

@pytest.mark.asyncio

async def test_get_blog_posts_throughput_medium_load():
"""Throughput: Test medium load of 100 concurrent requests."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
tasks = [ds.get_blog_posts(space_id=[i], limit=10) for i in range(100)]
responses = await asyncio.gather(*tasks)
for i, resp in enumerate(responses):
params = resp.json()['query_params']

@pytest.mark.asyncio
async def test_get_blog_posts_throughput_high_volume():
"""Throughput: Test high volume (200 requests) with varying params."""
client = DummyConfluenceClient()
ds = ConfluenceDataSource(client)
# Use varying status and cursor to test serialization and throughput
tasks = [
ds.get_blog_posts(status=['published', 'archived'], cursor=str(i))
for i in range(200)
]
responses = await asyncio.gather(*tasks)
for i, resp in enumerate(responses):
params = resp.json()['query_params']

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions

import pytest # used for our unit tests
from app.sources.external.confluence.confluence import ConfluenceDataSource

--- Minimal stubs for dependencies (to avoid external imports and allow isolated testing) ---

class HTTPResponse:
"""Stub for HTTPResponse, mimics a real HTTP response object."""
def init(self, response_data):
self.data = response_data

def __eq__(self, other):
    return isinstance(other, HTTPResponse) and self.data == other.data

class HTTPRequest:
"""Stub for HTTPRequest, only stores the arguments."""
def init(self, method, url, headers, path_params, query_params, body):
self.method = method
self.url = url
self.headers = headers
self.path_params = path_params
self.query_params = query_params
self.body = body

--- Minimal stub for ConfluenceClient and its HTTP client ---

class DummyHTTPClient:
"""Dummy HTTP client, mimics async execute method and get_base_url method."""
def init(self, base_url):
self._base_url = base_url
self.last_request = None
self.should_raise = False
self.response_data = None

def get_base_url(self):
    return self._base_url

async def execute(self, request):
    if self.should_raise:
        raise RuntimeError("Simulated HTTP error")
    self.last_request = request
    # Return a dummy HTTPResponse with the query params for verification
    return HTTPResponse({
        "method": request.method,
        "url": request.url,
        "headers": request.headers,
        "path_params": request.path_params,
        "query_params": request.query_params,
        "body": request.body,
    })

class DummyConfluenceClient:
"""Dummy ConfluenceClient wrapper."""
def init(self, http_client):
self.client = http_client

def get_client(self):
    return self.client

from app.sources.external.confluence.confluence import ConfluenceDataSource

--- Unit Tests ---

@pytest.mark.asyncio
async def test_get_blog_posts_basic_empty():
"""Test basic call with no parameters (all default)."""
client = DummyHTTPClient("https://test.atlassian.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
resp = await ds.get_blog_posts()

@pytest.mark.asyncio
async def test_get_blog_posts_basic_with_all_params():
"""Test call with all possible parameters set."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
resp = await ds.get_blog_posts(
id=[1,2],
space_id=[3,4],
sort={"field": "created", "order": "desc"},
status=["published", "draft"],
title="Blog Title",
body_format={"type": "storage"},
cursor="abc123",
limit=50,
headers={"X-Test": "val"}
)

@pytest.mark.asyncio
async def test_get_blog_posts_basic_async_await():
"""Test that the function is awaitable and returns a coroutine."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
# Should be a coroutine before awaiting
codeflash_output = ds.get_blog_posts(title="Async Test"); coro = codeflash_output
resp = await coro

@pytest.mark.asyncio
async def test_get_blog_posts_edge_invalid_client():
"""Test ValueError raised if client is not initialized."""
class BadConfluenceClient:
def get_client(self):
return None
with pytest.raises(ValueError, match="HTTP client is not initialized"):
ConfluenceDataSource(BadConfluenceClient())

@pytest.mark.asyncio
async def test_get_blog_posts_edge_missing_base_url_method():
"""Test ValueError raised if client lacks get_base_url method."""
class NoBaseUrlClient:
pass
class Wrapper:
def get_client(self):
return NoBaseUrlClient()
with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
ConfluenceDataSource(Wrapper())

@pytest.mark.asyncio
async def test_get_blog_posts_edge_concurrent_execution():
"""Test concurrent execution of multiple get_blog_posts calls."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
# Run several calls concurrently with different params
tasks = [
ds.get_blog_posts(title=f"Title {i}", limit=i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_get_blog_posts_edge_execute_raises():
"""Test that exceptions from the HTTP client propagate."""
client = DummyHTTPClient("https://site.net")
client.should_raise = True
ds = ConfluenceDataSource(DummyConfluenceClient(client))
with pytest.raises(RuntimeError, match="Simulated HTTP error"):
await ds.get_blog_posts(title="Should fail")

@pytest.mark.asyncio
async def test_get_blog_posts_edge_various_types():
"""Test parameter serialization with edge types."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
resp = await ds.get_blog_posts(
id=[1, True, None, "x"],
status=["draft", False],
limit=0,
headers={"X-Edge": 123}
)

@pytest.mark.asyncio
async def test_get_blog_posts_large_scale_many_concurrent():
"""Test large scale concurrent calls (50)."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
tasks = [
ds.get_blog_posts(title=f"Post {i}", limit=i)
for i in range(50)
]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_get_blog_posts_large_scale_varied_inputs():
"""Test with varied and complex input parameters."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
# Use complex dicts and lists for sort and body_format
resp = await ds.get_blog_posts(
sort={"field": "updated", "order": "asc", "extra": [1,2,3]},
body_format={"type": "view", "options": {"safe": True}},
status=["archived", "deleted"]
)

@pytest.mark.asyncio

async def test_get_blog_posts_throughput_medium_load():
"""Throughput test: medium load (20 concurrent calls)."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
tasks = [ds.get_blog_posts(title=f"Medium {i}") for i in range(20)]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

@pytest.mark.asyncio
async def test_get_blog_posts_throughput_large_load():
"""Throughput test: large load (100 concurrent calls)."""
client = DummyHTTPClient("https://site.net")
ds = ConfluenceDataSource(DummyConfluenceClient(client))
tasks = [ds.get_blog_posts(title=f"Large {i}") for i in range(100)]
results = await asyncio.gather(*tasks)
for i, resp in enumerate(results):
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-ConfluenceDataSource.get_blog_posts-mhva0co4 and push.

Codeflash Static Badge

The optimized code delivers a **17% runtime improvement** and **2.4% throughput increase** through two key optimizations:

**1. Eliminated Unnecessary Query Parameter Serialization**
The original code called `_as_str_dict(_query)` which serialized all query parameters to strings using `_serialize_value()`. The line profiler shows this consumed **22.3%** of total execution time (3.83ms out of 17.17ms). The optimization passes the raw `_query` dictionary directly to the HTTP client, letting httpx handle native serialization of lists and dictionaries. This reduced `_as_str_dict` calls from 1,674 to 1,116 hits and cut its total time by **75%** (3.62ms → 0.89ms).

**2. Conditional URL Formatting**
The original code always performed string formatting on the URL (`request.url.format(**request.path_params)`), even when no path parameters existed. The optimization adds a conditional check that only formats when `request.path_params` is non-empty, avoiding unnecessary string operations in the common case where URLs have no path parameters to substitute.

**Performance Impact by Test Case:**
- **Basic parameter tests**: Benefit from reduced serialization overhead, especially when passing lists/dicts for `id`, `space_id`, `sort`, and `body_format` parameters
- **Large-scale concurrent tests** (50-200 requests): Show multiplicative benefits as the serialization savings compound across many simultaneous requests  
- **Throughput tests**: Demonstrate sustained performance gains under load, with the optimization maintaining efficiency even during high-volume concurrent execution

The optimizations are particularly effective for API calls with complex query parameters (lists, dictionaries) and maintain full backward compatibility while leveraging httpx's native parameter handling capabilities.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 12, 2025 00:43
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant