인접 리스트 (Adjacency List)
인접 리스트는 그래프 표현 방법 중 하나로, 각 정점(vertex)에 연결된 인접 정점들을 리스트 형태로 저장하는 방식이다.
특히 희소 그래프(sparse graph), 즉 간선의 수가 정점 수의 제곱에 비해 훨씬 적은 그래프를 표현할 때 메모리 효율성이 뛰어나다.
인접 리스트는 그래프 표현 방법 중에서 가장 널리 사용되는 방식 중 하나이다.
특히 희소 그래프를 효율적으로 표현할 수 있어 많은 실제 문제에 적합하다.
인접 리스트의 주요 장점을 요약하면 다음과 같다:
- 공간 효율성: 간선이 적은 그래프에서 메모리 사용량이 O(V + E)로 효율적이다.
- 정점 순회 효율성: 특정 정점의 인접 정점을 빠르게 순회할 수 있다.
- 동적 그래프 지원: 정점이나 간선의 추가/삭제가 효율적이다.
- 그래프 알고리즘 효율성: DFS, BFS, 다익스트라 등 많은 그래프 알고리즘이 인접 리스트에서 효율적으로 동작한다.
다만, 두 정점 간의 간선 존재 여부를 빠르게 확인해야 하는 경우에는 인접 행렬이 더 적합할 수 있다.
인접 리스트는 소셜 네트워크, 웹 그래프, 교통 네트워크, 생물학적 네트워크 등 다양한 분야에서 그래프 문제를 해결하는 데 중요한 역할을 한다. 최신 그래프 처리 시스템과 라이브러리들은 인접 리스트를 기반으로 하여 확장성과 성능을 최적화하고 있다.
인접 리스트의 기본 개념
인접 리스트는 각 정점마다 해당 정점에 인접한(직접 연결된) 정점들의 목록을 유지한다.
일반적으로 다음과 같은 구조를 가진다:
각 리스트는 해당 정점에서 출발하는 간선들의 목적지 정점들을 포함한다.
인접 리스트의 구현 방식
인접 리스트는 다양한 방식으로 구현할 수 있다.
배열(또는 벡터)의 배열
가장 기본적인 구현 방식으로, 정점 수만큼의 배열을 만들고 각 배열 요소는 인접 정점들의 리스트를 가리킨다.
연결 리스트의 배열
각 정점마다 연결 리스트를 사용하여 인접 정점들을 저장한다.
이 방식은 동적 메모리 할당이 필요하지만, 정점이나 간선을 추가/삭제하는 작업이 효율적이다.
|
|
해시 테이블(또는 딕셔너리)의 컬렉션
정점 식별자가 문자열이나 다른 복잡한 객체일 때 유용한 방식이다.
각 정점에 대해 해시 테이블을 사용하여 인접 정점들을 저장한다.
|
|
가중치 그래프를 위한 인접 리스트
가중치 그래프의 경우, 각 인접 정점과 함께 해당 간선의 가중치도 저장해야 한다.
|
|
인접 리스트의 장단점
장점
- 공간 효율성: 희소 그래프에서 O(V + E) 공간을 사용하여 메모리 효율적이다.
- 정점 순회 효율성: 특정 정점의 모든 인접 정점을 순회하는 데 O(degree(v)) 시간이 소요된다.
- 그래프 전체 순회 효율성: 모든 정점과 간선을 순회하는 데 O(V + E) 시간이 소요된다.
- 동적 그래프: 정점이나 간선을 추가/삭제하는 작업이 효율적이다.
단점
- 간선 확인의 비효율성: 두 정점 사이의 간선 존재 여부를 확인하는 데 O(degree(v)) 시간이 소요된다. 인접 행렬에서는 O(1)이다.
- 구현 복잡성: 인접 행렬에 비해 구현이 약간 더 복잡할 수 있다.
- 메모리 간접 참조: 포인터나 인덱스를 통한 간접 참조로 인해 캐시 효율성이 떨어질 수 있다.
인접 리스트의 연산
인접 리스트를 사용하여 그래프에서 수행할 수 있는 기본적인 연산들
간선 추가
시간 복잡도: O(1)
간선 제거
1 2 3 4 5 6 7 8 9 10 11 12
def remove_edge(self, u, v): # 인접 리스트에서 v를 찾아 제거 for i, (vertex, _) in enumerate(self.adj_list[u]): if vertex == v: self.adj_list[u].pop(i) break # 무방향 그래프의 경우 아래 코드도 추가 # for i, (vertex, _) in enumerate(self.adj_list[v]): # if vertex == u: # self.adj_list[v].pop(i) # break
시간 복잡도: O(degree(u)) - u의 차수에 비례
인접 정점 확인
시간 복잡도: O(degree(u)) - u의 차수에 비례
정점의 차수 계산
시간 복잡도: O(1)
인접 정점 순회
시간 복잡도: O(degree(vertex)) - 정점의 차수에 비례
인접 리스트를 이용한 그래프 알고리즘
깊이 우선 탐색(DFS)
시간 복잡도: O(V + E) - V는 정점 수, E는 간선 수
너비 우선 탐색(BFS)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from collections import deque def bfs(self, start_vertex): visited = set([start_vertex]) queue = deque([start_vertex]) while queue: vertex = queue.popleft() print(vertex, end=' ') # 인접한 정점들을 큐에 추가 for neighbor, _ in self.adj_list[vertex]: if neighbor not in visited: visited.add(neighbor) queue.append(neighbor)
시간 복잡도: O(V + E) - V는 정점 수, E는 간선 수
다익스트라 알고리즘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import heapq def dijkstra(self, start_vertex): # 최단 거리를 저장할 딕셔너리 distances = {vertex: float('infinity') for vertex in range(self.num_vertices)} distances[start_vertex] = 0 # 우선순위 큐 초기화 priority_queue = [(0, start_vertex)] while priority_queue: current_distance, current_vertex = heapq.heappop(priority_queue) # 현재 거리가 이미 계산된 거리보다 크면 무시 if current_distance > distances[current_vertex]: continue # 인접한 정점들을 확인 for neighbor, weight in self.adj_list[current_vertex]: distance = current_distance + weight # 더 짧은 경로를 발견한 경우 업데이트 if distance < distances[neighbor]: distances[neighbor] = distance heapq.heappush(priority_queue, (distance, neighbor)) return distances
시간 복잡도: O((V + E) log V) - 우선순위 큐 사용 시
응용 분야별 구현 예시
소셜 네트워크 분석
소셜 네트워크는 일반적으로 사용자 수에 비해 연결(친구 관계)이 상대적으로 적은 희소 그래프이다.
인접 리스트를 사용하여 친구 관계를 효율적으로 표현할 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
class SocialNetwork: def __init__(self): self.users = {} # 사용자 ID를 키로 하는 딕셔너리 def add_user(self, user_id, name): if user_id not in self.users: self.users[user_id] = {"name": name, "friends": []} def add_friendship(self, user1, user2): if user1 in self.users and user2 in self.users: # 이미 친구인지 확인 if user2 not in [friend["id"] for friend in self.users[user1]["friends"]]: self.users[user1]["friends"].append({"id": user2, "name": self.users[user2]["name"]}) self.users[user2]["friends"].append({"id": user1, "name": self.users[user1]["name"]}) def get_friends(self, user_id): if user_id in self.users: return self.users[user_id]["friends"] return [] def recommend_friends(self, user_id, max_recommendations=5): """친구의 친구를 추천 (현재 친구가 아닌 사람들)""" if user_id not in self.users: return [] current_friends = set(friend["id"] for friend in self.users[user_id]["friends"]) current_friends.add(user_id) # 자기 자신 추가 potential_friends = {} # 모든 친구의 친구를 확인 for friend in self.users[user_id]["friends"]: friend_id = friend["id"] for friend_of_friend in self.users[friend_id]["friends"]: fof_id = friend_of_friend["id"] if fof_id not in current_friends: if fof_id in potential_friends: potential_friends[fof_id] += 1 # 공통 친구 수 증가 else: potential_friends[fof_id] = 1 # 공통 친구 수로 정렬하여 추천 recommendations = sorted(potential_friends.items(), key=lambda x: x[1], reverse=True) return [(fof_id, self.users[fof_id]["name"], count) for fof_id, count in recommendations[:max_recommendations]]
웹 크롤러
웹 페이지 간의 링크 관계를 그래프로 표현할 때, 인접 리스트를 사용하여 각 페이지에서 링크하는 다른 페이지들을 저장할 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
class WebCrawler: def __init__(self): self.graph = {} # URL을 키로 하는 딕셔너리 self.visited = set() def add_page(self, url, linked_urls): self.graph[url] = linked_urls def crawl(self, start_url, max_depth=2): self.visited = set() self._dfs_crawl(start_url, 0, max_depth) def _dfs_crawl(self, url, depth, max_depth): if depth > max_depth or url in self.visited: return self.visited.add(url) print(f"Crawling: {url}") # 실제 구현에서는 여기서 페이지를 다운로드하고 링크 추출 # 인접 페이지들을 방문 if url in self.graph: for linked_url in self.graph[url]: self._dfs_crawl(linked_url, depth + 1, max_depth)
교통 네트워크
도시 간 교통 네트워크를 가중치 그래프로 표현할 때, 인접 리스트를 사용하여 각 도시에서 연결된 다른 도시와 이동 비용(거리, 시간 등)을 저장할 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
class TransportationNetwork: def __init__(self): self.cities = {} # 도시 이름을 키로 하는 딕셔너리 def add_city(self, city): if city not in self.cities: self.cities[city] = [] # 연결된 도시와 비용을 저장할 리스트 def add_route(self, city1, city2, distance, time): if city1 not in self.cities: self.add_city(city1) if city2 not in self.cities: self.add_city(city2) # 양방향 연결 (무방향 그래프) self.cities[city1].append({"to": city2, "distance": distance, "time": time}) self.cities[city2].append({"to": city1, "distance": distance, "time": time}) def find_shortest_path(self, start_city, end_city, metric="distance"): """다익스트라 알고리즘을 사용하여 최단 경로 찾기""" if start_city not in self.cities or end_city not in self.cities: return None # 거리/시간을 저장할 딕셔너리 costs = {city: float('infinity') for city in self.cities} costs[start_city] = 0 # 경로를 저장할 딕셔너리 previous = {city: None for city in self.cities} # 방문하지 않은 도시들 unvisited = list(self.cities.keys()) while unvisited: # 현재 최소 비용 도시 찾기 current_city = min(unvisited, key=lambda city: costs[city]) if current_city == end_city: break if costs[current_city] == float('infinity'): break unvisited.remove(current_city) # 인접 도시 확인 for connection in self.cities[current_city]: cost = costs[current_city] + connection[metric] if cost < costs[connection["to"]]: costs[connection["to"]] = cost previous[connection["to"]] = current_city # 경로 구성 path = [] current = end_city while current: path.append(current) current = previous[current] # 경로 역순으로 변경 path.reverse() if path[0] != start_city: return None # 경로가 없음 return {"path": path, "cost": costs[end_city]}
인접 리스트의 최적화 기법
캐시 지역성 향상
인접 리스트의 성능을 향상시키기 위해, 데이터의 지역성(locality)을 개선하여 캐시 효율성을 높일 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
# 정점들을 그래프 순회 순서로 재정렬 def optimize_vertex_ordering(self): # BFS를 사용하여 정점 순서 결정 new_order = [] visited = set() for start in range(self.num_vertices): if start not in visited: queue = deque([start]) visited.add(start) while queue: vertex = queue.popleft() new_order.append(vertex) for neighbor, _ in self.adj_list[vertex]: if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) # 새 순서에 따라 인접 리스트 재구성 new_adj_list = [[] for _ in range(self.num_vertices)] vertex_map = {old: new for new, old in enumerate(new_order)} for old_vertex, neighbors in enumerate(self.adj_list): new_vertex = vertex_map[old_vertex] for neighbor, weight in neighbors: new_neighbor = vertex_map[neighbor] new_adj_list[new_vertex].append((new_neighbor, weight)) self.adj_list = new_adj_list
압축 인접 리스트
메모리 사용량을 더 줄이기 위해, 연속된 메모리에 인접 리스트를 저장하는 압축 방식을 사용할 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
class CompressedGraph: def __init__(self, num_vertices, edges): self.num_vertices = num_vertices # 각 정점의 인접 리스트 시작 인덱스 self.offsets = [0] * (num_vertices + 1) # 간선 정보 저장 edges_by_vertex = [[] for _ in range(num_vertices)] for u, v, weight in edges: edges_by_vertex[u].append((v, weight)) # 오프셋 계산 current_offset = 0 for i in range(num_vertices): self.offsets[i] = current_offset current_offset += len(edges_by_vertex[i]) self.offsets[num_vertices] = current_offset # 인접 정점과 가중치 저장 total_edges = current_offset self.neighbors = [0] * total_edges self.weights = [0] * total_edges for i in range(num_vertices): edges_by_vertex[i].sort() # 정점 ID로 정렬 for idx, (v, w) in enumerate(edges_by_vertex[i]): pos = self.offsets[i] + idx self.neighbors[pos] = v self.weights[pos] = w def get_neighbors(self, vertex): start = self.offsets[vertex] end = self.offsets[vertex + 1] return [(self.neighbors[i], self.weights[i]) for i in range(start, end)]
병렬 처리를 위한 최적화
병렬 알고리즘을 위해 그래프 데이터 구조를 최적화할 수 있다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
import multiprocessing def parallel_bfs(graph, start_vertex, num_processes=4): """병렬 처리를 활용한 BFS""" from collections import deque import multiprocessing as mp def process_level(vertices, visited): """한 레벨의 정점들을 처리하여 다음 레벨의 정점들 반환""" next_level = [] local_visited = set() for vertex in vertices: for neighbor, _ in graph.adj_list[vertex]: if neighbor not in visited and neighbor not in local_visited: local_visited.add(neighbor) next_level.append(neighbor) return next_level, local_visited # 초기 설정 visited = set([start_vertex]) current_level = [start_vertex] all_vertices = [start_vertex] while current_level: # 현재 레벨의 정점들을 청크로 나누기 chunk_size = max(1, len(current_level) // num_processes) chunks = [current_level[i:i + chunk_size] for i in range(0, len(current_level), chunk_size)] # 병렬 처리를 위한 풀 생성 pool = mp.Pool(processes=num_processes) # 각 청크를 병렬로 처리 results = [pool.apply_async(process_level, args=(chunk, visited)) for chunk in chunks] # 결과 수집 next_level = [] for result in results: level_vertices, level_visited = result.get() next_level.extend(level_vertices) visited.update(level_visited) pool.close() pool.join() # 처리된 정점들 추가 all_vertices.extend(next_level) current_level = next_level return all_vertices
고급 응용: 실제 시스템에서의 인접 리스트
대규모 그래프 처리 시스템
대규모 그래프(수백만 개 이상의 노드와 간선)를 처리하기 위한 분산 시스템에서는 특수한 인접 리스트 구현이 필요하다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
class DistributedGraph: def __init__(self, num_shards=10): """노드 ID를 기준으로 그래프를 여러 샤드로 분할""" self.num_shards = num_shards self.shards = [{} for _ in range(num_shards)] def _get_shard_id(self, vertex_id): """정점 ID에 해당하는 샤드 ID 반환""" return hash(vertex_id) % self.num_shards def add_edge(self, from_vertex, to_vertex, weight=1): """그래프에 간선 추가""" shard_id = self._get_shard_id(from_vertex) if from_vertex not in self.shards[shard_id]: self.shards[shard_id][from_vertex] = [] self.shards[shard_id][from_vertex].append((to_vertex, weight)) def get_neighbors(self, vertex): """정점의 인접 정점들 반환""" shard_id = self._get_shard_id(vertex) if vertex in self.shards[shard_id]: return self.shards[shard_id][vertex] return [] def parallel_process(self, processing_function): """각 샤드를 병렬로 처리""" pool = multiprocessing.Pool(processes=self.num_shards) results = pool.map(processing_function, self.shards) pool.close() pool.join() return results
동적 그래프를 위한 시간 복잡도 최적화
많은 간선 추가/삭제가 발생하는 동적 그래프에서는 효율적인 자료구조가 필요하다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
class DynamicGraph: def __init__(self): self.nodes = {} # 정점 ID를 키로 하는 딕셔너리 def add_node(self, node_id, attributes=None): """정점 추가""" if node_id not in self.nodes: self.nodes[node_id] = { "attrs": attributes or {}, "out_edges": {}, # {목적지 정점: 가중치} "in_edges": {} # {출발 정점: 가중치} } def add_edge(self, from_node, to_node, weight=1, bidirectional=False): """간선 추가""" # 정점이 존재하지 않으면 추가 if from_node not in self.nodes: self.add_node(from_node) if to_node not in self.nodes: self.add_node(to_node) # 간선 추가 self.nodes[from_node]["out_edges"][to_node] = weight self.nodes[to_node]["in_edges"][from_node] = weight if bidirectional: self.nodes[to_node]["out_edges"][from_node] = weight self.nodes[from_node]["in_edges"][to_node] = weight def remove_edge(self, from_node, to_node, bidirectional=False): """간선 제거""" if from_node in self.nodes and to_node in self.nodes: if to_node in self.nodes[from_node]["out_edges"]: del self.nodes[from_node]["out_edges"][to_node] if from_node in self.nodes[to_node]["in_edges"]: del self.nodes[to_node]["in_edges"][from_node] if bidirectional: if from_node in self.nodes[to_node]["out_edges"]: del self.nodes[to_node]["out_edges"][from_node] if to_node in self.nodes[from_node]["in_edges"]: del self.nodes[from_node]["in_edges"][to_node] def get_out_neighbors(self, node): """출발 정점의 인접 정점들 반환""" if node in self.nodes: return list(self.nodes[node]["out_edges"].items()) return [] def get_in_neighbors(self, node): """도착 정점의 인접 정점들 반환""" if node in self.nodes: return list(self.nodes[node]["in_edges"].items()) return []
대규모 그래프 분석을 위한 인접 리스트
소셜 네트워크나 웹 그래프와 같은 대규모 그래프를 분석할 때는 효율적인 인접 리스트 구현이 필수적이다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
class GraphAnalyzer: def __init__(self, graph): self.graph = graph def pagerank(self, damping=0.85, iterations=100, tolerance=1e-6): """PageRank 알고리즘 구현""" num_vertices = len(self.graph.nodes) ranks = {node: 1.0 / num_vertices for node in self.graph.nodes} for _ in range(iterations): prev_ranks = ranks.copy() # 각 정점의 랭크 계산 for node in self.graph.nodes: rank_sum = 0 # 해당 정점으로 들어오는 간선들 처리 for source, _ in self.graph.get_in_neighbors(node): out_degree = len(self.graph.get_out_neighbors(source)) if out_degree > 0: rank_sum += prev_ranks[source] / out_degree # PageRank 공식 적용 ranks[node] = (1 - damping) / num_vertices + damping * rank_sum # 수렴 확인 diff = sum(abs(ranks[node] - prev_ranks[node]) for node in ranks) if diff < tolerance: break return ranks def community_detection(self, max_iterations=100): """레이블 전파 방식의 커뮤니티 탐지 알고리즘""" # 초기에 각 정점에 고유한 레이블 할당 labels = {node: idx for idx, node in enumerate(self.graph.nodes)} for _ in range(max_iterations): changed = False # 모든 정점에 대해 레이블 업데이트 for node in self.graph.nodes: # 이웃 정점들의 레이블 수집 neighbor_labels = {} for neighbor, _ in self.graph.get_out_neighbors(node): label = labels[neighbor] neighbor_labels[label] = neighbor_labels.get(label, 0) + 1 if not neighbor_labels: continue # 가장 많이 등장하는 레이블 선택 max_count = 0 max_label = labels[node] for label, count in neighbor_labels.items(): if count > max_count: max_count = count max_label = label # 레이블 업데이트 if max_label != labels[node]: labels[node] = max_label changed = True # 변화가 없으면 종료 if not changed: break # 커뮤니티 구성 communities = {} for node, label in labels.items(): if label not in communities: communities[label] = [] communities[label].append(node) return list(communities.values())
특수한 그래프를 위한 인접 리스트 변형
이분 그래프(Bipartite Graph)
이분 그래프는 정점을 두 그룹으로 나눌 수 있고, 같은 그룹 내의 정점 간에는 간선이 없는 그래프이다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
class BipartiteGraph: def __init__(self): self.left_vertices = {} # 왼쪽 정점 집합 self.right_vertices = {} # 오른쪽 정점 집합 def add_left_vertex(self, vertex, attributes=None): """왼쪽 집합에 정점 추가""" if vertex not in self.left_vertices: self.left_vertices[vertex] = { "attrs": attributes or {}, "edges": {} # {오른쪽 정점: 가중치} } def add_right_vertex(self, vertex, attributes=None): """오른쪽 집합에 정점 추가""" if vertex not in self.right_vertices: self.right_vertices[vertex] = { "attrs": attributes or {}, "edges": {} # {왼쪽 정점: 가중치} } def add_edge(self, left_vertex, right_vertex, weight=1): """간선 추가 (왼쪽 정점에서 오른쪽 정점으로만 연결 가능)""" # 정점이 존재하지 않으면 추가 if left_vertex not in self.left_vertices: self.add_left_vertex(left_vertex) if right_vertex not in self.right_vertices: self.add_right_vertex(right_vertex) # 간선 추가 self.left_vertices[left_vertex]["edges"][right_vertex] = weight self.right_vertices[right_vertex]["edges"][left_vertex] = weight def maximum_bipartite_matching(self): """최대 이분 매칭 알고리즘 (Hopcroft-Karp)""" # 매칭 결과 (왼쪽 정점 -> 오른쪽 정점) match_left = {} # 매칭 결과 (오른쪽 정점 -> 왼쪽 정점) match_right = {} def bfs(): queue = [] for left in self.left_vertices: if left not in match_left: dist[left] = 0 queue.append(left) else: dist[left] = float('inf') dist[None] = float('inf') while queue: left = queue.pop(0) if dist[left] < dist[None]: for right in self.left_vertices[left]["edges"]: if dist[match_right.get(right, None)] == float('inf'): dist[match_right.get(right, None)] = dist[left] + 1 queue.append(match_right.get(right, None)) return dist[None] != float('inf') def dfs(left): if left is None: return True for right in self.left_vertices[left]["edges"]: if dist[match_right.get(right, None)] == dist[left] + 1: if dfs(match_right.get(right, None)): match_left[left] = right match_right[right] = left return True dist[left] = float('inf') return False # 매칭 알고리즘 실행 dist = {} while bfs(): for left in self.left_vertices: if left not in match_left: dfs(left) return match_left
방향성 비순환 그래프(DAG)
방향성 비순환 그래프(Directed Acyclic Graph, DAG)는 방향이 있고 순환이 없는 그래프이다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
class DAG: def __init__(self): self.nodes = {} # 정점 ID를 키로 하는 딕셔너리 def add_node(self, node_id, attributes=None): """정점 추가""" if node_id not in self.nodes: self.nodes[node_id] = { "attrs": attributes or {}, "out_edges": {}, # {목적지 정점: 가중치} "in_edges": {} # {출발 정점: 가중치} } def add_edge(self, from_node, to_node, weight=1): """간선 추가 (순환이 생기지 않는 경우만)""" # 정점이 존재하지 않으면 추가 if from_node not in self.nodes: self.add_node(from_node) if to_node not in self.nodes: self.add_node(to_node) # 추가할 간선이 순환을 만드는지 확인 if self._will_create_cycle(from_node, to_node): raise ValueError(f"Adding edge {from_node} -> {to_node} would create a cycle") # 간선 추가 self.nodes[from_node]["out_edges"][to_node] = weight self.nodes[to_node]["in_edges"][from_node] = weight def _will_create_cycle(self, from_node, to_node): """간선을 추가했을 때 순환이 생기는지 확인""" # from_node에서 출발하여 to_node로 가는 경로가 이미 있는지 확인 visited = set() def dfs(node): if node == from_node: return True visited.add(node) for next_node in self.nodes[node]["out_edges"]: if next_node not in visited: if dfs(next_node): return True return False # to_node에서 시작하여 from_node로 가는 경로가 있는지 확인 return dfs(to_node) def topological_sort(self): """위상 정렬 수행""" # 각 정점의 진입 차수 계산 in_degree = {node: len(self.nodes[node]["in_edges"]) for node in self.nodes} # 진입 차수가 0인 정점들을 큐에 삽입 queue = [node for node in self.nodes if in_degree[node] == 0] result = [] while queue: # 큐에서 정점을 하나 꺼내서 결과에 추가 node = queue.pop(0) result.append(node) # 해당 정점에서 나가는 간선들을 제거 for next_node in list(self.nodes[node]["out_edges"].keys()): in_degree[next_node] -= 1 # 진입 차수가 0이 되면 큐에 삽입 if in_degree[next_node] == 0: queue.append(next_node) # 모든 정점을 방문하지 못했다면 순환이 있는 것 if len(result) != len(self.nodes): raise ValueError("Graph contains a cycle") return result def longest_path(self, start_node, end_node): """DAG에서 최장 경로 찾기""" # 위상 정렬 수행 topo_order = self.topological_sort() # 각 정점까지의 최장 거리와 이전 정점 저장 dist = {node: float('-inf') for node in self.nodes} prev = {node: None for node in self.nodes} dist[start_node] = 0 # 위상 정렬된 순서대로 정점 처리 for node in topo_order: # 현재 정점까지의 거리가 무한대가 아니면 처리 if dist[node] != float('-inf'): # 인접한 정점들의 거리 갱신 for next_node, weight in self.nodes[node]["out_edges"].items(): if dist[node] + weight > dist[next_node]: dist[next_node] = dist[node] + weight prev[next_node] = node # 경로 구성 if dist[end_node] == float('-inf'): return None # 경로가 없음 path = [] node = end_node while node: path.append(node) node = prev[node] # 경로 역순으로 변경 path.reverse() return {"path": path, "distance": dist[end_node]}
인접 리스트의 병렬 처리 최적화
현대의 많은 그래프 알고리즘은 병렬 처리를 통해 성능을 향상시킬 수 있다.
인접 리스트는 병렬 처리에 적합한 형태로 최적화될 수 있다.
정점 기반 병렬화
각 정점을 독립적으로 처리하는 병렬화 방식이다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
import concurrent.futures class ParallelGraph: def __init__(self, num_vertices): self.num_vertices = num_vertices self.adj_list = [[] for _ in range(num_vertices)] def add_edge(self, u, v, weight=1): self.adj_list[u].append((v, weight)) def parallel_process_vertices(self, vertex_function, max_workers=None): """각 정점에 대해 병렬로 함수 실행""" results = [None] * self.num_vertices with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 각 정점에 대해 작업 제출 future_to_vertex = { executor.submit(vertex_function, vertex, self.adj_list[vertex]): vertex for vertex in range(self.num_vertices) } # 결과 수집 for future in concurrent.futures.as_completed(future_to_vertex): vertex = future_to_vertex[future] try: results[vertex] = future.result() except Exception as exc: print(f'Vertex {vertex} generated an exception: {exc}') return results
간선 기반 병렬화
각 간선을 독립적으로 처리하는 병렬화 방식이다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
def parallel_process_edges(self, edge_function, max_workers=None): """모든 간선에 대해 병렬로 함수 실행""" # 모든 간선 수집 edges = [] for u in range(self.num_vertices): for v, weight in self.adj_list[u]: edges.append((u, v, weight)) results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 각 간선에 대해 작업 제출 futures = [executor.submit(edge_function, u, v, weight) for u, v, weight in edges] # 결과 수집 for future in concurrent.futures.as_completed(futures): try: results.append(future.result()) except Exception as exc: print(f'An edge generated an exception: {exc}') return results
메모리 최적화를 위한 인접 리스트 확장
대규모 그래프를 처리할 때는 메모리 사용량이 중요한 요소가 된다.
인접 리스트를 메모리 효율적으로 구현하는 방법을 살펴보면:
외부 메모리 그래프 처리
그래프가 매우 커서 메모리에 모두 로드할 수 없는 경우, 외부 저장소(디스크)를 활용한 처리가 필요하다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
class ExternalMemoryGraph: def __init__(self, file_path, mode='r'): """파일 기반 그래프 표현""" self.file_path = file_path self.mode = mode # 파일이 존재하지 않으면 생성 if mode == 'w' or not os.path.exists(file_path): with open(file_path, 'w') as f: f.write("# Vertex Count: 0\n") f.write("# Format: source_vertex target_vertex weight\n") # 정점 정보 캐싱 self.vertex_cache = {} self.vertex_count = 0 # 파일에서 정점 수 읽기 with open(file_path, 'r') as f: first_line = f.readline().strip() if first_line.startswith("# Vertex Count:"): self.vertex_count = int(first_line.split(':')[1].strip()) def add_edge(self, u, v, weight=1): """간선 추가""" if self.mode != 'w': raise ValueError("Graph is not in write mode") with open(self.file_path, 'a') as f: f.write(f"{u} {v} {weight}\n") # 캐시 업데이트 self.vertex_count = max(self.vertex_count, u + 1, v + 1) # 정점 수 업데이트 with open(self.file_path, 'r+') as f: f.seek(0) f.write(f"# Vertex Count: {self.vertex_count}\n") def get_neighbors(self, vertex): """정점의 인접 정점 가져오기""" # 캐시 확인 if vertex in self.vertex_cache: return self.vertex_cache[vertex] neighbors = [] # 파일에서 간선 정보 읽기 with open(self.file_path, 'r') as f: # 헤더 건너뛰기 f.readline() f.readline() for line in f: if line.startswith('#'): continue parts = line.strip().split() if len(parts) >= 3 and int(parts[0]) == vertex: target = int(parts[1]) weight = float(parts[2]) neighbors.append((target, weight)) # 캐시에 저장 self.vertex_cache[vertex] = neighbors return neighbors def bfs(self, start_vertex): """외부 메모리 BFS 구현""" visited = set([start_vertex]) queue = [start_vertex] result = [] while queue: vertex = queue.pop(0) result.append(vertex) # 인접 정점 가져오기 neighbors = self.get_neighbors(vertex) for neighbor, _ in neighbors: if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) return result
압축 인접 리스트
대규모 그래프에서 메모리 사용량을 줄이기 위한 압축 기법.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
import array class CompressedGraph: def __init__(self, num_vertices, edges=None): self.num_vertices = num_vertices # 타입코드: 'i'는 int형 배열을 의미 self.offsets = array.array('i', [0] * (num_vertices + 1)) self.edges = [] self.weights = [] if edges: self.build_from_edges(edges) def build_from_edges(self, edges): """간선 목록에서 압축 인접 리스트 구축""" # 정점별 간선 목록 생성 adj_lists = [[] for _ in range(self.num_vertices)] for u, v, weight in edges: adj_lists[u].append((v, weight)) # 각 정점별로 정렬 for i in range(self.num_vertices): adj_lists[i].sort() # 오프셋 계산 및 간선/가중치 배열 구성 current_offset = 0 for i in range(self.num_vertices): self.offsets[i] = current_offset for v, weight in adj_lists[i]: self.edges.append(v) self.weights.append(weight) current_offset += 1 self.offsets[self.num_vertices] = current_offset # 배열로 변환 self.edges = array.array('i', self.edges) self.weights = array.array('f', self.weights) def get_neighbors(self, vertex): """정점의 인접 정점 가져오기""" start = self.offsets[vertex] end = self.offsets[vertex + 1] return [(self.edges[i], self.weights[i]) for i in range(start, end)] def memory_usage(self): """메모리 사용량 추정 (바이트 단위)""" # 각 배열의 크기 계산 offsets_size = (self.num_vertices + 1) * array.array('i').itemsize edges_size = len(self.edges) * array.array('i').itemsize weights_size = len(self.weights) * array.array('f').itemsize return offsets_size + edges_size + weights_size
인접 리스트와 현대적 그래프 라이브러리
현대적인 그래프 처리 라이브러리들은 대부분 인접 리스트를 기반으로 구현되어 있다.
이러한 라이브러리들이 어떻게 인접 리스트를 확장하고 최적화하는지 살펴보면:
NetworkX
NetworkX는 Python의 대표적인 그래프 분석 라이브러리이다.
내부적으로 딕셔너리 기반의 인접 리스트를 사용한다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import networkx as nx # 그래프 생성 G = nx.Graph() # 무방향 그래프 # G = nx.DiGraph() # 방향 그래프 # 정점 추가 G.add_node(1, name="Alice") G.add_node(2, name="Bob") G.add_node(3, name="Charlie") # 간선 추가 G.add_edge(1, 2, weight=0.5) G.add_edge(2, 3, weight=0.7) G.add_edge(1, 3, weight=0.9) # 인접 정점 확인 neighbors = list(G.neighbors(1)) print(f"Neighbors of node 1: {neighbors}") # 그래프 알고리즘 shortest_path = nx.shortest_path(G, source=1, target=3, weight='weight') print(f"Shortest path from 1 to 3: {shortest_path}") # NetworkX 내부 표현 확인 print("Graph adjacency representation:") print(G.adj)
그래프 데이터베이스: Neo4j
Neo4j는 그래프 데이터베이스로, 노드와 관계를 저장하고 쿼리할 수 있다. 내부적으로 최적화된 인접 리스트 구조를 사용한다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
from neo4j import GraphDatabase # Neo4j 연결 uri = "neo4j://localhost:7687" driver = GraphDatabase.driver(uri, auth=("neo4j", "password")) def create_graph(tx): # 노드 생성 tx.run("CREATE (a:Person {name: 'Alice'})") tx.run("CREATE (b:Person {name: 'Bob'})") tx.run("CREATE (c:Person {name: 'Charlie'})") # 관계 생성 tx.run("MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) " "CREATE (a)-[:KNOWS {weight: 0.5}]->(b)") tx.run("MATCH (b:Person {name: 'Bob'}), (c:Person {name: 'Charlie'}) " "CREATE (b)-[:KNOWS {weight: 0.7}]->(c)") tx.run("MATCH (a:Person {name: 'Alice'}), (c:Person {name: 'Charlie'}) " "CREATE (a)-[:KNOWS {weight: 0.9}]->(c)") def find_friends_of_friends(tx, name): result = tx.run("MATCH (p:Person {name: $name})-[:KNOWS]->(friend)-[:KNOWS]->(fof) " "WHERE p <> fof AND NOT (p)-[:KNOWS]->(fof) " "RETURN fof.name as name", name=name) return [record["name"] for record in result] # 그래프 생성 with driver.session() as session: session.write_transaction(create_graph) # 2단계 연결 조회 friends_of_friends = session.read_transaction(find_friends_of_friends, "Alice") print(f"Friends of friends for Alice: {friends_of_friends}") driver.close()