Pipelines

Inventory Items Retrieval Module

17min

This document provides a comprehensive overview of the modules responsible for retrieving Inventory Data and Levels from Linnworks. These modules fetch data from Linnworks and subsequently update the database with the received inventory data.

Code Overview

The focus of this guide is on three primary modules:

  • Inventory
  • Threepl_inventory
  • Levels

Most of the code is reused between this modules, as they fetch and load data from the same source.

Inventory

This module is specifically designed to retrieve inventory items using the Linnworks API. It comprises several functions and a class that collectively manage the retrieval and updating of inventory data.

Database Connection Context Manager

  • get_db_cursor - The get_db_cursor function is a context manager for handling database connections. It establishes a connection, yields it for use, and ensures the connection is closed after use. It also handles any errors that occur during database operations. This context manager is reused in the whole code.
Python


InventoryStock Class

InventoryStock - The class extends the LinnApi class and includes methods for managing inventory stock using the Linnworks API. The key method in this class is get_stock_items, which retrieves stock items from the Linnworks inventory.

Retrieve Inventory Data Function

  • retrieve_inventory_data - The function is used to retrieve inventory data and update a database with the Yielded data from get_stock_items.
Python


3PL Integration

This module is used for extracting item titles, descriptions, prices, extended properties, and images. Linnworks provides a 3PL endpoint, Stock/GetStockItemsFullByIds, which we can use to retrieve inventory items, provided we have the inventory ID. In the Inventory module, we have already fetched the main inventory data which contains the inventory ID. So, we fetch the IDs from our database and send them to the API asynchronously. This module has one main function and several helper functions that assist the main function in retrieving the inventory items.

Main Function

  • get_all_data - The function is an asynchronous function that retrieves and processes inventory data from the Linnworks API. It sends a POST request to the Stock/GetStockItemsFullByIds endpoint of the Linnworks API to retrieve inventory items. It uses the inventory IDs from params_list for this purpose.The function creates tasks for these requests and runs them asynchronously using asyncio's gather method. If any errors occur during the HTTP request, such as a client error or a timeout error, they are logged and the function returns. After successfully retrieving the responses, the function processes and inserts the data into the database. It does this by calling helper functions (process_responses) for each type of data (titles, prices, descriptions, images, and extended properties).
Python


Note: This is just a code snnippet, find the actual code in the Code repository

Helper Functions

There are several helper functions,

-process_responses - This is an asynchronous function used to transform the data fetched from the api, it processes the responses in batches of a predefined size (BATCH_SIZE). For each batch, it extracts the required data using the data_extractor function and inserts it into the database using the provided SQL query (query). If an error occurs during data insertion, it skips the current iteration and continues with the next batch.

Python


-insert_data - This is used to update the data in the database after it has been extracted and transformed

Python

  • Extract Functions - This functions include extract_titles_data, extract_prices_data, extract_description_data, extract_images_data, and extract_extended_properties_data. This functions are designed to extract data from a batch of items. The batch is expected to be a list of dictionaries, where each dictionary represents an item with its associated properties. This functions are called in the main function passed in the process_responses function.
Python

Python


Levels

Stock levels are a crucial part of inventory management and need to be updated regularly. While we could have used the Stock/GetStockItemsFullByIds endpoint to retrieve the levels, this would also mean regularly fetching items, which isn’t necessary. In this module, we use the Stock/GetStockLevel_Batch endpoint to fetch the levels on a regular basis in a more efficient manner. The code is quite similar to that of the 3PL Integration, as they perform almost the same function. This module contains only one function: update_stock_levels.

update_stock_levels

This asynchronous function is designed to update stock levels regularly. It makes multiple HTTP requests concurrently, gathers the responses, and updates the stock levels in the database.

Usage

To run this Pipeline, the function are imported to one module from where they can get executed.

Main Inventory

main_inventory -This asynchronous function is the main entry point for managing inventory. It retrieves stock items and processes each item to retrieve its inventory data.

Python


The function starts by initializing an InventoryStock object. It then calls the get_stock_items method of this object to retrieve a list of stock items. The function iterates over each item in this list asynchronously. For each item, it calls the retrieve_inventory_data function to process the item and retrieve its inventory data.

3PL INTEGRATION

  • update_inventory_data - This asynchronous function is designed to update inventory data. It fetches parameters from the database in batches and makes multiple HTTP requests concurrently to retrieve the data. After all requests for a batch are completed, it waits for 60 seconds before proceeding with the next batch. This process continues until there are no more parameters to fetch from the database.
Python


The function starts by initializing an aiohttp client session. It then enters a loop where it fetches parameters from the database in batches of 150 (up to a maximum of 200 batches per minute). For each batch, it creates a list of tasks where each task is a call to the get_all_data function with the current session and parameters. It then executes all tasks concurrently using asyncio.gather(). After all tasks for the current batch are completed, it waits for 60 seconds before proceeding with the next batch. The function keeps track of the total time taken to update the inventory data.

Levels Data

This asynchronous function is designed to update level data. It fetches parameters from the database in batches and uses these parameters to update the stock levels. This process continues until there are no more parameters to fetch from the database.

Python


The function starts by initializing an aiohttp client session. It then enters a loop where it fetches parameters from the database in batches of 200 (up to a maximum of 50 batches at a time). For each batch, it calls the update_stock_levels function with the current session and parameters. The loop continues until there are no more parameters to fetch from the database.

Conclusion

This is how the ETL Pipeline is designed to function. The processes are later scheduled to execute at respective times. In conclusion, this documentation provides an overview of the Inventory Items Retrieval Module's functionality, its components, and their usage. It's designed for efficient and automated inventory management.