Extracts methods for code reuse.

This commit is contained in:
Toran Bruce Richards
2024-07-21 14:46:39 +01:00
parent 3e1f59814d
commit ad1699eccf

View File

@@ -17,6 +17,16 @@ def extract_spreadsheet_id(url: str) -> str:
return match.group(1)
raise ValueError("Invalid Google Sheets URL")
def get_google_sheets_service(access_token: str):
"""Create and return a Google Sheets service object."""
credentials = Credentials(access_token)
return build("sheets", "v4", credentials=credentials)
def get_spreadsheet_and_id(spreadsheet_url: str, access_token: str):
"""Extract spreadsheet ID and create a Google Sheets service object."""
spreadsheet_id = extract_spreadsheet_id(spreadsheet_url)
service = get_google_sheets_service(access_token)
return service.spreadsheets(), spreadsheet_id
class GoogleSheetsWriter(Block):
class Input(BlockSchema):
@@ -51,12 +61,8 @@ class GoogleSheetsWriter(Block):
def run(self, input_data: Input) -> BlockOutput:
try:
spreadsheet_id = extract_spreadsheet_id(input_data.spreadsheet_url)
credentials = Credentials(input_data.access_token)
service = build("sheets", "v4", credentials=credentials)
sheet = service.spreadsheets()
sheet, spreadsheet_id = get_spreadsheet_and_id(input_data.spreadsheet_url, input_data.access_token)
# Get the current sheet data to find the next empty row
result = (
sheet.values()
.get(spreadsheetId=spreadsheet_id, range=f"{input_data.sheet_name}!A:A")
@@ -111,6 +117,7 @@ class GoogleSheetsReader(Block):
row_number: int = SchemaField(
description="If reading a single row, which row number do you want? (e.g., 3 for the third row)",
placeholder="3",
default=1,
)
cell_range: str = SchemaField(
description="If reading a range, what range of cells do you want? (e.g., 'A1:E10' for the first 10 rows of columns A to E)",
@@ -137,21 +144,14 @@ class GoogleSheetsReader(Block):
def run(self, input_data: Input) -> BlockOutput:
try:
spreadsheet_id = extract_spreadsheet_id(input_data.spreadsheet_url)
credentials = Credentials(input_data.access_token)
service = build("sheets", "v4", credentials=credentials)
sheet = service.spreadsheets()
sheet, spreadsheet_id = get_spreadsheet_and_id(input_data.spreadsheet_url, input_data.access_token)
if input_data.read_single_row:
range_to_read = f"{input_data.sheet_name}!A{input_data.row_number}:ZZ{input_data.row_number}"
else:
range_to_read = f"{input_data.sheet_name}!{input_data.cell_range}"
result = (
sheet.values()
.get(spreadsheetId=spreadsheet_id, range=range_to_read)
.execute()
)
result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range_to_read).execute()
values = result.get("values", [])
if not values: